##// END OF EJS Templates
chore: disabled auto-merge feature until next bugfix release
andverb -
r5663:5aecabed default
parent child Browse files
Show More
@@ -1,450 +1,452
1 # Copyright (C) 2010-2024 RhodeCode GmbH
1 # Copyright (C) 2010-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 import io
18 import io
19 import shlex
19 import shlex
20
20
21 import math
21 import math
22 import re
22 import re
23 import os
23 import os
24 import datetime
24 import datetime
25 import logging
25 import logging
26 import queue
26 import queue
27 import subprocess
27 import subprocess
28
28
29
29
30 from dateutil.parser import parse
30 from dateutil.parser import parse
31 from pyramid.interfaces import IRoutesMapper
31 from pyramid.interfaces import IRoutesMapper
32 from pyramid.settings import asbool
32 from pyramid.settings import asbool
33 from pyramid.path import AssetResolver
33 from pyramid.path import AssetResolver
34 from threading import Thread
34 from threading import Thread
35
35
36 from rhodecode.config.jsroutes import generate_jsroutes_content
36 from rhodecode.config.jsroutes import generate_jsroutes_content
37 from rhodecode.lib.base import get_auth_user
37 from rhodecode.lib.base import get_auth_user
38 from rhodecode.lib.celerylib.loader import set_celery_conf
38 from rhodecode.lib.celerylib.loader import set_celery_conf
39
39
40 import rhodecode
40 import rhodecode
41
41
42
42
43 log = logging.getLogger(__name__)
43 log = logging.getLogger(__name__)
44
44
45
45
46 def add_renderer_globals(event):
46 def add_renderer_globals(event):
47 from rhodecode.lib import helpers
47 from rhodecode.lib import helpers
48
48
49 # TODO: When executed in pyramid view context the request is not available
49 # TODO: When executed in pyramid view context the request is not available
50 # in the event. Find a better solution to get the request.
50 # in the event. Find a better solution to get the request.
51 from pyramid.threadlocal import get_current_request
51 from pyramid.threadlocal import get_current_request
52 request = event['request'] or get_current_request()
52 request = event['request'] or get_current_request()
53
53
54 # Add Pyramid translation as '_' to context
54 # Add Pyramid translation as '_' to context
55 event['_'] = request.translate
55 event['_'] = request.translate
56 event['_ungettext'] = request.plularize
56 event['_ungettext'] = request.plularize
57 event['h'] = helpers
57 event['h'] = helpers
58
58
59
59
60 def auto_merge_pr_if_needed(event):
60 def auto_merge_pr_if_needed(event):
61 from rhodecode.model.db import PullRequest
61 #TODO To be re-enabled later
62 from rhodecode.model.pull_request import (
62 pass
63 PullRequestModel, ChangesetStatus, MergeCheck
63 # from rhodecode.model.db import PullRequest
64 )
64 # from rhodecode.model.pull_request import (
65
65 # PullRequestModel, ChangesetStatus, MergeCheck
66 pr_event_data = event.as_dict()['pullrequest']
66 # )
67 pull_request = PullRequest.get(pr_event_data['pull_request_id'])
67 #
68 calculated_status = pr_event_data['status']
68 # pr_event_data = event.as_dict()['pullrequest']
69 if (calculated_status == ChangesetStatus.STATUS_APPROVED
69 # pull_request = PullRequest.get(pr_event_data['pull_request_id'])
70 and PullRequestModel().is_automatic_merge_enabled(pull_request)):
70 # calculated_status = pr_event_data['status']
71 user = pull_request.author.AuthUser()
71 # if (calculated_status == ChangesetStatus.STATUS_APPROVED
72
72 # and PullRequestModel().is_automatic_merge_enabled(pull_request)):
73 merge_check = MergeCheck.validate(
73 # user = pull_request.author.AuthUser()
74 pull_request, user, translator=lambda x: x, fail_early=True
74 #
75 )
75 # merge_check = MergeCheck.validate(
76 if merge_check.merge_possible:
76 # pull_request, user, translator=lambda x: x, fail_early=True
77 from rhodecode.lib.base import vcs_operation_context
77 # )
78 extras = vcs_operation_context(
78 # if merge_check.merge_possible:
79 event.request.environ, repo_name=pull_request.target_repo.repo_name,
79 # from rhodecode.lib.base import vcs_operation_context
80 username=user.username, action='push',
80 # extras = vcs_operation_context(
81 scm=pull_request.target_repo.repo_type)
81 # event.request.environ, repo_name=pull_request.target_repo.repo_name,
82 from rc_ee.lib.celerylib.tasks import auto_merge_repo
82 # username=user.username, action='push',
83 auto_merge_repo.apply_async(
83 # scm=pull_request.target_repo.repo_type)
84 args=(pull_request.pull_request_id, extras), countdown=3
84 # from rc_ee.lib.celerylib.tasks import auto_merge_repo
85 )
85 # auto_merge_repo.apply_async(
86 # args=(pull_request.pull_request_id, extras), countdown=3
87 # )
86
88
87
89
88 def set_user_lang(event):
90 def set_user_lang(event):
89 request = event.request
91 request = event.request
90 cur_user = getattr(request, 'user', None)
92 cur_user = getattr(request, 'user', None)
91
93
92 if cur_user:
94 if cur_user:
93 user_lang = cur_user.get_instance().user_data.get('language')
95 user_lang = cur_user.get_instance().user_data.get('language')
94 if user_lang:
96 if user_lang:
95 log.debug('lang: setting current user:%s language to: %s', cur_user, user_lang)
97 log.debug('lang: setting current user:%s language to: %s', cur_user, user_lang)
96 event.request._LOCALE_ = user_lang
98 event.request._LOCALE_ = user_lang
97
99
98
100
99 def update_celery_conf(event):
101 def update_celery_conf(event):
100 log.debug('Setting celery config from new request')
102 log.debug('Setting celery config from new request')
101 set_celery_conf(request=event.request, registry=event.request.registry)
103 set_celery_conf(request=event.request, registry=event.request.registry)
102
104
103
105
104 def add_request_user_context(event):
106 def add_request_user_context(event):
105 """
107 """
106 Adds auth user into request context
108 Adds auth user into request context
107 """
109 """
108
110
109 request = event.request
111 request = event.request
110 # access req_id as soon as possible
112 # access req_id as soon as possible
111 req_id = request.req_id
113 req_id = request.req_id
112
114
113 if hasattr(request, 'vcs_call'):
115 if hasattr(request, 'vcs_call'):
114 # skip vcs calls
116 # skip vcs calls
115 return
117 return
116
118
117 if hasattr(request, 'rpc_method'):
119 if hasattr(request, 'rpc_method'):
118 # skip api calls
120 # skip api calls
119 return
121 return
120
122
121 auth_user, auth_token = get_auth_user(request)
123 auth_user, auth_token = get_auth_user(request)
122 request.user = auth_user
124 request.user = auth_user
123 request.user_auth_token = auth_token
125 request.user_auth_token = auth_token
124 request.environ['rc_auth_user'] = auth_user
126 request.environ['rc_auth_user'] = auth_user
125 request.environ['rc_auth_user_id'] = str(auth_user.user_id)
127 request.environ['rc_auth_user_id'] = str(auth_user.user_id)
126 request.environ['rc_req_id'] = req_id
128 request.environ['rc_req_id'] = req_id
127
129
128
130
129 def reset_log_bucket(event):
131 def reset_log_bucket(event):
130 """
132 """
131 reset the log bucket on new request
133 reset the log bucket on new request
132 """
134 """
133 request = event.request
135 request = event.request
134 request.req_id_records_init()
136 request.req_id_records_init()
135
137
136
138
137 def scan_repositories_if_enabled(event):
139 def scan_repositories_if_enabled(event):
138 """
140 """
139 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
141 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
140 does a repository scan if enabled in the settings.
142 does a repository scan if enabled in the settings.
141 """
143 """
142
144
143 settings = event.app.registry.settings
145 settings = event.app.registry.settings
144 vcs_server_enabled = settings['vcs.server.enable']
146 vcs_server_enabled = settings['vcs.server.enable']
145 import_on_startup = settings['startup.import_repos']
147 import_on_startup = settings['startup.import_repos']
146
148
147 if vcs_server_enabled and import_on_startup:
149 if vcs_server_enabled and import_on_startup:
148 from rhodecode.model.scm import ScmModel
150 from rhodecode.model.scm import ScmModel
149 from rhodecode.lib.utils import repo2db_mapper
151 from rhodecode.lib.utils import repo2db_mapper
150 scm = ScmModel()
152 scm = ScmModel()
151 repositories = scm.repo_scan(scm.repos_path)
153 repositories = scm.repo_scan(scm.repos_path)
152 repo2db_mapper(repositories)
154 repo2db_mapper(repositories)
153
155
154
156
155 def write_metadata_if_needed(event):
157 def write_metadata_if_needed(event):
156 """
158 """
157 Writes upgrade metadata
159 Writes upgrade metadata
158 """
160 """
159 import rhodecode
161 import rhodecode
160 from rhodecode.lib import system_info
162 from rhodecode.lib import system_info
161 from rhodecode.lib import ext_json
163 from rhodecode.lib import ext_json
162
164
163 fname = '.rcmetadata.json'
165 fname = '.rcmetadata.json'
164 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
166 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
165 metadata_destination = os.path.join(ini_loc, fname)
167 metadata_destination = os.path.join(ini_loc, fname)
166
168
167 def get_update_age():
169 def get_update_age():
168 now = datetime.datetime.utcnow()
170 now = datetime.datetime.utcnow()
169
171
170 with open(metadata_destination, 'rb') as f:
172 with open(metadata_destination, 'rb') as f:
171 data = ext_json.json.loads(f.read())
173 data = ext_json.json.loads(f.read())
172 if 'created_on' in data:
174 if 'created_on' in data:
173 update_date = parse(data['created_on'])
175 update_date = parse(data['created_on'])
174 diff = now - update_date
176 diff = now - update_date
175 return diff.total_seconds() / 60.0
177 return diff.total_seconds() / 60.0
176
178
177 return 0
179 return 0
178
180
179 def write():
181 def write():
180 configuration = system_info.SysInfo(
182 configuration = system_info.SysInfo(
181 system_info.rhodecode_config)()['value']
183 system_info.rhodecode_config)()['value']
182 license_token = configuration['config']['license_token']
184 license_token = configuration['config']['license_token']
183
185
184 setup = dict(
186 setup = dict(
185 workers=configuration['config']['server:main'].get(
187 workers=configuration['config']['server:main'].get(
186 'workers', '?'),
188 'workers', '?'),
187 worker_type=configuration['config']['server:main'].get(
189 worker_type=configuration['config']['server:main'].get(
188 'worker_class', 'sync'),
190 'worker_class', 'sync'),
189 )
191 )
190 dbinfo = system_info.SysInfo(system_info.database_info)()['value']
192 dbinfo = system_info.SysInfo(system_info.database_info)()['value']
191 del dbinfo['url']
193 del dbinfo['url']
192
194
193 metadata = dict(
195 metadata = dict(
194 desc='upgrade metadata info',
196 desc='upgrade metadata info',
195 license_token=license_token,
197 license_token=license_token,
196 created_on=datetime.datetime.utcnow().isoformat(),
198 created_on=datetime.datetime.utcnow().isoformat(),
197 usage=system_info.SysInfo(system_info.usage_info)()['value'],
199 usage=system_info.SysInfo(system_info.usage_info)()['value'],
198 platform=system_info.SysInfo(system_info.platform_type)()['value'],
200 platform=system_info.SysInfo(system_info.platform_type)()['value'],
199 database=dbinfo,
201 database=dbinfo,
200 cpu=system_info.SysInfo(system_info.cpu)()['value'],
202 cpu=system_info.SysInfo(system_info.cpu)()['value'],
201 memory=system_info.SysInfo(system_info.memory)()['value'],
203 memory=system_info.SysInfo(system_info.memory)()['value'],
202 setup=setup
204 setup=setup
203 )
205 )
204
206
205 with open(metadata_destination, 'wb') as f:
207 with open(metadata_destination, 'wb') as f:
206 f.write(ext_json.json.dumps(metadata))
208 f.write(ext_json.json.dumps(metadata))
207
209
208 settings = event.app.registry.settings
210 settings = event.app.registry.settings
209 if settings.get('metadata.skip'):
211 if settings.get('metadata.skip'):
210 return
212 return
211
213
212 # only write this every 24h, workers restart caused unwanted delays
214 # only write this every 24h, workers restart caused unwanted delays
213 try:
215 try:
214 age_in_min = get_update_age()
216 age_in_min = get_update_age()
215 except Exception:
217 except Exception:
216 age_in_min = 0
218 age_in_min = 0
217
219
218 if age_in_min > 60 * 60 * 24:
220 if age_in_min > 60 * 60 * 24:
219 return
221 return
220
222
221 try:
223 try:
222 write()
224 write()
223 except Exception:
225 except Exception:
224 pass
226 pass
225
227
226
228
227 def write_usage_data(event):
229 def write_usage_data(event):
228 import rhodecode
230 import rhodecode
229 from rhodecode.lib import system_info
231 from rhodecode.lib import system_info
230 from rhodecode.lib import ext_json
232 from rhodecode.lib import ext_json
231
233
232 settings = event.app.registry.settings
234 settings = event.app.registry.settings
233 instance_tag = settings.get('metadata.write_usage_tag')
235 instance_tag = settings.get('metadata.write_usage_tag')
234 if not settings.get('metadata.write_usage'):
236 if not settings.get('metadata.write_usage'):
235 return
237 return
236
238
237 def get_update_age(dest_file):
239 def get_update_age(dest_file):
238 now = datetime.datetime.now(datetime.UTC)
240 now = datetime.datetime.now(datetime.UTC)
239
241
240 with open(dest_file, 'rb') as f:
242 with open(dest_file, 'rb') as f:
241 data = ext_json.json.loads(f.read())
243 data = ext_json.json.loads(f.read())
242 if 'created_on' in data:
244 if 'created_on' in data:
243 update_date = parse(data['created_on'])
245 update_date = parse(data['created_on'])
244 diff = now - update_date
246 diff = now - update_date
245 return math.ceil(diff.total_seconds() / 60.0)
247 return math.ceil(diff.total_seconds() / 60.0)
246
248
247 return 0
249 return 0
248
250
249 utc_date = datetime.datetime.now(datetime.UTC)
251 utc_date = datetime.datetime.now(datetime.UTC)
250 hour_quarter = int(math.ceil((utc_date.hour + utc_date.minute/60.0) / 6.))
252 hour_quarter = int(math.ceil((utc_date.hour + utc_date.minute/60.0) / 6.))
251 fname = f'.rc_usage_{utc_date.year}{utc_date.month:02d}{utc_date.day:02d}_{hour_quarter}.json'
253 fname = f'.rc_usage_{utc_date.year}{utc_date.month:02d}{utc_date.day:02d}_{hour_quarter}.json'
252 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
254 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
253
255
254 usage_dir = os.path.join(ini_loc, '.rcusage')
256 usage_dir = os.path.join(ini_loc, '.rcusage')
255 if not os.path.isdir(usage_dir):
257 if not os.path.isdir(usage_dir):
256 os.makedirs(usage_dir)
258 os.makedirs(usage_dir)
257 usage_metadata_destination = os.path.join(usage_dir, fname)
259 usage_metadata_destination = os.path.join(usage_dir, fname)
258
260
259 try:
261 try:
260 age_in_min = get_update_age(usage_metadata_destination)
262 age_in_min = get_update_age(usage_metadata_destination)
261 except Exception:
263 except Exception:
262 age_in_min = 0
264 age_in_min = 0
263
265
264 # write every 6th hour
266 # write every 6th hour
265 if age_in_min and age_in_min < 60 * 6:
267 if age_in_min and age_in_min < 60 * 6:
266 log.debug('Usage file created %s minutes ago, skipping (threshold: %s minutes)...',
268 log.debug('Usage file created %s minutes ago, skipping (threshold: %s minutes)...',
267 age_in_min, 60 * 6)
269 age_in_min, 60 * 6)
268 return
270 return
269
271
270 def write(dest_file):
272 def write(dest_file):
271 configuration = system_info.SysInfo(system_info.rhodecode_config)()['value']
273 configuration = system_info.SysInfo(system_info.rhodecode_config)()['value']
272 license_token = configuration['config']['license_token']
274 license_token = configuration['config']['license_token']
273
275
274 metadata = dict(
276 metadata = dict(
275 desc='Usage data',
277 desc='Usage data',
276 instance_tag=instance_tag,
278 instance_tag=instance_tag,
277 license_token=license_token,
279 license_token=license_token,
278 created_on=datetime.datetime.utcnow().isoformat(),
280 created_on=datetime.datetime.utcnow().isoformat(),
279 usage=system_info.SysInfo(system_info.usage_info)()['value'],
281 usage=system_info.SysInfo(system_info.usage_info)()['value'],
280 )
282 )
281
283
282 with open(dest_file, 'wb') as f:
284 with open(dest_file, 'wb') as f:
283 f.write(ext_json.formatted_json(metadata))
285 f.write(ext_json.formatted_json(metadata))
284
286
285 try:
287 try:
286 log.debug('Writing usage file at: %s', usage_metadata_destination)
288 log.debug('Writing usage file at: %s', usage_metadata_destination)
287 write(usage_metadata_destination)
289 write(usage_metadata_destination)
288 except Exception:
290 except Exception:
289 pass
291 pass
290
292
291
293
292 def write_js_routes_if_enabled(event):
294 def write_js_routes_if_enabled(event):
293 registry = event.app.registry
295 registry = event.app.registry
294
296
295 mapper = registry.queryUtility(IRoutesMapper)
297 mapper = registry.queryUtility(IRoutesMapper)
296 _argument_prog = re.compile(r'\{(.*?)\}|:\((.*)\)')
298 _argument_prog = re.compile(r'\{(.*?)\}|:\((.*)\)')
297
299
298 def _extract_route_information(route):
300 def _extract_route_information(route):
299 """
301 """
300 Convert a route into tuple(name, path, args), eg:
302 Convert a route into tuple(name, path, args), eg:
301 ('show_user', '/profile/%(username)s', ['username'])
303 ('show_user', '/profile/%(username)s', ['username'])
302 """
304 """
303
305
304 route_path = route.pattern
306 route_path = route.pattern
305 pattern = route.pattern
307 pattern = route.pattern
306
308
307 def replace(matchobj):
309 def replace(matchobj):
308 if matchobj.group(1):
310 if matchobj.group(1):
309 return "%%(%s)s" % matchobj.group(1).split(':')[0]
311 return "%%(%s)s" % matchobj.group(1).split(':')[0]
310 else:
312 else:
311 return "%%(%s)s" % matchobj.group(2)
313 return "%%(%s)s" % matchobj.group(2)
312
314
313 route_path = _argument_prog.sub(replace, route_path)
315 route_path = _argument_prog.sub(replace, route_path)
314
316
315 if not route_path.startswith('/'):
317 if not route_path.startswith('/'):
316 route_path = f'/{route_path}'
318 route_path = f'/{route_path}'
317
319
318 return (
320 return (
319 route.name,
321 route.name,
320 route_path,
322 route_path,
321 [(arg[0].split(':')[0] if arg[0] != '' else arg[1])
323 [(arg[0].split(':')[0] if arg[0] != '' else arg[1])
322 for arg in _argument_prog.findall(pattern)]
324 for arg in _argument_prog.findall(pattern)]
323 )
325 )
324
326
325 def get_routes():
327 def get_routes():
326 # pyramid routes
328 # pyramid routes
327 for route in mapper.get_routes():
329 for route in mapper.get_routes():
328 if not route.name.startswith('__'):
330 if not route.name.startswith('__'):
329 yield _extract_route_information(route)
331 yield _extract_route_information(route)
330
332
331 if asbool(registry.settings.get('generate_js_files', 'false')):
333 if asbool(registry.settings.get('generate_js_files', 'false')):
332 static_path = AssetResolver().resolve('rhodecode:public').abspath()
334 static_path = AssetResolver().resolve('rhodecode:public').abspath()
333 jsroutes = get_routes()
335 jsroutes = get_routes()
334 jsroutes_file_content = generate_jsroutes_content(jsroutes)
336 jsroutes_file_content = generate_jsroutes_content(jsroutes)
335 jsroutes_file_path = os.path.join(
337 jsroutes_file_path = os.path.join(
336 static_path, 'js', 'rhodecode', 'routes.js')
338 static_path, 'js', 'rhodecode', 'routes.js')
337
339
338 try:
340 try:
339 with open(jsroutes_file_path, 'w', encoding='utf-8') as f:
341 with open(jsroutes_file_path, 'w', encoding='utf-8') as f:
340 f.write(jsroutes_file_content)
342 f.write(jsroutes_file_content)
341 log.debug('generated JS files in %s', jsroutes_file_path)
343 log.debug('generated JS files in %s', jsroutes_file_path)
342 except Exception:
344 except Exception:
343 log.exception('Failed to write routes.js into %s', jsroutes_file_path)
345 log.exception('Failed to write routes.js into %s', jsroutes_file_path)
344
346
345
347
346 def import_license_if_present(event):
348 def import_license_if_present(event):
347 """
349 """
348 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
350 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
349 does a import license key based on a presence of the file.
351 does a import license key based on a presence of the file.
350 """
352 """
351 settings = event.app.registry.settings
353 settings = event.app.registry.settings
352
354
353 rhodecode_edition_id = settings.get('rhodecode.edition_id')
355 rhodecode_edition_id = settings.get('rhodecode.edition_id')
354 license_file_path = settings.get('license.import_path')
356 license_file_path = settings.get('license.import_path')
355 force = settings.get('license.import_path_mode') == 'force'
357 force = settings.get('license.import_path_mode') == 'force'
356
358
357 if license_file_path and rhodecode_edition_id == 'EE':
359 if license_file_path and rhodecode_edition_id == 'EE':
358 log.debug('license.import_path= is set importing license from %s', license_file_path)
360 log.debug('license.import_path= is set importing license from %s', license_file_path)
359 from rhodecode.model.meta import Session
361 from rhodecode.model.meta import Session
360 from rhodecode.model.license import apply_license_from_file
362 from rhodecode.model.license import apply_license_from_file
361 try:
363 try:
362 apply_license_from_file(license_file_path, force=force)
364 apply_license_from_file(license_file_path, force=force)
363 Session().commit()
365 Session().commit()
364 except OSError:
366 except OSError:
365 log.exception('Failed to import license from %s, make sure this file exists', license_file_path)
367 log.exception('Failed to import license from %s, make sure this file exists', license_file_path)
366
368
367
369
368 class Subscriber(object):
370 class Subscriber(object):
369 """
371 """
370 Base class for subscribers to the pyramid event system.
372 Base class for subscribers to the pyramid event system.
371 """
373 """
372 def __call__(self, event):
374 def __call__(self, event):
373 self.run(event)
375 self.run(event)
374
376
375 def run(self, event):
377 def run(self, event):
376 raise NotImplementedError('Subclass has to implement this.')
378 raise NotImplementedError('Subclass has to implement this.')
377
379
378
380
379 class AsyncSubscriber(Subscriber):
381 class AsyncSubscriber(Subscriber):
380 """
382 """
381 Subscriber that handles the execution of events in a separate task to not
383 Subscriber that handles the execution of events in a separate task to not
382 block the execution of the code which triggers the event. It puts the
384 block the execution of the code which triggers the event. It puts the
383 received events into a queue from which the worker process takes them in
385 received events into a queue from which the worker process takes them in
384 order.
386 order.
385 """
387 """
386 def __init__(self):
388 def __init__(self):
387 self._stop = False
389 self._stop = False
388 self._eventq = queue.Queue()
390 self._eventq = queue.Queue()
389 self._worker = self.create_worker()
391 self._worker = self.create_worker()
390 self._worker.start()
392 self._worker.start()
391
393
392 def __call__(self, event):
394 def __call__(self, event):
393 self._eventq.put(event)
395 self._eventq.put(event)
394
396
395 def create_worker(self):
397 def create_worker(self):
396 worker = Thread(target=self.do_work)
398 worker = Thread(target=self.do_work)
397 worker.daemon = True
399 worker.daemon = True
398 return worker
400 return worker
399
401
400 def stop_worker(self):
402 def stop_worker(self):
401 self._stop = False
403 self._stop = False
402 self._eventq.put(None)
404 self._eventq.put(None)
403 self._worker.join()
405 self._worker.join()
404
406
405 def do_work(self):
407 def do_work(self):
406 while not self._stop:
408 while not self._stop:
407 event = self._eventq.get()
409 event = self._eventq.get()
408 if event is not None:
410 if event is not None:
409 self.run(event)
411 self.run(event)
410
412
411
413
412 class AsyncSubprocessSubscriber(AsyncSubscriber):
414 class AsyncSubprocessSubscriber(AsyncSubscriber):
413 """
415 """
414 Subscriber that uses the subprocess module to execute a command if an
416 Subscriber that uses the subprocess module to execute a command if an
415 event is received. Events are handled asynchronously::
417 event is received. Events are handled asynchronously::
416
418
417 subscriber = AsyncSubprocessSubscriber('ls -la', timeout=10)
419 subscriber = AsyncSubprocessSubscriber('ls -la', timeout=10)
418 subscriber(dummyEvent) # running __call__(event)
420 subscriber(dummyEvent) # running __call__(event)
419
421
420 """
422 """
421
423
422 def __init__(self, cmd, timeout=None):
424 def __init__(self, cmd, timeout=None):
423 if not isinstance(cmd, (list, tuple)):
425 if not isinstance(cmd, (list, tuple)):
424 cmd = shlex.split(cmd)
426 cmd = shlex.split(cmd)
425 super().__init__()
427 super().__init__()
426 self._cmd = cmd
428 self._cmd = cmd
427 self._timeout = timeout
429 self._timeout = timeout
428
430
429 def run(self, event):
431 def run(self, event):
430 cmd = self._cmd
432 cmd = self._cmd
431 timeout = self._timeout
433 timeout = self._timeout
432 log.debug('Executing command %s.', cmd)
434 log.debug('Executing command %s.', cmd)
433
435
434 try:
436 try:
435 output = subprocess.check_output(
437 output = subprocess.check_output(
436 cmd, timeout=timeout, stderr=subprocess.STDOUT)
438 cmd, timeout=timeout, stderr=subprocess.STDOUT)
437 log.debug('Command finished %s', cmd)
439 log.debug('Command finished %s', cmd)
438 if output:
440 if output:
439 log.debug('Command output: %s', output)
441 log.debug('Command output: %s', output)
440 except subprocess.TimeoutExpired as e:
442 except subprocess.TimeoutExpired as e:
441 log.exception('Timeout while executing command.')
443 log.exception('Timeout while executing command.')
442 if e.output:
444 if e.output:
443 log.error('Command output: %s', e.output)
445 log.error('Command output: %s', e.output)
444 except subprocess.CalledProcessError as e:
446 except subprocess.CalledProcessError as e:
445 log.exception('Error while executing command.')
447 log.exception('Error while executing command.')
446 if e.output:
448 if e.output:
447 log.error('Command output: %s', e.output)
449 log.error('Command output: %s', e.output)
448 except Exception:
450 except Exception:
449 log.exception(
451 log.exception(
450 'Exception while executing command %s.', cmd)
452 'Exception while executing command %s.', cmd)
General Comments 0
You need to be logged in to leave comments. Login now