##// END OF EJS Templates
scheduler: added DB models and db parsers for the RhodeCode scheduler....
marcink -
r2406:db81d430 default
parent child Browse files
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -0,0 +1,60 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import logging
21 import importlib
22
23 from celery.beat import (
24 PersistentScheduler, ScheduleEntry as CeleryScheduleEntry)
25
26 log = logging.getLogger(__name__)
27
28
29 class FileScheduleEntry(CeleryScheduleEntry):
30 def __init__(self, name=None, task=None, last_run_at=None,
31 total_run_count=None, schedule=None, args=(), kwargs=None,
32 options=None, relative=False, app=None, **_kwargs):
33 kwargs = kwargs or {}
34 options = options or {}
35
36 # because our custom loader passes in some variables that the original
37 # function doesn't expect, we have this thin wrapper
38
39 super(FileScheduleEntry, self).__init__(
40 name=name, task=task, last_run_at=last_run_at,
41 total_run_count=total_run_count, schedule=schedule, args=args,
42 kwargs=kwargs, options=options, relative=relative, app=app)
43
44
45 class FileScheduler(PersistentScheduler):
46 """CE base scheduler"""
47 Entry = FileScheduleEntry
48
49 def setup_schedule(self):
50 log.info("setup_schedule called")
51 super(FileScheduler, self).setup_schedule()
52
53
54 try:
55 # try if we have EE scheduler available
56 module = importlib.import_module('rc_ee.lib.celerylib.scheduler')
57 RcScheduler = module.RcScheduler
58 except ImportError:
59 # fallback to CE scheduler
60 RcScheduler = FileScheduler
1 NO CONTENT: new file 100644
NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
@@ -0,0 +1,32 b''
1 import logging
2
3 from sqlalchemy import *
4
5 from rhodecode.model import meta
6 from rhodecode.lib.dbmigrate.versions import _reset_base, notify
7
8 log = logging.getLogger(__name__)
9
10
11 def upgrade(migrate_engine):
12 """
13 Upgrade operations go here.
14 Don't create your own engine; bind migrate_engine to your metadata
15 """
16 _reset_base(migrate_engine)
17 from rhodecode.lib.dbmigrate.schema import db_4_11_0_0
18 db_4_11_0_0.ScheduleEntry().__table__.create()
19
20 # issue fixups
21 fixups(db_4_11_0_0, meta.Session)
22
23
24 def downgrade(migrate_engine):
25 meta = MetaData()
26 meta.bind = migrate_engine
27
28
29 def fixups(models, _SESSION):
30 pass
31
32
@@ -51,7 +51,7 b' PYRAMID_SETTINGS = {}'
51 EXTENSIONS = {}
51 EXTENSIONS = {}
52
52
53 __version__ = ('.'.join((str(each) for each in VERSION[:3])))
53 __version__ = ('.'.join((str(each) for each in VERSION[:3])))
54 __dbversion__ = 81 # defines current db version for migrations
54 __dbversion__ = 82 # defines current db version for migrations
55 __platform__ = platform.system()
55 __platform__ = platform.system()
56 __license__ = 'AGPLv3, and Commercial License'
56 __license__ = 'AGPLv3, and Commercial License'
57 __author__ = 'RhodeCode GmbH'
57 __author__ = 'RhodeCode GmbH'
@@ -20,7 +20,11 b''
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini
23 celery worker \
24 --beat \
25 --app rhodecode.lib.celerylib.loader \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --loglevel DEBUG --ini=._dev/dev.ini
24 """
28 """
25 import os
29 import os
26 import logging
30 import logging
@@ -83,6 +87,10 b' base_celery_config = {'
83 'task_serializer': 'json_ext',
87 'task_serializer': 'json_ext',
84 'result_serializer': 'json_ext',
88 'result_serializer': 'json_ext',
85 'worker_hijack_root_logger': False,
89 'worker_hijack_root_logger': False,
90 'database_table_names': {
91 'task': 'beat_taskmeta',
92 'group': 'beat_groupmeta',
93 }
86 }
94 }
87 # init main celery app
95 # init main celery app
88 celery_app = Celery()
96 celery_app = Celery()
@@ -24,6 +24,7 b' by celery daemon'
24 """
24 """
25
25
26 import os
26 import os
27 import time
27
28
28 import rhodecode
29 import rhodecode
29 from rhodecode.lib import audit_logger
30 from rhodecode.lib import audit_logger
@@ -268,8 +269,17 b' def create_repo_fork(form_data, cur_user'
268 def sync_repo(*args, **kwargs):
269 def sync_repo(*args, **kwargs):
269 from rhodecode.model.scm import ScmModel
270 from rhodecode.model.scm import ScmModel
270 log = get_logger(sync_repo)
271 log = get_logger(sync_repo)
271
272 repo_name = kwargs['repo_name']
272 log.info('Pulling from %s', kwargs['repo_name'])
273 log.info('Pulling from %s', repo_name)
274 dbrepo = Repository.get_by_repo_name(repo_name)
275 if dbrepo and dbrepo.clone_uri:
273 ScmModel().pull_changes(kwargs['repo_name'], kwargs['username'])
276 ScmModel().pull_changes(kwargs['repo_name'], kwargs['username'])
277 else:
278 log.debug('Repo `%s` not found or without a clone_url', repo_name)
274
279
275
280
281 @async_task(ignore_result=False)
282 def beat_check(*args, **kwargs):
283 log = get_logger(beat_check)
284 log.info('Got args: %r and kwargs %r', args, kwargs)
285 return time.time()
@@ -64,34 +64,44 b' def safe_json(get, section, key):'
64 return json_value
64 return json_value
65
65
66
66
67 def get_beat_config(parser, section):
67 def raw_2_schedule(schedule_value, schedule_type):
68 SCHEDULE_TYPE_MAP = {
68 schedule_type_map = {
69 'crontab': crontab,
69 'crontab': crontab,
70 'timedelta': timedelta,
70 'timedelta': timedelta,
71 'integer': int
71 'integer': int
72 }
72 }
73 scheduler_cls = schedule_type_map.get(schedule_type)
74
75 if scheduler_cls is None:
76 raise ValueError(
77 'schedule type %s in section is invalid' % (
78 schedule_type,
79 )
80 )
81 try:
82 schedule = scheduler_cls(schedule_value)
83 except TypeError:
84 log.exception('Failed to compose a schedule from value: %r', schedule_value)
85 schedule = None
86 return schedule
87
88
89 def get_beat_config(parser, section):
90
73 get = partial(parser.get, section)
91 get = partial(parser.get, section)
74 has_option = partial(parser.has_option, section)
92 has_option = partial(parser.has_option, section)
75
93
76 schedule_type = get('type')
94 schedule_type = get('type')
77 schedule_value = safe_json(get, section, 'schedule')
95 schedule_value = safe_json(get, section, 'schedule')
78
96
79 scheduler_cls = SCHEDULE_TYPE_MAP.get(schedule_type)
80
81 if scheduler_cls is None:
82 raise ValueError(
83 'schedule type %s in section %s is invalid' % (
84 schedule_type,
85 section
86 )
87 )
88
89 schedule = scheduler_cls(schedule_value)
90
91 config = {
97 config = {
98 'schedule_type': schedule_type,
99 'schedule_value': schedule_value,
92 'task': get('task'),
100 'task': get('task'),
93 'schedule': schedule,
94 }
101 }
102 schedule = raw_2_schedule(schedule_value, schedule_type)
103 if schedule:
104 config['schedule'] = schedule
95
105
96 if has_option('args'):
106 if has_option('args'):
97 config['args'] = safe_json(get, section, 'args')
107 config['args'] = safe_json(get, section, 'args')
@@ -99,6 +109,9 b' def get_beat_config(parser, section):'
99 if has_option('kwargs'):
109 if has_option('kwargs'):
100 config['kwargs'] = safe_json(get, section, 'kwargs')
110 config['kwargs'] = safe_json(get, section, 'kwargs')
101
111
112 if has_option('force_update'):
113 config['force_update'] = get('force_update')
114
102 return config
115 return config
103
116
104
117
@@ -36,7 +36,7 b' import collections'
36
36
37 from sqlalchemy import (
37 from sqlalchemy import (
38 or_, and_, not_, func, TypeDecorator, event,
38 or_, and_, not_, func, TypeDecorator, event,
39 Index, UniqueConstraint, ForeignKey, CheckConstraint, Column,
39 Index, Sequence, UniqueConstraint, ForeignKey, CheckConstraint, Column,
40 Boolean, String, Unicode, UnicodeText, DateTime, Integer, LargeBinary,
40 Boolean, String, Unicode, UnicodeText, DateTime, Integer, LargeBinary,
41 Text, Float, PickleType)
41 Text, Float, PickleType)
42 from sqlalchemy.sql.expression import true, false
42 from sqlalchemy.sql.expression import true, false
@@ -225,10 +225,10 b' class BaseModel(object):'
225 """return list with keys and values tuples corresponding
225 """return list with keys and values tuples corresponding
226 to this model data """
226 to this model data """
227
227
228 l = []
228 lst = []
229 for k in self._get_keys():
229 for k in self._get_keys():
230 l.append((k, getattr(self, k),))
230 lst.append((k, getattr(self, k),))
231 return l
231 return lst
232
232
233 def populate_obj(self, populate_dict):
233 def populate_obj(self, populate_dict):
234 """populate model with data from given populate_dict"""
234 """populate model with data from given populate_dict"""
@@ -4208,6 +4208,136 b' class RepoReviewRule(Base, BaseModel):'
4208 self.repo_review_rule_id, self.repo)
4208 self.repo_review_rule_id, self.repo)
4209
4209
4210
4210
4211 class ScheduleEntry(Base, BaseModel):
4212 __tablename__ = 'schedule_entries'
4213 __table_args__ = (
4214 UniqueConstraint('schedule_name', name='s_schedule_name_idx'),
4215 UniqueConstraint('task_uid', name='s_task_uid_idx'),
4216 {'extend_existing': True, 'mysql_engine': 'InnoDB',
4217 'mysql_charset': 'utf8', 'sqlite_autoincrement': True},
4218 )
4219 schedule_types = ['crontab', 'timedelta', 'integer']
4220 schedule_entry_id = Column('schedule_entry_id', Integer(), primary_key=True)
4221
4222 schedule_name = Column("schedule_name", String(255), nullable=False, unique=None, default=None)
4223 schedule_description = Column("schedule_description", String(10000), nullable=True, unique=None, default=None)
4224 schedule_enabled = Column("schedule_enabled", Boolean(), nullable=False, unique=None, default=True)
4225
4226 _schedule_type = Column("schedule_type", String(255), nullable=False, unique=None, default=None)
4227 schedule_definition = Column('schedule_definition_json', MutationObj.as_mutable(JsonType(default=lambda: "", dialect_map=dict(mysql=LONGTEXT()))))
4228
4229 schedule_last_run = Column('schedule_last_run', DateTime(timezone=False), nullable=True, unique=None, default=None)
4230 schedule_total_run_count = Column('schedule_total_run_count', Integer(), nullable=True, unique=None, default=0)
4231
4232 # task
4233 task_uid = Column("task_uid", String(255), nullable=False, unique=None, default=None)
4234 task_dot_notation = Column("task_dot_notation", String(4096), nullable=False, unique=None, default=None)
4235 task_args = Column('task_args_json', MutationObj.as_mutable(JsonType(default=list, dialect_map=dict(mysql=LONGTEXT()))))
4236 task_kwargs = Column('task_kwargs_json', MutationObj.as_mutable(JsonType(default=dict, dialect_map=dict(mysql=LONGTEXT()))))
4237
4238 created_on = Column('created_on', DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
4239 updated_on = Column('updated_on', DateTime(timezone=False), nullable=True, unique=None, default=None)
4240
4241 @hybrid_property
4242 def schedule_type(self):
4243 return self._schedule_type
4244
4245 @schedule_type.setter
4246 def schedule_type(self, val):
4247 if val not in self.schedule_types:
4248 raise ValueError('Value must be on of `{}` and got `{}`'.format(
4249 val, self.schedule_type))
4250
4251 self._schedule_type = val
4252
4253 @classmethod
4254 def get_uid(cls, obj):
4255 args = obj.task_args
4256 kwargs = obj.task_kwargs
4257 if isinstance(args, JsonRaw):
4258 try:
4259 args = json.loads(args)
4260 except ValueError:
4261 args = tuple()
4262
4263 if isinstance(kwargs, JsonRaw):
4264 try:
4265 kwargs = json.loads(kwargs)
4266 except ValueError:
4267 kwargs = dict()
4268
4269 dot_notation = obj.task_dot_notation
4270 val = '.'.join(map(safe_str, [
4271 sorted(dot_notation), args, sorted(kwargs.items())]))
4272 return hashlib.sha1(val).hexdigest()
4273
4274 @classmethod
4275 def get_by_schedule_name(cls, schedule_name):
4276 return cls.query().filter(cls.schedule_name == schedule_name).scalar()
4277
4278 @classmethod
4279 def get_by_schedule_id(cls, schedule_id):
4280 return cls.query().filter(cls.schedule_entry_id == schedule_id).scalar()
4281
4282 @property
4283 def task(self):
4284 return self.task_dot_notation
4285
4286 @property
4287 def schedule(self):
4288 from rhodecode.lib.celerylib.utils import raw_2_schedule
4289 schedule = raw_2_schedule(self.schedule_definition, self.schedule_type)
4290 return schedule
4291
4292 @property
4293 def args(self):
4294 try:
4295 return list(self.task_args or [])
4296 except ValueError:
4297 return list()
4298
4299 @property
4300 def kwargs(self):
4301 try:
4302 return dict(self.task_kwargs or {})
4303 except ValueError:
4304 return dict()
4305
4306 def _as_raw(self, val):
4307 if hasattr(val, 'de_coerce'):
4308 val = val.de_coerce()
4309 if val:
4310 val = json.dumps(val)
4311
4312 return val
4313
4314 @property
4315 def schedule_definition_raw(self):
4316 return self._as_raw(self.schedule_definition)
4317
4318 @property
4319 def args_raw(self):
4320 return self._as_raw(self.task_args)
4321
4322 @property
4323 def kwargs_raw(self):
4324 return self._as_raw(self.task_kwargs)
4325
4326 def __repr__(self):
4327 return '<DB:ScheduleEntry({}:{})>'.format(
4328 self.schedule_entry_id, self.schedule_name)
4329
4330
4331 @event.listens_for(ScheduleEntry, 'before_update')
4332 def update_task_uid(mapper, connection, target):
4333 target.task_uid = ScheduleEntry.get_uid(target)
4334
4335
4336 @event.listens_for(ScheduleEntry, 'before_insert')
4337 def set_task_uid(mapper, connection, target):
4338 target.task_uid = ScheduleEntry.get_uid(target)
4339
4340
4211 class DbMigrateVersion(Base, BaseModel):
4341 class DbMigrateVersion(Base, BaseModel):
4212 __tablename__ = 'db_migrate_version'
4342 __tablename__ = 'db_migrate_version'
4213 __table_args__ = (
4343 __table_args__ = (
@@ -5,6 +5,7 b''
5 placeholder placeholder|field.widget.placeholder|field.placeholder|'';
5 placeholder placeholder|field.widget.placeholder|field.placeholder|'';
6 mask_placeholder mask_placeholder|field.widget.mask_placeholder;
6 mask_placeholder mask_placeholder|field.widget.mask_placeholder;
7 style style|field.widget.style;
7 style style|field.widget.style;
8 help_block help_block|field.widget.help_block|'';
8 "
9 "
9 tal:omit-tag="">
10 tal:omit-tag="">
10 <input type="text" name="${name}" value="${cstruct}"
11 <input type="text" name="${name}" value="${cstruct}"
@@ -12,6 +13,8 b''
12 style style"
13 style style"
13 placeholder="${placeholder}"
14 placeholder="${placeholder}"
14 id="${oid}"/>
15 id="${oid}"/>
16
17 <p tal:condition="help_block" class="help-block">${help_block}</p>
15 <script tal:condition="mask" type="text/javascript">
18 <script tal:condition="mask" type="text/javascript">
16 deform.addCallback(
19 deform.addCallback(
17 '${oid}',
20 '${oid}',
General Comments 0
You need to be logged in to leave comments. Login now