##// END OF EJS Templates
added delay before doing an auto merge
ilin.s -
r5660:cba707aa default
parent child Browse files
Show More
@@ -1,450 +1,450
1 # Copyright (C) 2010-2024 RhodeCode GmbH
1 # Copyright (C) 2010-2024 RhodeCode GmbH
2 #
2 #
3 # This program is free software: you can redistribute it and/or modify
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License, version 3
4 # it under the terms of the GNU Affero General Public License, version 3
5 # (only), as published by the Free Software Foundation.
5 # (only), as published by the Free Software Foundation.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU Affero General Public License
12 # You should have received a copy of the GNU Affero General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 #
14 #
15 # This program is dual-licensed. If you wish to learn more about the
15 # This program is dual-licensed. If you wish to learn more about the
16 # RhodeCode Enterprise Edition, including its added features, Support services,
16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 import io
18 import io
19 import shlex
19 import shlex
20
20
21 import math
21 import math
22 import re
22 import re
23 import os
23 import os
24 import datetime
24 import datetime
25 import logging
25 import logging
26 import queue
26 import queue
27 import subprocess
27 import subprocess
28
28
29
29
30 from dateutil.parser import parse
30 from dateutil.parser import parse
31 from pyramid.interfaces import IRoutesMapper
31 from pyramid.interfaces import IRoutesMapper
32 from pyramid.settings import asbool
32 from pyramid.settings import asbool
33 from pyramid.path import AssetResolver
33 from pyramid.path import AssetResolver
34 from threading import Thread
34 from threading import Thread
35
35
36 from rhodecode.config.jsroutes import generate_jsroutes_content
36 from rhodecode.config.jsroutes import generate_jsroutes_content
37 from rhodecode.lib.base import get_auth_user
37 from rhodecode.lib.base import get_auth_user
38 from rhodecode.lib.celerylib.loader import set_celery_conf
38 from rhodecode.lib.celerylib.loader import set_celery_conf
39
39
40 import rhodecode
40 import rhodecode
41
41
42
42
43 log = logging.getLogger(__name__)
43 log = logging.getLogger(__name__)
44
44
45
45
46 def add_renderer_globals(event):
46 def add_renderer_globals(event):
47 from rhodecode.lib import helpers
47 from rhodecode.lib import helpers
48
48
49 # TODO: When executed in pyramid view context the request is not available
49 # TODO: When executed in pyramid view context the request is not available
50 # in the event. Find a better solution to get the request.
50 # in the event. Find a better solution to get the request.
51 from pyramid.threadlocal import get_current_request
51 from pyramid.threadlocal import get_current_request
52 request = event['request'] or get_current_request()
52 request = event['request'] or get_current_request()
53
53
54 # Add Pyramid translation as '_' to context
54 # Add Pyramid translation as '_' to context
55 event['_'] = request.translate
55 event['_'] = request.translate
56 event['_ungettext'] = request.plularize
56 event['_ungettext'] = request.plularize
57 event['h'] = helpers
57 event['h'] = helpers
58
58
59
59
60 def auto_merge_pr_if_needed(event):
60 def auto_merge_pr_if_needed(event):
61 from rhodecode.model.db import PullRequest
61 from rhodecode.model.db import PullRequest
62 from rhodecode.model.pull_request import (
62 from rhodecode.model.pull_request import (
63 PullRequestModel, ChangesetStatus, MergeCheck
63 PullRequestModel, ChangesetStatus, MergeCheck
64 )
64 )
65
65
66 pr_event_data = event.as_dict()['pullrequest']
66 pr_event_data = event.as_dict()['pullrequest']
67 pull_request = PullRequest.get(pr_event_data['pull_request_id'])
67 pull_request = PullRequest.get(pr_event_data['pull_request_id'])
68 calculated_status = pr_event_data['status']
68 calculated_status = pr_event_data['status']
69 if (calculated_status == ChangesetStatus.STATUS_APPROVED
69 if (calculated_status == ChangesetStatus.STATUS_APPROVED
70 and PullRequestModel().is_automatic_merge_enabled(pull_request)):
70 and PullRequestModel().is_automatic_merge_enabled(pull_request)):
71 user = pull_request.author.AuthUser()
71 user = pull_request.author.AuthUser()
72
72
73 merge_check = MergeCheck.validate(
73 merge_check = MergeCheck.validate(
74 pull_request, user, translator=lambda x: x, fail_early=True
74 pull_request, user, translator=lambda x: x, fail_early=True
75 )
75 )
76 if merge_check.merge_possible:
76 if merge_check.merge_possible:
77 from rhodecode.lib.base import vcs_operation_context
77 from rhodecode.lib.base import vcs_operation_context
78 extras = vcs_operation_context(
78 extras = vcs_operation_context(
79 event.request.environ, repo_name=pull_request.target_repo.repo_name,
79 event.request.environ, repo_name=pull_request.target_repo.repo_name,
80 username=user.username, action='push',
80 username=user.username, action='push',
81 scm=pull_request.target_repo.repo_type)
81 scm=pull_request.target_repo.repo_type)
82 from rc_ee.lib.celerylib.tasks import auto_merge_repo
82 from rc_ee.lib.celerylib.tasks import auto_merge_repo
83 auto_merge_repo.apply_async(
83 auto_merge_repo.apply_async(
84 args=(pull_request.pull_request_id, extras)
84 args=(pull_request.pull_request_id, extras), countdown=3
85 )
85 )
86
86
87
87
88 def set_user_lang(event):
88 def set_user_lang(event):
89 request = event.request
89 request = event.request
90 cur_user = getattr(request, 'user', None)
90 cur_user = getattr(request, 'user', None)
91
91
92 if cur_user:
92 if cur_user:
93 user_lang = cur_user.get_instance().user_data.get('language')
93 user_lang = cur_user.get_instance().user_data.get('language')
94 if user_lang:
94 if user_lang:
95 log.debug('lang: setting current user:%s language to: %s', cur_user, user_lang)
95 log.debug('lang: setting current user:%s language to: %s', cur_user, user_lang)
96 event.request._LOCALE_ = user_lang
96 event.request._LOCALE_ = user_lang
97
97
98
98
99 def update_celery_conf(event):
99 def update_celery_conf(event):
100 log.debug('Setting celery config from new request')
100 log.debug('Setting celery config from new request')
101 set_celery_conf(request=event.request, registry=event.request.registry)
101 set_celery_conf(request=event.request, registry=event.request.registry)
102
102
103
103
104 def add_request_user_context(event):
104 def add_request_user_context(event):
105 """
105 """
106 Adds auth user into request context
106 Adds auth user into request context
107 """
107 """
108
108
109 request = event.request
109 request = event.request
110 # access req_id as soon as possible
110 # access req_id as soon as possible
111 req_id = request.req_id
111 req_id = request.req_id
112
112
113 if hasattr(request, 'vcs_call'):
113 if hasattr(request, 'vcs_call'):
114 # skip vcs calls
114 # skip vcs calls
115 return
115 return
116
116
117 if hasattr(request, 'rpc_method'):
117 if hasattr(request, 'rpc_method'):
118 # skip api calls
118 # skip api calls
119 return
119 return
120
120
121 auth_user, auth_token = get_auth_user(request)
121 auth_user, auth_token = get_auth_user(request)
122 request.user = auth_user
122 request.user = auth_user
123 request.user_auth_token = auth_token
123 request.user_auth_token = auth_token
124 request.environ['rc_auth_user'] = auth_user
124 request.environ['rc_auth_user'] = auth_user
125 request.environ['rc_auth_user_id'] = str(auth_user.user_id)
125 request.environ['rc_auth_user_id'] = str(auth_user.user_id)
126 request.environ['rc_req_id'] = req_id
126 request.environ['rc_req_id'] = req_id
127
127
128
128
129 def reset_log_bucket(event):
129 def reset_log_bucket(event):
130 """
130 """
131 reset the log bucket on new request
131 reset the log bucket on new request
132 """
132 """
133 request = event.request
133 request = event.request
134 request.req_id_records_init()
134 request.req_id_records_init()
135
135
136
136
137 def scan_repositories_if_enabled(event):
137 def scan_repositories_if_enabled(event):
138 """
138 """
139 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
139 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
140 does a repository scan if enabled in the settings.
140 does a repository scan if enabled in the settings.
141 """
141 """
142
142
143 settings = event.app.registry.settings
143 settings = event.app.registry.settings
144 vcs_server_enabled = settings['vcs.server.enable']
144 vcs_server_enabled = settings['vcs.server.enable']
145 import_on_startup = settings['startup.import_repos']
145 import_on_startup = settings['startup.import_repos']
146
146
147 if vcs_server_enabled and import_on_startup:
147 if vcs_server_enabled and import_on_startup:
148 from rhodecode.model.scm import ScmModel
148 from rhodecode.model.scm import ScmModel
149 from rhodecode.lib.utils import repo2db_mapper
149 from rhodecode.lib.utils import repo2db_mapper
150 scm = ScmModel()
150 scm = ScmModel()
151 repositories = scm.repo_scan(scm.repos_path)
151 repositories = scm.repo_scan(scm.repos_path)
152 repo2db_mapper(repositories)
152 repo2db_mapper(repositories)
153
153
154
154
155 def write_metadata_if_needed(event):
155 def write_metadata_if_needed(event):
156 """
156 """
157 Writes upgrade metadata
157 Writes upgrade metadata
158 """
158 """
159 import rhodecode
159 import rhodecode
160 from rhodecode.lib import system_info
160 from rhodecode.lib import system_info
161 from rhodecode.lib import ext_json
161 from rhodecode.lib import ext_json
162
162
163 fname = '.rcmetadata.json'
163 fname = '.rcmetadata.json'
164 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
164 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
165 metadata_destination = os.path.join(ini_loc, fname)
165 metadata_destination = os.path.join(ini_loc, fname)
166
166
167 def get_update_age():
167 def get_update_age():
168 now = datetime.datetime.utcnow()
168 now = datetime.datetime.utcnow()
169
169
170 with open(metadata_destination, 'rb') as f:
170 with open(metadata_destination, 'rb') as f:
171 data = ext_json.json.loads(f.read())
171 data = ext_json.json.loads(f.read())
172 if 'created_on' in data:
172 if 'created_on' in data:
173 update_date = parse(data['created_on'])
173 update_date = parse(data['created_on'])
174 diff = now - update_date
174 diff = now - update_date
175 return diff.total_seconds() / 60.0
175 return diff.total_seconds() / 60.0
176
176
177 return 0
177 return 0
178
178
179 def write():
179 def write():
180 configuration = system_info.SysInfo(
180 configuration = system_info.SysInfo(
181 system_info.rhodecode_config)()['value']
181 system_info.rhodecode_config)()['value']
182 license_token = configuration['config']['license_token']
182 license_token = configuration['config']['license_token']
183
183
184 setup = dict(
184 setup = dict(
185 workers=configuration['config']['server:main'].get(
185 workers=configuration['config']['server:main'].get(
186 'workers', '?'),
186 'workers', '?'),
187 worker_type=configuration['config']['server:main'].get(
187 worker_type=configuration['config']['server:main'].get(
188 'worker_class', 'sync'),
188 'worker_class', 'sync'),
189 )
189 )
190 dbinfo = system_info.SysInfo(system_info.database_info)()['value']
190 dbinfo = system_info.SysInfo(system_info.database_info)()['value']
191 del dbinfo['url']
191 del dbinfo['url']
192
192
193 metadata = dict(
193 metadata = dict(
194 desc='upgrade metadata info',
194 desc='upgrade metadata info',
195 license_token=license_token,
195 license_token=license_token,
196 created_on=datetime.datetime.utcnow().isoformat(),
196 created_on=datetime.datetime.utcnow().isoformat(),
197 usage=system_info.SysInfo(system_info.usage_info)()['value'],
197 usage=system_info.SysInfo(system_info.usage_info)()['value'],
198 platform=system_info.SysInfo(system_info.platform_type)()['value'],
198 platform=system_info.SysInfo(system_info.platform_type)()['value'],
199 database=dbinfo,
199 database=dbinfo,
200 cpu=system_info.SysInfo(system_info.cpu)()['value'],
200 cpu=system_info.SysInfo(system_info.cpu)()['value'],
201 memory=system_info.SysInfo(system_info.memory)()['value'],
201 memory=system_info.SysInfo(system_info.memory)()['value'],
202 setup=setup
202 setup=setup
203 )
203 )
204
204
205 with open(metadata_destination, 'wb') as f:
205 with open(metadata_destination, 'wb') as f:
206 f.write(ext_json.json.dumps(metadata))
206 f.write(ext_json.json.dumps(metadata))
207
207
208 settings = event.app.registry.settings
208 settings = event.app.registry.settings
209 if settings.get('metadata.skip'):
209 if settings.get('metadata.skip'):
210 return
210 return
211
211
212 # only write this every 24h, workers restart caused unwanted delays
212 # only write this every 24h, workers restart caused unwanted delays
213 try:
213 try:
214 age_in_min = get_update_age()
214 age_in_min = get_update_age()
215 except Exception:
215 except Exception:
216 age_in_min = 0
216 age_in_min = 0
217
217
218 if age_in_min > 60 * 60 * 24:
218 if age_in_min > 60 * 60 * 24:
219 return
219 return
220
220
221 try:
221 try:
222 write()
222 write()
223 except Exception:
223 except Exception:
224 pass
224 pass
225
225
226
226
227 def write_usage_data(event):
227 def write_usage_data(event):
228 import rhodecode
228 import rhodecode
229 from rhodecode.lib import system_info
229 from rhodecode.lib import system_info
230 from rhodecode.lib import ext_json
230 from rhodecode.lib import ext_json
231
231
232 settings = event.app.registry.settings
232 settings = event.app.registry.settings
233 instance_tag = settings.get('metadata.write_usage_tag')
233 instance_tag = settings.get('metadata.write_usage_tag')
234 if not settings.get('metadata.write_usage'):
234 if not settings.get('metadata.write_usage'):
235 return
235 return
236
236
237 def get_update_age(dest_file):
237 def get_update_age(dest_file):
238 now = datetime.datetime.now(datetime.UTC)
238 now = datetime.datetime.now(datetime.UTC)
239
239
240 with open(dest_file, 'rb') as f:
240 with open(dest_file, 'rb') as f:
241 data = ext_json.json.loads(f.read())
241 data = ext_json.json.loads(f.read())
242 if 'created_on' in data:
242 if 'created_on' in data:
243 update_date = parse(data['created_on'])
243 update_date = parse(data['created_on'])
244 diff = now - update_date
244 diff = now - update_date
245 return math.ceil(diff.total_seconds() / 60.0)
245 return math.ceil(diff.total_seconds() / 60.0)
246
246
247 return 0
247 return 0
248
248
249 utc_date = datetime.datetime.now(datetime.UTC)
249 utc_date = datetime.datetime.now(datetime.UTC)
250 hour_quarter = int(math.ceil((utc_date.hour + utc_date.minute/60.0) / 6.))
250 hour_quarter = int(math.ceil((utc_date.hour + utc_date.minute/60.0) / 6.))
251 fname = f'.rc_usage_{utc_date.year}{utc_date.month:02d}{utc_date.day:02d}_{hour_quarter}.json'
251 fname = f'.rc_usage_{utc_date.year}{utc_date.month:02d}{utc_date.day:02d}_{hour_quarter}.json'
252 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
252 ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__'))
253
253
254 usage_dir = os.path.join(ini_loc, '.rcusage')
254 usage_dir = os.path.join(ini_loc, '.rcusage')
255 if not os.path.isdir(usage_dir):
255 if not os.path.isdir(usage_dir):
256 os.makedirs(usage_dir)
256 os.makedirs(usage_dir)
257 usage_metadata_destination = os.path.join(usage_dir, fname)
257 usage_metadata_destination = os.path.join(usage_dir, fname)
258
258
259 try:
259 try:
260 age_in_min = get_update_age(usage_metadata_destination)
260 age_in_min = get_update_age(usage_metadata_destination)
261 except Exception:
261 except Exception:
262 age_in_min = 0
262 age_in_min = 0
263
263
264 # write every 6th hour
264 # write every 6th hour
265 if age_in_min and age_in_min < 60 * 6:
265 if age_in_min and age_in_min < 60 * 6:
266 log.debug('Usage file created %s minutes ago, skipping (threshold: %s minutes)...',
266 log.debug('Usage file created %s minutes ago, skipping (threshold: %s minutes)...',
267 age_in_min, 60 * 6)
267 age_in_min, 60 * 6)
268 return
268 return
269
269
270 def write(dest_file):
270 def write(dest_file):
271 configuration = system_info.SysInfo(system_info.rhodecode_config)()['value']
271 configuration = system_info.SysInfo(system_info.rhodecode_config)()['value']
272 license_token = configuration['config']['license_token']
272 license_token = configuration['config']['license_token']
273
273
274 metadata = dict(
274 metadata = dict(
275 desc='Usage data',
275 desc='Usage data',
276 instance_tag=instance_tag,
276 instance_tag=instance_tag,
277 license_token=license_token,
277 license_token=license_token,
278 created_on=datetime.datetime.utcnow().isoformat(),
278 created_on=datetime.datetime.utcnow().isoformat(),
279 usage=system_info.SysInfo(system_info.usage_info)()['value'],
279 usage=system_info.SysInfo(system_info.usage_info)()['value'],
280 )
280 )
281
281
282 with open(dest_file, 'wb') as f:
282 with open(dest_file, 'wb') as f:
283 f.write(ext_json.formatted_json(metadata))
283 f.write(ext_json.formatted_json(metadata))
284
284
285 try:
285 try:
286 log.debug('Writing usage file at: %s', usage_metadata_destination)
286 log.debug('Writing usage file at: %s', usage_metadata_destination)
287 write(usage_metadata_destination)
287 write(usage_metadata_destination)
288 except Exception:
288 except Exception:
289 pass
289 pass
290
290
291
291
292 def write_js_routes_if_enabled(event):
292 def write_js_routes_if_enabled(event):
293 registry = event.app.registry
293 registry = event.app.registry
294
294
295 mapper = registry.queryUtility(IRoutesMapper)
295 mapper = registry.queryUtility(IRoutesMapper)
296 _argument_prog = re.compile(r'\{(.*?)\}|:\((.*)\)')
296 _argument_prog = re.compile(r'\{(.*?)\}|:\((.*)\)')
297
297
298 def _extract_route_information(route):
298 def _extract_route_information(route):
299 """
299 """
300 Convert a route into tuple(name, path, args), eg:
300 Convert a route into tuple(name, path, args), eg:
301 ('show_user', '/profile/%(username)s', ['username'])
301 ('show_user', '/profile/%(username)s', ['username'])
302 """
302 """
303
303
304 route_path = route.pattern
304 route_path = route.pattern
305 pattern = route.pattern
305 pattern = route.pattern
306
306
307 def replace(matchobj):
307 def replace(matchobj):
308 if matchobj.group(1):
308 if matchobj.group(1):
309 return "%%(%s)s" % matchobj.group(1).split(':')[0]
309 return "%%(%s)s" % matchobj.group(1).split(':')[0]
310 else:
310 else:
311 return "%%(%s)s" % matchobj.group(2)
311 return "%%(%s)s" % matchobj.group(2)
312
312
313 route_path = _argument_prog.sub(replace, route_path)
313 route_path = _argument_prog.sub(replace, route_path)
314
314
315 if not route_path.startswith('/'):
315 if not route_path.startswith('/'):
316 route_path = f'/{route_path}'
316 route_path = f'/{route_path}'
317
317
318 return (
318 return (
319 route.name,
319 route.name,
320 route_path,
320 route_path,
321 [(arg[0].split(':')[0] if arg[0] != '' else arg[1])
321 [(arg[0].split(':')[0] if arg[0] != '' else arg[1])
322 for arg in _argument_prog.findall(pattern)]
322 for arg in _argument_prog.findall(pattern)]
323 )
323 )
324
324
325 def get_routes():
325 def get_routes():
326 # pyramid routes
326 # pyramid routes
327 for route in mapper.get_routes():
327 for route in mapper.get_routes():
328 if not route.name.startswith('__'):
328 if not route.name.startswith('__'):
329 yield _extract_route_information(route)
329 yield _extract_route_information(route)
330
330
331 if asbool(registry.settings.get('generate_js_files', 'false')):
331 if asbool(registry.settings.get('generate_js_files', 'false')):
332 static_path = AssetResolver().resolve('rhodecode:public').abspath()
332 static_path = AssetResolver().resolve('rhodecode:public').abspath()
333 jsroutes = get_routes()
333 jsroutes = get_routes()
334 jsroutes_file_content = generate_jsroutes_content(jsroutes)
334 jsroutes_file_content = generate_jsroutes_content(jsroutes)
335 jsroutes_file_path = os.path.join(
335 jsroutes_file_path = os.path.join(
336 static_path, 'js', 'rhodecode', 'routes.js')
336 static_path, 'js', 'rhodecode', 'routes.js')
337
337
338 try:
338 try:
339 with open(jsroutes_file_path, 'w', encoding='utf-8') as f:
339 with open(jsroutes_file_path, 'w', encoding='utf-8') as f:
340 f.write(jsroutes_file_content)
340 f.write(jsroutes_file_content)
341 log.debug('generated JS files in %s', jsroutes_file_path)
341 log.debug('generated JS files in %s', jsroutes_file_path)
342 except Exception:
342 except Exception:
343 log.exception('Failed to write routes.js into %s', jsroutes_file_path)
343 log.exception('Failed to write routes.js into %s', jsroutes_file_path)
344
344
345
345
346 def import_license_if_present(event):
346 def import_license_if_present(event):
347 """
347 """
348 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
348 This is subscribed to the `pyramid.events.ApplicationCreated` event. It
349 does a import license key based on a presence of the file.
349 does a import license key based on a presence of the file.
350 """
350 """
351 settings = event.app.registry.settings
351 settings = event.app.registry.settings
352
352
353 rhodecode_edition_id = settings.get('rhodecode.edition_id')
353 rhodecode_edition_id = settings.get('rhodecode.edition_id')
354 license_file_path = settings.get('license.import_path')
354 license_file_path = settings.get('license.import_path')
355 force = settings.get('license.import_path_mode') == 'force'
355 force = settings.get('license.import_path_mode') == 'force'
356
356
357 if license_file_path and rhodecode_edition_id == 'EE':
357 if license_file_path and rhodecode_edition_id == 'EE':
358 log.debug('license.import_path= is set importing license from %s', license_file_path)
358 log.debug('license.import_path= is set importing license from %s', license_file_path)
359 from rhodecode.model.meta import Session
359 from rhodecode.model.meta import Session
360 from rhodecode.model.license import apply_license_from_file
360 from rhodecode.model.license import apply_license_from_file
361 try:
361 try:
362 apply_license_from_file(license_file_path, force=force)
362 apply_license_from_file(license_file_path, force=force)
363 Session().commit()
363 Session().commit()
364 except OSError:
364 except OSError:
365 log.exception('Failed to import license from %s, make sure this file exists', license_file_path)
365 log.exception('Failed to import license from %s, make sure this file exists', license_file_path)
366
366
367
367
368 class Subscriber(object):
368 class Subscriber(object):
369 """
369 """
370 Base class for subscribers to the pyramid event system.
370 Base class for subscribers to the pyramid event system.
371 """
371 """
372 def __call__(self, event):
372 def __call__(self, event):
373 self.run(event)
373 self.run(event)
374
374
375 def run(self, event):
375 def run(self, event):
376 raise NotImplementedError('Subclass has to implement this.')
376 raise NotImplementedError('Subclass has to implement this.')
377
377
378
378
379 class AsyncSubscriber(Subscriber):
379 class AsyncSubscriber(Subscriber):
380 """
380 """
381 Subscriber that handles the execution of events in a separate task to not
381 Subscriber that handles the execution of events in a separate task to not
382 block the execution of the code which triggers the event. It puts the
382 block the execution of the code which triggers the event. It puts the
383 received events into a queue from which the worker process takes them in
383 received events into a queue from which the worker process takes them in
384 order.
384 order.
385 """
385 """
386 def __init__(self):
386 def __init__(self):
387 self._stop = False
387 self._stop = False
388 self._eventq = queue.Queue()
388 self._eventq = queue.Queue()
389 self._worker = self.create_worker()
389 self._worker = self.create_worker()
390 self._worker.start()
390 self._worker.start()
391
391
392 def __call__(self, event):
392 def __call__(self, event):
393 self._eventq.put(event)
393 self._eventq.put(event)
394
394
395 def create_worker(self):
395 def create_worker(self):
396 worker = Thread(target=self.do_work)
396 worker = Thread(target=self.do_work)
397 worker.daemon = True
397 worker.daemon = True
398 return worker
398 return worker
399
399
400 def stop_worker(self):
400 def stop_worker(self):
401 self._stop = False
401 self._stop = False
402 self._eventq.put(None)
402 self._eventq.put(None)
403 self._worker.join()
403 self._worker.join()
404
404
405 def do_work(self):
405 def do_work(self):
406 while not self._stop:
406 while not self._stop:
407 event = self._eventq.get()
407 event = self._eventq.get()
408 if event is not None:
408 if event is not None:
409 self.run(event)
409 self.run(event)
410
410
411
411
412 class AsyncSubprocessSubscriber(AsyncSubscriber):
412 class AsyncSubprocessSubscriber(AsyncSubscriber):
413 """
413 """
414 Subscriber that uses the subprocess module to execute a command if an
414 Subscriber that uses the subprocess module to execute a command if an
415 event is received. Events are handled asynchronously::
415 event is received. Events are handled asynchronously::
416
416
417 subscriber = AsyncSubprocessSubscriber('ls -la', timeout=10)
417 subscriber = AsyncSubprocessSubscriber('ls -la', timeout=10)
418 subscriber(dummyEvent) # running __call__(event)
418 subscriber(dummyEvent) # running __call__(event)
419
419
420 """
420 """
421
421
422 def __init__(self, cmd, timeout=None):
422 def __init__(self, cmd, timeout=None):
423 if not isinstance(cmd, (list, tuple)):
423 if not isinstance(cmd, (list, tuple)):
424 cmd = shlex.split(cmd)
424 cmd = shlex.split(cmd)
425 super().__init__()
425 super().__init__()
426 self._cmd = cmd
426 self._cmd = cmd
427 self._timeout = timeout
427 self._timeout = timeout
428
428
429 def run(self, event):
429 def run(self, event):
430 cmd = self._cmd
430 cmd = self._cmd
431 timeout = self._timeout
431 timeout = self._timeout
432 log.debug('Executing command %s.', cmd)
432 log.debug('Executing command %s.', cmd)
433
433
434 try:
434 try:
435 output = subprocess.check_output(
435 output = subprocess.check_output(
436 cmd, timeout=timeout, stderr=subprocess.STDOUT)
436 cmd, timeout=timeout, stderr=subprocess.STDOUT)
437 log.debug('Command finished %s', cmd)
437 log.debug('Command finished %s', cmd)
438 if output:
438 if output:
439 log.debug('Command output: %s', output)
439 log.debug('Command output: %s', output)
440 except subprocess.TimeoutExpired as e:
440 except subprocess.TimeoutExpired as e:
441 log.exception('Timeout while executing command.')
441 log.exception('Timeout while executing command.')
442 if e.output:
442 if e.output:
443 log.error('Command output: %s', e.output)
443 log.error('Command output: %s', e.output)
444 except subprocess.CalledProcessError as e:
444 except subprocess.CalledProcessError as e:
445 log.exception('Error while executing command.')
445 log.exception('Error while executing command.')
446 if e.output:
446 if e.output:
447 log.error('Command output: %s', e.output)
447 log.error('Command output: %s', e.output)
448 except Exception:
448 except Exception:
449 log.exception(
449 log.exception(
450 'Exception while executing command %s.', cmd)
450 'Exception while executing command %s.', cmd)
General Comments 0
You need to be logged in to leave comments. Login now