##// END OF EJS Templates
celery: update how reqquest object is passed arround....
super-admin -
r4878:c5087cb0 default
parent child Browse files
Show More
@@ -1,330 +1,329 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2012-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 from __future__ import unicode_literals
22 22 import logging
23 23
24 24 import colander
25 25 import deform.widget
26 26 from mako.template import Template
27 27
28 28 from rhodecode import events
29 29 from rhodecode.model.validation_schema.widgets import CheckboxChoiceWidgetDesc
30 30 from rhodecode.translation import _
31 31 from rhodecode.lib.celerylib import run_task
32 32 from rhodecode.lib.celerylib import tasks
33 33 from rhodecode.integrations.types.base import (
34 34 IntegrationTypeBase, render_with_traceback)
35 35
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39 REPO_PUSH_TEMPLATE_PLAINTEXT = Template('''
40 40 Commits:
41 41
42 42 % for commit in data['push']['commits']:
43 43 ${commit['url']} by ${commit['author']} at ${commit['date']}
44 44 ${commit['message']}
45 45 ----
46 46
47 47 % endfor
48 48 ''')
49 49
50 50 REPO_PUSH_TEMPLATE_HTML = Template('''
51 51 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
52 52 <html xmlns="http://www.w3.org/1999/xhtml">
53 53 <head>
54 54 <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
55 55 <meta name="viewport" content="width=device-width, initial-scale=1.0"/>
56 56 <title>${subject}</title>
57 57 <style type="text/css">
58 58 /* Based on The MailChimp Reset INLINE: Yes. */
59 59 #outlook a {padding:0;} /* Force Outlook to provide a "view in browser" menu link. */
60 60 body{width:100% !important; -webkit-text-size-adjust:100%; -ms-text-size-adjust:100%; margin:0; padding:0;}
61 61 /* Prevent Webkit and Windows Mobile platforms from changing default font sizes.*/
62 62 .ExternalClass {width:100%;} /* Force Hotmail to display emails at full width */
63 63 .ExternalClass, .ExternalClass p, .ExternalClass span, .ExternalClass font, .ExternalClass td, .ExternalClass div {line-height: 100%;}
64 64 /* Forces Hotmail to display normal line spacing. More on that: http://www.emailonacid.com/forum/viewthread/43/ */
65 65 #backgroundTable {margin:0; padding:0; line-height: 100% !important;}
66 66 /* End reset */
67 67
68 68 /* defaults for images*/
69 69 img {outline:none; text-decoration:none; -ms-interpolation-mode: bicubic;}
70 70 a img {border:none;}
71 71 .image_fix {display:block;}
72 72
73 73 body {line-height:1.2em;}
74 74 p {margin: 0 0 20px;}
75 75 h1, h2, h3, h4, h5, h6 {color:#323232!important;}
76 76 a {color:#427cc9;text-decoration:none;outline:none;cursor:pointer;}
77 77 a:focus {outline:none;}
78 78 a:hover {color: #305b91;}
79 79 h1 a, h2 a, h3 a, h4 a, h5 a, h6 a {color:#427cc9!important;text-decoration:none!important;}
80 80 h1 a:active, h2 a:active, h3 a:active, h4 a:active, h5 a:active, h6 a:active {color: #305b91!important;}
81 81 h1 a:visited, h2 a:visited, h3 a:visited, h4 a:visited, h5 a:visited, h6 a:visited {color: #305b91!important;}
82 82 table {font-size:13px;border-collapse:collapse;mso-table-lspace:0pt;mso-table-rspace:0pt;}
83 83 table td {padding:.65em 1em .65em 0;border-collapse:collapse;vertical-align:top;text-align:left;}
84 84 input {display:inline;border-radius:2px;border-style:solid;border: 1px solid #dbd9da;padding:.5em;}
85 85 input:focus {outline: 1px solid #979797}
86 86 @media only screen and (-webkit-min-device-pixel-ratio: 2) {
87 87 /* Put your iPhone 4g styles in here */
88 88 }
89 89
90 90 /* Android targeting */
91 91 @media only screen and (-webkit-device-pixel-ratio:.75){
92 92 /* Put CSS for low density (ldpi) Android layouts in here */
93 93 }
94 94 @media only screen and (-webkit-device-pixel-ratio:1){
95 95 /* Put CSS for medium density (mdpi) Android layouts in here */
96 96 }
97 97 @media only screen and (-webkit-device-pixel-ratio:1.5){
98 98 /* Put CSS for high density (hdpi) Android layouts in here */
99 99 }
100 100 /* end Android targeting */
101 101
102 102 </style>
103 103
104 104 <!-- Targeting Windows Mobile -->
105 105 <!--[if IEMobile 7]>
106 106 <style type="text/css">
107 107
108 108 </style>
109 109 <![endif]-->
110 110
111 111 <!--[if gte mso 9]>
112 112 <style>
113 113 /* Target Outlook 2007 and 2010 */
114 114 </style>
115 115 <![endif]-->
116 116 </head>
117 117 <body>
118 118 <!-- Wrapper/Container Table: Use a wrapper table to control the width and the background color consistently of your email. Use this approach instead of setting attributes on the body tag. -->
119 119 <table cellpadding="0" cellspacing="0" border="0" id="backgroundTable" align="left" style="margin:1%;width:97%;padding:0;font-family:sans-serif;font-weight:100;border:1px solid #dbd9da">
120 120 <tr>
121 121 <td valign="top" style="padding:0;">
122 122 <table cellpadding="0" cellspacing="0" border="0" align="left" width="100%">
123 123 <tr><td style="width:100%;padding:7px;background-color:#202020" valign="top">
124 124 <a style="color:#eeeeee;text-decoration:none;" href="${instance_url}">
125 125 ${'RhodeCode'}
126 126 </a>
127 127 </td></tr>
128 128 <tr>
129 129 <td style="padding:15px;" valign="top">
130 130 % if data['push']['commits']:
131 131 % for commit in data['push']['commits']:
132 132 <a href="${commit['url']}">${commit['short_id']}</a> by ${commit['author']} at ${commit['date']} <br/>
133 133 ${commit['message_html']} <br/>
134 134 <br/>
135 135 % endfor
136 136 % else:
137 137 No commit data
138 138 % endif
139 139 </td>
140 140 </tr>
141 141 </table>
142 142 </td>
143 143 </tr>
144 144 </table>
145 145 <!-- End of wrapper table -->
146 146 <p><a style="margin-top:15px;margin-left:1%;font-family:sans-serif;font-weight:100;font-size:11px;color:#666666;text-decoration:none;" href="${instance_url}">
147 147 ${'This is a notification from RhodeCode. %(instance_url)s' % {'instance_url': instance_url}}
148 148 </a></p>
149 149 </body>
150 150 </html>
151 151 ''')
152 152
153 153
154 154 class EmailSettingsSchema(colander.Schema):
155 155 @colander.instantiate(validator=colander.Length(min=1))
156 156 class recipients(colander.SequenceSchema):
157 157 title = _('Recipients')
158 158 description = _('Email addresses to send push events to')
159 159 widget = deform.widget.SequenceWidget(min_len=1)
160 160
161 161 recipient = colander.SchemaNode(
162 162 colander.String(),
163 163 title=_('Email address'),
164 164 description=_('Email address'),
165 165 default='',
166 166 validator=colander.Email(),
167 167 widget=deform.widget.TextInputWidget(
168 168 placeholder='user@domain.com',
169 169 ),
170 170 )
171 171
172 172
173 173 class EmailIntegrationType(IntegrationTypeBase):
174 174 key = 'email'
175 175 display_name = _('Email')
176 176 description = _('Send repo push summaries to a list of recipients via email')
177 177
178 178 valid_events = [
179 179 events.RepoPushEvent
180 180 ]
181 181
182 182 @classmethod
183 183 def icon(cls):
184 184 return '''
185 185 <?xml version="1.0" encoding="UTF-8" standalone="no"?>
186 186 <svg
187 187 xmlns:dc="http://purl.org/dc/elements/1.1/"
188 188 xmlns:cc="http://creativecommons.org/ns#"
189 189 xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
190 190 xmlns:svg="http://www.w3.org/2000/svg"
191 191 xmlns="http://www.w3.org/2000/svg"
192 192 xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
193 193 xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
194 194 viewBox="0 -256 1850 1850"
195 195 id="svg2989"
196 196 version="1.1"
197 197 inkscape:version="0.48.3.1 r9886"
198 198 width="100%"
199 199 height="100%"
200 200 sodipodi:docname="envelope_font_awesome.svg">
201 201 <metadata
202 202 id="metadata2999">
203 203 <rdf:RDF>
204 204 <cc:Work
205 205 rdf:about="">
206 206 <dc:format>image/svg+xml</dc:format>
207 207 <dc:type
208 208 rdf:resource="http://purl.org/dc/dcmitype/StillImage" />
209 209 </cc:Work>
210 210 </rdf:RDF>
211 211 </metadata>
212 212 <defs
213 213 id="defs2997" />
214 214 <sodipodi:namedview
215 215 pagecolor="#ffffff"
216 216 bordercolor="#666666"
217 217 borderopacity="1"
218 218 objecttolerance="10"
219 219 gridtolerance="10"
220 220 guidetolerance="10"
221 221 inkscape:pageopacity="0"
222 222 inkscape:pageshadow="2"
223 223 inkscape:window-width="640"
224 224 inkscape:window-height="480"
225 225 id="namedview2995"
226 226 showgrid="false"
227 227 inkscape:zoom="0.13169643"
228 228 inkscape:cx="896"
229 229 inkscape:cy="896"
230 230 inkscape:window-x="0"
231 231 inkscape:window-y="25"
232 232 inkscape:window-maximized="0"
233 233 inkscape:current-layer="svg2989" />
234 234 <g
235 235 transform="matrix(1,0,0,-1,37.966102,1282.678)"
236 236 id="g2991">
237 237 <path
238 238 d="m 1664,32 v 768 q -32,-36 -69,-66 -268,-206 -426,-338 -51,-43 -83,-67 -32,-24 -86.5,-48.5 Q 945,256 897,256 h -1 -1 Q 847,256 792.5,280.5 738,305 706,329 674,353 623,396 465,528 197,734 160,764 128,800 V 32 Q 128,19 137.5,9.5 147,0 160,0 h 1472 q 13,0 22.5,9.5 9.5,9.5 9.5,22.5 z m 0,1051 v 11 13.5 q 0,0 -0.5,13 -0.5,13 -3,12.5 -2.5,-0.5 -5.5,9 -3,9.5 -9,7.5 -6,-2 -14,2.5 H 160 q -13,0 -22.5,-9.5 Q 128,1133 128,1120 128,952 275,836 468,684 676,519 682,514 711,489.5 740,465 757,452 774,439 801.5,420.5 829,402 852,393 q 23,-9 43,-9 h 1 1 q 20,0 43,9 23,9 50.5,27.5 27.5,18.5 44.5,31.5 17,13 46,37.5 29,24.5 35,29.5 208,165 401,317 54,43 100.5,115.5 46.5,72.5 46.5,131.5 z m 128,37 V 32 q 0,-66 -47,-113 -47,-47 -113,-47 H 160 Q 94,-128 47,-81 0,-34 0,32 v 1088 q 0,66 47,113 47,47 113,47 h 1472 q 66,0 113,-47 47,-47 47,-113 z"
239 239 id="path2993"
240 240 inkscape:connector-curvature="0"
241 241 style="fill:currentColor" />
242 242 </g>
243 243 </svg>
244 244 '''
245 245
246 246 def settings_schema(self):
247 247 schema = EmailSettingsSchema()
248 248 schema.add(colander.SchemaNode(
249 249 colander.Set(),
250 250 widget=CheckboxChoiceWidgetDesc(
251 251 values=sorted(
252 252 [(e.name, e.display_name, e.description) for e in self.valid_events]
253 253 ),
254 254 ),
255 255 description="List of events activated for this integration",
256 256 name='events'
257 257 ))
258 258 return schema
259 259
260 260 def send_event(self, event):
261 261 log.debug('handling event %s with integration %s', event.name, self)
262 262
263 263 if event.__class__ not in self.valid_events:
264 264 log.debug('event %r not present in valid event list (%s)', event, self.valid_events)
265 265 return
266 266
267 267 if not self.event_enabled(event):
268 268 # NOTE(marcink): for legacy reasons we're skipping this check...
269 269 # since the email event haven't had any settings...
270 270 pass
271 271
272 272 handler = EmailEventHandler(self.settings)
273 273 handler(event, event_data=event.as_dict())
274 274
275 275
276 276 class EmailEventHandler(object):
277 277 def __init__(self, integration_settings):
278 278 self.integration_settings = integration_settings
279 279
280 280 def __call__(self, event, event_data):
281 281 if isinstance(event, events.RepoPushEvent):
282 282 self.repo_push_handler(event, event_data)
283 283 else:
284 284 log.debug('ignoring event: %r', event)
285 285
286 286 def repo_push_handler(self, event, data):
287 287 commit_num = len(data['push']['commits'])
288 288 server_url = data['server_url']
289 289
290 290 if commit_num == 1:
291 291 if data['push']['branches']:
292 292 _subject = '[{repo_name}] {author} pushed {commit_num} commit on branches: {branches}'
293 293 else:
294 294 _subject = '[{repo_name}] {author} pushed {commit_num} commit'
295 295 subject = _subject.format(
296 296 author=data['actor']['username'],
297 297 repo_name=data['repo']['repo_name'],
298 298 commit_num=commit_num,
299 299 branches=', '.join(
300 300 branch['name'] for branch in data['push']['branches'])
301 301 )
302 302 else:
303 303 if data['push']['branches']:
304 304 _subject = '[{repo_name}] {author} pushed {commit_num} commits on branches: {branches}'
305 305 else:
306 306 _subject = '[{repo_name}] {author} pushed {commit_num} commits'
307 307 subject = _subject.format(
308 308 author=data['actor']['username'],
309 309 repo_name=data['repo']['repo_name'],
310 310 commit_num=commit_num,
311 311 branches=', '.join(
312 312 branch['name'] for branch in data['push']['branches']))
313 313
314 314 email_body_plaintext = render_with_traceback(
315 315 REPO_PUSH_TEMPLATE_PLAINTEXT,
316 316 data=data,
317 317 subject=subject,
318 318 instance_url=server_url)
319 319
320 320 email_body_html = render_with_traceback(
321 321 REPO_PUSH_TEMPLATE_HTML,
322 322 data=data,
323 323 subject=subject,
324 324 instance_url=server_url)
325 325
326 326 recipients = self.integration_settings['recipients']
327 327 for email_address in recipients:
328 run_task(
329 tasks.send_email, email_address, subject,
330 email_body_plaintext, email_body_html)
328 run_task(tasks.send_email, email_address, subject,
329 email_body_plaintext, email_body_html)
@@ -1,324 +1,321 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 23 celery worker \
24 24 --task-events \
25 25 --beat \
26 26 --autoscale=20,2 \
27 27 --max-tasks-per-child 1 \
28 28 --app rhodecode.lib.celerylib.loader \
29 29 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
30 30 --loglevel DEBUG --ini=.dev/dev.ini
31 31 """
32 32 import os
33 33 import logging
34 34 import importlib
35 35
36 36 from celery import Celery
37 37 from celery import signals
38 38 from celery import Task
39 39 from celery import exceptions # pragma: no cover
40 40 from kombu.serialization import register
41 from pyramid.threadlocal import get_current_request
42 41
43 42 import rhodecode
44 43
45 from rhodecode.lib.auth import AuthUser
46 44 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
47 45 from rhodecode.lib.ext_json import json
48 46 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
49 47 from rhodecode.lib.utils2 import str2bool
50 48 from rhodecode.model import meta
51 49
52 50
53 51 register('json_ext', json.dumps, json.loads,
54 52 content_type='application/x-json-ext',
55 53 content_encoding='utf-8')
56 54
57 55 log = logging.getLogger('celery.rhodecode.loader')
58 56
59 57
60 58 def add_preload_arguments(parser):
61 59 parser.add_argument(
62 60 '--ini', default=None,
63 61 help='Path to ini configuration file.'
64 62 )
65 63 parser.add_argument(
66 64 '--ini-var', default=None,
67 65 help='Comma separated list of key=value to pass to ini.'
68 66 )
69 67
70 68
71 69 def get_logger(obj):
72 70 custom_log = logging.getLogger(
73 71 'rhodecode.task.{}'.format(obj.__class__.__name__))
74 72
75 73 if rhodecode.CELERY_ENABLED:
76 74 try:
77 75 custom_log = obj.get_logger()
78 76 except Exception:
79 77 pass
80 78
81 79 return custom_log
82 80
83 81
84 82 imports = ['rhodecode.lib.celerylib.tasks']
85 83
86 84 try:
87 85 # try if we have EE tasks available
88 86 importlib.import_module('rc_ee')
89 87 imports.append('rc_ee.lib.celerylib.tasks')
90 88 except ImportError:
91 89 pass
92 90
93 91
94 92 base_celery_config = {
95 93 'result_backend': 'rpc://',
96 94 'result_expires': 60 * 60 * 24,
97 95 'result_persistent': True,
98 96 'imports': imports,
99 97 'worker_max_tasks_per_child': 100,
100 98 'accept_content': ['json_ext'],
101 99 'task_serializer': 'json_ext',
102 100 'result_serializer': 'json_ext',
103 101 'worker_hijack_root_logger': False,
104 102 'database_table_names': {
105 103 'task': 'beat_taskmeta',
106 104 'group': 'beat_groupmeta',
107 105 }
108 106 }
109 107 # init main celery app
110 108 celery_app = Celery()
111 109 celery_app.user_options['preload'].add(add_preload_arguments)
112 110 ini_file_glob = None
113 111
114 112
115 113 @signals.setup_logging.connect
116 114 def setup_logging_callback(**kwargs):
117 115 setup_logging(ini_file_glob)
118 116
119 117
120 118 @signals.user_preload_options.connect
121 119 def on_preload_parsed(options, **kwargs):
122 120 from rhodecode.config.middleware import get_celery_config
123 121
124 122 ini_location = options['ini']
125 123 ini_vars = options['ini_var']
126 124 celery_app.conf['INI_PYRAMID'] = options['ini']
127 125
128 126 if ini_location is None:
129 127 print('You must provide the paste --ini argument')
130 128 exit(-1)
131 129
132 130 options = None
133 131 if ini_vars is not None:
134 132 options = parse_ini_vars(ini_vars)
135 133
136 134 global ini_file_glob
137 135 ini_file_glob = ini_location
138 136
139 137 log.debug('Bootstrapping RhodeCode application...')
140 138
141 139 try:
142 140 env = bootstrap(ini_location, options=options)
143 141 except Exception:
144 142 log.exception('Failed to bootstrap RhodeCode APP')
145 143 raise
146 144
147 145 log.debug('Got Pyramid ENV: %s', env)
148 146
149 147 celery_settings = get_celery_config(env['registry'].settings)
150 148
151 149 setup_celery_app(
152 150 app=env['app'], root=env['root'], request=env['request'],
153 151 registry=env['registry'], closer=env['closer'],
154 152 celery_settings=celery_settings)
155 153
156 154 # fix the global flag even if it's disabled via .ini file because this
157 155 # is a worker code that doesn't need this to be disabled.
158 156 rhodecode.CELERY_ENABLED = True
159 157
160 158
161 159 @signals.task_prerun.connect
162 160 def task_prerun_signal(task_id, task, args, **kwargs):
163 161 ping_db()
164 162
165 163
166 164 @signals.task_success.connect
167 165 def task_success_signal(result, **kwargs):
168 166 meta.Session.commit()
169 167 closer = celery_app.conf['PYRAMID_CLOSER']
170 168 if closer:
171 169 closer()
172 170
173 171
174 172 @signals.task_retry.connect
175 173 def task_retry_signal(
176 174 request, reason, einfo, **kwargs):
177 175 meta.Session.remove()
178 176 closer = celery_app.conf['PYRAMID_CLOSER']
179 177 if closer:
180 178 closer()
181 179
182 180
183 181 @signals.task_failure.connect
184 182 def task_failure_signal(
185 183 task_id, exception, args, kwargs, traceback, einfo, **kargs):
186 184
187 185 log.error('Task: %s failed !! exc_info: %s', task_id, einfo)
188 186 from rhodecode.lib.exc_tracking import store_exception
189 187 from rhodecode.lib.statsd_client import StatsdClient
190 188
191 189 meta.Session.remove()
192 190
193 191 # simulate sys.exc_info()
194 192 exc_info = (einfo.type, einfo.exception, einfo.tb)
195 193 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
196 194 statsd = StatsdClient.statsd
197 195 if statsd:
198 196 exc_type = "{}.{}".format(einfo.__class__.__module__, einfo.__class__.__name__)
199 197 statsd.incr('rhodecode_exception_total',
200 198 tags=["exc_source:celery", "type:{}".format(exc_type)])
201 199
202 200 closer = celery_app.conf['PYRAMID_CLOSER']
203 201 if closer:
204 202 closer()
205 203
206 204
207 205 @signals.task_revoked.connect
208 206 def task_revoked_signal(
209 207 request, terminated, signum, expired, **kwargs):
210 208 closer = celery_app.conf['PYRAMID_CLOSER']
211 209 if closer:
212 210 closer()
213 211
214 212
213 class UNSET(object):
214 pass
215
216
217 def set_celery_conf(app=UNSET(), root=UNSET(), request=UNSET(), registry=UNSET(), closer=UNSET()):
218
219 if request is not UNSET:
220 celery_app.conf.update({'PYRAMID_REQUEST': request})
221
222 if registry is not UNSET:
223 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
224
225
215 226 def setup_celery_app(app, root, request, registry, closer, celery_settings):
216 227 log.debug('Got custom celery conf: %s', celery_settings)
217 228 celery_config = base_celery_config
218 229 celery_config.update({
219 230 # store celerybeat scheduler db where the .ini file is
220 231 'beat_schedule_filename': registry.settings['celerybeat-schedule.path'],
221 232 })
222 233
223 234 celery_config.update(celery_settings)
224 235 celery_app.config_from_object(celery_config)
225 236
226 237 celery_app.conf.update({'PYRAMID_APP': app})
227 238 celery_app.conf.update({'PYRAMID_ROOT': root})
228 239 celery_app.conf.update({'PYRAMID_REQUEST': request})
229 240 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
230 241 celery_app.conf.update({'PYRAMID_CLOSER': closer})
231 242
232 243
233 244 def configure_celery(config, celery_settings):
234 245 """
235 246 Helper that is called from our application creation logic. It gives
236 247 connection info into running webapp and allows execution of tasks from
237 248 RhodeCode itself
238 249 """
239 250 # store some globals into rhodecode
240 251 rhodecode.CELERY_ENABLED = str2bool(
241 252 config.registry.settings.get('use_celery'))
242 253 if rhodecode.CELERY_ENABLED:
243 254 log.info('Configuring celery based on `%s` settings', celery_settings)
244 255 setup_celery_app(
245 256 app=None, root=None, request=None, registry=config.registry,
246 257 closer=None, celery_settings=celery_settings)
247 258
248 259
249 260 def maybe_prepare_env(req):
250 261 environ = {}
251 262 try:
252 263 environ.update({
253 264 'PATH_INFO': req.environ['PATH_INFO'],
254 265 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
255 266 'HTTP_HOST': req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
256 267 'SERVER_NAME': req.environ['SERVER_NAME'],
257 268 'SERVER_PORT': req.environ['SERVER_PORT'],
258 269 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
259 270 })
260 271 except Exception:
261 272 pass
262 273
263 274 return environ
264 275
265 276
266 277 class RequestContextTask(Task):
267 278 """
268 279 This is a celery task which will create a rhodecode app instance context
269 280 for the task, patch pyramid with the original request
270 281 that created the task and also add the user to the context.
271 282 """
272 283
273 284 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
274 285 link=None, link_error=None, shadow=None, **options):
275 286 """ queue the job to run (we are in web request context here) """
287 from rhodecode.lib.base import get_ip_addr
276 288
277 req = self.app.conf['PYRAMID_REQUEST'] or get_current_request()
289 req = self.app.conf['PYRAMID_REQUEST']
290 if not req:
291 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
278 292
279 293 log.debug('Running Task with class: %s. Request Class: %s',
280 294 self.__class__, req.__class__)
281 295
282 user_id = None
283 ip_addr = None
296 user_id = 0
284 297
285 298 # web case
286 299 if hasattr(req, 'user'):
287 ip_addr = req.user.ip_addr
288 300 user_id = req.user.user_id
289 301
290 302 # api case
291 303 elif hasattr(req, 'rpc_user'):
292 ip_addr = req.rpc_user.ip_addr
293 304 user_id = req.rpc_user.user_id
294 else:
295 if user_id and ip_addr:
296 log.debug('Using data from celery proxy user')
297 305
298 else:
299 raise Exception(
300 'Unable to fetch required data from request: {}. \n'
301 'This task is required to be executed from context of '
302 'request in a webapp. Task: {}'.format(
303 repr(req),
304 self.__class__
305 )
306 )
307
308 if req:
309 # we hook into kwargs since it is the only way to pass our data to
310 # the celery worker
311 environ = maybe_prepare_env(req)
312 options['headers'] = options.get('headers', {})
313 options['headers'].update({
314 'rhodecode_proxy_data': {
315 'environ': environ,
316 'auth_user': {
317 'ip_addr': ip_addr,
318 'user_id': user_id
319 },
320 }
321 })
306 # we hook into kwargs since it is the only way to pass our data to
307 # the celery worker
308 environ = maybe_prepare_env(req)
309 options['headers'] = options.get('headers', {})
310 options['headers'].update({
311 'rhodecode_proxy_data': {
312 'environ': environ,
313 'auth_user': {
314 'ip_addr': get_ip_addr(req.environ),
315 'user_id': user_id
316 },
317 }
318 })
322 319
323 320 return super(RequestContextTask, self).apply_async(
324 321 args, kwargs, task_id, producer, link, link_error, shadow, **options)
@@ -1,347 +1,352 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import tempfile
25 25 import traceback
26 26 import threading
27 27 import socket
28 28 import random
29 29
30 30 from BaseHTTPServer import BaseHTTPRequestHandler
31 31 from SocketServer import TCPServer
32 32
33 33 import rhodecode
34 34 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
35 35 from rhodecode.model import meta
36 36 from rhodecode.lib.base import bootstrap_request, bootstrap_config
37 37 from rhodecode.lib import hooks_base
38 38 from rhodecode.lib.utils2 import AttributeDict
39 39 from rhodecode.lib.ext_json import json
40 40 from rhodecode.lib import rc_cache
41 41
42 42 log = logging.getLogger(__name__)
43 43
44 44
45 45 class HooksHttpHandler(BaseHTTPRequestHandler):
46 46
47 47 def do_POST(self):
48 48 method, extras = self._read_request()
49 49 txn_id = getattr(self.server, 'txn_id', None)
50 50 if txn_id:
51 51 log.debug('Computing TXN_ID based on `%s`:`%s`',
52 52 extras['repository'], extras['txn_id'])
53 53 computed_txn_id = rc_cache.utils.compute_key_from_params(
54 54 extras['repository'], extras['txn_id'])
55 55 if txn_id != computed_txn_id:
56 56 raise Exception(
57 57 'TXN ID fail: expected {} got {} instead'.format(
58 58 txn_id, computed_txn_id))
59 59
60 request = getattr(self.server, 'request', None)
60 61 try:
61 result = self._call_hook(method, extras)
62 hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address))
63 result = self._call_hook_method(hooks, method, extras)
62 64 except Exception as e:
63 65 exc_tb = traceback.format_exc()
64 66 result = {
65 67 'exception': e.__class__.__name__,
66 68 'exception_traceback': exc_tb,
67 69 'exception_args': e.args
68 70 }
69 71 self._write_response(result)
70 72
71 73 def _read_request(self):
72 74 length = int(self.headers['Content-Length'])
73 75 body = self.rfile.read(length).decode('utf-8')
74 76 data = json.loads(body)
75 77 return data['method'], data['extras']
76 78
77 79 def _write_response(self, result):
78 80 self.send_response(200)
79 81 self.send_header("Content-type", "text/json")
80 82 self.end_headers()
81 83 self.wfile.write(json.dumps(result))
82 84
83 def _call_hook(self, method, extras):
84 hooks = Hooks()
85 def _call_hook_method(self, hooks, method, extras):
85 86 try:
86 87 result = getattr(hooks, method)(extras)
87 88 finally:
88 89 meta.Session.remove()
89 90 return result
90 91
91 92 def log_message(self, format, *args):
92 93 """
93 94 This is an overridden method of BaseHTTPRequestHandler which logs using
94 95 logging library instead of writing directly to stderr.
95 96 """
96 97
97 98 message = format % args
98 99
99 100 log.debug(
100 "%s - - [%s] %s", self.client_address[0],
101 "HOOKS: %s - - [%s] %s", self.client_address,
101 102 self.log_date_time_string(), message)
102 103
103 104
104 105 class DummyHooksCallbackDaemon(object):
105 106 hooks_uri = ''
106 107
107 108 def __init__(self):
108 109 self.hooks_module = Hooks.__module__
109 110
110 111 def __enter__(self):
111 112 log.debug('Running `%s` callback daemon', self.__class__.__name__)
112 113 return self
113 114
114 115 def __exit__(self, exc_type, exc_val, exc_tb):
115 116 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
116 117
117 118
118 119 class ThreadedHookCallbackDaemon(object):
119 120
120 121 _callback_thread = None
121 122 _daemon = None
122 123 _done = False
123 124
124 125 def __init__(self, txn_id=None, host=None, port=None):
125 126 self._prepare(txn_id=txn_id, host=host, port=port)
126 127
127 128 def __enter__(self):
128 129 log.debug('Running `%s` callback daemon', self.__class__.__name__)
129 130 self._run()
130 131 return self
131 132
132 133 def __exit__(self, exc_type, exc_val, exc_tb):
133 134 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
134 135 self._stop()
135 136
136 137 def _prepare(self, txn_id=None, host=None, port=None):
137 138 raise NotImplementedError()
138 139
139 140 def _run(self):
140 141 raise NotImplementedError()
141 142
142 143 def _stop(self):
143 144 raise NotImplementedError()
144 145
145 146
146 147 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
147 148 """
148 149 Context manager which will run a callback daemon in a background thread.
149 150 """
150 151
151 152 hooks_uri = None
152 153
153 154 # From Python docs: Polling reduces our responsiveness to a shutdown
154 155 # request and wastes cpu at all other times.
155 156 POLL_INTERVAL = 0.01
156 157
158 @property
159 def _hook_prefix(self):
160 return 'HOOKS: {} '.format(self.hooks_uri)
161
157 162 def get_hostname(self):
158 163 return socket.gethostname() or '127.0.0.1'
159 164
160 165 def get_available_port(self, min_port=20000, max_port=65535):
161 166 from rhodecode.lib.utils2 import get_available_port as _get_port
162 167 return _get_port(min_port, max_port)
163 168
164 169 def _prepare(self, txn_id=None, host=None, port=None):
170 from pyramid.threadlocal import get_current_request
171
165 172 if not host or host == "*":
166 173 host = self.get_hostname()
167 174 if not port:
168 175 port = self.get_available_port()
169 176
170 177 server_address = (host, port)
171 178 self.hooks_uri = '{}:{}'.format(host, port)
172 179 self.txn_id = txn_id
173 180 self._done = False
174 181
175 182 log.debug(
176 "Preparing HTTP callback daemon at `%s` and registering hook object: %s",
177 self.hooks_uri, HooksHttpHandler)
183 "%s Preparing HTTP callback daemon registering hook object: %s",
184 self._hook_prefix, HooksHttpHandler)
178 185
179 186 self._daemon = TCPServer(server_address, HooksHttpHandler)
180 187 # inject transaction_id for later verification
181 188 self._daemon.txn_id = self.txn_id
182 189
190 # pass the WEB app request into daemon
191 self._daemon.request = get_current_request()
192
183 193 def _run(self):
184 194 log.debug("Running event loop of callback daemon in background thread")
185 195 callback_thread = threading.Thread(
186 196 target=self._daemon.serve_forever,
187 197 kwargs={'poll_interval': self.POLL_INTERVAL})
188 198 callback_thread.daemon = True
189 199 callback_thread.start()
190 200 self._callback_thread = callback_thread
191 201
192 202 def _stop(self):
193 203 log.debug("Waiting for background thread to finish.")
194 204 self._daemon.shutdown()
195 205 self._callback_thread.join()
196 206 self._daemon = None
197 207 self._callback_thread = None
198 208 if self.txn_id:
199 209 txn_id_file = get_txn_id_data_path(self.txn_id)
200 210 log.debug('Cleaning up TXN ID %s', txn_id_file)
201 211 if os.path.isfile(txn_id_file):
202 212 os.remove(txn_id_file)
203 213
204 214 log.debug("Background thread done.")
205 215
206 216
207 217 def get_txn_id_data_path(txn_id):
208 218 import rhodecode
209 219
210 220 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
211 221 final_dir = os.path.join(root, 'svn_txn_id')
212 222
213 223 if not os.path.isdir(final_dir):
214 224 os.makedirs(final_dir)
215 225 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
216 226
217 227
218 228 def store_txn_id_data(txn_id, data_dict):
219 229 if not txn_id:
220 230 log.warning('Cannot store txn_id because it is empty')
221 231 return
222 232
223 233 path = get_txn_id_data_path(txn_id)
224 234 try:
225 235 with open(path, 'wb') as f:
226 236 f.write(json.dumps(data_dict))
227 237 except Exception:
228 238 log.exception('Failed to write txn_id metadata')
229 239
230 240
231 241 def get_txn_id_from_store(txn_id):
232 242 """
233 243 Reads txn_id from store and if present returns the data for callback manager
234 244 """
235 245 path = get_txn_id_data_path(txn_id)
236 246 try:
237 247 with open(path, 'rb') as f:
238 248 return json.loads(f.read())
239 249 except Exception:
240 250 return {}
241 251
242 252
243 253 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
244 254 txn_details = get_txn_id_from_store(txn_id)
245 255 port = txn_details.get('port', 0)
246 256 if use_direct_calls:
247 257 callback_daemon = DummyHooksCallbackDaemon()
248 258 extras['hooks_module'] = callback_daemon.hooks_module
249 259 else:
250 260 if protocol == 'http':
251 261 callback_daemon = HttpHooksCallbackDaemon(
252 262 txn_id=txn_id, host=host, port=port)
253 263 else:
254 264 log.error('Unsupported callback daemon protocol "%s"', protocol)
255 265 raise Exception('Unsupported callback daemon protocol.')
256 266
257 267 extras['hooks_uri'] = callback_daemon.hooks_uri
258 268 extras['hooks_protocol'] = protocol
259 269 extras['time'] = time.time()
260 270
261 271 # register txn_id
262 272 extras['txn_id'] = txn_id
263 273 log.debug('Prepared a callback daemon: %s at url `%s`',
264 274 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
265 275 return callback_daemon, extras
266 276
267 277
268 278 class Hooks(object):
269 279 """
270 280 Exposes the hooks for remote call backs
271 281 """
282 def __init__(self, request=None, log_prefix=''):
283 self.log_prefix = log_prefix
284 self.request = request
272 285
273 286 def repo_size(self, extras):
274 log.debug("Called repo_size of %s object", self)
287 log.debug("%sCalled repo_size of %s object", self.log_prefix, self)
275 288 return self._call_hook(hooks_base.repo_size, extras)
276 289
277 290 def pre_pull(self, extras):
278 log.debug("Called pre_pull of %s object", self)
291 log.debug("%sCalled pre_pull of %s object", self.log_prefix, self)
279 292 return self._call_hook(hooks_base.pre_pull, extras)
280 293
281 294 def post_pull(self, extras):
282 log.debug("Called post_pull of %s object", self)
295 log.debug("%sCalled post_pull of %s object", self.log_prefix, self)
283 296 return self._call_hook(hooks_base.post_pull, extras)
284 297
285 298 def pre_push(self, extras):
286 log.debug("Called pre_push of %s object", self)
299 log.debug("%sCalled pre_push of %s object", self.log_prefix, self)
287 300 return self._call_hook(hooks_base.pre_push, extras)
288 301
289 302 def post_push(self, extras):
290 log.debug("Called post_push of %s object", self)
303 log.debug("%sCalled post_push of %s object", self.log_prefix, self)
291 304 return self._call_hook(hooks_base.post_push, extras)
292 305
293 306 def _call_hook(self, hook, extras):
294 307 extras = AttributeDict(extras)
295 308 server_url = extras['server_url']
296 request = bootstrap_request(application_url=server_url)
297 309
298 bootstrap_config(request) # inject routes and other interfaces
299
300 # inject the user for usage in hooks
301 request.user = AttributeDict({'username': extras.username,
302 'ip_addr': extras.ip,
303 'user_id': extras.user_id})
304
305 extras.request = request
310 extras.request = self.request
306 311
307 312 try:
308 313 result = hook(extras)
309 314 if result is None:
310 315 raise Exception(
311 316 'Failed to obtain hook result from func: {}'.format(hook))
312 317 except HTTPBranchProtected as handled_error:
313 318 # Those special cases doesn't need error reporting. It's a case of
314 319 # locked repo or protected branch
315 320 result = AttributeDict({
316 321 'status': handled_error.code,
317 322 'output': handled_error.explanation
318 323 })
319 324 except (HTTPLockedRC, Exception) as error:
320 325 # locked needs different handling since we need to also
321 326 # handle PULL operations
322 327 exc_tb = ''
323 328 if not isinstance(error, HTTPLockedRC):
324 329 exc_tb = traceback.format_exc()
325 log.exception('Exception when handling hook %s', hook)
330 log.exception('%sException when handling hook %s', self.log_prefix, hook)
326 331 error_args = error.args
327 332 return {
328 333 'status': 128,
329 334 'output': '',
330 335 'exception': type(error).__name__,
331 336 'exception_traceback': exc_tb,
332 337 'exception_args': error_args,
333 338 }
334 339 finally:
335 340 meta.Session.remove()
336 341
337 log.debug('Got hook call response %s', result)
342 log.debug('%sGot hook call response %s', self.log_prefix, result)
338 343 return {
339 344 'status': result.status,
340 345 'output': result.output,
341 346 }
342 347
343 348 def __enter__(self):
344 349 return self
345 350
346 351 def __exit__(self, exc_type, exc_val, exc_tb):
347 352 pass
@@ -1,454 +1,453 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2011-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 """
23 23 Model for notifications
24 24 """
25 25
26 26 import logging
27 27 import traceback
28 28
29 29 import premailer
30 30 from pyramid.threadlocal import get_current_request
31 31 from sqlalchemy.sql.expression import false, true
32 32
33 33 import rhodecode
34 34 from rhodecode.lib import helpers as h
35 35 from rhodecode.model import BaseModel
36 36 from rhodecode.model.db import Notification, User, UserNotification
37 37 from rhodecode.model.meta import Session
38 38 from rhodecode.translation import TranslationString
39 39
40 40 log = logging.getLogger(__name__)
41 41
42 42
43 43 class NotificationModel(BaseModel):
44 44
45 45 cls = Notification
46 46
47 47 def __get_notification(self, notification):
48 48 if isinstance(notification, Notification):
49 49 return notification
50 50 elif isinstance(notification, (int, long)):
51 51 return Notification.get(notification)
52 52 else:
53 53 if notification:
54 54 raise Exception('notification must be int, long or Instance'
55 55 ' of Notification got %s' % type(notification))
56 56
57 57 def create(
58 58 self, created_by, notification_subject='', notification_body='',
59 59 notification_type=Notification.TYPE_MESSAGE, recipients=None,
60 60 mention_recipients=None, with_email=True, email_kwargs=None):
61 61 """
62 62
63 63 Creates notification of given type
64 64
65 65 :param created_by: int, str or User instance. User who created this
66 66 notification
67 67 :param notification_subject: subject of notification itself,
68 68 it will be generated automatically from notification_type if not specified
69 69 :param notification_body: body of notification text
70 70 it will be generated automatically from notification_type if not specified
71 71 :param notification_type: type of notification, based on that we
72 72 pick templates
73 73 :param recipients: list of int, str or User objects, when None
74 74 is given send to all admins
75 75 :param mention_recipients: list of int, str or User objects,
76 76 that were mentioned
77 77 :param with_email: send email with this notification
78 78 :param email_kwargs: dict with arguments to generate email
79 79 """
80 80
81 81 from rhodecode.lib.celerylib import tasks, run_task
82 82
83 83 if recipients and not getattr(recipients, '__iter__', False):
84 84 raise Exception('recipients must be an iterable object')
85 85
86 86 if not (notification_subject and notification_body) and not notification_type:
87 87 raise ValueError('notification_subject, and notification_body '
88 88 'cannot be empty when notification_type is not specified')
89 89
90 90 created_by_obj = self._get_user(created_by)
91 91
92 92 if not created_by_obj:
93 93 raise Exception('unknown user %s' % created_by)
94 94
95 95 # default MAIN body if not given
96 96 email_kwargs = email_kwargs or {'body': notification_body}
97 97 mention_recipients = mention_recipients or set()
98 98
99 99 if recipients is None:
100 100 # recipients is None means to all admins
101 101 recipients_objs = User.query().filter(User.admin == true()).all()
102 102 log.debug('sending notifications %s to admins: %s',
103 103 notification_type, recipients_objs)
104 104 else:
105 105 recipients_objs = set()
106 106 for u in recipients:
107 107 obj = self._get_user(u)
108 108 if obj:
109 109 recipients_objs.add(obj)
110 110 else: # we didn't find this user, log the error and carry on
111 111 log.error('cannot notify unknown user %r', u)
112 112
113 113 if not recipients_objs:
114 114 raise Exception('no valid recipients specified')
115 115
116 116 log.debug('sending notifications %s to %s',
117 117 notification_type, recipients_objs)
118 118
119 119 # add mentioned users into recipients
120 120 final_recipients = set(recipients_objs).union(mention_recipients)
121 121
122 122 (subject, email_body, email_body_plaintext) = \
123 123 EmailNotificationModel().render_email(notification_type, **email_kwargs)
124 124
125 125 if not notification_subject:
126 126 notification_subject = subject
127 127
128 128 if not notification_body:
129 129 notification_body = email_body_plaintext
130 130
131 131 notification = Notification.create(
132 132 created_by=created_by_obj, subject=notification_subject,
133 133 body=notification_body, recipients=final_recipients,
134 134 type_=notification_type
135 135 )
136 136
137 137 if not with_email: # skip sending email, and just create notification
138 138 return notification
139 139
140 140 # don't send email to person who created this comment
141 141 rec_objs = set(recipients_objs).difference({created_by_obj})
142 142
143 143 # now notify all recipients in question
144 144
145 145 for recipient in rec_objs.union(mention_recipients):
146 146 # inject current recipient
147 147 email_kwargs['recipient'] = recipient
148 148 email_kwargs['mention'] = recipient in mention_recipients
149 149 (subject, email_body, email_body_plaintext) = EmailNotificationModel().render_email(
150 150 notification_type, **email_kwargs)
151 151
152 152 extra_headers = None
153 153 if 'thread_ids' in email_kwargs:
154 154 extra_headers = {'thread_ids': email_kwargs.pop('thread_ids')}
155 155
156 156 log.debug('Creating notification email task for user:`%s`', recipient)
157 task = run_task(
158 tasks.send_email, recipient.email, subject,
159 email_body_plaintext, email_body, extra_headers=extra_headers)
157 task = run_task(tasks.send_email, recipient.email, subject,
158 email_body_plaintext, email_body, extra_headers=extra_headers)
160 159 log.debug('Created email task: %s', task)
161 160
162 161 return notification
163 162
164 163 def delete(self, user, notification):
165 164 # we don't want to remove actual notification just the assignment
166 165 try:
167 166 notification = self.__get_notification(notification)
168 167 user = self._get_user(user)
169 168 if notification and user:
170 169 obj = UserNotification.query()\
171 170 .filter(UserNotification.user == user)\
172 171 .filter(UserNotification.notification == notification)\
173 172 .one()
174 173 Session().delete(obj)
175 174 return True
176 175 except Exception:
177 176 log.error(traceback.format_exc())
178 177 raise
179 178
180 179 def get_for_user(self, user, filter_=None):
181 180 """
182 181 Get mentions for given user, filter them if filter dict is given
183 182 """
184 183 user = self._get_user(user)
185 184
186 185 q = UserNotification.query()\
187 186 .filter(UserNotification.user == user)\
188 187 .join((
189 188 Notification, UserNotification.notification_id ==
190 189 Notification.notification_id))
191 190 if filter_ == ['all']:
192 191 q = q # no filter
193 192 elif filter_ == ['unread']:
194 193 q = q.filter(UserNotification.read == false())
195 194 elif filter_:
196 195 q = q.filter(Notification.type_.in_(filter_))
197 196
198 197 return q
199 198
200 199 def mark_read(self, user, notification):
201 200 try:
202 201 notification = self.__get_notification(notification)
203 202 user = self._get_user(user)
204 203 if notification and user:
205 204 obj = UserNotification.query()\
206 205 .filter(UserNotification.user == user)\
207 206 .filter(UserNotification.notification == notification)\
208 207 .one()
209 208 obj.read = True
210 209 Session().add(obj)
211 210 return True
212 211 except Exception:
213 212 log.error(traceback.format_exc())
214 213 raise
215 214
216 215 def mark_all_read_for_user(self, user, filter_=None):
217 216 user = self._get_user(user)
218 217 q = UserNotification.query()\
219 218 .filter(UserNotification.user == user)\
220 219 .filter(UserNotification.read == false())\
221 220 .join((
222 221 Notification, UserNotification.notification_id ==
223 222 Notification.notification_id))
224 223 if filter_ == ['unread']:
225 224 q = q.filter(UserNotification.read == false())
226 225 elif filter_:
227 226 q = q.filter(Notification.type_.in_(filter_))
228 227
229 228 # this is a little inefficient but sqlalchemy doesn't support
230 229 # update on joined tables :(
231 230 for obj in q.all():
232 231 obj.read = True
233 232 Session().add(obj)
234 233
235 234 def get_unread_cnt_for_user(self, user):
236 235 user = self._get_user(user)
237 236 return UserNotification.query()\
238 237 .filter(UserNotification.read == false())\
239 238 .filter(UserNotification.user == user).count()
240 239
241 240 def get_unread_for_user(self, user):
242 241 user = self._get_user(user)
243 242 return [x.notification for x in UserNotification.query()
244 243 .filter(UserNotification.read == false())
245 244 .filter(UserNotification.user == user).all()]
246 245
247 246 def get_user_notification(self, user, notification):
248 247 user = self._get_user(user)
249 248 notification = self.__get_notification(notification)
250 249
251 250 return UserNotification.query()\
252 251 .filter(UserNotification.notification == notification)\
253 252 .filter(UserNotification.user == user).scalar()
254 253
255 254 def make_description(self, notification, translate, show_age=True):
256 255 """
257 256 Creates a human readable description based on properties
258 257 of notification object
259 258 """
260 259 _ = translate
261 260 _map = {
262 261 notification.TYPE_CHANGESET_COMMENT: [
263 262 _('%(user)s commented on commit %(date_or_age)s'),
264 263 _('%(user)s commented on commit at %(date_or_age)s'),
265 264 ],
266 265 notification.TYPE_MESSAGE: [
267 266 _('%(user)s sent message %(date_or_age)s'),
268 267 _('%(user)s sent message at %(date_or_age)s'),
269 268 ],
270 269 notification.TYPE_MENTION: [
271 270 _('%(user)s mentioned you %(date_or_age)s'),
272 271 _('%(user)s mentioned you at %(date_or_age)s'),
273 272 ],
274 273 notification.TYPE_REGISTRATION: [
275 274 _('%(user)s registered in RhodeCode %(date_or_age)s'),
276 275 _('%(user)s registered in RhodeCode at %(date_or_age)s'),
277 276 ],
278 277 notification.TYPE_PULL_REQUEST: [
279 278 _('%(user)s opened new pull request %(date_or_age)s'),
280 279 _('%(user)s opened new pull request at %(date_or_age)s'),
281 280 ],
282 281 notification.TYPE_PULL_REQUEST_UPDATE: [
283 282 _('%(user)s updated pull request %(date_or_age)s'),
284 283 _('%(user)s updated pull request at %(date_or_age)s'),
285 284 ],
286 285 notification.TYPE_PULL_REQUEST_COMMENT: [
287 286 _('%(user)s commented on pull request %(date_or_age)s'),
288 287 _('%(user)s commented on pull request at %(date_or_age)s'),
289 288 ],
290 289 }
291 290
292 291 templates = _map[notification.type_]
293 292
294 293 if show_age:
295 294 template = templates[0]
296 295 date_or_age = h.age(notification.created_on)
297 296 if translate:
298 297 date_or_age = translate(date_or_age)
299 298
300 299 if isinstance(date_or_age, TranslationString):
301 300 date_or_age = date_or_age.interpolate()
302 301
303 302 else:
304 303 template = templates[1]
305 304 date_or_age = h.format_date(notification.created_on)
306 305
307 306 return template % {
308 307 'user': notification.created_by_user.username,
309 308 'date_or_age': date_or_age,
310 309 }
311 310
312 311
313 312 # Templates for Titles, that could be overwritten by rcextensions
314 313 # Title of email for pull-request update
315 314 EMAIL_PR_UPDATE_SUBJECT_TEMPLATE = ''
316 315 # Title of email for request for pull request review
317 316 EMAIL_PR_REVIEW_SUBJECT_TEMPLATE = ''
318 317
319 318 # Title of email for general comment on pull request
320 319 EMAIL_PR_COMMENT_SUBJECT_TEMPLATE = ''
321 320 # Title of email for general comment which includes status change on pull request
322 321 EMAIL_PR_COMMENT_STATUS_CHANGE_SUBJECT_TEMPLATE = ''
323 322 # Title of email for inline comment on a file in pull request
324 323 EMAIL_PR_COMMENT_FILE_SUBJECT_TEMPLATE = ''
325 324
326 325 # Title of email for general comment on commit
327 326 EMAIL_COMMENT_SUBJECT_TEMPLATE = ''
328 327 # Title of email for general comment which includes status change on commit
329 328 EMAIL_COMMENT_STATUS_CHANGE_SUBJECT_TEMPLATE = ''
330 329 # Title of email for inline comment on a file in commit
331 330 EMAIL_COMMENT_FILE_SUBJECT_TEMPLATE = ''
332 331
333 332 import cssutils
334 333 # hijack css utils logger and replace with ours
335 334 log = logging.getLogger('rhodecode.cssutils.premailer')
336 335 cssutils.log.setLog(log)
337 336
338 337
339 338 class EmailNotificationModel(BaseModel):
340 339 TYPE_COMMIT_COMMENT = Notification.TYPE_CHANGESET_COMMENT
341 340 TYPE_REGISTRATION = Notification.TYPE_REGISTRATION
342 341 TYPE_PULL_REQUEST = Notification.TYPE_PULL_REQUEST
343 342 TYPE_PULL_REQUEST_COMMENT = Notification.TYPE_PULL_REQUEST_COMMENT
344 343 TYPE_PULL_REQUEST_UPDATE = Notification.TYPE_PULL_REQUEST_UPDATE
345 344 TYPE_MAIN = Notification.TYPE_MESSAGE
346 345
347 346 TYPE_PASSWORD_RESET = 'password_reset'
348 347 TYPE_PASSWORD_RESET_CONFIRMATION = 'password_reset_confirmation'
349 348 TYPE_EMAIL_TEST = 'email_test'
350 349 TYPE_EMAIL_EXCEPTION = 'exception'
351 350 TYPE_UPDATE_AVAILABLE = 'update_available'
352 351 TYPE_TEST = 'test'
353 352
354 353 email_types = {
355 354 TYPE_MAIN:
356 355 'rhodecode:templates/email_templates/main.mako',
357 356 TYPE_TEST:
358 357 'rhodecode:templates/email_templates/test.mako',
359 358 TYPE_EMAIL_EXCEPTION:
360 359 'rhodecode:templates/email_templates/exception_tracker.mako',
361 360 TYPE_UPDATE_AVAILABLE:
362 361 'rhodecode:templates/email_templates/update_available.mako',
363 362 TYPE_EMAIL_TEST:
364 363 'rhodecode:templates/email_templates/email_test.mako',
365 364 TYPE_REGISTRATION:
366 365 'rhodecode:templates/email_templates/user_registration.mako',
367 366 TYPE_PASSWORD_RESET:
368 367 'rhodecode:templates/email_templates/password_reset.mako',
369 368 TYPE_PASSWORD_RESET_CONFIRMATION:
370 369 'rhodecode:templates/email_templates/password_reset_confirmation.mako',
371 370 TYPE_COMMIT_COMMENT:
372 371 'rhodecode:templates/email_templates/commit_comment.mako',
373 372 TYPE_PULL_REQUEST:
374 373 'rhodecode:templates/email_templates/pull_request_review.mako',
375 374 TYPE_PULL_REQUEST_COMMENT:
376 375 'rhodecode:templates/email_templates/pull_request_comment.mako',
377 376 TYPE_PULL_REQUEST_UPDATE:
378 377 'rhodecode:templates/email_templates/pull_request_update.mako',
379 378 }
380 379
381 380 premailer_instance = premailer.Premailer()
382 381
383 382 def __init__(self):
384 383 """
385 384 Example usage::
386 385
387 386 (subject, email_body, email_body_plaintext) = EmailNotificationModel().render_email(
388 387 EmailNotificationModel.TYPE_TEST, **email_kwargs)
389 388
390 389 """
391 390 super(EmailNotificationModel, self).__init__()
392 391 self.rhodecode_instance_name = rhodecode.CONFIG.get('rhodecode_title')
393 392
394 393 def _update_kwargs_for_render(self, kwargs):
395 394 """
396 395 Inject params required for Mako rendering
397 396
398 397 :param kwargs:
399 398 """
400 399
401 400 kwargs['rhodecode_instance_name'] = self.rhodecode_instance_name
402 401 kwargs['rhodecode_version'] = rhodecode.__version__
403 402 instance_url = h.route_url('home')
404 403 _kwargs = {
405 404 'instance_url': instance_url,
406 405 'whitespace_filter': self.whitespace_filter,
407 406 'email_pr_update_subject_template': EMAIL_PR_UPDATE_SUBJECT_TEMPLATE,
408 407 'email_pr_review_subject_template': EMAIL_PR_REVIEW_SUBJECT_TEMPLATE,
409 408 'email_pr_comment_subject_template': EMAIL_PR_COMMENT_SUBJECT_TEMPLATE,
410 409 'email_pr_comment_status_change_subject_template': EMAIL_PR_COMMENT_STATUS_CHANGE_SUBJECT_TEMPLATE,
411 410 'email_pr_comment_file_subject_template': EMAIL_PR_COMMENT_FILE_SUBJECT_TEMPLATE,
412 411 'email_comment_subject_template': EMAIL_COMMENT_SUBJECT_TEMPLATE,
413 412 'email_comment_status_change_subject_template': EMAIL_COMMENT_STATUS_CHANGE_SUBJECT_TEMPLATE,
414 413 'email_comment_file_subject_template': EMAIL_COMMENT_FILE_SUBJECT_TEMPLATE,
415 414 }
416 415 _kwargs.update(kwargs)
417 416 return _kwargs
418 417
419 418 def whitespace_filter(self, text):
420 419 return text.replace('\n', '').replace('\t', '')
421 420
422 421 def get_renderer(self, type_, request):
423 422 template_name = self.email_types[type_]
424 423 return request.get_partial_renderer(template_name)
425 424
426 425 def render_email(self, type_, **kwargs):
427 426 """
428 427 renders template for email, and returns a tuple of
429 428 (subject, email_headers, email_html_body, email_plaintext_body)
430 429 """
431 430 request = get_current_request()
432 431
433 432 # translator and helpers inject
434 433 _kwargs = self._update_kwargs_for_render(kwargs)
435 434 email_template = self.get_renderer(type_, request=request)
436 435 subject = email_template.render('subject', **_kwargs)
437 436
438 437 try:
439 438 body_plaintext = email_template.render('body_plaintext', **_kwargs)
440 439 except AttributeError:
441 440 # it's not defined in template, ok we can skip it
442 441 body_plaintext = ''
443 442
444 443 # render WHOLE template
445 444 body = email_template.render(None, **_kwargs)
446 445
447 446 try:
448 447 # Inline CSS styles and conversion
449 448 body = self.premailer_instance.transform(body)
450 449 except Exception:
451 450 log.exception('Failed to parse body with premailer')
452 451 pass
453 452
454 453 return subject, body, body_plaintext
@@ -1,392 +1,398 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import io
21 21 import shlex
22 22
23 23 import math
24 24 import re
25 25 import os
26 26 import datetime
27 27 import logging
28 28 import Queue
29 29 import subprocess32
30 30
31 31
32 32 from dateutil.parser import parse
33 33 from pyramid.threadlocal import get_current_request
34 34 from pyramid.interfaces import IRoutesMapper
35 35 from pyramid.settings import asbool
36 36 from pyramid.path import AssetResolver
37 37 from threading import Thread
38 38
39 39 from rhodecode.config.jsroutes import generate_jsroutes_content
40 40 from rhodecode.lib.base import get_auth_user
41 41
42 42 import rhodecode
43 43
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 def add_renderer_globals(event):
49 49 from rhodecode.lib import helpers
50 50
51 51 # TODO: When executed in pyramid view context the request is not available
52 52 # in the event. Find a better solution to get the request.
53 53 request = event['request'] or get_current_request()
54 54
55 55 # Add Pyramid translation as '_' to context
56 56 event['_'] = request.translate
57 57 event['_ungettext'] = request.plularize
58 58 event['h'] = helpers
59 59
60 60
61 61 def set_user_lang(event):
62 62 request = event.request
63 63 cur_user = getattr(request, 'user', None)
64 64
65 65 if cur_user:
66 66 user_lang = cur_user.get_instance().user_data.get('language')
67 67 if user_lang:
68 68 log.debug('lang: setting current user:%s language to: %s', cur_user, user_lang)
69 69 event.request._LOCALE_ = user_lang
70 70
71 71
72 def update_celery_conf(event):
73 from rhodecode.lib.celerylib.loader import set_celery_conf
74 log.debug('Setting celery config from new request')
75 set_celery_conf(request=event.request, registry=event.request.registry)
76
77
72 78 def add_request_user_context(event):
73 79 """
74 80 Adds auth user into request context
75 81 """
76 82 request = event.request
77 83 # access req_id as soon as possible
78 84 req_id = request.req_id
79 85
80 86 if hasattr(request, 'vcs_call'):
81 87 # skip vcs calls
82 88 return
83 89
84 90 if hasattr(request, 'rpc_method'):
85 91 # skip api calls
86 92 return
87 93
88 94 auth_user, auth_token = get_auth_user(request)
89 95 request.user = auth_user
90 96 request.user_auth_token = auth_token
91 97 request.environ['rc_auth_user'] = auth_user
92 98 request.environ['rc_auth_user_id'] = auth_user.user_id
93 99 request.environ['rc_req_id'] = req_id
94 100
95 101
96 102 def reset_log_bucket(event):
97 103 """
98 104 reset the log bucket on new request
99 105 """
100 106 request = event.request
101 107 request.req_id_records_init()
102 108
103 109
104 110 def scan_repositories_if_enabled(event):
105 111 """
106 112 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
107 113 does a repository scan if enabled in the settings.
108 114 """
109 115 settings = event.app.registry.settings
110 116 vcs_server_enabled = settings['vcs.server.enable']
111 117 import_on_startup = settings['startup.import_repos']
112 118 if vcs_server_enabled and import_on_startup:
113 119 from rhodecode.model.scm import ScmModel
114 120 from rhodecode.lib.utils import repo2db_mapper, get_rhodecode_base_path
115 121 repositories = ScmModel().repo_scan(get_rhodecode_base_path())
116 122 repo2db_mapper(repositories, remove_obsolete=False)
117 123
118 124
119 125 def write_metadata_if_needed(event):
120 126 """
121 127 Writes upgrade metadata
122 128 """
123 129 import rhodecode
124 130 from rhodecode.lib import system_info
125 131 from rhodecode.lib import ext_json
126 132
127 133 fname = '.rcmetadata.json'
128 134 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
129 135 metadata_destination = os.path.join(ini_loc, fname)
130 136
131 137 def get_update_age():
132 138 now = datetime.datetime.utcnow()
133 139
134 140 with open(metadata_destination, 'rb') as f:
135 141 data = ext_json.json.loads(f.read())
136 142 if 'created_on' in data:
137 143 update_date = parse(data['created_on'])
138 144 diff = now - update_date
139 145 return diff.total_seconds() / 60.0
140 146
141 147 return 0
142 148
143 149 def write():
144 150 configuration = system_info.SysInfo(
145 151 system_info.rhodecode_config)()['value']
146 152 license_token = configuration['config']['license_token']
147 153
148 154 setup = dict(
149 155 workers=configuration['config']['server:main'].get(
150 156 'workers', '?'),
151 157 worker_type=configuration['config']['server:main'].get(
152 158 'worker_class', 'sync'),
153 159 )
154 160 dbinfo = system_info.SysInfo(system_info.database_info)()['value']
155 161 del dbinfo['url']
156 162
157 163 metadata = dict(
158 164 desc='upgrade metadata info',
159 165 license_token=license_token,
160 166 created_on=datetime.datetime.utcnow().isoformat(),
161 167 usage=system_info.SysInfo(system_info.usage_info)()['value'],
162 168 platform=system_info.SysInfo(system_info.platform_type)()['value'],
163 169 database=dbinfo,
164 170 cpu=system_info.SysInfo(system_info.cpu)()['value'],
165 171 memory=system_info.SysInfo(system_info.memory)()['value'],
166 172 setup=setup
167 173 )
168 174
169 175 with open(metadata_destination, 'wb') as f:
170 176 f.write(ext_json.json.dumps(metadata))
171 177
172 178 settings = event.app.registry.settings
173 179 if settings.get('metadata.skip'):
174 180 return
175 181
176 182 # only write this every 24h, workers restart caused unwanted delays
177 183 try:
178 184 age_in_min = get_update_age()
179 185 except Exception:
180 186 age_in_min = 0
181 187
182 188 if age_in_min > 60 * 60 * 24:
183 189 return
184 190
185 191 try:
186 192 write()
187 193 except Exception:
188 194 pass
189 195
190 196
191 197 def write_usage_data(event):
192 198 import rhodecode
193 199 from rhodecode.lib import system_info
194 200 from rhodecode.lib import ext_json
195 201
196 202 settings = event.app.registry.settings
197 203 instance_tag = settings.get('metadata.write_usage_tag')
198 204 if not settings.get('metadata.write_usage'):
199 205 return
200 206
201 207 def get_update_age(dest_file):
202 208 now = datetime.datetime.utcnow()
203 209
204 210 with open(dest_file, 'rb') as f:
205 211 data = ext_json.json.loads(f.read())
206 212 if 'created_on' in data:
207 213 update_date = parse(data['created_on'])
208 214 diff = now - update_date
209 215 return math.ceil(diff.total_seconds() / 60.0)
210 216
211 217 return 0
212 218
213 219 utc_date = datetime.datetime.utcnow()
214 220 hour_quarter = int(math.ceil((utc_date.hour + utc_date.minute/60.0) / 6.))
215 221 fname = '.rc_usage_{date.year}{date.month:02d}{date.day:02d}_{hour}.json'.format(
216 222 date=utc_date, hour=hour_quarter)
217 223 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
218 224
219 225 usage_dir = os.path.join(ini_loc, '.rcusage')
220 226 if not os.path.isdir(usage_dir):
221 227 os.makedirs(usage_dir)
222 228 usage_metadata_destination = os.path.join(usage_dir, fname)
223 229
224 230 try:
225 231 age_in_min = get_update_age(usage_metadata_destination)
226 232 except Exception:
227 233 age_in_min = 0
228 234
229 235 # write every 6th hour
230 236 if age_in_min and age_in_min < 60 * 6:
231 237 log.debug('Usage file created %s minutes ago, skipping (threshold: %s minutes)...',
232 238 age_in_min, 60 * 6)
233 239 return
234 240
235 241 def write(dest_file):
236 242 configuration = system_info.SysInfo(system_info.rhodecode_config)()['value']
237 243 license_token = configuration['config']['license_token']
238 244
239 245 metadata = dict(
240 246 desc='Usage data',
241 247 instance_tag=instance_tag,
242 248 license_token=license_token,
243 249 created_on=datetime.datetime.utcnow().isoformat(),
244 250 usage=system_info.SysInfo(system_info.usage_info)()['value'],
245 251 )
246 252
247 253 with open(dest_file, 'wb') as f:
248 254 f.write(ext_json.json.dumps(metadata, indent=2, sort_keys=True))
249 255
250 256 try:
251 257 log.debug('Writing usage file at: %s', usage_metadata_destination)
252 258 write(usage_metadata_destination)
253 259 except Exception:
254 260 pass
255 261
256 262
257 263 def write_js_routes_if_enabled(event):
258 264 registry = event.app.registry
259 265
260 266 mapper = registry.queryUtility(IRoutesMapper)
261 267 _argument_prog = re.compile('\{(.*?)\}|:\((.*)\)')
262 268
263 269 def _extract_route_information(route):
264 270 """
265 271 Convert a route into tuple(name, path, args), eg:
266 272 ('show_user', '/profile/%(username)s', ['username'])
267 273 """
268 274
269 275 routepath = route.pattern
270 276 pattern = route.pattern
271 277
272 278 def replace(matchobj):
273 279 if matchobj.group(1):
274 280 return "%%(%s)s" % matchobj.group(1).split(':')[0]
275 281 else:
276 282 return "%%(%s)s" % matchobj.group(2)
277 283
278 284 routepath = _argument_prog.sub(replace, routepath)
279 285
280 286 if not routepath.startswith('/'):
281 287 routepath = '/'+routepath
282 288
283 289 return (
284 290 route.name,
285 291 routepath,
286 292 [(arg[0].split(':')[0] if arg[0] != '' else arg[1])
287 293 for arg in _argument_prog.findall(pattern)]
288 294 )
289 295
290 296 def get_routes():
291 297 # pyramid routes
292 298 for route in mapper.get_routes():
293 299 if not route.name.startswith('__'):
294 300 yield _extract_route_information(route)
295 301
296 302 if asbool(registry.settings.get('generate_js_files', 'false')):
297 303 static_path = AssetResolver().resolve('rhodecode:public').abspath()
298 304 jsroutes = get_routes()
299 305 jsroutes_file_content = generate_jsroutes_content(jsroutes)
300 306 jsroutes_file_path = os.path.join(
301 307 static_path, 'js', 'rhodecode', 'routes.js')
302 308
303 309 try:
304 310 with io.open(jsroutes_file_path, 'w', encoding='utf-8') as f:
305 311 f.write(jsroutes_file_content)
306 312 except Exception:
307 313 log.exception('Failed to write routes.js into %s', jsroutes_file_path)
308 314
309 315
310 316 class Subscriber(object):
311 317 """
312 318 Base class for subscribers to the pyramid event system.
313 319 """
314 320 def __call__(self, event):
315 321 self.run(event)
316 322
317 323 def run(self, event):
318 324 raise NotImplementedError('Subclass has to implement this.')
319 325
320 326
321 327 class AsyncSubscriber(Subscriber):
322 328 """
323 329 Subscriber that handles the execution of events in a separate task to not
324 330 block the execution of the code which triggers the event. It puts the
325 331 received events into a queue from which the worker process takes them in
326 332 order.
327 333 """
328 334 def __init__(self):
329 335 self._stop = False
330 336 self._eventq = Queue.Queue()
331 337 self._worker = self.create_worker()
332 338 self._worker.start()
333 339
334 340 def __call__(self, event):
335 341 self._eventq.put(event)
336 342
337 343 def create_worker(self):
338 344 worker = Thread(target=self.do_work)
339 345 worker.daemon = True
340 346 return worker
341 347
342 348 def stop_worker(self):
343 349 self._stop = False
344 350 self._eventq.put(None)
345 351 self._worker.join()
346 352
347 353 def do_work(self):
348 354 while not self._stop:
349 355 event = self._eventq.get()
350 356 if event is not None:
351 357 self.run(event)
352 358
353 359
354 360 class AsyncSubprocessSubscriber(AsyncSubscriber):
355 361 """
356 362 Subscriber that uses the subprocess32 module to execute a command if an
357 363 event is received. Events are handled asynchronously::
358 364
359 365 subscriber = AsyncSubprocessSubscriber('ls -la', timeout=10)
360 366 subscriber(dummyEvent) # running __call__(event)
361 367
362 368 """
363 369
364 370 def __init__(self, cmd, timeout=None):
365 371 if not isinstance(cmd, (list, tuple)):
366 372 cmd = shlex.split(cmd)
367 373 super(AsyncSubprocessSubscriber, self).__init__()
368 374 self._cmd = cmd
369 375 self._timeout = timeout
370 376
371 377 def run(self, event):
372 378 cmd = self._cmd
373 379 timeout = self._timeout
374 380 log.debug('Executing command %s.', cmd)
375 381
376 382 try:
377 383 output = subprocess32.check_output(
378 384 cmd, timeout=timeout, stderr=subprocess32.STDOUT)
379 385 log.debug('Command finished %s', cmd)
380 386 if output:
381 387 log.debug('Command output: %s', output)
382 388 except subprocess32.TimeoutExpired as e:
383 389 log.exception('Timeout while executing command.')
384 390 if e.output:
385 391 log.error('Command output: %s', e.output)
386 392 except subprocess32.CalledProcessError as e:
387 393 log.exception('Error while executing command.')
388 394 if e.output:
389 395 log.error('Command output: %s', e.output)
390 396 except Exception:
391 397 log.exception(
392 398 'Exception while executing command %s.', cmd)
@@ -1,122 +1,124 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 import logging
23 23 from pyramid.httpexceptions import HTTPException, HTTPBadRequest
24 24
25 25 from rhodecode.lib.middleware.vcs import (
26 26 detect_vcs_request, VCS_TYPE_KEY, VCS_TYPE_SKIP)
27 27
28 28
29 29 log = logging.getLogger(__name__)
30 30
31 31
32 32 def vcs_detection_tween_factory(handler, registry):
33 33
34 34 def vcs_detection_tween(request):
35 35 """
36 36 Do detection of vcs type, and save results for other layers to re-use
37 37 this information
38 38 """
39 39 vcs_server_enabled = request.registry.settings.get('vcs.server.enable')
40 40 vcs_handler = vcs_server_enabled and detect_vcs_request(
41 41 request.environ, request.registry.settings.get('vcs.backends'))
42 42
43 43 if vcs_handler:
44 44 # save detected VCS type for later re-use
45 45 request.environ[VCS_TYPE_KEY] = vcs_handler.SCM
46 46 request.vcs_call = vcs_handler.SCM
47 47
48 48 log.debug('Processing request with `%s` handler', handler)
49 49 return handler(request)
50 50
51 51 # mark that we didn't detect an VCS, and we can skip detection later on
52 52 request.environ[VCS_TYPE_KEY] = VCS_TYPE_SKIP
53 53
54 54 log.debug('Processing request with `%s` handler', handler)
55 55 return handler(request)
56 56
57 57 return vcs_detection_tween
58 58
59 59
60 60 def junk_encoding_detector(request):
61 61 """
62 62 Detect bad encoded GET params, and fail immediately with BadRequest
63 63 """
64 64
65 65 try:
66 66 request.GET.get("", None)
67 67 except UnicodeDecodeError:
68 68 raise HTTPBadRequest("Invalid bytes in query string.")
69 69
70 70
71 71 def bad_url_data_detector(request):
72 72 """
73 73 Detect invalid bytes in a path.
74 74 """
75 75 try:
76 76 request.path_info
77 77 except UnicodeDecodeError:
78 78 raise HTTPBadRequest("Invalid bytes in URL.")
79 79
80 80
81 81 def junk_form_data_detector(request):
82 82 """
83 83 Detect bad encoded POST params, and fail immediately with BadRequest
84 84 """
85 85
86 86 if request.method == "POST":
87 87 try:
88 88 request.POST.get("", None)
89 89 except ValueError:
90 90 raise HTTPBadRequest("Invalid bytes in form data.")
91 91
92 92
93 93 def sanity_check_factory(handler, registry):
94 94 def sanity_check(request):
95 95 log.debug('Checking current URL sanity for bad data')
96 96 try:
97 97 junk_encoding_detector(request)
98 98 bad_url_data_detector(request)
99 99 junk_form_data_detector(request)
100 100 except HTTPException as exc:
101 101 return exc
102 102
103 103 return handler(request)
104 104
105 105 return sanity_check
106 106
107 107
108 108 def includeme(config):
109 109 config.add_subscriber('rhodecode.subscribers.add_renderer_globals',
110 110 'pyramid.events.BeforeRender')
111 config.add_subscriber('rhodecode.subscribers.update_celery_conf',
112 'pyramid.events.NewRequest')
111 113 config.add_subscriber('rhodecode.subscribers.set_user_lang',
112 114 'pyramid.events.NewRequest')
113 115 config.add_subscriber('rhodecode.subscribers.reset_log_bucket',
114 116 'pyramid.events.NewRequest')
115 117 config.add_subscriber('rhodecode.subscribers.add_request_user_context',
116 118 'pyramid.events.ContextFound')
117 119 config.add_tween('rhodecode.tweens.vcs_detection_tween_factory')
118 120 config.add_tween('rhodecode.tweens.sanity_check_factory')
119 121
120 122 # This needs to be the LAST item
121 123 config.add_tween('rhodecode.lib.middleware.request_wrapper.RequestWrapperTween')
122 124 log.debug('configured all tweens')
General Comments 0
You need to be logged in to leave comments. Login now