##// END OF EJS Templates
metadata: store workers and prevent to excesive metadata writes.
marcink -
r2488:e8ec1e05 default
parent child Browse files
Show More
@@ -1,290 +1,322 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2018 RhodeCode GmbH
3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import io
20 import io
21 import re
21 import re
22 import datetime
22 import datetime
23 import logging
23 import logging
24 import Queue
24 import Queue
25 import subprocess32
25 import subprocess32
26 import os
26 import os
27
27
28
29 from dateutil.parser import parse
28 from pyramid.i18n import get_localizer
30 from pyramid.i18n import get_localizer
29 from pyramid.threadlocal import get_current_request
31 from pyramid.threadlocal import get_current_request
30 from pyramid.interfaces import IRoutesMapper
32 from pyramid.interfaces import IRoutesMapper
31 from pyramid.settings import asbool
33 from pyramid.settings import asbool
32 from pyramid.path import AssetResolver
34 from pyramid.path import AssetResolver
33 from threading import Thread
35 from threading import Thread
34
36
35 from rhodecode.translation import _ as tsf
37 from rhodecode.translation import _ as tsf
36 from rhodecode.config.jsroutes import generate_jsroutes_content
38 from rhodecode.config.jsroutes import generate_jsroutes_content
37 from rhodecode.lib import auth
39 from rhodecode.lib import auth
38 from rhodecode.lib.base import get_auth_user
40 from rhodecode.lib.base import get_auth_user
39
41
40
42
41 import rhodecode
43 import rhodecode
42
44
43
45
44 log = logging.getLogger(__name__)
46 log = logging.getLogger(__name__)
45
47
46
48
47 def add_renderer_globals(event):
49 def add_renderer_globals(event):
48 from rhodecode.lib import helpers
50 from rhodecode.lib import helpers
49
51
50 # TODO: When executed in pyramid view context the request is not available
52 # TODO: When executed in pyramid view context the request is not available
51 # in the event. Find a better solution to get the request.
53 # in the event. Find a better solution to get the request.
52 request = event['request'] or get_current_request()
54 request = event['request'] or get_current_request()
53
55
54 # Add Pyramid translation as '_' to context
56 # Add Pyramid translation as '_' to context
55 event['_'] = request.translate
57 event['_'] = request.translate
56 event['_ungettext'] = request.plularize
58 event['_ungettext'] = request.plularize
57 event['h'] = helpers
59 event['h'] = helpers
58
60
59
61
60 def add_localizer(event):
62 def add_localizer(event):
61 request = event.request
63 request = event.request
62 localizer = request.localizer
64 localizer = request.localizer
63
65
64 def auto_translate(*args, **kwargs):
66 def auto_translate(*args, **kwargs):
65 return localizer.translate(tsf(*args, **kwargs))
67 return localizer.translate(tsf(*args, **kwargs))
66
68
67 request.translate = auto_translate
69 request.translate = auto_translate
68 request.plularize = localizer.pluralize
70 request.plularize = localizer.pluralize
69
71
70
72
71 def set_user_lang(event):
73 def set_user_lang(event):
72 request = event.request
74 request = event.request
73 cur_user = getattr(request, 'user', None)
75 cur_user = getattr(request, 'user', None)
74
76
75 if cur_user:
77 if cur_user:
76 user_lang = cur_user.get_instance().user_data.get('language')
78 user_lang = cur_user.get_instance().user_data.get('language')
77 if user_lang:
79 if user_lang:
78 log.debug('lang: setting current user:%s language to: %s', cur_user, user_lang)
80 log.debug('lang: setting current user:%s language to: %s', cur_user, user_lang)
79 event.request._LOCALE_ = user_lang
81 event.request._LOCALE_ = user_lang
80
82
81
83
82 def add_request_user_context(event):
84 def add_request_user_context(event):
83 """
85 """
84 Adds auth user into request context
86 Adds auth user into request context
85 """
87 """
86 request = event.request
88 request = event.request
87
89
88 if hasattr(request, 'vcs_call'):
90 if hasattr(request, 'vcs_call'):
89 # skip vcs calls
91 # skip vcs calls
90 return
92 return
91
93
92 if hasattr(request, 'rpc_method'):
94 if hasattr(request, 'rpc_method'):
93 # skip api calls
95 # skip api calls
94 return
96 return
95
97
96 auth_user = get_auth_user(request)
98 auth_user = get_auth_user(request)
97 request.user = auth_user
99 request.user = auth_user
98 request.environ['rc_auth_user'] = auth_user
100 request.environ['rc_auth_user'] = auth_user
99
101
100
102
101 def inject_app_settings(event):
103 def inject_app_settings(event):
102 settings = event.app.registry.settings
104 settings = event.app.registry.settings
103 # inject info about available permissions
105 # inject info about available permissions
104 auth.set_available_permissions(settings)
106 auth.set_available_permissions(settings)
105
107
106
108
107 def scan_repositories_if_enabled(event):
109 def scan_repositories_if_enabled(event):
108 """
110 """
109 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
111 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
110 does a repository scan if enabled in the settings.
112 does a repository scan if enabled in the settings.
111 """
113 """
112 settings = event.app.registry.settings
114 settings = event.app.registry.settings
113 vcs_server_enabled = settings['vcs.server.enable']
115 vcs_server_enabled = settings['vcs.server.enable']
114 import_on_startup = settings['startup.import_repos']
116 import_on_startup = settings['startup.import_repos']
115 if vcs_server_enabled and import_on_startup:
117 if vcs_server_enabled and import_on_startup:
116 from rhodecode.model.scm import ScmModel
118 from rhodecode.model.scm import ScmModel
117 from rhodecode.lib.utils import repo2db_mapper, get_rhodecode_base_path
119 from rhodecode.lib.utils import repo2db_mapper, get_rhodecode_base_path
118 repositories = ScmModel().repo_scan(get_rhodecode_base_path())
120 repositories = ScmModel().repo_scan(get_rhodecode_base_path())
119 repo2db_mapper(repositories, remove_obsolete=False)
121 repo2db_mapper(repositories, remove_obsolete=False)
120
122
121
123
122 def write_metadata_if_needed(event):
124 def write_metadata_if_needed(event):
123 """
125 """
124 Writes upgrade metadata
126 Writes upgrade metadata
125 """
127 """
126 import rhodecode
128 import rhodecode
127 from rhodecode.lib import system_info
129 from rhodecode.lib import system_info
128 from rhodecode.lib import ext_json
130 from rhodecode.lib import ext_json
129
131
132 fname = '.rcmetadata.json'
133 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
134 metadata_destination = os.path.join(ini_loc, fname)
135
136 def get_update_age():
137 now = datetime.datetime.utcnow()
138
139 with open(metadata_destination, 'rb') as f:
140 data = ext_json.json.loads(f.read())
141 if 'created_on' in data:
142 update_date = parse(data['created_on'])
143 diff = now - update_date
144 return diff.total_seconds() / 60.0
145
146 return 0
147
130 def write():
148 def write():
131 fname = '.rcmetadata.json'
132 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
133 metadata_destination = os.path.join(ini_loc, fname)
134
135 configuration = system_info.SysInfo(
149 configuration = system_info.SysInfo(
136 system_info.rhodecode_config)()['value']
150 system_info.rhodecode_config)()['value']
137 license_token = configuration['config']['license_token']
151 license_token = configuration['config']['license_token']
152
153 setup = dict(
154 workers=configuration['config']['server:main'].get(
155 'workers', '?'),
156 worker_type=configuration['config']['server:main'].get(
157 'worker_class', 'sync'),
158 )
138 dbinfo = system_info.SysInfo(system_info.database_info)()['value']
159 dbinfo = system_info.SysInfo(system_info.database_info)()['value']
139 del dbinfo['url']
160 del dbinfo['url']
161
140 metadata = dict(
162 metadata = dict(
141 desc='upgrade metadata info',
163 desc='upgrade metadata info',
142 license_token=license_token,
164 license_token=license_token,
143 created_on=datetime.datetime.utcnow().isoformat(),
165 created_on=datetime.datetime.utcnow().isoformat(),
144 usage=system_info.SysInfo(system_info.usage_info)()['value'],
166 usage=system_info.SysInfo(system_info.usage_info)()['value'],
145 platform=system_info.SysInfo(system_info.platform_type)()['value'],
167 platform=system_info.SysInfo(system_info.platform_type)()['value'],
146 database=dbinfo,
168 database=dbinfo,
147 cpu=system_info.SysInfo(system_info.cpu)()['value'],
169 cpu=system_info.SysInfo(system_info.cpu)()['value'],
148 memory=system_info.SysInfo(system_info.memory)()['value'],
170 memory=system_info.SysInfo(system_info.memory)()['value'],
171 setup=setup
149 )
172 )
150
173
151 with open(metadata_destination, 'wb') as f:
174 with open(metadata_destination, 'wb') as f:
152 f.write(ext_json.json.dumps(metadata))
175 f.write(ext_json.json.dumps(metadata))
153
176
154 settings = event.app.registry.settings
177 settings = event.app.registry.settings
155 if settings.get('metadata.skip'):
178 if settings.get('metadata.skip'):
156 return
179 return
157
180
181 # only write this every 24h, workers restart caused unwanted delays
182 try:
183 age_in_min = get_update_age()
184 except Exception:
185 age_in_min = 0
186
187 if age_in_min < 60 * 60 * 24:
188 return
189
158 try:
190 try:
159 write()
191 write()
160 except Exception:
192 except Exception:
161 pass
193 pass
162
194
163
195
164 def write_js_routes_if_enabled(event):
196 def write_js_routes_if_enabled(event):
165 registry = event.app.registry
197 registry = event.app.registry
166
198
167 mapper = registry.queryUtility(IRoutesMapper)
199 mapper = registry.queryUtility(IRoutesMapper)
168 _argument_prog = re.compile('\{(.*?)\}|:\((.*)\)')
200 _argument_prog = re.compile('\{(.*?)\}|:\((.*)\)')
169
201
170 def _extract_route_information(route):
202 def _extract_route_information(route):
171 """
203 """
172 Convert a route into tuple(name, path, args), eg:
204 Convert a route into tuple(name, path, args), eg:
173 ('show_user', '/profile/%(username)s', ['username'])
205 ('show_user', '/profile/%(username)s', ['username'])
174 """
206 """
175
207
176 routepath = route.pattern
208 routepath = route.pattern
177 pattern = route.pattern
209 pattern = route.pattern
178
210
179 def replace(matchobj):
211 def replace(matchobj):
180 if matchobj.group(1):
212 if matchobj.group(1):
181 return "%%(%s)s" % matchobj.group(1).split(':')[0]
213 return "%%(%s)s" % matchobj.group(1).split(':')[0]
182 else:
214 else:
183 return "%%(%s)s" % matchobj.group(2)
215 return "%%(%s)s" % matchobj.group(2)
184
216
185 routepath = _argument_prog.sub(replace, routepath)
217 routepath = _argument_prog.sub(replace, routepath)
186
218
187 if not routepath.startswith('/'):
219 if not routepath.startswith('/'):
188 routepath = '/'+routepath
220 routepath = '/'+routepath
189
221
190 return (
222 return (
191 route.name,
223 route.name,
192 routepath,
224 routepath,
193 [(arg[0].split(':')[0] if arg[0] != '' else arg[1])
225 [(arg[0].split(':')[0] if arg[0] != '' else arg[1])
194 for arg in _argument_prog.findall(pattern)]
226 for arg in _argument_prog.findall(pattern)]
195 )
227 )
196
228
197 def get_routes():
229 def get_routes():
198 # pyramid routes
230 # pyramid routes
199 for route in mapper.get_routes():
231 for route in mapper.get_routes():
200 if not route.name.startswith('__'):
232 if not route.name.startswith('__'):
201 yield _extract_route_information(route)
233 yield _extract_route_information(route)
202
234
203 if asbool(registry.settings.get('generate_js_files', 'false')):
235 if asbool(registry.settings.get('generate_js_files', 'false')):
204 static_path = AssetResolver().resolve('rhodecode:public').abspath()
236 static_path = AssetResolver().resolve('rhodecode:public').abspath()
205 jsroutes = get_routes()
237 jsroutes = get_routes()
206 jsroutes_file_content = generate_jsroutes_content(jsroutes)
238 jsroutes_file_content = generate_jsroutes_content(jsroutes)
207 jsroutes_file_path = os.path.join(
239 jsroutes_file_path = os.path.join(
208 static_path, 'js', 'rhodecode', 'routes.js')
240 static_path, 'js', 'rhodecode', 'routes.js')
209
241
210 with io.open(jsroutes_file_path, 'w', encoding='utf-8') as f:
242 with io.open(jsroutes_file_path, 'w', encoding='utf-8') as f:
211 f.write(jsroutes_file_content)
243 f.write(jsroutes_file_content)
212
244
213
245
214 class Subscriber(object):
246 class Subscriber(object):
215 """
247 """
216 Base class for subscribers to the pyramid event system.
248 Base class for subscribers to the pyramid event system.
217 """
249 """
218 def __call__(self, event):
250 def __call__(self, event):
219 self.run(event)
251 self.run(event)
220
252
221 def run(self, event):
253 def run(self, event):
222 raise NotImplementedError('Subclass has to implement this.')
254 raise NotImplementedError('Subclass has to implement this.')
223
255
224
256
225 class AsyncSubscriber(Subscriber):
257 class AsyncSubscriber(Subscriber):
226 """
258 """
227 Subscriber that handles the execution of events in a separate task to not
259 Subscriber that handles the execution of events in a separate task to not
228 block the execution of the code which triggers the event. It puts the
260 block the execution of the code which triggers the event. It puts the
229 received events into a queue from which the worker process takes them in
261 received events into a queue from which the worker process takes them in
230 order.
262 order.
231 """
263 """
232 def __init__(self):
264 def __init__(self):
233 self._stop = False
265 self._stop = False
234 self._eventq = Queue.Queue()
266 self._eventq = Queue.Queue()
235 self._worker = self.create_worker()
267 self._worker = self.create_worker()
236 self._worker.start()
268 self._worker.start()
237
269
238 def __call__(self, event):
270 def __call__(self, event):
239 self._eventq.put(event)
271 self._eventq.put(event)
240
272
241 def create_worker(self):
273 def create_worker(self):
242 worker = Thread(target=self.do_work)
274 worker = Thread(target=self.do_work)
243 worker.daemon = True
275 worker.daemon = True
244 return worker
276 return worker
245
277
246 def stop_worker(self):
278 def stop_worker(self):
247 self._stop = False
279 self._stop = False
248 self._eventq.put(None)
280 self._eventq.put(None)
249 self._worker.join()
281 self._worker.join()
250
282
251 def do_work(self):
283 def do_work(self):
252 while not self._stop:
284 while not self._stop:
253 event = self._eventq.get()
285 event = self._eventq.get()
254 if event is not None:
286 if event is not None:
255 self.run(event)
287 self.run(event)
256
288
257
289
258 class AsyncSubprocessSubscriber(AsyncSubscriber):
290 class AsyncSubprocessSubscriber(AsyncSubscriber):
259 """
291 """
260 Subscriber that uses the subprocess32 module to execute a command if an
292 Subscriber that uses the subprocess32 module to execute a command if an
261 event is received. Events are handled asynchronously.
293 event is received. Events are handled asynchronously.
262 """
294 """
263
295
264 def __init__(self, cmd, timeout=None):
296 def __init__(self, cmd, timeout=None):
265 super(AsyncSubprocessSubscriber, self).__init__()
297 super(AsyncSubprocessSubscriber, self).__init__()
266 self._cmd = cmd
298 self._cmd = cmd
267 self._timeout = timeout
299 self._timeout = timeout
268
300
269 def run(self, event):
301 def run(self, event):
270 cmd = self._cmd
302 cmd = self._cmd
271 timeout = self._timeout
303 timeout = self._timeout
272 log.debug('Executing command %s.', cmd)
304 log.debug('Executing command %s.', cmd)
273
305
274 try:
306 try:
275 output = subprocess32.check_output(
307 output = subprocess32.check_output(
276 cmd, timeout=timeout, stderr=subprocess32.STDOUT)
308 cmd, timeout=timeout, stderr=subprocess32.STDOUT)
277 log.debug('Command finished %s', cmd)
309 log.debug('Command finished %s', cmd)
278 if output:
310 if output:
279 log.debug('Command output: %s', output)
311 log.debug('Command output: %s', output)
280 except subprocess32.TimeoutExpired as e:
312 except subprocess32.TimeoutExpired as e:
281 log.exception('Timeout while executing command.')
313 log.exception('Timeout while executing command.')
282 if e.output:
314 if e.output:
283 log.error('Command output: %s', e.output)
315 log.error('Command output: %s', e.output)
284 except subprocess32.CalledProcessError as e:
316 except subprocess32.CalledProcessError as e:
285 log.exception('Error while executing command.')
317 log.exception('Error while executing command.')
286 if e.output:
318 if e.output:
287 log.error('Command output: %s', e.output)
319 log.error('Command output: %s', e.output)
288 except:
320 except:
289 log.exception(
321 log.exception(
290 'Exception while executing command %s.', cmd)
322 'Exception while executing command %s.', cmd)
General Comments 0
You need to be logged in to leave comments. Login now