##// END OF EJS Templates
subscribers: use shlex in the async-subscriber directly
milka -
r4582:5c8aeb25 stable
parent child Browse files
Show More
@@ -1,90 +1,90 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import os
21 21 import logging
22 22 import shlex
23 23 from pyramid import compat
24 24
25 25 # Do not use `from rhodecode import events` here, it will be overridden by the
26 26 # events module in this package due to pythons import mechanism.
27 27 from rhodecode.events import RepoGroupEvent
28 28 from rhodecode.subscribers import AsyncSubprocessSubscriber
29 29 from rhodecode.config.middleware import (
30 30 _bool_setting, _string_setting, _int_setting)
31 31
32 32 from .events import ModDavSvnConfigChange
33 33 from .subscribers import generate_config_subscriber
34 34 from . import config_keys
35 35
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39
40 40 def includeme(config):
41 41 settings = config.registry.settings
42 42 _sanitize_settings_and_apply_defaults(settings)
43 43
44 44 if settings[config_keys.generate_config]:
45 45 # Add subscriber to generate the Apache mod dav svn configuration on
46 46 # repository group events.
47 47 config.add_subscriber(generate_config_subscriber, RepoGroupEvent)
48 48
49 49 # If a reload command is set add a subscriber to execute it on
50 50 # configuration changes.
51 reload_cmd = shlex.split(settings[config_keys.reload_command])
51 reload_cmd = settings[config_keys.reload_command]
52 52 if reload_cmd:
53 53 reload_timeout = settings[config_keys.reload_timeout] or None
54 54 reload_subscriber = AsyncSubprocessSubscriber(
55 55 cmd=reload_cmd, timeout=reload_timeout)
56 56 config.add_subscriber(reload_subscriber, ModDavSvnConfigChange)
57 57
58 58
59 59 def _sanitize_settings_and_apply_defaults(settings):
60 60 """
61 61 Set defaults, convert to python types and validate settings.
62 62 """
63 63 _bool_setting(settings, config_keys.generate_config, 'false')
64 64 _bool_setting(settings, config_keys.list_parent_path, 'true')
65 65 _int_setting(settings, config_keys.reload_timeout, 10)
66 66 _string_setting(settings, config_keys.config_file_path, '', lower=False)
67 67 _string_setting(settings, config_keys.location_root, '/', lower=False)
68 68 _string_setting(settings, config_keys.reload_command, '', lower=False)
69 69 _string_setting(settings, config_keys.template, '', lower=False)
70 70
71 71 # Convert negative timeout values to zero.
72 72 if settings[config_keys.reload_timeout] < 0:
73 73 settings[config_keys.reload_timeout] = 0
74 74
75 75 # Append path separator to location root.
76 76 settings[config_keys.location_root] = _append_path_sep(
77 77 settings[config_keys.location_root])
78 78
79 79 # Validate settings.
80 80 if settings[config_keys.generate_config]:
81 81 assert len(settings[config_keys.config_file_path]) > 0
82 82
83 83
84 84 def _append_path_sep(path):
85 85 """
86 86 Append the path separator if missing.
87 87 """
88 88 if isinstance(path, compat.string_types) and not path.endswith(os.path.sep):
89 89 path += os.path.sep
90 90 return path
@@ -1,389 +1,397 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import io
21 import shlex
22
21 23 import math
22 24 import re
23 25 import os
24 26 import datetime
25 27 import logging
26 28 import Queue
27 29 import subprocess32
28 30
29 31
30 32 from dateutil.parser import parse
31 33 from pyramid.threadlocal import get_current_request
32 34 from pyramid.interfaces import IRoutesMapper
33 35 from pyramid.settings import asbool
34 36 from pyramid.path import AssetResolver
35 37 from threading import Thread
36 38
37 39 from rhodecode.translation import _ as tsf
38 40 from rhodecode.config.jsroutes import generate_jsroutes_content
39 41 from rhodecode.lib import auth
40 42 from rhodecode.lib.base import get_auth_user
41 43
42 44 import rhodecode
43 45
44 46
45 47 log = logging.getLogger(__name__)
46 48
47 49
48 50 def add_renderer_globals(event):
49 51 from rhodecode.lib import helpers
50 52
51 53 # TODO: When executed in pyramid view context the request is not available
52 54 # in the event. Find a better solution to get the request.
53 55 request = event['request'] or get_current_request()
54 56
55 57 # Add Pyramid translation as '_' to context
56 58 event['_'] = request.translate
57 59 event['_ungettext'] = request.plularize
58 60 event['h'] = helpers
59 61
60 62
61 63 def add_localizer(event):
62 64 request = event.request
63 65 localizer = request.localizer
64 66
65 67 def auto_translate(*args, **kwargs):
66 68 return localizer.translate(tsf(*args, **kwargs))
67 69
68 70 request.translate = auto_translate
69 71 request.plularize = localizer.pluralize
70 72
71 73
72 74 def set_user_lang(event):
73 75 request = event.request
74 76 cur_user = getattr(request, 'user', None)
75 77
76 78 if cur_user:
77 79 user_lang = cur_user.get_instance().user_data.get('language')
78 80 if user_lang:
79 81 log.debug('lang: setting current user:%s language to: %s', cur_user, user_lang)
80 82 event.request._LOCALE_ = user_lang
81 83
82 84
83 85 def add_request_user_context(event):
84 86 """
85 87 Adds auth user into request context
86 88 """
87 89 request = event.request
88 90 # access req_id as soon as possible
89 91 req_id = request.req_id
90 92
91 93 if hasattr(request, 'vcs_call'):
92 94 # skip vcs calls
93 95 return
94 96
95 97 if hasattr(request, 'rpc_method'):
96 98 # skip api calls
97 99 return
98 100
99 101 auth_user, auth_token = get_auth_user(request)
100 102 request.user = auth_user
101 103 request.user_auth_token = auth_token
102 104 request.environ['rc_auth_user'] = auth_user
103 105 request.environ['rc_auth_user_id'] = auth_user.user_id
104 106 request.environ['rc_req_id'] = req_id
105 107
106 108
107 109 def scan_repositories_if_enabled(event):
108 110 """
109 111 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
110 112 does a repository scan if enabled in the settings.
111 113 """
112 114 settings = event.app.registry.settings
113 115 vcs_server_enabled = settings['vcs.server.enable']
114 116 import_on_startup = settings['startup.import_repos']
115 117 if vcs_server_enabled and import_on_startup:
116 118 from rhodecode.model.scm import ScmModel
117 119 from rhodecode.lib.utils import repo2db_mapper, get_rhodecode_base_path
118 120 repositories = ScmModel().repo_scan(get_rhodecode_base_path())
119 121 repo2db_mapper(repositories, remove_obsolete=False)
120 122
121 123
122 124 def write_metadata_if_needed(event):
123 125 """
124 126 Writes upgrade metadata
125 127 """
126 128 import rhodecode
127 129 from rhodecode.lib import system_info
128 130 from rhodecode.lib import ext_json
129 131
130 132 fname = '.rcmetadata.json'
131 133 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
132 134 metadata_destination = os.path.join(ini_loc, fname)
133 135
134 136 def get_update_age():
135 137 now = datetime.datetime.utcnow()
136 138
137 139 with open(metadata_destination, 'rb') as f:
138 140 data = ext_json.json.loads(f.read())
139 141 if 'created_on' in data:
140 142 update_date = parse(data['created_on'])
141 143 diff = now - update_date
142 144 return diff.total_seconds() / 60.0
143 145
144 146 return 0
145 147
146 148 def write():
147 149 configuration = system_info.SysInfo(
148 150 system_info.rhodecode_config)()['value']
149 151 license_token = configuration['config']['license_token']
150 152
151 153 setup = dict(
152 154 workers=configuration['config']['server:main'].get(
153 155 'workers', '?'),
154 156 worker_type=configuration['config']['server:main'].get(
155 157 'worker_class', 'sync'),
156 158 )
157 159 dbinfo = system_info.SysInfo(system_info.database_info)()['value']
158 160 del dbinfo['url']
159 161
160 162 metadata = dict(
161 163 desc='upgrade metadata info',
162 164 license_token=license_token,
163 165 created_on=datetime.datetime.utcnow().isoformat(),
164 166 usage=system_info.SysInfo(system_info.usage_info)()['value'],
165 167 platform=system_info.SysInfo(system_info.platform_type)()['value'],
166 168 database=dbinfo,
167 169 cpu=system_info.SysInfo(system_info.cpu)()['value'],
168 170 memory=system_info.SysInfo(system_info.memory)()['value'],
169 171 setup=setup
170 172 )
171 173
172 174 with open(metadata_destination, 'wb') as f:
173 175 f.write(ext_json.json.dumps(metadata))
174 176
175 177 settings = event.app.registry.settings
176 178 if settings.get('metadata.skip'):
177 179 return
178 180
179 181 # only write this every 24h, workers restart caused unwanted delays
180 182 try:
181 183 age_in_min = get_update_age()
182 184 except Exception:
183 185 age_in_min = 0
184 186
185 187 if age_in_min > 60 * 60 * 24:
186 188 return
187 189
188 190 try:
189 191 write()
190 192 except Exception:
191 193 pass
192 194
193 195
194 196 def write_usage_data(event):
195 197 import rhodecode
196 198 from rhodecode.lib import system_info
197 199 from rhodecode.lib import ext_json
198 200
199 201 settings = event.app.registry.settings
200 202 instance_tag = settings.get('metadata.write_usage_tag')
201 203 if not settings.get('metadata.write_usage'):
202 204 return
203 205
204 206 def get_update_age(dest_file):
205 207 now = datetime.datetime.utcnow()
206 208
207 209 with open(dest_file, 'rb') as f:
208 210 data = ext_json.json.loads(f.read())
209 211 if 'created_on' in data:
210 212 update_date = parse(data['created_on'])
211 213 diff = now - update_date
212 214 return math.ceil(diff.total_seconds() / 60.0)
213 215
214 216 return 0
215 217
216 218 utc_date = datetime.datetime.utcnow()
217 219 hour_quarter = int(math.ceil((utc_date.hour + utc_date.minute/60.0) / 6.))
218 220 fname = '.rc_usage_{date.year}{date.month:02d}{date.day:02d}_{hour}.json'.format(
219 221 date=utc_date, hour=hour_quarter)
220 222 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
221 223
222 224 usage_dir = os.path.join(ini_loc, '.rcusage')
223 225 if not os.path.isdir(usage_dir):
224 226 os.makedirs(usage_dir)
225 227 usage_metadata_destination = os.path.join(usage_dir, fname)
226 228
227 229 try:
228 230 age_in_min = get_update_age(usage_metadata_destination)
229 231 except Exception:
230 232 age_in_min = 0
231 233
232 234 # write every 6th hour
233 235 if age_in_min and age_in_min < 60 * 6:
234 236 log.debug('Usage file created %s minutes ago, skipping (threashold: %s)...',
235 237 age_in_min, 60 * 6)
236 238 return
237 239
238 240 def write(dest_file):
239 241 configuration = system_info.SysInfo(system_info.rhodecode_config)()['value']
240 242 license_token = configuration['config']['license_token']
241 243
242 244 metadata = dict(
243 245 desc='Usage data',
244 246 instance_tag=instance_tag,
245 247 license_token=license_token,
246 248 created_on=datetime.datetime.utcnow().isoformat(),
247 249 usage=system_info.SysInfo(system_info.usage_info)()['value'],
248 250 )
249 251
250 252 with open(dest_file, 'wb') as f:
251 253 f.write(ext_json.json.dumps(metadata, indent=2, sort_keys=True))
252 254
253 255 try:
254 256 log.debug('Writing usage file at: %s', usage_metadata_destination)
255 257 write(usage_metadata_destination)
256 258 except Exception:
257 259 pass
258 260
259 261
260 262 def write_js_routes_if_enabled(event):
261 263 registry = event.app.registry
262 264
263 265 mapper = registry.queryUtility(IRoutesMapper)
264 266 _argument_prog = re.compile('\{(.*?)\}|:\((.*)\)')
265 267
266 268 def _extract_route_information(route):
267 269 """
268 270 Convert a route into tuple(name, path, args), eg:
269 271 ('show_user', '/profile/%(username)s', ['username'])
270 272 """
271 273
272 274 routepath = route.pattern
273 275 pattern = route.pattern
274 276
275 277 def replace(matchobj):
276 278 if matchobj.group(1):
277 279 return "%%(%s)s" % matchobj.group(1).split(':')[0]
278 280 else:
279 281 return "%%(%s)s" % matchobj.group(2)
280 282
281 283 routepath = _argument_prog.sub(replace, routepath)
282 284
283 285 if not routepath.startswith('/'):
284 286 routepath = '/'+routepath
285 287
286 288 return (
287 289 route.name,
288 290 routepath,
289 291 [(arg[0].split(':')[0] if arg[0] != '' else arg[1])
290 292 for arg in _argument_prog.findall(pattern)]
291 293 )
292 294
293 295 def get_routes():
294 296 # pyramid routes
295 297 for route in mapper.get_routes():
296 298 if not route.name.startswith('__'):
297 299 yield _extract_route_information(route)
298 300
299 301 if asbool(registry.settings.get('generate_js_files', 'false')):
300 302 static_path = AssetResolver().resolve('rhodecode:public').abspath()
301 303 jsroutes = get_routes()
302 304 jsroutes_file_content = generate_jsroutes_content(jsroutes)
303 305 jsroutes_file_path = os.path.join(
304 306 static_path, 'js', 'rhodecode', 'routes.js')
305 307
306 308 try:
307 309 with io.open(jsroutes_file_path, 'w', encoding='utf-8') as f:
308 310 f.write(jsroutes_file_content)
309 311 except Exception:
310 312 log.exception('Failed to write routes.js into %s', jsroutes_file_path)
311 313
312 314
313 315 class Subscriber(object):
314 316 """
315 317 Base class for subscribers to the pyramid event system.
316 318 """
317 319 def __call__(self, event):
318 320 self.run(event)
319 321
320 322 def run(self, event):
321 323 raise NotImplementedError('Subclass has to implement this.')
322 324
323 325
324 326 class AsyncSubscriber(Subscriber):
325 327 """
326 328 Subscriber that handles the execution of events in a separate task to not
327 329 block the execution of the code which triggers the event. It puts the
328 330 received events into a queue from which the worker process takes them in
329 331 order.
330 332 """
331 333 def __init__(self):
332 334 self._stop = False
333 335 self._eventq = Queue.Queue()
334 336 self._worker = self.create_worker()
335 337 self._worker.start()
336 338
337 339 def __call__(self, event):
338 340 self._eventq.put(event)
339 341
340 342 def create_worker(self):
341 343 worker = Thread(target=self.do_work)
342 344 worker.daemon = True
343 345 return worker
344 346
345 347 def stop_worker(self):
346 348 self._stop = False
347 349 self._eventq.put(None)
348 350 self._worker.join()
349 351
350 352 def do_work(self):
351 353 while not self._stop:
352 354 event = self._eventq.get()
353 355 if event is not None:
354 356 self.run(event)
355 357
356 358
357 359 class AsyncSubprocessSubscriber(AsyncSubscriber):
358 360 """
359 361 Subscriber that uses the subprocess32 module to execute a command if an
360 event is received. Events are handled asynchronously.
362 event is received. Events are handled asynchronously::
363
364 subscriber = AsyncSubprocessSubscriber('ls -la', timeout=10)
365 subscriber(dummyEvent) # running __call__(event)
366
361 367 """
362 368
363 369 def __init__(self, cmd, timeout=None):
370 if not isinstance(cmd, (list, tuple)):
371 cmd = shlex.split(cmd)
364 372 super(AsyncSubprocessSubscriber, self).__init__()
365 373 self._cmd = cmd
366 374 self._timeout = timeout
367 375
368 376 def run(self, event):
369 377 cmd = self._cmd
370 378 timeout = self._timeout
371 379 log.debug('Executing command %s.', cmd)
372 380
373 381 try:
374 382 output = subprocess32.check_output(
375 383 cmd, timeout=timeout, stderr=subprocess32.STDOUT)
376 384 log.debug('Command finished %s', cmd)
377 385 if output:
378 386 log.debug('Command output: %s', output)
379 387 except subprocess32.TimeoutExpired as e:
380 388 log.exception('Timeout while executing command.')
381 389 if e.output:
382 390 log.error('Command output: %s', e.output)
383 391 except subprocess32.CalledProcessError as e:
384 392 log.exception('Error while executing command.')
385 393 if e.output:
386 394 log.error('Command output: %s', e.output)
387 except:
395 except Exception:
388 396 log.exception(
389 397 'Exception while executing command %s.', cmd)
General Comments 0
You need to be logged in to leave comments. Login now