##// END OF EJS Templates
subscribers: optimized used of threadglobals and fix python3 compat
super-admin -
r5062:0a4c1892 default
parent child Browse files
Show More
@@ -1,398 +1,398 b''
1 # -*- coding: utf-8 -*-
2 1
3 2 # Copyright (C) 2010-2020 RhodeCode GmbH
4 3 #
5 4 # This program is free software: you can redistribute it and/or modify
6 5 # it under the terms of the GNU Affero General Public License, version 3
7 6 # (only), as published by the Free Software Foundation.
8 7 #
9 8 # This program is distributed in the hope that it will be useful,
10 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 11 # GNU General Public License for more details.
13 12 #
14 13 # You should have received a copy of the GNU Affero General Public License
15 14 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 15 #
17 16 # This program is dual-licensed. If you wish to learn more about the
18 17 # RhodeCode Enterprise Edition, including its added features, Support services,
19 18 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 19 import io
21 20 import shlex
22 21
23 22 import math
24 23 import re
25 24 import os
26 25 import datetime
27 26 import logging
28 27 import queue
29 28 import subprocess
30 29
31 30
32 31 from dateutil.parser import parse
33 from pyramid.threadlocal import get_current_request
34 32 from pyramid.interfaces import IRoutesMapper
35 33 from pyramid.settings import asbool
36 34 from pyramid.path import AssetResolver
37 35 from threading import Thread
38 36
39 37 from rhodecode.config.jsroutes import generate_jsroutes_content
40 38 from rhodecode.lib.base import get_auth_user
39 from rhodecode.lib.celerylib.loader import set_celery_conf
41 40
42 41 import rhodecode
43 42
44 43
45 44 log = logging.getLogger(__name__)
46 45
47 46
48 47 def add_renderer_globals(event):
49 48 from rhodecode.lib import helpers
50 49
51 50 # TODO: When executed in pyramid view context the request is not available
52 51 # in the event. Find a better solution to get the request.
52 from pyramid.threadlocal import get_current_request
53 53 request = event['request'] or get_current_request()
54 54
55 55 # Add Pyramid translation as '_' to context
56 56 event['_'] = request.translate
57 57 event['_ungettext'] = request.plularize
58 58 event['h'] = helpers
59 59
60 60
61 61 def set_user_lang(event):
62 62 request = event.request
63 63 cur_user = getattr(request, 'user', None)
64 64
65 65 if cur_user:
66 66 user_lang = cur_user.get_instance().user_data.get('language')
67 67 if user_lang:
68 68 log.debug('lang: setting current user:%s language to: %s', cur_user, user_lang)
69 69 event.request._LOCALE_ = user_lang
70 70
71 71
72 72 def update_celery_conf(event):
73 from rhodecode.lib.celerylib.loader import set_celery_conf
74 73 log.debug('Setting celery config from new request')
75 74 set_celery_conf(request=event.request, registry=event.request.registry)
76 75
77 76
78 77 def add_request_user_context(event):
79 78 """
80 79 Adds auth user into request context
81 80 """
81
82 82 request = event.request
83 83 # access req_id as soon as possible
84 84 req_id = request.req_id
85 85
86 86 if hasattr(request, 'vcs_call'):
87 87 # skip vcs calls
88 88 return
89 89
90 90 if hasattr(request, 'rpc_method'):
91 91 # skip api calls
92 92 return
93 93
94 94 auth_user, auth_token = get_auth_user(request)
95 95 request.user = auth_user
96 96 request.user_auth_token = auth_token
97 97 request.environ['rc_auth_user'] = auth_user
98 98 request.environ['rc_auth_user_id'] = str(auth_user.user_id)
99 99 request.environ['rc_req_id'] = req_id
100 100
101 101
102 102 def reset_log_bucket(event):
103 103 """
104 104 reset the log bucket on new request
105 105 """
106 106 request = event.request
107 107 request.req_id_records_init()
108 108
109 109
110 110 def scan_repositories_if_enabled(event):
111 111 """
112 112 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
113 113 does a repository scan if enabled in the settings.
114 114 """
115 115 settings = event.app.registry.settings
116 116 vcs_server_enabled = settings['vcs.server.enable']
117 117 import_on_startup = settings['startup.import_repos']
118 118 if vcs_server_enabled and import_on_startup:
119 119 from rhodecode.model.scm import ScmModel
120 120 from rhodecode.lib.utils import repo2db_mapper, get_rhodecode_base_path
121 121 repositories = ScmModel().repo_scan(get_rhodecode_base_path())
122 122 repo2db_mapper(repositories, remove_obsolete=False)
123 123
124 124
125 125 def write_metadata_if_needed(event):
126 126 """
127 127 Writes upgrade metadata
128 128 """
129 129 import rhodecode
130 130 from rhodecode.lib import system_info
131 131 from rhodecode.lib import ext_json
132 132
133 133 fname = '.rcmetadata.json'
134 134 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
135 135 metadata_destination = os.path.join(ini_loc, fname)
136 136
137 137 def get_update_age():
138 138 now = datetime.datetime.utcnow()
139 139
140 140 with open(metadata_destination, 'rb') as f:
141 141 data = ext_json.json.loads(f.read())
142 142 if 'created_on' in data:
143 143 update_date = parse(data['created_on'])
144 144 diff = now - update_date
145 145 return diff.total_seconds() / 60.0
146 146
147 147 return 0
148 148
149 149 def write():
150 150 configuration = system_info.SysInfo(
151 151 system_info.rhodecode_config)()['value']
152 152 license_token = configuration['config']['license_token']
153 153
154 154 setup = dict(
155 155 workers=configuration['config']['server:main'].get(
156 156 'workers', '?'),
157 157 worker_type=configuration['config']['server:main'].get(
158 158 'worker_class', 'sync'),
159 159 )
160 160 dbinfo = system_info.SysInfo(system_info.database_info)()['value']
161 161 del dbinfo['url']
162 162
163 163 metadata = dict(
164 164 desc='upgrade metadata info',
165 165 license_token=license_token,
166 166 created_on=datetime.datetime.utcnow().isoformat(),
167 167 usage=system_info.SysInfo(system_info.usage_info)()['value'],
168 168 platform=system_info.SysInfo(system_info.platform_type)()['value'],
169 169 database=dbinfo,
170 170 cpu=system_info.SysInfo(system_info.cpu)()['value'],
171 171 memory=system_info.SysInfo(system_info.memory)()['value'],
172 172 setup=setup
173 173 )
174 174
175 175 with open(metadata_destination, 'wb') as f:
176 176 f.write(ext_json.json.dumps(metadata))
177 177
178 178 settings = event.app.registry.settings
179 179 if settings.get('metadata.skip'):
180 180 return
181 181
182 182 # only write this every 24h, workers restart caused unwanted delays
183 183 try:
184 184 age_in_min = get_update_age()
185 185 except Exception:
186 186 age_in_min = 0
187 187
188 188 if age_in_min > 60 * 60 * 24:
189 189 return
190 190
191 191 try:
192 192 write()
193 193 except Exception:
194 194 pass
195 195
196 196
197 197 def write_usage_data(event):
198 198 import rhodecode
199 199 from rhodecode.lib import system_info
200 200 from rhodecode.lib import ext_json
201 201
202 202 settings = event.app.registry.settings
203 203 instance_tag = settings.get('metadata.write_usage_tag')
204 204 if not settings.get('metadata.write_usage'):
205 205 return
206 206
207 207 def get_update_age(dest_file):
208 208 now = datetime.datetime.utcnow()
209 209
210 210 with open(dest_file, 'rb') as f:
211 211 data = ext_json.json.loads(f.read())
212 212 if 'created_on' in data:
213 213 update_date = parse(data['created_on'])
214 214 diff = now - update_date
215 215 return math.ceil(diff.total_seconds() / 60.0)
216 216
217 217 return 0
218 218
219 219 utc_date = datetime.datetime.utcnow()
220 220 hour_quarter = int(math.ceil((utc_date.hour + utc_date.minute/60.0) / 6.))
221 221 fname = '.rc_usage_{date.year}{date.month:02d}{date.day:02d}_{hour}.json'.format(
222 222 date=utc_date, hour=hour_quarter)
223 223 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
224 224
225 225 usage_dir = os.path.join(ini_loc, '.rcusage')
226 226 if not os.path.isdir(usage_dir):
227 227 os.makedirs(usage_dir)
228 228 usage_metadata_destination = os.path.join(usage_dir, fname)
229 229
230 230 try:
231 231 age_in_min = get_update_age(usage_metadata_destination)
232 232 except Exception:
233 233 age_in_min = 0
234 234
235 235 # write every 6th hour
236 236 if age_in_min and age_in_min < 60 * 6:
237 237 log.debug('Usage file created %s minutes ago, skipping (threshold: %s minutes)...',
238 238 age_in_min, 60 * 6)
239 239 return
240 240
241 241 def write(dest_file):
242 242 configuration = system_info.SysInfo(system_info.rhodecode_config)()['value']
243 243 license_token = configuration['config']['license_token']
244 244
245 245 metadata = dict(
246 246 desc='Usage data',
247 247 instance_tag=instance_tag,
248 248 license_token=license_token,
249 249 created_on=datetime.datetime.utcnow().isoformat(),
250 250 usage=system_info.SysInfo(system_info.usage_info)()['value'],
251 251 )
252 252
253 253 with open(dest_file, 'wb') as f:
254 254 f.write(ext_json.formatted_json(metadata))
255 255
256 256 try:
257 257 log.debug('Writing usage file at: %s', usage_metadata_destination)
258 258 write(usage_metadata_destination)
259 259 except Exception:
260 260 pass
261 261
262 262
263 263 def write_js_routes_if_enabled(event):
264 264 registry = event.app.registry
265 265
266 266 mapper = registry.queryUtility(IRoutesMapper)
267 267 _argument_prog = re.compile(r'\{(.*?)\}|:\((.*)\)')
268 268
269 269 def _extract_route_information(route):
270 270 """
271 271 Convert a route into tuple(name, path, args), eg:
272 272 ('show_user', '/profile/%(username)s', ['username'])
273 273 """
274 274
275 275 routepath = route.pattern
276 276 pattern = route.pattern
277 277
278 278 def replace(matchobj):
279 279 if matchobj.group(1):
280 280 return "%%(%s)s" % matchobj.group(1).split(':')[0]
281 281 else:
282 282 return "%%(%s)s" % matchobj.group(2)
283 283
284 284 routepath = _argument_prog.sub(replace, routepath)
285 285
286 286 if not routepath.startswith('/'):
287 287 routepath = '/'+routepath
288 288
289 289 return (
290 290 route.name,
291 291 routepath,
292 292 [(arg[0].split(':')[0] if arg[0] != '' else arg[1])
293 293 for arg in _argument_prog.findall(pattern)]
294 294 )
295 295
296 296 def get_routes():
297 297 # pyramid routes
298 298 for route in mapper.get_routes():
299 299 if not route.name.startswith('__'):
300 300 yield _extract_route_information(route)
301 301
302 302 if asbool(registry.settings.get('generate_js_files', 'false')):
303 303 static_path = AssetResolver().resolve('rhodecode:public').abspath()
304 304 jsroutes = get_routes()
305 305 jsroutes_file_content = generate_jsroutes_content(jsroutes)
306 306 jsroutes_file_path = os.path.join(
307 307 static_path, 'js', 'rhodecode', 'routes.js')
308 308
309 309 try:
310 with io.open(jsroutes_file_path, 'w', encoding='utf-8') as f:
310 with open(jsroutes_file_path, 'w', encoding='utf-8') as f:
311 311 f.write(jsroutes_file_content)
312 312 except Exception:
313 313 log.exception('Failed to write routes.js into %s', jsroutes_file_path)
314 314
315 315
316 316 class Subscriber(object):
317 317 """
318 318 Base class for subscribers to the pyramid event system.
319 319 """
320 320 def __call__(self, event):
321 321 self.run(event)
322 322
323 323 def run(self, event):
324 324 raise NotImplementedError('Subclass has to implement this.')
325 325
326 326
327 327 class AsyncSubscriber(Subscriber):
328 328 """
329 329 Subscriber that handles the execution of events in a separate task to not
330 330 block the execution of the code which triggers the event. It puts the
331 331 received events into a queue from which the worker process takes them in
332 332 order.
333 333 """
334 334 def __init__(self):
335 335 self._stop = False
336 336 self._eventq = queue.Queue()
337 337 self._worker = self.create_worker()
338 338 self._worker.start()
339 339
340 340 def __call__(self, event):
341 341 self._eventq.put(event)
342 342
343 343 def create_worker(self):
344 344 worker = Thread(target=self.do_work)
345 345 worker.daemon = True
346 346 return worker
347 347
348 348 def stop_worker(self):
349 349 self._stop = False
350 350 self._eventq.put(None)
351 351 self._worker.join()
352 352
353 353 def do_work(self):
354 354 while not self._stop:
355 355 event = self._eventq.get()
356 356 if event is not None:
357 357 self.run(event)
358 358
359 359
360 360 class AsyncSubprocessSubscriber(AsyncSubscriber):
361 361 """
362 362 Subscriber that uses the subprocess module to execute a command if an
363 363 event is received. Events are handled asynchronously::
364 364
365 365 subscriber = AsyncSubprocessSubscriber('ls -la', timeout=10)
366 366 subscriber(dummyEvent) # running __call__(event)
367 367
368 368 """
369 369
370 370 def __init__(self, cmd, timeout=None):
371 371 if not isinstance(cmd, (list, tuple)):
372 372 cmd = shlex.split(cmd)
373 373 super(AsyncSubprocessSubscriber, self).__init__()
374 374 self._cmd = cmd
375 375 self._timeout = timeout
376 376
377 377 def run(self, event):
378 378 cmd = self._cmd
379 379 timeout = self._timeout
380 380 log.debug('Executing command %s.', cmd)
381 381
382 382 try:
383 383 output = subprocess.check_output(
384 384 cmd, timeout=timeout, stderr=subprocess.STDOUT)
385 385 log.debug('Command finished %s', cmd)
386 386 if output:
387 387 log.debug('Command output: %s', output)
388 388 except subprocess.TimeoutExpired as e:
389 389 log.exception('Timeout while executing command.')
390 390 if e.output:
391 391 log.error('Command output: %s', e.output)
392 392 except subprocess.CalledProcessError as e:
393 393 log.exception('Error while executing command.')
394 394 if e.output:
395 395 log.error('Command output: %s', e.output)
396 396 except Exception:
397 397 log.exception(
398 398 'Exception while executing command %s.', cmd)
General Comments 0
You need to be logged in to leave comments. Login now