##// END OF EJS Templates
metadata: store workers and prevent to excesive metadata writes.
marcink -
r2488:e8ec1e05 default
parent child Browse files
Show More
@@ -1,290 +1,322 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 import io
21 21 import re
22 22 import datetime
23 23 import logging
24 24 import Queue
25 25 import subprocess32
26 26 import os
27 27
28
29 from dateutil.parser import parse
28 30 from pyramid.i18n import get_localizer
29 31 from pyramid.threadlocal import get_current_request
30 32 from pyramid.interfaces import IRoutesMapper
31 33 from pyramid.settings import asbool
32 34 from pyramid.path import AssetResolver
33 35 from threading import Thread
34 36
35 37 from rhodecode.translation import _ as tsf
36 38 from rhodecode.config.jsroutes import generate_jsroutes_content
37 39 from rhodecode.lib import auth
38 40 from rhodecode.lib.base import get_auth_user
39 41
40 42
41 43 import rhodecode
42 44
43 45
44 46 log = logging.getLogger(__name__)
45 47
46 48
47 49 def add_renderer_globals(event):
48 50 from rhodecode.lib import helpers
49 51
50 52 # TODO: When executed in pyramid view context the request is not available
51 53 # in the event. Find a better solution to get the request.
52 54 request = event['request'] or get_current_request()
53 55
54 56 # Add Pyramid translation as '_' to context
55 57 event['_'] = request.translate
56 58 event['_ungettext'] = request.plularize
57 59 event['h'] = helpers
58 60
59 61
60 62 def add_localizer(event):
61 63 request = event.request
62 64 localizer = request.localizer
63 65
64 66 def auto_translate(*args, **kwargs):
65 67 return localizer.translate(tsf(*args, **kwargs))
66 68
67 69 request.translate = auto_translate
68 70 request.plularize = localizer.pluralize
69 71
70 72
71 73 def set_user_lang(event):
72 74 request = event.request
73 75 cur_user = getattr(request, 'user', None)
74 76
75 77 if cur_user:
76 78 user_lang = cur_user.get_instance().user_data.get('language')
77 79 if user_lang:
78 80 log.debug('lang: setting current user:%s language to: %s', cur_user, user_lang)
79 81 event.request._LOCALE_ = user_lang
80 82
81 83
82 84 def add_request_user_context(event):
83 85 """
84 86 Adds auth user into request context
85 87 """
86 88 request = event.request
87 89
88 90 if hasattr(request, 'vcs_call'):
89 91 # skip vcs calls
90 92 return
91 93
92 94 if hasattr(request, 'rpc_method'):
93 95 # skip api calls
94 96 return
95 97
96 98 auth_user = get_auth_user(request)
97 99 request.user = auth_user
98 100 request.environ['rc_auth_user'] = auth_user
99 101
100 102
101 103 def inject_app_settings(event):
102 104 settings = event.app.registry.settings
103 105 # inject info about available permissions
104 106 auth.set_available_permissions(settings)
105 107
106 108
107 109 def scan_repositories_if_enabled(event):
108 110 """
109 111 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
110 112 does a repository scan if enabled in the settings.
111 113 """
112 114 settings = event.app.registry.settings
113 115 vcs_server_enabled = settings['vcs.server.enable']
114 116 import_on_startup = settings['startup.import_repos']
115 117 if vcs_server_enabled and import_on_startup:
116 118 from rhodecode.model.scm import ScmModel
117 119 from rhodecode.lib.utils import repo2db_mapper, get_rhodecode_base_path
118 120 repositories = ScmModel().repo_scan(get_rhodecode_base_path())
119 121 repo2db_mapper(repositories, remove_obsolete=False)
120 122
121 123
122 124 def write_metadata_if_needed(event):
123 125 """
124 126 Writes upgrade metadata
125 127 """
126 128 import rhodecode
127 129 from rhodecode.lib import system_info
128 130 from rhodecode.lib import ext_json
129 131
130 def write():
131 132 fname = '.rcmetadata.json'
132 133 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
133 134 metadata_destination = os.path.join(ini_loc, fname)
134 135
136 def get_update_age():
137 now = datetime.datetime.utcnow()
138
139 with open(metadata_destination, 'rb') as f:
140 data = ext_json.json.loads(f.read())
141 if 'created_on' in data:
142 update_date = parse(data['created_on'])
143 diff = now - update_date
144 return diff.total_seconds() / 60.0
145
146 return 0
147
148 def write():
135 149 configuration = system_info.SysInfo(
136 150 system_info.rhodecode_config)()['value']
137 151 license_token = configuration['config']['license_token']
152
153 setup = dict(
154 workers=configuration['config']['server:main'].get(
155 'workers', '?'),
156 worker_type=configuration['config']['server:main'].get(
157 'worker_class', 'sync'),
158 )
138 159 dbinfo = system_info.SysInfo(system_info.database_info)()['value']
139 160 del dbinfo['url']
161
140 162 metadata = dict(
141 163 desc='upgrade metadata info',
142 164 license_token=license_token,
143 165 created_on=datetime.datetime.utcnow().isoformat(),
144 166 usage=system_info.SysInfo(system_info.usage_info)()['value'],
145 167 platform=system_info.SysInfo(system_info.platform_type)()['value'],
146 168 database=dbinfo,
147 169 cpu=system_info.SysInfo(system_info.cpu)()['value'],
148 170 memory=system_info.SysInfo(system_info.memory)()['value'],
171 setup=setup
149 172 )
150 173
151 174 with open(metadata_destination, 'wb') as f:
152 175 f.write(ext_json.json.dumps(metadata))
153 176
154 177 settings = event.app.registry.settings
155 178 if settings.get('metadata.skip'):
156 179 return
157 180
181 # only write this every 24h, workers restart caused unwanted delays
182 try:
183 age_in_min = get_update_age()
184 except Exception:
185 age_in_min = 0
186
187 if age_in_min < 60 * 60 * 24:
188 return
189
158 190 try:
159 191 write()
160 192 except Exception:
161 193 pass
162 194
163 195
164 196 def write_js_routes_if_enabled(event):
165 197 registry = event.app.registry
166 198
167 199 mapper = registry.queryUtility(IRoutesMapper)
168 200 _argument_prog = re.compile('\{(.*?)\}|:\((.*)\)')
169 201
170 202 def _extract_route_information(route):
171 203 """
172 204 Convert a route into tuple(name, path, args), eg:
173 205 ('show_user', '/profile/%(username)s', ['username'])
174 206 """
175 207
176 208 routepath = route.pattern
177 209 pattern = route.pattern
178 210
179 211 def replace(matchobj):
180 212 if matchobj.group(1):
181 213 return "%%(%s)s" % matchobj.group(1).split(':')[0]
182 214 else:
183 215 return "%%(%s)s" % matchobj.group(2)
184 216
185 217 routepath = _argument_prog.sub(replace, routepath)
186 218
187 219 if not routepath.startswith('/'):
188 220 routepath = '/'+routepath
189 221
190 222 return (
191 223 route.name,
192 224 routepath,
193 225 [(arg[0].split(':')[0] if arg[0] != '' else arg[1])
194 226 for arg in _argument_prog.findall(pattern)]
195 227 )
196 228
197 229 def get_routes():
198 230 # pyramid routes
199 231 for route in mapper.get_routes():
200 232 if not route.name.startswith('__'):
201 233 yield _extract_route_information(route)
202 234
203 235 if asbool(registry.settings.get('generate_js_files', 'false')):
204 236 static_path = AssetResolver().resolve('rhodecode:public').abspath()
205 237 jsroutes = get_routes()
206 238 jsroutes_file_content = generate_jsroutes_content(jsroutes)
207 239 jsroutes_file_path = os.path.join(
208 240 static_path, 'js', 'rhodecode', 'routes.js')
209 241
210 242 with io.open(jsroutes_file_path, 'w', encoding='utf-8') as f:
211 243 f.write(jsroutes_file_content)
212 244
213 245
214 246 class Subscriber(object):
215 247 """
216 248 Base class for subscribers to the pyramid event system.
217 249 """
218 250 def __call__(self, event):
219 251 self.run(event)
220 252
221 253 def run(self, event):
222 254 raise NotImplementedError('Subclass has to implement this.')
223 255
224 256
225 257 class AsyncSubscriber(Subscriber):
226 258 """
227 259 Subscriber that handles the execution of events in a separate task to not
228 260 block the execution of the code which triggers the event. It puts the
229 261 received events into a queue from which the worker process takes them in
230 262 order.
231 263 """
232 264 def __init__(self):
233 265 self._stop = False
234 266 self._eventq = Queue.Queue()
235 267 self._worker = self.create_worker()
236 268 self._worker.start()
237 269
238 270 def __call__(self, event):
239 271 self._eventq.put(event)
240 272
241 273 def create_worker(self):
242 274 worker = Thread(target=self.do_work)
243 275 worker.daemon = True
244 276 return worker
245 277
246 278 def stop_worker(self):
247 279 self._stop = False
248 280 self._eventq.put(None)
249 281 self._worker.join()
250 282
251 283 def do_work(self):
252 284 while not self._stop:
253 285 event = self._eventq.get()
254 286 if event is not None:
255 287 self.run(event)
256 288
257 289
258 290 class AsyncSubprocessSubscriber(AsyncSubscriber):
259 291 """
260 292 Subscriber that uses the subprocess32 module to execute a command if an
261 293 event is received. Events are handled asynchronously.
262 294 """
263 295
264 296 def __init__(self, cmd, timeout=None):
265 297 super(AsyncSubprocessSubscriber, self).__init__()
266 298 self._cmd = cmd
267 299 self._timeout = timeout
268 300
269 301 def run(self, event):
270 302 cmd = self._cmd
271 303 timeout = self._timeout
272 304 log.debug('Executing command %s.', cmd)
273 305
274 306 try:
275 307 output = subprocess32.check_output(
276 308 cmd, timeout=timeout, stderr=subprocess32.STDOUT)
277 309 log.debug('Command finished %s', cmd)
278 310 if output:
279 311 log.debug('Command output: %s', output)
280 312 except subprocess32.TimeoutExpired as e:
281 313 log.exception('Timeout while executing command.')
282 314 if e.output:
283 315 log.error('Command output: %s', e.output)
284 316 except subprocess32.CalledProcessError as e:
285 317 log.exception('Error while executing command.')
286 318 if e.output:
287 319 log.error('Command output: %s', e.output)
288 320 except:
289 321 log.exception(
290 322 'Exception while executing command %s.', cmd)
General Comments 0
You need to be logged in to leave comments. Login now