##// END OF EJS Templates
added delay before doing an auto merge
ilin.s -
r5660:cba707aa default
parent child Browse files
Show More
@@ -1,450 +1,450
1 1 # Copyright (C) 2010-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18 import io
19 19 import shlex
20 20
21 21 import math
22 22 import re
23 23 import os
24 24 import datetime
25 25 import logging
26 26 import queue
27 27 import subprocess
28 28
29 29
30 30 from dateutil.parser import parse
31 31 from pyramid.interfaces import IRoutesMapper
32 32 from pyramid.settings import asbool
33 33 from pyramid.path import AssetResolver
34 34 from threading import Thread
35 35
36 36 from rhodecode.config.jsroutes import generate_jsroutes_content
37 37 from rhodecode.lib.base import get_auth_user
38 38 from rhodecode.lib.celerylib.loader import set_celery_conf
39 39
40 40 import rhodecode
41 41
42 42
43 43 log = logging.getLogger(__name__)
44 44
45 45
46 46 def add_renderer_globals(event):
47 47 from rhodecode.lib import helpers
48 48
49 49 # TODO: When executed in pyramid view context the request is not available
50 50 # in the event. Find a better solution to get the request.
51 51 from pyramid.threadlocal import get_current_request
52 52 request = event['request'] or get_current_request()
53 53
54 54 # Add Pyramid translation as '_' to context
55 55 event['_'] = request.translate
56 56 event['_ungettext'] = request.plularize
57 57 event['h'] = helpers
58 58
59 59
60 60 def auto_merge_pr_if_needed(event):
61 61 from rhodecode.model.db import PullRequest
62 62 from rhodecode.model.pull_request import (
63 63 PullRequestModel, ChangesetStatus, MergeCheck
64 64 )
65 65
66 66 pr_event_data = event.as_dict()['pullrequest']
67 67 pull_request = PullRequest.get(pr_event_data['pull_request_id'])
68 68 calculated_status = pr_event_data['status']
69 69 if (calculated_status == ChangesetStatus.STATUS_APPROVED
70 70 and PullRequestModel().is_automatic_merge_enabled(pull_request)):
71 71 user = pull_request.author.AuthUser()
72 72
73 73 merge_check = MergeCheck.validate(
74 74 pull_request, user, translator=lambda x: x, fail_early=True
75 75 )
76 76 if merge_check.merge_possible:
77 77 from rhodecode.lib.base import vcs_operation_context
78 78 extras = vcs_operation_context(
79 79 event.request.environ, repo_name=pull_request.target_repo.repo_name,
80 80 username=user.username, action='push',
81 81 scm=pull_request.target_repo.repo_type)
82 82 from rc_ee.lib.celerylib.tasks import auto_merge_repo
83 83 auto_merge_repo.apply_async(
84 args=(pull_request.pull_request_id, extras)
84 args=(pull_request.pull_request_id, extras), countdown=3
85 85 )
86 86
87 87
88 88 def set_user_lang(event):
89 89 request = event.request
90 90 cur_user = getattr(request, 'user', None)
91 91
92 92 if cur_user:
93 93 user_lang = cur_user.get_instance().user_data.get('language')
94 94 if user_lang:
95 95 log.debug('lang: setting current user:%s language to: %s', cur_user, user_lang)
96 96 event.request._LOCALE_ = user_lang
97 97
98 98
99 99 def update_celery_conf(event):
100 100 log.debug('Setting celery config from new request')
101 101 set_celery_conf(request=event.request, registry=event.request.registry)
102 102
103 103
104 104 def add_request_user_context(event):
105 105 """
106 106 Adds auth user into request context
107 107 """
108 108
109 109 request = event.request
110 110 # access req_id as soon as possible
111 111 req_id = request.req_id
112 112
113 113 if hasattr(request, 'vcs_call'):
114 114 # skip vcs calls
115 115 return
116 116
117 117 if hasattr(request, 'rpc_method'):
118 118 # skip api calls
119 119 return
120 120
121 121 auth_user, auth_token = get_auth_user(request)
122 122 request.user = auth_user
123 123 request.user_auth_token = auth_token
124 124 request.environ['rc_auth_user'] = auth_user
125 125 request.environ['rc_auth_user_id'] = str(auth_user.user_id)
126 126 request.environ['rc_req_id'] = req_id
127 127
128 128
129 129 def reset_log_bucket(event):
130 130 """
131 131 reset the log bucket on new request
132 132 """
133 133 request = event.request
134 134 request.req_id_records_init()
135 135
136 136
137 137 def scan_repositories_if_enabled(event):
138 138 """
139 139 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
140 140 does a repository scan if enabled in the settings.
141 141 """
142 142
143 143 settings = event.app.registry.settings
144 144 vcs_server_enabled = settings['vcs.server.enable']
145 145 import_on_startup = settings['startup.import_repos']
146 146
147 147 if vcs_server_enabled and import_on_startup:
148 148 from rhodecode.model.scm import ScmModel
149 149 from rhodecode.lib.utils import repo2db_mapper
150 150 scm = ScmModel()
151 151 repositories = scm.repo_scan(scm.repos_path)
152 152 repo2db_mapper(repositories)
153 153
154 154
155 155 def write_metadata_if_needed(event):
156 156 """
157 157 Writes upgrade metadata
158 158 """
159 159 import rhodecode
160 160 from rhodecode.lib import system_info
161 161 from rhodecode.lib import ext_json
162 162
163 163 fname = '.rcmetadata.json'
164 164 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
165 165 metadata_destination = os.path.join(ini_loc, fname)
166 166
167 167 def get_update_age():
168 168 now = datetime.datetime.utcnow()
169 169
170 170 with open(metadata_destination, 'rb') as f:
171 171 data = ext_json.json.loads(f.read())
172 172 if 'created_on' in data:
173 173 update_date = parse(data['created_on'])
174 174 diff = now - update_date
175 175 return diff.total_seconds() / 60.0
176 176
177 177 return 0
178 178
179 179 def write():
180 180 configuration = system_info.SysInfo(
181 181 system_info.rhodecode_config)()['value']
182 182 license_token = configuration['config']['license_token']
183 183
184 184 setup = dict(
185 185 workers=configuration['config']['server:main'].get(
186 186 'workers', '?'),
187 187 worker_type=configuration['config']['server:main'].get(
188 188 'worker_class', 'sync'),
189 189 )
190 190 dbinfo = system_info.SysInfo(system_info.database_info)()['value']
191 191 del dbinfo['url']
192 192
193 193 metadata = dict(
194 194 desc='upgrade metadata info',
195 195 license_token=license_token,
196 196 created_on=datetime.datetime.utcnow().isoformat(),
197 197 usage=system_info.SysInfo(system_info.usage_info)()['value'],
198 198 platform=system_info.SysInfo(system_info.platform_type)()['value'],
199 199 database=dbinfo,
200 200 cpu=system_info.SysInfo(system_info.cpu)()['value'],
201 201 memory=system_info.SysInfo(system_info.memory)()['value'],
202 202 setup=setup
203 203 )
204 204
205 205 with open(metadata_destination, 'wb') as f:
206 206 f.write(ext_json.json.dumps(metadata))
207 207
208 208 settings = event.app.registry.settings
209 209 if settings.get('metadata.skip'):
210 210 return
211 211
212 212 # only write this every 24h, workers restart caused unwanted delays
213 213 try:
214 214 age_in_min = get_update_age()
215 215 except Exception:
216 216 age_in_min = 0
217 217
218 218 if age_in_min > 60 * 60 * 24:
219 219 return
220 220
221 221 try:
222 222 write()
223 223 except Exception:
224 224 pass
225 225
226 226
227 227 def write_usage_data(event):
228 228 import rhodecode
229 229 from rhodecode.lib import system_info
230 230 from rhodecode.lib import ext_json
231 231
232 232 settings = event.app.registry.settings
233 233 instance_tag = settings.get('metadata.write_usage_tag')
234 234 if not settings.get('metadata.write_usage'):
235 235 return
236 236
237 237 def get_update_age(dest_file):
238 238 now = datetime.datetime.now(datetime.UTC)
239 239
240 240 with open(dest_file, 'rb') as f:
241 241 data = ext_json.json.loads(f.read())
242 242 if 'created_on' in data:
243 243 update_date = parse(data['created_on'])
244 244 diff = now - update_date
245 245 return math.ceil(diff.total_seconds() / 60.0)
246 246
247 247 return 0
248 248
249 249 utc_date = datetime.datetime.now(datetime.UTC)
250 250 hour_quarter = int(math.ceil((utc_date.hour + utc_date.minute/60.0) / 6.))
251 251 fname = f'.rc_usage_{utc_date.year}{utc_date.month:02d}{utc_date.day:02d}_{hour_quarter}.json'
252 252 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
253 253
254 254 usage_dir = os.path.join(ini_loc, '.rcusage')
255 255 if not os.path.isdir(usage_dir):
256 256 os.makedirs(usage_dir)
257 257 usage_metadata_destination = os.path.join(usage_dir, fname)
258 258
259 259 try:
260 260 age_in_min = get_update_age(usage_metadata_destination)
261 261 except Exception:
262 262 age_in_min = 0
263 263
264 264 # write every 6th hour
265 265 if age_in_min and age_in_min < 60 * 6:
266 266 log.debug('Usage file created %s minutes ago, skipping (threshold: %s minutes)...',
267 267 age_in_min, 60 * 6)
268 268 return
269 269
270 270 def write(dest_file):
271 271 configuration = system_info.SysInfo(system_info.rhodecode_config)()['value']
272 272 license_token = configuration['config']['license_token']
273 273
274 274 metadata = dict(
275 275 desc='Usage data',
276 276 instance_tag=instance_tag,
277 277 license_token=license_token,
278 278 created_on=datetime.datetime.utcnow().isoformat(),
279 279 usage=system_info.SysInfo(system_info.usage_info)()['value'],
280 280 )
281 281
282 282 with open(dest_file, 'wb') as f:
283 283 f.write(ext_json.formatted_json(metadata))
284 284
285 285 try:
286 286 log.debug('Writing usage file at: %s', usage_metadata_destination)
287 287 write(usage_metadata_destination)
288 288 except Exception:
289 289 pass
290 290
291 291
292 292 def write_js_routes_if_enabled(event):
293 293 registry = event.app.registry
294 294
295 295 mapper = registry.queryUtility(IRoutesMapper)
296 296 _argument_prog = re.compile(r'\{(.*?)\}|:\((.*)\)')
297 297
298 298 def _extract_route_information(route):
299 299 """
300 300 Convert a route into tuple(name, path, args), eg:
301 301 ('show_user', '/profile/%(username)s', ['username'])
302 302 """
303 303
304 304 route_path = route.pattern
305 305 pattern = route.pattern
306 306
307 307 def replace(matchobj):
308 308 if matchobj.group(1):
309 309 return "%%(%s)s" % matchobj.group(1).split(':')[0]
310 310 else:
311 311 return "%%(%s)s" % matchobj.group(2)
312 312
313 313 route_path = _argument_prog.sub(replace, route_path)
314 314
315 315 if not route_path.startswith('/'):
316 316 route_path = f'/{route_path}'
317 317
318 318 return (
319 319 route.name,
320 320 route_path,
321 321 [(arg[0].split(':')[0] if arg[0] != '' else arg[1])
322 322 for arg in _argument_prog.findall(pattern)]
323 323 )
324 324
325 325 def get_routes():
326 326 # pyramid routes
327 327 for route in mapper.get_routes():
328 328 if not route.name.startswith('__'):
329 329 yield _extract_route_information(route)
330 330
331 331 if asbool(registry.settings.get('generate_js_files', 'false')):
332 332 static_path = AssetResolver().resolve('rhodecode:public').abspath()
333 333 jsroutes = get_routes()
334 334 jsroutes_file_content = generate_jsroutes_content(jsroutes)
335 335 jsroutes_file_path = os.path.join(
336 336 static_path, 'js', 'rhodecode', 'routes.js')
337 337
338 338 try:
339 339 with open(jsroutes_file_path, 'w', encoding='utf-8') as f:
340 340 f.write(jsroutes_file_content)
341 341 log.debug('generated JS files in %s', jsroutes_file_path)
342 342 except Exception:
343 343 log.exception('Failed to write routes.js into %s', jsroutes_file_path)
344 344
345 345
346 346 def import_license_if_present(event):
347 347 """
348 348 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
349 349 does a import license key based on a presence of the file.
350 350 """
351 351 settings = event.app.registry.settings
352 352
353 353 rhodecode_edition_id = settings.get('rhodecode.edition_id')
354 354 license_file_path = settings.get('license.import_path')
355 355 force = settings.get('license.import_path_mode') == 'force'
356 356
357 357 if license_file_path and rhodecode_edition_id == 'EE':
358 358 log.debug('license.import_path= is set importing license from %s', license_file_path)
359 359 from rhodecode.model.meta import Session
360 360 from rhodecode.model.license import apply_license_from_file
361 361 try:
362 362 apply_license_from_file(license_file_path, force=force)
363 363 Session().commit()
364 364 except OSError:
365 365 log.exception('Failed to import license from %s, make sure this file exists', license_file_path)
366 366
367 367
368 368 class Subscriber(object):
369 369 """
370 370 Base class for subscribers to the pyramid event system.
371 371 """
372 372 def __call__(self, event):
373 373 self.run(event)
374 374
375 375 def run(self, event):
376 376 raise NotImplementedError('Subclass has to implement this.')
377 377
378 378
379 379 class AsyncSubscriber(Subscriber):
380 380 """
381 381 Subscriber that handles the execution of events in a separate task to not
382 382 block the execution of the code which triggers the event. It puts the
383 383 received events into a queue from which the worker process takes them in
384 384 order.
385 385 """
386 386 def __init__(self):
387 387 self._stop = False
388 388 self._eventq = queue.Queue()
389 389 self._worker = self.create_worker()
390 390 self._worker.start()
391 391
392 392 def __call__(self, event):
393 393 self._eventq.put(event)
394 394
395 395 def create_worker(self):
396 396 worker = Thread(target=self.do_work)
397 397 worker.daemon = True
398 398 return worker
399 399
400 400 def stop_worker(self):
401 401 self._stop = False
402 402 self._eventq.put(None)
403 403 self._worker.join()
404 404
405 405 def do_work(self):
406 406 while not self._stop:
407 407 event = self._eventq.get()
408 408 if event is not None:
409 409 self.run(event)
410 410
411 411
412 412 class AsyncSubprocessSubscriber(AsyncSubscriber):
413 413 """
414 414 Subscriber that uses the subprocess module to execute a command if an
415 415 event is received. Events are handled asynchronously::
416 416
417 417 subscriber = AsyncSubprocessSubscriber('ls -la', timeout=10)
418 418 subscriber(dummyEvent) # running __call__(event)
419 419
420 420 """
421 421
422 422 def __init__(self, cmd, timeout=None):
423 423 if not isinstance(cmd, (list, tuple)):
424 424 cmd = shlex.split(cmd)
425 425 super().__init__()
426 426 self._cmd = cmd
427 427 self._timeout = timeout
428 428
429 429 def run(self, event):
430 430 cmd = self._cmd
431 431 timeout = self._timeout
432 432 log.debug('Executing command %s.', cmd)
433 433
434 434 try:
435 435 output = subprocess.check_output(
436 436 cmd, timeout=timeout, stderr=subprocess.STDOUT)
437 437 log.debug('Command finished %s', cmd)
438 438 if output:
439 439 log.debug('Command output: %s', output)
440 440 except subprocess.TimeoutExpired as e:
441 441 log.exception('Timeout while executing command.')
442 442 if e.output:
443 443 log.error('Command output: %s', e.output)
444 444 except subprocess.CalledProcessError as e:
445 445 log.exception('Error while executing command.')
446 446 if e.output:
447 447 log.error('Command output: %s', e.output)
448 448 except Exception:
449 449 log.exception(
450 450 'Exception while executing command %s.', cmd)
General Comments 0
You need to be logged in to leave comments. Login now