##// END OF EJS Templates
scheduler: added DB models and db parsers for the RhodeCode scheduler....
marcink -
r2406:db81d430 default
parent child Browse files
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -0,0 +1,60 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import logging
21 import importlib
22
23 from celery.beat import (
24 PersistentScheduler, ScheduleEntry as CeleryScheduleEntry)
25
26 log = logging.getLogger(__name__)
27
28
29 class FileScheduleEntry(CeleryScheduleEntry):
30 def __init__(self, name=None, task=None, last_run_at=None,
31 total_run_count=None, schedule=None, args=(), kwargs=None,
32 options=None, relative=False, app=None, **_kwargs):
33 kwargs = kwargs or {}
34 options = options or {}
35
36 # because our custom loader passes in some variables that the original
37 # function doesn't expect, we have this thin wrapper
38
39 super(FileScheduleEntry, self).__init__(
40 name=name, task=task, last_run_at=last_run_at,
41 total_run_count=total_run_count, schedule=schedule, args=args,
42 kwargs=kwargs, options=options, relative=relative, app=app)
43
44
45 class FileScheduler(PersistentScheduler):
46 """CE base scheduler"""
47 Entry = FileScheduleEntry
48
49 def setup_schedule(self):
50 log.info("setup_schedule called")
51 super(FileScheduler, self).setup_schedule()
52
53
54 try:
55 # try if we have EE scheduler available
56 module = importlib.import_module('rc_ee.lib.celerylib.scheduler')
57 RcScheduler = module.RcScheduler
58 except ImportError:
59 # fallback to CE scheduler
60 RcScheduler = FileScheduler
1 NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
@@ -0,0 +1,32 b''
1 import logging
2
3 from sqlalchemy import *
4
5 from rhodecode.model import meta
6 from rhodecode.lib.dbmigrate.versions import _reset_base, notify
7
8 log = logging.getLogger(__name__)
9
10
11 def upgrade(migrate_engine):
12 """
13 Upgrade operations go here.
14 Don't create your own engine; bind migrate_engine to your metadata
15 """
16 _reset_base(migrate_engine)
17 from rhodecode.lib.dbmigrate.schema import db_4_11_0_0
18 db_4_11_0_0.ScheduleEntry().__table__.create()
19
20 # issue fixups
21 fixups(db_4_11_0_0, meta.Session)
22
23
24 def downgrade(migrate_engine):
25 meta = MetaData()
26 meta.bind = migrate_engine
27
28
29 def fixups(models, _SESSION):
30 pass
31
32
@@ -1,63 +1,63 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22
23 23 RhodeCode, a web based repository management software
24 24 versioning implementation: http://www.python.org/dev/peps/pep-0386/
25 25 """
26 26
27 27 import os
28 28 import sys
29 29 import platform
30 30
31 31 VERSION = tuple(open(os.path.join(
32 32 os.path.dirname(__file__), 'VERSION')).read().split('.'))
33 33
34 34 BACKENDS = {
35 35 'hg': 'Mercurial repository',
36 36 'git': 'Git repository',
37 37 'svn': 'Subversion repository',
38 38 }
39 39
40 40 CELERY_ENABLED = False
41 41 CELERY_EAGER = False
42 42
43 43 # link to config for pyramid
44 44 CONFIG = {}
45 45
46 46 # Populated with the settings dictionary from application init in
47 47 # rhodecode.conf.environment.load_pyramid_environment
48 48 PYRAMID_SETTINGS = {}
49 49
50 50 # Linked module for extensions
51 51 EXTENSIONS = {}
52 52
53 53 __version__ = ('.'.join((str(each) for each in VERSION[:3])))
54 __dbversion__ = 81 # defines current db version for migrations
54 __dbversion__ = 82 # defines current db version for migrations
55 55 __platform__ = platform.system()
56 56 __license__ = 'AGPLv3, and Commercial License'
57 57 __author__ = 'RhodeCode GmbH'
58 58 __url__ = 'https://code.rhodecode.com'
59 59
60 60 is_windows = __platform__ in ['Windows']
61 61 is_unix = not is_windows
62 62 is_test = False
63 63 disable_error_handler = False
@@ -1,257 +1,265 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20 """
21 21 Celery loader, run with::
22 22
23 celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini
23 celery worker \
24 --beat \
25 --app rhodecode.lib.celerylib.loader \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --loglevel DEBUG --ini=._dev/dev.ini
24 28 """
25 29 import os
26 30 import logging
27 31
28 32 from celery import Celery
29 33 from celery import signals
30 34 from celery import Task
31 35 from celery import exceptions # noqa
32 36 from kombu.serialization import register
33 37 from pyramid.threadlocal import get_current_request
34 38
35 39 import rhodecode
36 40
37 41 from rhodecode.lib.auth import AuthUser
38 42 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
39 43 from rhodecode.lib.ext_json import json
40 44 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
41 45 from rhodecode.lib.utils2 import str2bool
42 46 from rhodecode.model import meta
43 47
44 48
45 49 register('json_ext', json.dumps, json.loads,
46 50 content_type='application/x-json-ext',
47 51 content_encoding='utf-8')
48 52
49 53 log = logging.getLogger('celery.rhodecode.loader')
50 54
51 55
52 56 def add_preload_arguments(parser):
53 57 parser.add_argument(
54 58 '--ini', default=None,
55 59 help='Path to ini configuration file.'
56 60 )
57 61 parser.add_argument(
58 62 '--ini-var', default=None,
59 63 help='Comma separated list of key=value to pass to ini.'
60 64 )
61 65
62 66
63 67 def get_logger(obj):
64 68 custom_log = logging.getLogger(
65 69 'rhodecode.task.{}'.format(obj.__class__.__name__))
66 70
67 71 if rhodecode.CELERY_ENABLED:
68 72 try:
69 73 custom_log = obj.get_logger()
70 74 except Exception:
71 75 pass
72 76
73 77 return custom_log
74 78
75 79
76 80 base_celery_config = {
77 81 'result_backend': 'rpc://',
78 82 'result_expires': 60 * 60 * 24,
79 83 'result_persistent': True,
80 84 'imports': [],
81 85 'worker_max_tasks_per_child': 100,
82 86 'accept_content': ['json_ext'],
83 87 'task_serializer': 'json_ext',
84 88 'result_serializer': 'json_ext',
85 89 'worker_hijack_root_logger': False,
90 'database_table_names': {
91 'task': 'beat_taskmeta',
92 'group': 'beat_groupmeta',
93 }
86 94 }
87 95 # init main celery app
88 96 celery_app = Celery()
89 97 celery_app.user_options['preload'].add(add_preload_arguments)
90 98 ini_file_glob = None
91 99
92 100
93 101 @signals.setup_logging.connect
94 102 def setup_logging_callback(**kwargs):
95 103 setup_logging(ini_file_glob)
96 104
97 105
98 106 @signals.user_preload_options.connect
99 107 def on_preload_parsed(options, **kwargs):
100 108 ini_location = options['ini']
101 109 ini_vars = options['ini_var']
102 110 celery_app.conf['INI_PYRAMID'] = options['ini']
103 111
104 112 if ini_location is None:
105 113 print('You must provide the paste --ini argument')
106 114 exit(-1)
107 115
108 116 options = None
109 117 if ini_vars is not None:
110 118 options = parse_ini_vars(ini_vars)
111 119
112 120 global ini_file_glob
113 121 ini_file_glob = ini_location
114 122
115 123 log.debug('Bootstrapping RhodeCode application...')
116 124 env = bootstrap(ini_location, options=options)
117 125
118 126 setup_celery_app(
119 127 app=env['app'], root=env['root'], request=env['request'],
120 128 registry=env['registry'], closer=env['closer'],
121 129 ini_location=ini_location)
122 130
123 131 # fix the global flag even if it's disabled via .ini file because this
124 132 # is a worker code that doesn't need this to be disabled.
125 133 rhodecode.CELERY_ENABLED = True
126 134
127 135
128 136 @signals.task_success.connect
129 137 def task_success_signal(result, **kwargs):
130 138 meta.Session.commit()
131 139 celery_app.conf['PYRAMID_CLOSER']()
132 140
133 141
134 142 @signals.task_retry.connect
135 143 def task_retry_signal(
136 144 request, reason, einfo, **kwargs):
137 145 meta.Session.remove()
138 146 celery_app.conf['PYRAMID_CLOSER']()
139 147
140 148
141 149 @signals.task_failure.connect
142 150 def task_failure_signal(
143 151 task_id, exception, args, kwargs, traceback, einfo, **kargs):
144 152 meta.Session.remove()
145 153 celery_app.conf['PYRAMID_CLOSER']()
146 154
147 155
148 156 @signals.task_revoked.connect
149 157 def task_revoked_signal(
150 158 request, terminated, signum, expired, **kwargs):
151 159 celery_app.conf['PYRAMID_CLOSER']()
152 160
153 161
154 162 def setup_celery_app(app, root, request, registry, closer, ini_location):
155 163 ini_dir = os.path.dirname(os.path.abspath(ini_location))
156 164 celery_config = base_celery_config
157 165 celery_config.update({
158 166 # store celerybeat scheduler db where the .ini file is
159 167 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
160 168 })
161 169 ini_settings = get_ini_config(ini_location)
162 170 log.debug('Got custom celery conf: %s', ini_settings)
163 171
164 172 celery_config.update(ini_settings)
165 173 celery_app.config_from_object(celery_config)
166 174
167 175 celery_app.conf.update({'PYRAMID_APP': app})
168 176 celery_app.conf.update({'PYRAMID_ROOT': root})
169 177 celery_app.conf.update({'PYRAMID_REQUEST': request})
170 178 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
171 179 celery_app.conf.update({'PYRAMID_CLOSER': closer})
172 180
173 181
174 182 def configure_celery(config, ini_location):
175 183 """
176 184 Helper that is called from our application creation logic. It gives
177 185 connection info into running webapp and allows execution of tasks from
178 186 RhodeCode itself
179 187 """
180 188 # store some globals into rhodecode
181 189 rhodecode.CELERY_ENABLED = str2bool(
182 190 config.registry.settings.get('use_celery'))
183 191 if rhodecode.CELERY_ENABLED:
184 192 log.info('Configuring celery based on `%s` file', ini_location)
185 193 setup_celery_app(
186 194 app=None, root=None, request=None, registry=config.registry,
187 195 closer=None, ini_location=ini_location)
188 196
189 197
190 198 class RequestContextTask(Task):
191 199 """
192 200 This is a celery task which will create a rhodecode app instance context
193 201 for the task, patch pyramid with the original request
194 202 that created the task and also add the user to the context.
195 203 """
196 204
197 205 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
198 206 link=None, link_error=None, shadow=None, **options):
199 207 """ queue the job to run (we are in web request context here) """
200 208
201 209 req = get_current_request()
202 210
203 211 # web case
204 212 if hasattr(req, 'user'):
205 213 ip_addr = req.user.ip_addr
206 214 user_id = req.user.user_id
207 215
208 216 # api case
209 217 elif hasattr(req, 'rpc_user'):
210 218 ip_addr = req.rpc_user.ip_addr
211 219 user_id = req.rpc_user.user_id
212 220 else:
213 221 raise Exception(
214 222 'Unable to fetch required data from request: {}. \n'
215 223 'This task is required to be executed from context of '
216 224 'request in a webapp'.format(repr(req)))
217 225
218 226 if req:
219 227 # we hook into kwargs since it is the only way to pass our data to
220 228 # the celery worker
221 229 options['headers'] = options.get('headers', {})
222 230 options['headers'].update({
223 231 'rhodecode_proxy_data': {
224 232 'environ': {
225 233 'PATH_INFO': req.environ['PATH_INFO'],
226 234 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
227 235 'HTTP_HOST': req.environ.get('HTTP_HOST',
228 236 req.environ['SERVER_NAME']),
229 237 'SERVER_NAME': req.environ['SERVER_NAME'],
230 238 'SERVER_PORT': req.environ['SERVER_PORT'],
231 239 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
232 240 },
233 241 'auth_user': {
234 242 'ip_addr': ip_addr,
235 243 'user_id': user_id
236 244 },
237 245 }
238 246 })
239 247
240 248 return super(RequestContextTask, self).apply_async(
241 249 args, kwargs, task_id, producer, link, link_error, shadow, **options)
242 250
243 251 def __call__(self, *args, **kwargs):
244 252 """ rebuild the context and then run task on celery worker """
245 253
246 254 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
247 255 if not proxy_data:
248 256 return super(RequestContextTask, self).__call__(*args, **kwargs)
249 257
250 258 log.debug('using celery proxy data to run task: %r', proxy_data)
251 259 # re-inject and register threadlocals for proper routing support
252 260 request = prepare_request(proxy_data['environ'])
253 261 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
254 262 ip_addr=proxy_data['auth_user']['ip_addr'])
255 263
256 264 return super(RequestContextTask, self).__call__(*args, **kwargs)
257 265
@@ -1,275 +1,285 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2012-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 RhodeCode task modules, containing all task that suppose to be run
23 23 by celery daemon
24 24 """
25 25
26 26 import os
27 import time
27 28
28 29 import rhodecode
29 30 from rhodecode.lib import audit_logger
30 31 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask
31 32 from rhodecode.lib.hooks_base import log_create_repository
32 33 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
33 34 from rhodecode.lib.utils2 import safe_int, str2bool
34 35 from rhodecode.model.db import Session, Repository, User
35 36
36 37
37 38 @async_task(ignore_result=True, base=RequestContextTask)
38 39 def send_email(recipients, subject, body='', html_body='', email_config=None):
39 40 """
40 41 Sends an email with defined parameters from the .ini files.
41 42
42 43 :param recipients: list of recipients, it this is empty the defined email
43 44 address from field 'email_to' is used instead
44 45 :param subject: subject of the mail
45 46 :param body: body of the mail
46 47 :param html_body: html version of body
47 48 """
48 49 log = get_logger(send_email)
49 50
50 51 email_config = email_config or rhodecode.CONFIG
51 52 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
52 53 if not recipients:
53 54 # if recipients are not defined we send to email_config + all admins
54 55 admins = [
55 56 u.email for u in User.query().filter(User.admin == True).all()]
56 57 recipients = [email_config.get('email_to')] + admins
57 58
58 59 mail_server = email_config.get('smtp_server') or None
59 60 if mail_server is None:
60 61 log.error("SMTP server information missing. Sending email failed. "
61 62 "Make sure that `smtp_server` variable is configured "
62 63 "inside the .ini file")
63 64 return False
64 65
65 66 mail_from = email_config.get('app_email_from', 'RhodeCode')
66 67 user = email_config.get('smtp_username')
67 68 passwd = email_config.get('smtp_password')
68 69 mail_port = email_config.get('smtp_port')
69 70 tls = str2bool(email_config.get('smtp_use_tls'))
70 71 ssl = str2bool(email_config.get('smtp_use_ssl'))
71 72 debug = str2bool(email_config.get('debug'))
72 73 smtp_auth = email_config.get('smtp_auth')
73 74
74 75 try:
75 76 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
76 77 mail_port, ssl, tls, debug=debug)
77 78 m.send(recipients, subject, body, html_body)
78 79 except Exception:
79 80 log.exception('Mail sending failed')
80 81 return False
81 82 return True
82 83
83 84
84 85 @async_task(ignore_result=True, base=RequestContextTask)
85 86 def create_repo(form_data, cur_user):
86 87 from rhodecode.model.repo import RepoModel
87 88 from rhodecode.model.user import UserModel
88 89 from rhodecode.model.settings import SettingsModel
89 90
90 91 log = get_logger(create_repo)
91 92
92 93 cur_user = UserModel()._get_user(cur_user)
93 94 owner = cur_user
94 95
95 96 repo_name = form_data['repo_name']
96 97 repo_name_full = form_data['repo_name_full']
97 98 repo_type = form_data['repo_type']
98 99 description = form_data['repo_description']
99 100 private = form_data['repo_private']
100 101 clone_uri = form_data.get('clone_uri')
101 102 repo_group = safe_int(form_data['repo_group'])
102 103 landing_rev = form_data['repo_landing_rev']
103 104 copy_fork_permissions = form_data.get('copy_permissions')
104 105 copy_group_permissions = form_data.get('repo_copy_permissions')
105 106 fork_of = form_data.get('fork_parent_id')
106 107 state = form_data.get('repo_state', Repository.STATE_PENDING)
107 108
108 109 # repo creation defaults, private and repo_type are filled in form
109 110 defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
110 111 enable_statistics = form_data.get(
111 112 'enable_statistics', defs.get('repo_enable_statistics'))
112 113 enable_locking = form_data.get(
113 114 'enable_locking', defs.get('repo_enable_locking'))
114 115 enable_downloads = form_data.get(
115 116 'enable_downloads', defs.get('repo_enable_downloads'))
116 117
117 118 try:
118 119 repo = RepoModel()._create_repo(
119 120 repo_name=repo_name_full,
120 121 repo_type=repo_type,
121 122 description=description,
122 123 owner=owner,
123 124 private=private,
124 125 clone_uri=clone_uri,
125 126 repo_group=repo_group,
126 127 landing_rev=landing_rev,
127 128 fork_of=fork_of,
128 129 copy_fork_permissions=copy_fork_permissions,
129 130 copy_group_permissions=copy_group_permissions,
130 131 enable_statistics=enable_statistics,
131 132 enable_locking=enable_locking,
132 133 enable_downloads=enable_downloads,
133 134 state=state
134 135 )
135 136 Session().commit()
136 137
137 138 # now create this repo on Filesystem
138 139 RepoModel()._create_filesystem_repo(
139 140 repo_name=repo_name,
140 141 repo_type=repo_type,
141 142 repo_group=RepoModel()._get_repo_group(repo_group),
142 143 clone_uri=clone_uri,
143 144 )
144 145 repo = Repository.get_by_repo_name(repo_name_full)
145 146 log_create_repository(created_by=owner.username, **repo.get_dict())
146 147
147 148 # update repo commit caches initially
148 149 repo.update_commit_cache()
149 150
150 151 # set new created state
151 152 repo.set_state(Repository.STATE_CREATED)
152 153 repo_id = repo.repo_id
153 154 repo_data = repo.get_api_data()
154 155
155 156 audit_logger.store(
156 157 'repo.create', action_data={'data': repo_data},
157 158 user=cur_user,
158 159 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
159 160
160 161 Session().commit()
161 162 except Exception:
162 163 log.warning('Exception occurred when creating repository, '
163 164 'doing cleanup...', exc_info=True)
164 165 # rollback things manually !
165 166 repo = Repository.get_by_repo_name(repo_name_full)
166 167 if repo:
167 168 Repository.delete(repo.repo_id)
168 169 Session().commit()
169 170 RepoModel()._delete_filesystem_repo(repo)
170 171 raise
171 172
172 173 # it's an odd fix to make celery fail task when exception occurs
173 174 def on_failure(self, *args, **kwargs):
174 175 pass
175 176
176 177 return True
177 178
178 179
179 180 @async_task(ignore_result=True, base=RequestContextTask)
180 181 def create_repo_fork(form_data, cur_user):
181 182 """
182 183 Creates a fork of repository using internal VCS methods
183 184 """
184 185 from rhodecode.model.repo import RepoModel
185 186 from rhodecode.model.user import UserModel
186 187
187 188 log = get_logger(create_repo_fork)
188 189
189 190 cur_user = UserModel()._get_user(cur_user)
190 191 owner = cur_user
191 192
192 193 repo_name = form_data['repo_name'] # fork in this case
193 194 repo_name_full = form_data['repo_name_full']
194 195 repo_type = form_data['repo_type']
195 196 description = form_data['description']
196 197 private = form_data['private']
197 198 clone_uri = form_data.get('clone_uri')
198 199 repo_group = safe_int(form_data['repo_group'])
199 200 landing_rev = form_data['landing_rev']
200 201 copy_fork_permissions = form_data.get('copy_permissions')
201 202 fork_id = safe_int(form_data.get('fork_parent_id'))
202 203
203 204 try:
204 205 fork_of = RepoModel()._get_repo(fork_id)
205 206 RepoModel()._create_repo(
206 207 repo_name=repo_name_full,
207 208 repo_type=repo_type,
208 209 description=description,
209 210 owner=owner,
210 211 private=private,
211 212 clone_uri=clone_uri,
212 213 repo_group=repo_group,
213 214 landing_rev=landing_rev,
214 215 fork_of=fork_of,
215 216 copy_fork_permissions=copy_fork_permissions
216 217 )
217 218
218 219 Session().commit()
219 220
220 221 base_path = Repository.base_path()
221 222 source_repo_path = os.path.join(base_path, fork_of.repo_name)
222 223
223 224 # now create this repo on Filesystem
224 225 RepoModel()._create_filesystem_repo(
225 226 repo_name=repo_name,
226 227 repo_type=repo_type,
227 228 repo_group=RepoModel()._get_repo_group(repo_group),
228 229 clone_uri=source_repo_path,
229 230 )
230 231 repo = Repository.get_by_repo_name(repo_name_full)
231 232 log_create_repository(created_by=owner.username, **repo.get_dict())
232 233
233 234 # update repo commit caches initially
234 235 config = repo._config
235 236 config.set('extensions', 'largefiles', '')
236 237 repo.update_commit_cache(config=config)
237 238
238 239 # set new created state
239 240 repo.set_state(Repository.STATE_CREATED)
240 241
241 242 repo_id = repo.repo_id
242 243 repo_data = repo.get_api_data()
243 244 audit_logger.store(
244 245 'repo.fork', action_data={'data': repo_data},
245 246 user=cur_user,
246 247 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
247 248
248 249 Session().commit()
249 250 except Exception as e:
250 251 log.warning('Exception %s occurred when forking repository, '
251 252 'doing cleanup...', e)
252 253 # rollback things manually !
253 254 repo = Repository.get_by_repo_name(repo_name_full)
254 255 if repo:
255 256 Repository.delete(repo.repo_id)
256 257 Session().commit()
257 258 RepoModel()._delete_filesystem_repo(repo)
258 259 raise
259 260
260 261 # it's an odd fix to make celery fail task when exception occurs
261 262 def on_failure(self, *args, **kwargs):
262 263 pass
263 264
264 265 return True
265 266
266 267
267 268 @async_task(ignore_result=True)
268 269 def sync_repo(*args, **kwargs):
269 270 from rhodecode.model.scm import ScmModel
270 271 log = get_logger(sync_repo)
271
272 log.info('Pulling from %s', kwargs['repo_name'])
272 repo_name = kwargs['repo_name']
273 log.info('Pulling from %s', repo_name)
274 dbrepo = Repository.get_by_repo_name(repo_name)
275 if dbrepo and dbrepo.clone_uri:
273 276 ScmModel().pull_changes(kwargs['repo_name'], kwargs['username'])
277 else:
278 log.debug('Repo `%s` not found or without a clone_url', repo_name)
274 279
275 280
281 @async_task(ignore_result=False)
282 def beat_check(*args, **kwargs):
283 log = get_logger(beat_check)
284 log.info('Got args: %r and kwargs %r', args, kwargs)
285 return time.time()
@@ -1,156 +1,169 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import json
23 23 import logging
24 24 import datetime
25 25
26 26 from functools import partial
27 27
28 28 from pyramid.compat import configparser
29 29 from celery.result import AsyncResult
30 30 import celery.loaders.base
31 31 import celery.schedules
32 32
33 33
34 34 log = logging.getLogger(__name__)
35 35
36 36
37 37 def get_task_id(task):
38 38 task_id = None
39 39 if isinstance(task, AsyncResult):
40 40 task_id = task.task_id
41 41
42 42 return task_id
43 43
44 44
45 45 def crontab(value):
46 46 return celery.schedules.crontab(**value)
47 47
48 48
49 49 def timedelta(value):
50 50 return datetime.timedelta(**value)
51 51
52 52
53 53 def safe_json(get, section, key):
54 54 value = ''
55 55 try:
56 56 value = get(key)
57 57 json_value = json.loads(value)
58 58 except ValueError:
59 59 msg = 'The %s=%s is not valid json in section %s' % (
60 60 key, value, section
61 61 )
62 62 raise ValueError(msg)
63 63
64 64 return json_value
65 65
66 66
67 def get_beat_config(parser, section):
68 SCHEDULE_TYPE_MAP = {
67 def raw_2_schedule(schedule_value, schedule_type):
68 schedule_type_map = {
69 69 'crontab': crontab,
70 70 'timedelta': timedelta,
71 71 'integer': int
72 72 }
73 scheduler_cls = schedule_type_map.get(schedule_type)
74
75 if scheduler_cls is None:
76 raise ValueError(
77 'schedule type %s in section is invalid' % (
78 schedule_type,
79 )
80 )
81 try:
82 schedule = scheduler_cls(schedule_value)
83 except TypeError:
84 log.exception('Failed to compose a schedule from value: %r', schedule_value)
85 schedule = None
86 return schedule
87
88
89 def get_beat_config(parser, section):
90
73 91 get = partial(parser.get, section)
74 92 has_option = partial(parser.has_option, section)
75 93
76 94 schedule_type = get('type')
77 95 schedule_value = safe_json(get, section, 'schedule')
78 96
79 scheduler_cls = SCHEDULE_TYPE_MAP.get(schedule_type)
80
81 if scheduler_cls is None:
82 raise ValueError(
83 'schedule type %s in section %s is invalid' % (
84 schedule_type,
85 section
86 )
87 )
88
89 schedule = scheduler_cls(schedule_value)
90
91 97 config = {
98 'schedule_type': schedule_type,
99 'schedule_value': schedule_value,
92 100 'task': get('task'),
93 'schedule': schedule,
94 101 }
102 schedule = raw_2_schedule(schedule_value, schedule_type)
103 if schedule:
104 config['schedule'] = schedule
95 105
96 106 if has_option('args'):
97 107 config['args'] = safe_json(get, section, 'args')
98 108
99 109 if has_option('kwargs'):
100 110 config['kwargs'] = safe_json(get, section, 'kwargs')
101 111
112 if has_option('force_update'):
113 config['force_update'] = get('force_update')
114
102 115 return config
103 116
104 117
105 118 def get_ini_config(ini_location):
106 119 """
107 120 Converts basic ini configuration into celery 4.X options
108 121 """
109 122 def key_converter(key_name):
110 123 pref = 'celery.'
111 124 if key_name.startswith(pref):
112 125 return key_name[len(pref):].replace('.', '_').lower()
113 126
114 127 def type_converter(parsed_key, value):
115 128 # cast to int
116 129 if value.isdigit():
117 130 return int(value)
118 131
119 132 # cast to bool
120 133 if value.lower() in ['true', 'false', 'True', 'False']:
121 134 return value.lower() == 'true'
122 135 return value
123 136
124 137 parser = configparser.SafeConfigParser(
125 138 defaults={'here': os.path.abspath(ini_location)})
126 139 parser.read(ini_location)
127 140
128 141 ini_config = {}
129 142 for k, v in parser.items('app:main'):
130 143 pref = 'celery.'
131 144 if k.startswith(pref):
132 145 ini_config[key_converter(k)] = type_converter(key_converter(k), v)
133 146
134 147 beat_config = {}
135 148 for section in parser.sections():
136 149 if section.startswith('celerybeat:'):
137 150 name = section.split(':', 1)[1]
138 151 beat_config[name] = get_beat_config(parser, section)
139 152
140 153 # final compose of settings
141 154 celery_settings = {}
142 155
143 156 if ini_config:
144 157 celery_settings.update(ini_config)
145 158 if beat_config:
146 159 celery_settings.update({'beat_schedule': beat_config})
147 160
148 161 return celery_settings
149 162
150 163
151 164 def parse_ini_vars(ini_vars):
152 165 options = {}
153 166 for pairs in ini_vars.split(','):
154 167 key, value = pairs.split('=')
155 168 options[key] = value
156 169 return options
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
@@ -1,23 +1,26 b''
1 1 <span tal:define="name name|field.name;
2 2 css_class css_class|field.widget.css_class;
3 3 oid oid|field.oid;
4 4 mask mask|field.widget.mask;
5 5 placeholder placeholder|field.widget.placeholder|field.placeholder|'';
6 6 mask_placeholder mask_placeholder|field.widget.mask_placeholder;
7 7 style style|field.widget.style;
8 help_block help_block|field.widget.help_block|'';
8 9 "
9 10 tal:omit-tag="">
10 11 <input type="text" name="${name}" value="${cstruct}"
11 12 tal:attributes="class string: form-control ${css_class or ''};
12 13 style style"
13 14 placeholder="${placeholder}"
14 15 id="${oid}"/>
16
17 <p tal:condition="help_block" class="help-block">${help_block}</p>
15 18 <script tal:condition="mask" type="text/javascript">
16 19 deform.addCallback(
17 20 '${oid}',
18 21 function (oid) {
19 22 $("#" + oid).mask("${mask}",
20 23 {placeholder:"${mask_placeholder}"});
21 24 });
22 25 </script>
23 26 </span> No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now