##// END OF EJS Templates
scheduler: added DB models and db parsers for the RhodeCode scheduler....
marcink -
r2406:db81d430 default
parent child Browse files
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -0,0 +1,60 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 import logging
21 import importlib
22
23 from celery.beat import (
24 PersistentScheduler, ScheduleEntry as CeleryScheduleEntry)
25
26 log = logging.getLogger(__name__)
27
28
29 class FileScheduleEntry(CeleryScheduleEntry):
30 def __init__(self, name=None, task=None, last_run_at=None,
31 total_run_count=None, schedule=None, args=(), kwargs=None,
32 options=None, relative=False, app=None, **_kwargs):
33 kwargs = kwargs or {}
34 options = options or {}
35
36 # because our custom loader passes in some variables that the original
37 # function doesn't expect, we have this thin wrapper
38
39 super(FileScheduleEntry, self).__init__(
40 name=name, task=task, last_run_at=last_run_at,
41 total_run_count=total_run_count, schedule=schedule, args=args,
42 kwargs=kwargs, options=options, relative=relative, app=app)
43
44
45 class FileScheduler(PersistentScheduler):
46 """CE base scheduler"""
47 Entry = FileScheduleEntry
48
49 def setup_schedule(self):
50 log.info("setup_schedule called")
51 super(FileScheduler, self).setup_schedule()
52
53
54 try:
55 # try if we have EE scheduler available
56 module = importlib.import_module('rc_ee.lib.celerylib.scheduler')
57 RcScheduler = module.RcScheduler
58 except ImportError:
59 # fallback to CE scheduler
60 RcScheduler = FileScheduler
1 NO CONTENT: new file 100644
The requested commit or file is too big and content was truncated. Show full diff
@@ -0,0 +1,32 b''
1 import logging
2
3 from sqlalchemy import *
4
5 from rhodecode.model import meta
6 from rhodecode.lib.dbmigrate.versions import _reset_base, notify
7
8 log = logging.getLogger(__name__)
9
10
11 def upgrade(migrate_engine):
12 """
13 Upgrade operations go here.
14 Don't create your own engine; bind migrate_engine to your metadata
15 """
16 _reset_base(migrate_engine)
17 from rhodecode.lib.dbmigrate.schema import db_4_11_0_0
18 db_4_11_0_0.ScheduleEntry().__table__.create()
19
20 # issue fixups
21 fixups(db_4_11_0_0, meta.Session)
22
23
24 def downgrade(migrate_engine):
25 meta = MetaData()
26 meta.bind = migrate_engine
27
28
29 def fixups(models, _SESSION):
30 pass
31
32
@@ -51,7 +51,7 b' PYRAMID_SETTINGS = {}'
51 51 EXTENSIONS = {}
52 52
53 53 __version__ = ('.'.join((str(each) for each in VERSION[:3])))
54 __dbversion__ = 81 # defines current db version for migrations
54 __dbversion__ = 82 # defines current db version for migrations
55 55 __platform__ = platform.system()
56 56 __license__ = 'AGPLv3, and Commercial License'
57 57 __author__ = 'RhodeCode GmbH'
@@ -20,7 +20,11 b''
20 20 """
21 21 Celery loader, run with::
22 22
23 celery worker --beat --app rhodecode.lib.celerylib.loader --loglevel DEBUG --ini=._dev/dev.ini
23 celery worker \
24 --beat \
25 --app rhodecode.lib.celerylib.loader \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --loglevel DEBUG --ini=._dev/dev.ini
24 28 """
25 29 import os
26 30 import logging
@@ -83,6 +87,10 b' base_celery_config = {'
83 87 'task_serializer': 'json_ext',
84 88 'result_serializer': 'json_ext',
85 89 'worker_hijack_root_logger': False,
90 'database_table_names': {
91 'task': 'beat_taskmeta',
92 'group': 'beat_groupmeta',
93 }
86 94 }
87 95 # init main celery app
88 96 celery_app = Celery()
@@ -24,6 +24,7 b' by celery daemon'
24 24 """
25 25
26 26 import os
27 import time
27 28
28 29 import rhodecode
29 30 from rhodecode.lib import audit_logger
@@ -268,8 +269,17 b' def create_repo_fork(form_data, cur_user'
268 269 def sync_repo(*args, **kwargs):
269 270 from rhodecode.model.scm import ScmModel
270 271 log = get_logger(sync_repo)
271
272 log.info('Pulling from %s', kwargs['repo_name'])
272 repo_name = kwargs['repo_name']
273 log.info('Pulling from %s', repo_name)
274 dbrepo = Repository.get_by_repo_name(repo_name)
275 if dbrepo and dbrepo.clone_uri:
273 276 ScmModel().pull_changes(kwargs['repo_name'], kwargs['username'])
277 else:
278 log.debug('Repo `%s` not found or without a clone_url', repo_name)
274 279
275 280
281 @async_task(ignore_result=False)
282 def beat_check(*args, **kwargs):
283 log = get_logger(beat_check)
284 log.info('Got args: %r and kwargs %r', args, kwargs)
285 return time.time()
@@ -64,34 +64,44 b' def safe_json(get, section, key):'
64 64 return json_value
65 65
66 66
67 def get_beat_config(parser, section):
68 SCHEDULE_TYPE_MAP = {
67 def raw_2_schedule(schedule_value, schedule_type):
68 schedule_type_map = {
69 69 'crontab': crontab,
70 70 'timedelta': timedelta,
71 71 'integer': int
72 72 }
73 scheduler_cls = schedule_type_map.get(schedule_type)
74
75 if scheduler_cls is None:
76 raise ValueError(
77 'schedule type %s in section is invalid' % (
78 schedule_type,
79 )
80 )
81 try:
82 schedule = scheduler_cls(schedule_value)
83 except TypeError:
84 log.exception('Failed to compose a schedule from value: %r', schedule_value)
85 schedule = None
86 return schedule
87
88
89 def get_beat_config(parser, section):
90
73 91 get = partial(parser.get, section)
74 92 has_option = partial(parser.has_option, section)
75 93
76 94 schedule_type = get('type')
77 95 schedule_value = safe_json(get, section, 'schedule')
78 96
79 scheduler_cls = SCHEDULE_TYPE_MAP.get(schedule_type)
80
81 if scheduler_cls is None:
82 raise ValueError(
83 'schedule type %s in section %s is invalid' % (
84 schedule_type,
85 section
86 )
87 )
88
89 schedule = scheduler_cls(schedule_value)
90
91 97 config = {
98 'schedule_type': schedule_type,
99 'schedule_value': schedule_value,
92 100 'task': get('task'),
93 'schedule': schedule,
94 101 }
102 schedule = raw_2_schedule(schedule_value, schedule_type)
103 if schedule:
104 config['schedule'] = schedule
95 105
96 106 if has_option('args'):
97 107 config['args'] = safe_json(get, section, 'args')
@@ -99,6 +109,9 b' def get_beat_config(parser, section):'
99 109 if has_option('kwargs'):
100 110 config['kwargs'] = safe_json(get, section, 'kwargs')
101 111
112 if has_option('force_update'):
113 config['force_update'] = get('force_update')
114
102 115 return config
103 116
104 117
@@ -36,7 +36,7 b' import collections'
36 36
37 37 from sqlalchemy import (
38 38 or_, and_, not_, func, TypeDecorator, event,
39 Index, UniqueConstraint, ForeignKey, CheckConstraint, Column,
39 Index, Sequence, UniqueConstraint, ForeignKey, CheckConstraint, Column,
40 40 Boolean, String, Unicode, UnicodeText, DateTime, Integer, LargeBinary,
41 41 Text, Float, PickleType)
42 42 from sqlalchemy.sql.expression import true, false
@@ -225,10 +225,10 b' class BaseModel(object):'
225 225 """return list with keys and values tuples corresponding
226 226 to this model data """
227 227
228 l = []
228 lst = []
229 229 for k in self._get_keys():
230 l.append((k, getattr(self, k),))
231 return l
230 lst.append((k, getattr(self, k),))
231 return lst
232 232
233 233 def populate_obj(self, populate_dict):
234 234 """populate model with data from given populate_dict"""
@@ -4208,6 +4208,136 b' class RepoReviewRule(Base, BaseModel):'
4208 4208 self.repo_review_rule_id, self.repo)
4209 4209
4210 4210
4211 class ScheduleEntry(Base, BaseModel):
4212 __tablename__ = 'schedule_entries'
4213 __table_args__ = (
4214 UniqueConstraint('schedule_name', name='s_schedule_name_idx'),
4215 UniqueConstraint('task_uid', name='s_task_uid_idx'),
4216 {'extend_existing': True, 'mysql_engine': 'InnoDB',
4217 'mysql_charset': 'utf8', 'sqlite_autoincrement': True},
4218 )
4219 schedule_types = ['crontab', 'timedelta', 'integer']
4220 schedule_entry_id = Column('schedule_entry_id', Integer(), primary_key=True)
4221
4222 schedule_name = Column("schedule_name", String(255), nullable=False, unique=None, default=None)
4223 schedule_description = Column("schedule_description", String(10000), nullable=True, unique=None, default=None)
4224 schedule_enabled = Column("schedule_enabled", Boolean(), nullable=False, unique=None, default=True)
4225
4226 _schedule_type = Column("schedule_type", String(255), nullable=False, unique=None, default=None)
4227 schedule_definition = Column('schedule_definition_json', MutationObj.as_mutable(JsonType(default=lambda: "", dialect_map=dict(mysql=LONGTEXT()))))
4228
4229 schedule_last_run = Column('schedule_last_run', DateTime(timezone=False), nullable=True, unique=None, default=None)
4230 schedule_total_run_count = Column('schedule_total_run_count', Integer(), nullable=True, unique=None, default=0)
4231
4232 # task
4233 task_uid = Column("task_uid", String(255), nullable=False, unique=None, default=None)
4234 task_dot_notation = Column("task_dot_notation", String(4096), nullable=False, unique=None, default=None)
4235 task_args = Column('task_args_json', MutationObj.as_mutable(JsonType(default=list, dialect_map=dict(mysql=LONGTEXT()))))
4236 task_kwargs = Column('task_kwargs_json', MutationObj.as_mutable(JsonType(default=dict, dialect_map=dict(mysql=LONGTEXT()))))
4237
4238 created_on = Column('created_on', DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
4239 updated_on = Column('updated_on', DateTime(timezone=False), nullable=True, unique=None, default=None)
4240
4241 @hybrid_property
4242 def schedule_type(self):
4243 return self._schedule_type
4244
4245 @schedule_type.setter
4246 def schedule_type(self, val):
4247 if val not in self.schedule_types:
4248 raise ValueError('Value must be on of `{}` and got `{}`'.format(
4249 val, self.schedule_type))
4250
4251 self._schedule_type = val
4252
4253 @classmethod
4254 def get_uid(cls, obj):
4255 args = obj.task_args
4256 kwargs = obj.task_kwargs
4257 if isinstance(args, JsonRaw):
4258 try:
4259 args = json.loads(args)
4260 except ValueError:
4261 args = tuple()
4262
4263 if isinstance(kwargs, JsonRaw):
4264 try:
4265 kwargs = json.loads(kwargs)
4266 except ValueError:
4267 kwargs = dict()
4268
4269 dot_notation = obj.task_dot_notation
4270 val = '.'.join(map(safe_str, [
4271 sorted(dot_notation), args, sorted(kwargs.items())]))
4272 return hashlib.sha1(val).hexdigest()
4273
4274 @classmethod
4275 def get_by_schedule_name(cls, schedule_name):
4276 return cls.query().filter(cls.schedule_name == schedule_name).scalar()
4277
4278 @classmethod
4279 def get_by_schedule_id(cls, schedule_id):
4280 return cls.query().filter(cls.schedule_entry_id == schedule_id).scalar()
4281
4282 @property
4283 def task(self):
4284 return self.task_dot_notation
4285
4286 @property
4287 def schedule(self):
4288 from rhodecode.lib.celerylib.utils import raw_2_schedule
4289 schedule = raw_2_schedule(self.schedule_definition, self.schedule_type)
4290 return schedule
4291
4292 @property
4293 def args(self):
4294 try:
4295 return list(self.task_args or [])
4296 except ValueError:
4297 return list()
4298
4299 @property
4300 def kwargs(self):
4301 try:
4302 return dict(self.task_kwargs or {})
4303 except ValueError:
4304 return dict()
4305
4306 def _as_raw(self, val):
4307 if hasattr(val, 'de_coerce'):
4308 val = val.de_coerce()
4309 if val:
4310 val = json.dumps(val)
4311
4312 return val
4313
4314 @property
4315 def schedule_definition_raw(self):
4316 return self._as_raw(self.schedule_definition)
4317
4318 @property
4319 def args_raw(self):
4320 return self._as_raw(self.task_args)
4321
4322 @property
4323 def kwargs_raw(self):
4324 return self._as_raw(self.task_kwargs)
4325
4326 def __repr__(self):
4327 return '<DB:ScheduleEntry({}:{})>'.format(
4328 self.schedule_entry_id, self.schedule_name)
4329
4330
4331 @event.listens_for(ScheduleEntry, 'before_update')
4332 def update_task_uid(mapper, connection, target):
4333 target.task_uid = ScheduleEntry.get_uid(target)
4334
4335
4336 @event.listens_for(ScheduleEntry, 'before_insert')
4337 def set_task_uid(mapper, connection, target):
4338 target.task_uid = ScheduleEntry.get_uid(target)
4339
4340
4211 4341 class DbMigrateVersion(Base, BaseModel):
4212 4342 __tablename__ = 'db_migrate_version'
4213 4343 __table_args__ = (
@@ -5,6 +5,7 b''
5 5 placeholder placeholder|field.widget.placeholder|field.placeholder|'';
6 6 mask_placeholder mask_placeholder|field.widget.mask_placeholder;
7 7 style style|field.widget.style;
8 help_block help_block|field.widget.help_block|'';
8 9 "
9 10 tal:omit-tag="">
10 11 <input type="text" name="${name}" value="${cstruct}"
@@ -12,6 +13,8 b''
12 13 style style"
13 14 placeholder="${placeholder}"
14 15 id="${oid}"/>
16
17 <p tal:condition="help_block" class="help-block">${help_block}</p>
15 18 <script tal:condition="mask" type="text/javascript">
16 19 deform.addCallback(
17 20 '${oid}',
General Comments 0
You need to be logged in to leave comments. Login now