##// END OF EJS Templates
scheduler: added DB models and db parsers for the RhodeCode scheduler....
marcink -
r2406:db81d430 default
parent child Browse files
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -0,0 +1,60 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import logging
21 import importlib
22
23 from celery.beat import (
24 PersistentScheduler, ScheduleEntry as CeleryScheduleEntry)
25
26 log = logging.getLogger(__name__)
27
28
29 class FileScheduleEntry(CeleryScheduleEntry):
30 def __init__(self, name=None, task=None, last_run_at=None,
31 total_run_count=None, schedule=None, args=(), kwargs=None,
32 options=None, relative=False, app=None, **_kwargs):
33 kwargs = kwargs or {}
34 options = options or {}
35
36 # because our custom loader passes in some variables that the original
37 # function doesn't expect, we have this thin wrapper
38
39 super(FileScheduleEntry, self).__init__(
40 name=name, task=task, last_run_at=last_run_at,
41 total_run_count=total_run_count, schedule=schedule, args=args,
42 kwargs=kwargs, options=options, relative=relative, app=app)
43
44
45 class FileScheduler(PersistentScheduler):
46 """CE base scheduler"""
47 Entry = FileScheduleEntry
48
49 def setup_schedule(self):
50 log.info("setup_schedule called")
51 super(FileScheduler, self).setup_schedule()
52
53
54 try:
55 # try if we have EE scheduler available
56 module = importlib.import_module('rc_ee.lib.celerylib.scheduler')
57 RcScheduler = module.RcScheduler
58 except ImportError:
59 # fallback to CE scheduler
60 RcScheduler = FileScheduler
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
@@ -0,0 +1,32 b''
1 import logging
2
3 from sqlalchemy import *
4
5 from rhodecode.model import meta
6 from rhodecode.lib.dbmigrate.versions import _reset_base, notify
7
8 log = logging.getLogger(__name__)
9
10
11 def upgrade(migrate_engine):
12 """
13 Upgrade operations go here.
14 Don't create your own engine; bind migrate_engine to your metadata
15 """
16 _reset_base(migrate_engine)
17 from rhodecode.lib.dbmigrate.schema import db_4_11_0_0
18 db_4_11_0_0.ScheduleEntry().__table__.create()
19
20 # issue fixups
21 fixups(db_4_11_0_0, meta.Session)
22
23
24 def downgrade(migrate_engine):
25 meta = MetaData()
26 meta.bind = migrate_engine
27
28
29 def fixups(models, _SESSION):
30 pass
31
32
@@ -1,63 +1,63 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22
22
23 RhodeCode, a web based repository management software
23 RhodeCode, a web based repository management software
24 versioning implementation: http://www.python.org/dev/peps/pep-0386/
24 versioning implementation: http://www.python.org/dev/peps/pep-0386/
25 """
25 """
26
26
27 import os
27 import os
28 import sys
28 import sys
29 import platform
29 import platform
30
30
31 VERSION = tuple(open(os.path.join(
31 VERSION = tuple(open(os.path.join(
32 os.path.dirname(__file__), 'VERSION')).read().split('.'))
32 os.path.dirname(__file__), 'VERSION')).read().split('.'))
33
33
34 BACKENDS = {
34 BACKENDS = {
35 'hg': 'Mercurial repository',
35 'hg': 'Mercurial repository',
36 'git': 'Git repository',
36 'git': 'Git repository',
37 'svn': 'Subversion repository',
37 'svn': 'Subversion repository',
38 }
38 }
39
39
40 CELERY_ENABLED = False
40 CELERY_ENABLED = False
41 CELERY_EAGER = False
41 CELERY_EAGER = False
42
42
43 # link to config for pyramid
43 # link to config for pyramid
44 CONFIG = {}
44 CONFIG = {}
45
45
46 # Populated with the settings dictionary from application init in
46 # Populated with the settings dictionary from application init in
47 # rhodecode.conf.environment.load_pyramid_environment
47 # rhodecode.conf.environment.load_pyramid_environment
48 PYRAMID_SETTINGS = {}
48 PYRAMID_SETTINGS = {}
49
49
50 # Linked module for extensions
50 # Linked module for extensions
51 EXTENSIONS = {}
51 EXTENSIONS = {}
52
52
53 __version__ = ('.'.join((str(each) for each in VERSION[:3])))
53 __version__ = ('.'.join((str(each) for each in VERSION[:3])))
54 __dbversion__ = 81 # defines current db version for migrations
54 __dbversion__ = 82 # defines current db version for migrations
55 __platform__ = platform.system()
55 __platform__ = platform.system()
56 __license__ = 'AGPLv3, and Commercial License'
56 __license__ = 'AGPLv3, and Commercial License'
57 __author__ = 'RhodeCode GmbH'
57 __author__ = 'RhodeCode GmbH'
58 __url__ = 'https://code.rhodecode.com'
58 __url__ = 'https://code.rhodecode.com'
59
59
60 is_windows = __platform__ in ['Windows']
60 is_windows = __platform__ in ['Windows']
61 is_unix = not is_windows
61 is_unix = not is_windows
62 is_test = False
62 is_test = False
63 disable_error_handler = False
63 disable_error_handler = False
@@ -1,257 +1,265 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini
23 celery worker \
24 --beat \
25 --app rhodecode.lib.celerylib.loader \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --loglevel DEBUG --ini=._dev/dev.ini
24 """
28 """
25 import os
29 import os
26 import logging
30 import logging
27
31
28 from celery import Celery
32 from celery import Celery
29 from celery import signals
33 from celery import signals
30 from celery import Task
34 from celery import Task
31 from celery import exceptions # noqa
35 from celery import exceptions # noqa
32 from kombu.serialization import register
36 from kombu.serialization import register
33 from pyramid.threadlocal import get_current_request
37 from pyramid.threadlocal import get_current_request
34
38
35 import rhodecode
39 import rhodecode
36
40
37 from rhodecode.lib.auth import AuthUser
41 from rhodecode.lib.auth import AuthUser
38 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
42 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
39 from rhodecode.lib.ext_json import json
43 from rhodecode.lib.ext_json import json
40 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
44 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
41 from rhodecode.lib.utils2 import str2bool
45 from rhodecode.lib.utils2 import str2bool
42 from rhodecode.model import meta
46 from rhodecode.model import meta
43
47
44
48
45 register('json_ext', json.dumps, json.loads,
49 register('json_ext', json.dumps, json.loads,
46 content_type='application/x-json-ext',
50 content_type='application/x-json-ext',
47 content_encoding='utf-8')
51 content_encoding='utf-8')
48
52
49 log = logging.getLogger('celery.rhodecode.loader')
53 log = logging.getLogger('celery.rhodecode.loader')
50
54
51
55
52 def add_preload_arguments(parser):
56 def add_preload_arguments(parser):
53 parser.add_argument(
57 parser.add_argument(
54 '--ini', default=None,
58 '--ini', default=None,
55 help='Path to ini configuration file.'
59 help='Path to ini configuration file.'
56 )
60 )
57 parser.add_argument(
61 parser.add_argument(
58 '--ini-var', default=None,
62 '--ini-var', default=None,
59 help='Comma separated list of key=value to pass to ini.'
63 help='Comma separated list of key=value to pass to ini.'
60 )
64 )
61
65
62
66
63 def get_logger(obj):
67 def get_logger(obj):
64 custom_log = logging.getLogger(
68 custom_log = logging.getLogger(
65 'rhodecode.task.{}'.format(obj.__class__.__name__))
69 'rhodecode.task.{}'.format(obj.__class__.__name__))
66
70
67 if rhodecode.CELERY_ENABLED:
71 if rhodecode.CELERY_ENABLED:
68 try:
72 try:
69 custom_log = obj.get_logger()
73 custom_log = obj.get_logger()
70 except Exception:
74 except Exception:
71 pass
75 pass
72
76
73 return custom_log
77 return custom_log
74
78
75
79
76 base_celery_config = {
80 base_celery_config = {
77 'result_backend': 'rpc://',
81 'result_backend': 'rpc://',
78 'result_expires': 60 * 60 * 24,
82 'result_expires': 60 * 60 * 24,
79 'result_persistent': True,
83 'result_persistent': True,
80 'imports': [],
84 'imports': [],
81 'worker_max_tasks_per_child': 100,
85 'worker_max_tasks_per_child': 100,
82 'accept_content': ['json_ext'],
86 'accept_content': ['json_ext'],
83 'task_serializer': 'json_ext',
87 'task_serializer': 'json_ext',
84 'result_serializer': 'json_ext',
88 'result_serializer': 'json_ext',
85 'worker_hijack_root_logger': False,
89 'worker_hijack_root_logger': False,
90 'database_table_names': {
91 'task': 'beat_taskmeta',
92 'group': 'beat_groupmeta',
93 }
86 }
94 }
87 # init main celery app
95 # init main celery app
88 celery_app = Celery()
96 celery_app = Celery()
89 celery_app.user_options['preload'].add(add_preload_arguments)
97 celery_app.user_options['preload'].add(add_preload_arguments)
90 ini_file_glob = None
98 ini_file_glob = None
91
99
92
100
93 @signals.setup_logging.connect
101 @signals.setup_logging.connect
94 def setup_logging_callback(**kwargs):
102 def setup_logging_callback(**kwargs):
95 setup_logging(ini_file_glob)
103 setup_logging(ini_file_glob)
96
104
97
105
98 @signals.user_preload_options.connect
106 @signals.user_preload_options.connect
99 def on_preload_parsed(options, **kwargs):
107 def on_preload_parsed(options, **kwargs):
100 ini_location = options['ini']
108 ini_location = options['ini']
101 ini_vars = options['ini_var']
109 ini_vars = options['ini_var']
102 celery_app.conf['INI_PYRAMID'] = options['ini']
110 celery_app.conf['INI_PYRAMID'] = options['ini']
103
111
104 if ini_location is None:
112 if ini_location is None:
105 print('You must provide the paste --ini argument')
113 print('You must provide the paste --ini argument')
106 exit(-1)
114 exit(-1)
107
115
108 options = None
116 options = None
109 if ini_vars is not None:
117 if ini_vars is not None:
110 options = parse_ini_vars(ini_vars)
118 options = parse_ini_vars(ini_vars)
111
119
112 global ini_file_glob
120 global ini_file_glob
113 ini_file_glob = ini_location
121 ini_file_glob = ini_location
114
122
115 log.debug('Bootstrapping RhodeCode application...')
123 log.debug('Bootstrapping RhodeCode application...')
116 env = bootstrap(ini_location, options=options)
124 env = bootstrap(ini_location, options=options)
117
125
118 setup_celery_app(
126 setup_celery_app(
119 app=env['app'], root=env['root'], request=env['request'],
127 app=env['app'], root=env['root'], request=env['request'],
120 registry=env['registry'], closer=env['closer'],
128 registry=env['registry'], closer=env['closer'],
121 ini_location=ini_location)
129 ini_location=ini_location)
122
130
123 # fix the global flag even if it's disabled via .ini file because this
131 # fix the global flag even if it's disabled via .ini file because this
124 # is a worker code that doesn't need this to be disabled.
132 # is a worker code that doesn't need this to be disabled.
125 rhodecode.CELERY_ENABLED = True
133 rhodecode.CELERY_ENABLED = True
126
134
127
135
128 @signals.task_success.connect
136 @signals.task_success.connect
129 def task_success_signal(result, **kwargs):
137 def task_success_signal(result, **kwargs):
130 meta.Session.commit()
138 meta.Session.commit()
131 celery_app.conf['PYRAMID_CLOSER']()
139 celery_app.conf['PYRAMID_CLOSER']()
132
140
133
141
134 @signals.task_retry.connect
142 @signals.task_retry.connect
135 def task_retry_signal(
143 def task_retry_signal(
136 request, reason, einfo, **kwargs):
144 request, reason, einfo, **kwargs):
137 meta.Session.remove()
145 meta.Session.remove()
138 celery_app.conf['PYRAMID_CLOSER']()
146 celery_app.conf['PYRAMID_CLOSER']()
139
147
140
148
141 @signals.task_failure.connect
149 @signals.task_failure.connect
142 def task_failure_signal(
150 def task_failure_signal(
143 task_id, exception, args, kwargs, traceback, einfo, **kargs):
151 task_id, exception, args, kwargs, traceback, einfo, **kargs):
144 meta.Session.remove()
152 meta.Session.remove()
145 celery_app.conf['PYRAMID_CLOSER']()
153 celery_app.conf['PYRAMID_CLOSER']()
146
154
147
155
148 @signals.task_revoked.connect
156 @signals.task_revoked.connect
149 def task_revoked_signal(
157 def task_revoked_signal(
150 request, terminated, signum, expired, **kwargs):
158 request, terminated, signum, expired, **kwargs):
151 celery_app.conf['PYRAMID_CLOSER']()
159 celery_app.conf['PYRAMID_CLOSER']()
152
160
153
161
154 def setup_celery_app(app, root, request, registry, closer, ini_location):
162 def setup_celery_app(app, root, request, registry, closer, ini_location):
155 ini_dir = os.path.dirname(os.path.abspath(ini_location))
163 ini_dir = os.path.dirname(os.path.abspath(ini_location))
156 celery_config = base_celery_config
164 celery_config = base_celery_config
157 celery_config.update({
165 celery_config.update({
158 # store celerybeat scheduler db where the .ini file is
166 # store celerybeat scheduler db where the .ini file is
159 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
167 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
160 })
168 })
161 ini_settings = get_ini_config(ini_location)
169 ini_settings = get_ini_config(ini_location)
162 log.debug('Got custom celery conf: %s', ini_settings)
170 log.debug('Got custom celery conf: %s', ini_settings)
163
171
164 celery_config.update(ini_settings)
172 celery_config.update(ini_settings)
165 celery_app.config_from_object(celery_config)
173 celery_app.config_from_object(celery_config)
166
174
167 celery_app.conf.update({'PYRAMID_APP': app})
175 celery_app.conf.update({'PYRAMID_APP': app})
168 celery_app.conf.update({'PYRAMID_ROOT': root})
176 celery_app.conf.update({'PYRAMID_ROOT': root})
169 celery_app.conf.update({'PYRAMID_REQUEST': request})
177 celery_app.conf.update({'PYRAMID_REQUEST': request})
170 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
178 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
171 celery_app.conf.update({'PYRAMID_CLOSER': closer})
179 celery_app.conf.update({'PYRAMID_CLOSER': closer})
172
180
173
181
174 def configure_celery(config, ini_location):
182 def configure_celery(config, ini_location):
175 """
183 """
176 Helper that is called from our application creation logic. It gives
184 Helper that is called from our application creation logic. It gives
177 connection info into running webapp and allows execution of tasks from
185 connection info into running webapp and allows execution of tasks from
178 RhodeCode itself
186 RhodeCode itself
179 """
187 """
180 # store some globals into rhodecode
188 # store some globals into rhodecode
181 rhodecode.CELERY_ENABLED = str2bool(
189 rhodecode.CELERY_ENABLED = str2bool(
182 config.registry.settings.get('use_celery'))
190 config.registry.settings.get('use_celery'))
183 if rhodecode.CELERY_ENABLED:
191 if rhodecode.CELERY_ENABLED:
184 log.info('Configuring celery based on `%s` file', ini_location)
192 log.info('Configuring celery based on `%s` file', ini_location)
185 setup_celery_app(
193 setup_celery_app(
186 app=None, root=None, request=None, registry=config.registry,
194 app=None, root=None, request=None, registry=config.registry,
187 closer=None, ini_location=ini_location)
195 closer=None, ini_location=ini_location)
188
196
189
197
190 class RequestContextTask(Task):
198 class RequestContextTask(Task):
191 """
199 """
192 This is a celery task which will create a rhodecode app instance context
200 This is a celery task which will create a rhodecode app instance context
193 for the task, patch pyramid with the original request
201 for the task, patch pyramid with the original request
194 that created the task and also add the user to the context.
202 that created the task and also add the user to the context.
195 """
203 """
196
204
197 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
205 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
198 link=None, link_error=None, shadow=None, **options):
206 link=None, link_error=None, shadow=None, **options):
199 """ queue the job to run (we are in web request context here) """
207 """ queue the job to run (we are in web request context here) """
200
208
201 req = get_current_request()
209 req = get_current_request()
202
210
203 # web case
211 # web case
204 if hasattr(req, 'user'):
212 if hasattr(req, 'user'):
205 ip_addr = req.user.ip_addr
213 ip_addr = req.user.ip_addr
206 user_id = req.user.user_id
214 user_id = req.user.user_id
207
215
208 # api case
216 # api case
209 elif hasattr(req, 'rpc_user'):
217 elif hasattr(req, 'rpc_user'):
210 ip_addr = req.rpc_user.ip_addr
218 ip_addr = req.rpc_user.ip_addr
211 user_id = req.rpc_user.user_id
219 user_id = req.rpc_user.user_id
212 else:
220 else:
213 raise Exception(
221 raise Exception(
214 'Unable to fetch required data from request: {}. \n'
222 'Unable to fetch required data from request: {}. \n'
215 'This task is required to be executed from context of '
223 'This task is required to be executed from context of '
216 'request in a webapp'.format(repr(req)))
224 'request in a webapp'.format(repr(req)))
217
225
218 if req:
226 if req:
219 # we hook into kwargs since it is the only way to pass our data to
227 # we hook into kwargs since it is the only way to pass our data to
220 # the celery worker
228 # the celery worker
221 options['headers'] = options.get('headers', {})
229 options['headers'] = options.get('headers', {})
222 options['headers'].update({
230 options['headers'].update({
223 'rhodecode_proxy_data': {
231 'rhodecode_proxy_data': {
224 'environ': {
232 'environ': {
225 'PATH_INFO': req.environ['PATH_INFO'],
233 'PATH_INFO': req.environ['PATH_INFO'],
226 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
234 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
227 'HTTP_HOST': req.environ.get('HTTP_HOST',
235 'HTTP_HOST': req.environ.get('HTTP_HOST',
228 req.environ['SERVER_NAME']),
236 req.environ['SERVER_NAME']),
229 'SERVER_NAME': req.environ['SERVER_NAME'],
237 'SERVER_NAME': req.environ['SERVER_NAME'],
230 'SERVER_PORT': req.environ['SERVER_PORT'],
238 'SERVER_PORT': req.environ['SERVER_PORT'],
231 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
239 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
232 },
240 },
233 'auth_user': {
241 'auth_user': {
234 'ip_addr': ip_addr,
242 'ip_addr': ip_addr,
235 'user_id': user_id
243 'user_id': user_id
236 },
244 },
237 }
245 }
238 })
246 })
239
247
240 return super(RequestContextTask, self).apply_async(
248 return super(RequestContextTask, self).apply_async(
241 args, kwargs, task_id, producer, link, link_error, shadow, **options)
249 args, kwargs, task_id, producer, link, link_error, shadow, **options)
242
250
243 def __call__(self, *args, **kwargs):
251 def __call__(self, *args, **kwargs):
244 """ rebuild the context and then run task on celery worker """
252 """ rebuild the context and then run task on celery worker """
245
253
246 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
254 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
247 if not proxy_data:
255 if not proxy_data:
248 return super(RequestContextTask, self).__call__(*args, **kwargs)
256 return super(RequestContextTask, self).__call__(*args, **kwargs)
249
257
250 log.debug('using celery proxy data to run task: %r', proxy_data)
258 log.debug('using celery proxy data to run task: %r', proxy_data)
251 # re-inject and register threadlocals for proper routing support
259 # re-inject and register threadlocals for proper routing support
252 request = prepare_request(proxy_data['environ'])
260 request = prepare_request(proxy_data['environ'])
253 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
261 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
254 ip_addr=proxy_data['auth_user']['ip_addr'])
262 ip_addr=proxy_data['auth_user']['ip_addr'])
255
263
256 return super(RequestContextTask, self).__call__(*args, **kwargs)
264 return super(RequestContextTask, self).__call__(*args, **kwargs)
257
265
@@ -1,275 +1,285 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2012-2017 RhodeCode GmbH
3 # Copyright (C) 2012-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 RhodeCode task modules, containing all task that suppose to be run
22 RhodeCode task modules, containing all task that suppose to be run
23 by celery daemon
23 by celery daemon
24 """
24 """
25
25
26 import os
26 import os
27 import time
27
28
28 import rhodecode
29 import rhodecode
29 from rhodecode.lib import audit_logger
30 from rhodecode.lib import audit_logger
30 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask
31 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask
31 from rhodecode.lib.hooks_base import log_create_repository
32 from rhodecode.lib.hooks_base import log_create_repository
32 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
33 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
33 from rhodecode.lib.utils2 import safe_int, str2bool
34 from rhodecode.lib.utils2 import safe_int, str2bool
34 from rhodecode.model.db import Session, Repository, User
35 from rhodecode.model.db import Session, Repository, User
35
36
36
37
37 @async_task(ignore_result=True, base=RequestContextTask)
38 @async_task(ignore_result=True, base=RequestContextTask)
38 def send_email(recipients, subject, body='', html_body='', email_config=None):
39 def send_email(recipients, subject, body='', html_body='', email_config=None):
39 """
40 """
40 Sends an email with defined parameters from the .ini files.
41 Sends an email with defined parameters from the .ini files.
41
42
42 :param recipients: list of recipients, it this is empty the defined email
43 :param recipients: list of recipients, it this is empty the defined email
43 address from field 'email_to' is used instead
44 address from field 'email_to' is used instead
44 :param subject: subject of the mail
45 :param subject: subject of the mail
45 :param body: body of the mail
46 :param body: body of the mail
46 :param html_body: html version of body
47 :param html_body: html version of body
47 """
48 """
48 log = get_logger(send_email)
49 log = get_logger(send_email)
49
50
50 email_config = email_config or rhodecode.CONFIG
51 email_config = email_config or rhodecode.CONFIG
51 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
52 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
52 if not recipients:
53 if not recipients:
53 # if recipients are not defined we send to email_config + all admins
54 # if recipients are not defined we send to email_config + all admins
54 admins = [
55 admins = [
55 u.email for u in User.query().filter(User.admin == True).all()]
56 u.email for u in User.query().filter(User.admin == True).all()]
56 recipients = [email_config.get('email_to')] + admins
57 recipients = [email_config.get('email_to')] + admins
57
58
58 mail_server = email_config.get('smtp_server') or None
59 mail_server = email_config.get('smtp_server') or None
59 if mail_server is None:
60 if mail_server is None:
60 log.error("SMTP server information missing. Sending email failed. "
61 log.error("SMTP server information missing. Sending email failed. "
61 "Make sure that `smtp_server` variable is configured "
62 "Make sure that `smtp_server` variable is configured "
62 "inside the .ini file")
63 "inside the .ini file")
63 return False
64 return False
64
65
65 mail_from = email_config.get('app_email_from', 'RhodeCode')
66 mail_from = email_config.get('app_email_from', 'RhodeCode')
66 user = email_config.get('smtp_username')
67 user = email_config.get('smtp_username')
67 passwd = email_config.get('smtp_password')
68 passwd = email_config.get('smtp_password')
68 mail_port = email_config.get('smtp_port')
69 mail_port = email_config.get('smtp_port')
69 tls = str2bool(email_config.get('smtp_use_tls'))
70 tls = str2bool(email_config.get('smtp_use_tls'))
70 ssl = str2bool(email_config.get('smtp_use_ssl'))
71 ssl = str2bool(email_config.get('smtp_use_ssl'))
71 debug = str2bool(email_config.get('debug'))
72 debug = str2bool(email_config.get('debug'))
72 smtp_auth = email_config.get('smtp_auth')
73 smtp_auth = email_config.get('smtp_auth')
73
74
74 try:
75 try:
75 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
76 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
76 mail_port, ssl, tls, debug=debug)
77 mail_port, ssl, tls, debug=debug)
77 m.send(recipients, subject, body, html_body)
78 m.send(recipients, subject, body, html_body)
78 except Exception:
79 except Exception:
79 log.exception('Mail sending failed')
80 log.exception('Mail sending failed')
80 return False
81 return False
81 return True
82 return True
82
83
83
84
84 @async_task(ignore_result=True, base=RequestContextTask)
85 @async_task(ignore_result=True, base=RequestContextTask)
85 def create_repo(form_data, cur_user):
86 def create_repo(form_data, cur_user):
86 from rhodecode.model.repo import RepoModel
87 from rhodecode.model.repo import RepoModel
87 from rhodecode.model.user import UserModel
88 from rhodecode.model.user import UserModel
88 from rhodecode.model.settings import SettingsModel
89 from rhodecode.model.settings import SettingsModel
89
90
90 log = get_logger(create_repo)
91 log = get_logger(create_repo)
91
92
92 cur_user = UserModel()._get_user(cur_user)
93 cur_user = UserModel()._get_user(cur_user)
93 owner = cur_user
94 owner = cur_user
94
95
95 repo_name = form_data['repo_name']
96 repo_name = form_data['repo_name']
96 repo_name_full = form_data['repo_name_full']
97 repo_name_full = form_data['repo_name_full']
97 repo_type = form_data['repo_type']
98 repo_type = form_data['repo_type']
98 description = form_data['repo_description']
99 description = form_data['repo_description']
99 private = form_data['repo_private']
100 private = form_data['repo_private']
100 clone_uri = form_data.get('clone_uri')
101 clone_uri = form_data.get('clone_uri')
101 repo_group = safe_int(form_data['repo_group'])
102 repo_group = safe_int(form_data['repo_group'])
102 landing_rev = form_data['repo_landing_rev']
103 landing_rev = form_data['repo_landing_rev']
103 copy_fork_permissions = form_data.get('copy_permissions')
104 copy_fork_permissions = form_data.get('copy_permissions')
104 copy_group_permissions = form_data.get('repo_copy_permissions')
105 copy_group_permissions = form_data.get('repo_copy_permissions')
105 fork_of = form_data.get('fork_parent_id')
106 fork_of = form_data.get('fork_parent_id')
106 state = form_data.get('repo_state', Repository.STATE_PENDING)
107 state = form_data.get('repo_state', Repository.STATE_PENDING)
107
108
108 # repo creation defaults, private and repo_type are filled in form
109 # repo creation defaults, private and repo_type are filled in form
109 defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
110 defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
110 enable_statistics = form_data.get(
111 enable_statistics = form_data.get(
111 'enable_statistics', defs.get('repo_enable_statistics'))
112 'enable_statistics', defs.get('repo_enable_statistics'))
112 enable_locking = form_data.get(
113 enable_locking = form_data.get(
113 'enable_locking', defs.get('repo_enable_locking'))
114 'enable_locking', defs.get('repo_enable_locking'))
114 enable_downloads = form_data.get(
115 enable_downloads = form_data.get(
115 'enable_downloads', defs.get('repo_enable_downloads'))
116 'enable_downloads', defs.get('repo_enable_downloads'))
116
117
117 try:
118 try:
118 repo = RepoModel()._create_repo(
119 repo = RepoModel()._create_repo(
119 repo_name=repo_name_full,
120 repo_name=repo_name_full,
120 repo_type=repo_type,
121 repo_type=repo_type,
121 description=description,
122 description=description,
122 owner=owner,
123 owner=owner,
123 private=private,
124 private=private,
124 clone_uri=clone_uri,
125 clone_uri=clone_uri,
125 repo_group=repo_group,
126 repo_group=repo_group,
126 landing_rev=landing_rev,
127 landing_rev=landing_rev,
127 fork_of=fork_of,
128 fork_of=fork_of,
128 copy_fork_permissions=copy_fork_permissions,
129 copy_fork_permissions=copy_fork_permissions,
129 copy_group_permissions=copy_group_permissions,
130 copy_group_permissions=copy_group_permissions,
130 enable_statistics=enable_statistics,
131 enable_statistics=enable_statistics,
131 enable_locking=enable_locking,
132 enable_locking=enable_locking,
132 enable_downloads=enable_downloads,
133 enable_downloads=enable_downloads,
133 state=state
134 state=state
134 )
135 )
135 Session().commit()
136 Session().commit()
136
137
137 # now create this repo on Filesystem
138 # now create this repo on Filesystem
138 RepoModel()._create_filesystem_repo(
139 RepoModel()._create_filesystem_repo(
139 repo_name=repo_name,
140 repo_name=repo_name,
140 repo_type=repo_type,
141 repo_type=repo_type,
141 repo_group=RepoModel()._get_repo_group(repo_group),
142 repo_group=RepoModel()._get_repo_group(repo_group),
142 clone_uri=clone_uri,
143 clone_uri=clone_uri,
143 )
144 )
144 repo = Repository.get_by_repo_name(repo_name_full)
145 repo = Repository.get_by_repo_name(repo_name_full)
145 log_create_repository(created_by=owner.username, **repo.get_dict())
146 log_create_repository(created_by=owner.username, **repo.get_dict())
146
147
147 # update repo commit caches initially
148 # update repo commit caches initially
148 repo.update_commit_cache()
149 repo.update_commit_cache()
149
150
150 # set new created state
151 # set new created state
151 repo.set_state(Repository.STATE_CREATED)
152 repo.set_state(Repository.STATE_CREATED)
152 repo_id = repo.repo_id
153 repo_id = repo.repo_id
153 repo_data = repo.get_api_data()
154 repo_data = repo.get_api_data()
154
155
155 audit_logger.store(
156 audit_logger.store(
156 'repo.create', action_data={'data': repo_data},
157 'repo.create', action_data={'data': repo_data},
157 user=cur_user,
158 user=cur_user,
158 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
159 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
159
160
160 Session().commit()
161 Session().commit()
161 except Exception:
162 except Exception:
162 log.warning('Exception occurred when creating repository, '
163 log.warning('Exception occurred when creating repository, '
163 'doing cleanup...', exc_info=True)
164 'doing cleanup...', exc_info=True)
164 # rollback things manually !
165 # rollback things manually !
165 repo = Repository.get_by_repo_name(repo_name_full)
166 repo = Repository.get_by_repo_name(repo_name_full)
166 if repo:
167 if repo:
167 Repository.delete(repo.repo_id)
168 Repository.delete(repo.repo_id)
168 Session().commit()
169 Session().commit()
169 RepoModel()._delete_filesystem_repo(repo)
170 RepoModel()._delete_filesystem_repo(repo)
170 raise
171 raise
171
172
172 # it's an odd fix to make celery fail task when exception occurs
173 # it's an odd fix to make celery fail task when exception occurs
173 def on_failure(self, *args, **kwargs):
174 def on_failure(self, *args, **kwargs):
174 pass
175 pass
175
176
176 return True
177 return True
177
178
178
179
179 @async_task(ignore_result=True, base=RequestContextTask)
180 @async_task(ignore_result=True, base=RequestContextTask)
180 def create_repo_fork(form_data, cur_user):
181 def create_repo_fork(form_data, cur_user):
181 """
182 """
182 Creates a fork of repository using internal VCS methods
183 Creates a fork of repository using internal VCS methods
183 """
184 """
184 from rhodecode.model.repo import RepoModel
185 from rhodecode.model.repo import RepoModel
185 from rhodecode.model.user import UserModel
186 from rhodecode.model.user import UserModel
186
187
187 log = get_logger(create_repo_fork)
188 log = get_logger(create_repo_fork)
188
189
189 cur_user = UserModel()._get_user(cur_user)
190 cur_user = UserModel()._get_user(cur_user)
190 owner = cur_user
191 owner = cur_user
191
192
192 repo_name = form_data['repo_name'] # fork in this case
193 repo_name = form_data['repo_name'] # fork in this case
193 repo_name_full = form_data['repo_name_full']
194 repo_name_full = form_data['repo_name_full']
194 repo_type = form_data['repo_type']
195 repo_type = form_data['repo_type']
195 description = form_data['description']
196 description = form_data['description']
196 private = form_data['private']
197 private = form_data['private']
197 clone_uri = form_data.get('clone_uri')
198 clone_uri = form_data.get('clone_uri')
198 repo_group = safe_int(form_data['repo_group'])
199 repo_group = safe_int(form_data['repo_group'])
199 landing_rev = form_data['landing_rev']
200 landing_rev = form_data['landing_rev']
200 copy_fork_permissions = form_data.get('copy_permissions')
201 copy_fork_permissions = form_data.get('copy_permissions')
201 fork_id = safe_int(form_data.get('fork_parent_id'))
202 fork_id = safe_int(form_data.get('fork_parent_id'))
202
203
203 try:
204 try:
204 fork_of = RepoModel()._get_repo(fork_id)
205 fork_of = RepoModel()._get_repo(fork_id)
205 RepoModel()._create_repo(
206 RepoModel()._create_repo(
206 repo_name=repo_name_full,
207 repo_name=repo_name_full,
207 repo_type=repo_type,
208 repo_type=repo_type,
208 description=description,
209 description=description,
209 owner=owner,
210 owner=owner,
210 private=private,
211 private=private,
211 clone_uri=clone_uri,
212 clone_uri=clone_uri,
212 repo_group=repo_group,
213 repo_group=repo_group,
213 landing_rev=landing_rev,
214 landing_rev=landing_rev,
214 fork_of=fork_of,
215 fork_of=fork_of,
215 copy_fork_permissions=copy_fork_permissions
216 copy_fork_permissions=copy_fork_permissions
216 )
217 )
217
218
218 Session().commit()
219 Session().commit()
219
220
220 base_path = Repository.base_path()
221 base_path = Repository.base_path()
221 source_repo_path = os.path.join(base_path, fork_of.repo_name)
222 source_repo_path = os.path.join(base_path, fork_of.repo_name)
222
223
223 # now create this repo on Filesystem
224 # now create this repo on Filesystem
224 RepoModel()._create_filesystem_repo(
225 RepoModel()._create_filesystem_repo(
225 repo_name=repo_name,
226 repo_name=repo_name,
226 repo_type=repo_type,
227 repo_type=repo_type,
227 repo_group=RepoModel()._get_repo_group(repo_group),
228 repo_group=RepoModel()._get_repo_group(repo_group),
228 clone_uri=source_repo_path,
229 clone_uri=source_repo_path,
229 )
230 )
230 repo = Repository.get_by_repo_name(repo_name_full)
231 repo = Repository.get_by_repo_name(repo_name_full)
231 log_create_repository(created_by=owner.username, **repo.get_dict())
232 log_create_repository(created_by=owner.username, **repo.get_dict())
232
233
233 # update repo commit caches initially
234 # update repo commit caches initially
234 config = repo._config
235 config = repo._config
235 config.set('extensions', 'largefiles', '')
236 config.set('extensions', 'largefiles', '')
236 repo.update_commit_cache(config=config)
237 repo.update_commit_cache(config=config)
237
238
238 # set new created state
239 # set new created state
239 repo.set_state(Repository.STATE_CREATED)
240 repo.set_state(Repository.STATE_CREATED)
240
241
241 repo_id = repo.repo_id
242 repo_id = repo.repo_id
242 repo_data = repo.get_api_data()
243 repo_data = repo.get_api_data()
243 audit_logger.store(
244 audit_logger.store(
244 'repo.fork', action_data={'data': repo_data},
245 'repo.fork', action_data={'data': repo_data},
245 user=cur_user,
246 user=cur_user,
246 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
247 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
247
248
248 Session().commit()
249 Session().commit()
249 except Exception as e:
250 except Exception as e:
250 log.warning('Exception %s occurred when forking repository, '
251 log.warning('Exception %s occurred when forking repository, '
251 'doing cleanup...', e)
252 'doing cleanup...', e)
252 # rollback things manually !
253 # rollback things manually !
253 repo = Repository.get_by_repo_name(repo_name_full)
254 repo = Repository.get_by_repo_name(repo_name_full)
254 if repo:
255 if repo:
255 Repository.delete(repo.repo_id)
256 Repository.delete(repo.repo_id)
256 Session().commit()
257 Session().commit()
257 RepoModel()._delete_filesystem_repo(repo)
258 RepoModel()._delete_filesystem_repo(repo)
258 raise
259 raise
259
260
260 # it's an odd fix to make celery fail task when exception occurs
261 # it's an odd fix to make celery fail task when exception occurs
261 def on_failure(self, *args, **kwargs):
262 def on_failure(self, *args, **kwargs):
262 pass
263 pass
263
264
264 return True
265 return True
265
266
266
267
267 @async_task(ignore_result=True)
268 @async_task(ignore_result=True)
268 def sync_repo(*args, **kwargs):
269 def sync_repo(*args, **kwargs):
269 from rhodecode.model.scm import ScmModel
270 from rhodecode.model.scm import ScmModel
270 log = get_logger(sync_repo)
271 log = get_logger(sync_repo)
271
272 repo_name = kwargs['repo_name']
272 log.info('Pulling from %s', kwargs['repo_name'])
273 log.info('Pulling from %s', repo_name)
274 dbrepo = Repository.get_by_repo_name(repo_name)
275 if dbrepo and dbrepo.clone_uri:
273 ScmModel().pull_changes(kwargs['repo_name'], kwargs['username'])
276 ScmModel().pull_changes(kwargs['repo_name'], kwargs['username'])
277 else:
278 log.debug('Repo `%s` not found or without a clone_url', repo_name)
274
279
275
280
281 @async_task(ignore_result=False)
282 def beat_check(*args, **kwargs):
283 log = get_logger(beat_check)
284 log.info('Got args: %r and kwargs %r', args, kwargs)
285 return time.time()
@@ -1,156 +1,169 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import json
22 import json
23 import logging
23 import logging
24 import datetime
24 import datetime
25
25
26 from functools import partial
26 from functools import partial
27
27
28 from pyramid.compat import configparser
28 from pyramid.compat import configparser
29 from celery.result import AsyncResult
29 from celery.result import AsyncResult
30 import celery.loaders.base
30 import celery.loaders.base
31 import celery.schedules
31 import celery.schedules
32
32
33
33
34 log = logging.getLogger(__name__)
34 log = logging.getLogger(__name__)
35
35
36
36
37 def get_task_id(task):
37 def get_task_id(task):
38 task_id = None
38 task_id = None
39 if isinstance(task, AsyncResult):
39 if isinstance(task, AsyncResult):
40 task_id = task.task_id
40 task_id = task.task_id
41
41
42 return task_id
42 return task_id
43
43
44
44
45 def crontab(value):
45 def crontab(value):
46 return celery.schedules.crontab(**value)
46 return celery.schedules.crontab(**value)
47
47
48
48
49 def timedelta(value):
49 def timedelta(value):
50 return datetime.timedelta(**value)
50 return datetime.timedelta(**value)
51
51
52
52
53 def safe_json(get, section, key):
53 def safe_json(get, section, key):
54 value = ''
54 value = ''
55 try:
55 try:
56 value = get(key)
56 value = get(key)
57 json_value = json.loads(value)
57 json_value = json.loads(value)
58 except ValueError:
58 except ValueError:
59 msg = 'The %s=%s is not valid json in section %s' % (
59 msg = 'The %s=%s is not valid json in section %s' % (
60 key, value, section
60 key, value, section
61 )
61 )
62 raise ValueError(msg)
62 raise ValueError(msg)
63
63
64 return json_value
64 return json_value
65
65
66
66
67 def get_beat_config(parser, section):
67 def raw_2_schedule(schedule_value, schedule_type):
68 SCHEDULE_TYPE_MAP = {
68 schedule_type_map = {
69 'crontab': crontab,
69 'crontab': crontab,
70 'timedelta': timedelta,
70 'timedelta': timedelta,
71 'integer': int
71 'integer': int
72 }
72 }
73 scheduler_cls = schedule_type_map.get(schedule_type)
74
75 if scheduler_cls is None:
76 raise ValueError(
77 'schedule type %s in section is invalid' % (
78 schedule_type,
79 )
80 )
81 try:
82 schedule = scheduler_cls(schedule_value)
83 except TypeError:
84 log.exception('Failed to compose a schedule from value: %r', schedule_value)
85 schedule = None
86 return schedule
87
88
89 def get_beat_config(parser, section):
90
73 get = partial(parser.get, section)
91 get = partial(parser.get, section)
74 has_option = partial(parser.has_option, section)
92 has_option = partial(parser.has_option, section)
75
93
76 schedule_type = get('type')
94 schedule_type = get('type')
77 schedule_value = safe_json(get, section, 'schedule')
95 schedule_value = safe_json(get, section, 'schedule')
78
96
79 scheduler_cls = SCHEDULE_TYPE_MAP.get(schedule_type)
80
81 if scheduler_cls is None:
82 raise ValueError(
83 'schedule type %s in section %s is invalid' % (
84 schedule_type,
85 section
86 )
87 )
88
89 schedule = scheduler_cls(schedule_value)
90
91 config = {
97 config = {
98 'schedule_type': schedule_type,
99 'schedule_value': schedule_value,
92 'task': get('task'),
100 'task': get('task'),
93 'schedule': schedule,
94 }
101 }
102 schedule = raw_2_schedule(schedule_value, schedule_type)
103 if schedule:
104 config['schedule'] = schedule
95
105
96 if has_option('args'):
106 if has_option('args'):
97 config['args'] = safe_json(get, section, 'args')
107 config['args'] = safe_json(get, section, 'args')
98
108
99 if has_option('kwargs'):
109 if has_option('kwargs'):
100 config['kwargs'] = safe_json(get, section, 'kwargs')
110 config['kwargs'] = safe_json(get, section, 'kwargs')
101
111
112 if has_option('force_update'):
113 config['force_update'] = get('force_update')
114
102 return config
115 return config
103
116
104
117
105 def get_ini_config(ini_location):
118 def get_ini_config(ini_location):
106 """
119 """
107 Converts basic ini configuration into celery 4.X options
120 Converts basic ini configuration into celery 4.X options
108 """
121 """
109 def key_converter(key_name):
122 def key_converter(key_name):
110 pref = 'celery.'
123 pref = 'celery.'
111 if key_name.startswith(pref):
124 if key_name.startswith(pref):
112 return key_name[len(pref):].replace('.', '_').lower()
125 return key_name[len(pref):].replace('.', '_').lower()
113
126
114 def type_converter(parsed_key, value):
127 def type_converter(parsed_key, value):
115 # cast to int
128 # cast to int
116 if value.isdigit():
129 if value.isdigit():
117 return int(value)
130 return int(value)
118
131
119 # cast to bool
132 # cast to bool
120 if value.lower() in ['true', 'false', 'True', 'False']:
133 if value.lower() in ['true', 'false', 'True', 'False']:
121 return value.lower() == 'true'
134 return value.lower() == 'true'
122 return value
135 return value
123
136
124 parser = configparser.SafeConfigParser(
137 parser = configparser.SafeConfigParser(
125 defaults={'here': os.path.abspath(ini_location)})
138 defaults={'here': os.path.abspath(ini_location)})
126 parser.read(ini_location)
139 parser.read(ini_location)
127
140
128 ini_config = {}
141 ini_config = {}
129 for k, v in parser.items('app:main'):
142 for k, v in parser.items('app:main'):
130 pref = 'celery.'
143 pref = 'celery.'
131 if k.startswith(pref):
144 if k.startswith(pref):
132 ini_config[key_converter(k)] = type_converter(key_converter(k), v)
145 ini_config[key_converter(k)] = type_converter(key_converter(k), v)
133
146
134 beat_config = {}
147 beat_config = {}
135 for section in parser.sections():
148 for section in parser.sections():
136 if section.startswith('celerybeat:'):
149 if section.startswith('celerybeat:'):
137 name = section.split(':', 1)[1]
150 name = section.split(':', 1)[1]
138 beat_config[name] = get_beat_config(parser, section)
151 beat_config[name] = get_beat_config(parser, section)
139
152
140 # final compose of settings
153 # final compose of settings
141 celery_settings = {}
154 celery_settings = {}
142
155
143 if ini_config:
156 if ini_config:
144 celery_settings.update(ini_config)
157 celery_settings.update(ini_config)
145 if beat_config:
158 if beat_config:
146 celery_settings.update({'beat_schedule': beat_config})
159 celery_settings.update({'beat_schedule': beat_config})
147
160
148 return celery_settings
161 return celery_settings
149
162
150
163
151 def parse_ini_vars(ini_vars):
164 def parse_ini_vars(ini_vars):
152 options = {}
165 options = {}
153 for pairs in ini_vars.split(','):
166 for pairs in ini_vars.split(','):
154 key, value = pairs.split('=')
167 key, value = pairs.split('=')
155 options[key] = value
168 options[key] = value
156 return options
169 return options
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
@@ -1,23 +1,26 b''
1 <span tal:define="name name|field.name;
1 <span tal:define="name name|field.name;
2 css_class css_class|field.widget.css_class;
2 css_class css_class|field.widget.css_class;
3 oid oid|field.oid;
3 oid oid|field.oid;
4 mask mask|field.widget.mask;
4 mask mask|field.widget.mask;
5 placeholder placeholder|field.widget.placeholder|field.placeholder|'';
5 placeholder placeholder|field.widget.placeholder|field.placeholder|'';
6 mask_placeholder mask_placeholder|field.widget.mask_placeholder;
6 mask_placeholder mask_placeholder|field.widget.mask_placeholder;
7 style style|field.widget.style;
7 style style|field.widget.style;
8 help_block help_block|field.widget.help_block|'';
8 "
9 "
9 tal:omit-tag="">
10 tal:omit-tag="">
10 <input type="text" name="${name}" value="${cstruct}"
11 <input type="text" name="${name}" value="${cstruct}"
11 tal:attributes="class string: form-control ${css_class or ''};
12 tal:attributes="class string: form-control ${css_class or ''};
12 style style"
13 style style"
13 placeholder="${placeholder}"
14 placeholder="${placeholder}"
14 id="${oid}"/>
15 id="${oid}"/>
16
17 <p tal:condition="help_block" class="help-block">${help_block}</p>
15 <script tal:condition="mask" type="text/javascript">
18 <script tal:condition="mask" type="text/javascript">
16 deform.addCallback(
19 deform.addCallback(
17 '${oid}',
20 '${oid}',
18 function (oid) {
21 function (oid) {
19 $("#" + oid).mask("${mask}",
22 $("#" + oid).mask("${mask}",
20 {placeholder:"${mask_placeholder}"});
23 {placeholder:"${mask_placeholder}"});
21 });
24 });
22 </script>
25 </script>
23 </span> No newline at end of file
26 </span>
General Comments 0
You need to be logged in to leave comments. Login now