##// END OF EJS Templates
celery: tasks, improve fork/repo create to survive a database errors in the cleanup code.
marcink -
r2419:20304eec default
parent child Browse files
Show More
@@ -1,285 +1,285 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2012-2017 RhodeCode GmbH
3 # Copyright (C) 2012-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 RhodeCode task modules, containing all task that suppose to be run
22 RhodeCode task modules, containing all task that suppose to be run
23 by celery daemon
23 by celery daemon
24 """
24 """
25
25
26 import os
26 import os
27 import time
27 import time
28
28
29 import rhodecode
29 import rhodecode
30 from rhodecode.lib import audit_logger
30 from rhodecode.lib import audit_logger
31 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask
31 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask
32 from rhodecode.lib.hooks_base import log_create_repository
32 from rhodecode.lib.hooks_base import log_create_repository
33 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
33 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
34 from rhodecode.lib.utils2 import safe_int, str2bool
34 from rhodecode.lib.utils2 import safe_int, str2bool
35 from rhodecode.model.db import Session, Repository, User
35 from rhodecode.model.db import Session, IntegrityError, Repository, User
36
36
37
37
38 @async_task(ignore_result=True, base=RequestContextTask)
38 @async_task(ignore_result=True, base=RequestContextTask)
39 def send_email(recipients, subject, body='', html_body='', email_config=None):
39 def send_email(recipients, subject, body='', html_body='', email_config=None):
40 """
40 """
41 Sends an email with defined parameters from the .ini files.
41 Sends an email with defined parameters from the .ini files.
42
42
43 :param recipients: list of recipients, it this is empty the defined email
43 :param recipients: list of recipients, it this is empty the defined email
44 address from field 'email_to' is used instead
44 address from field 'email_to' is used instead
45 :param subject: subject of the mail
45 :param subject: subject of the mail
46 :param body: body of the mail
46 :param body: body of the mail
47 :param html_body: html version of body
47 :param html_body: html version of body
48 """
48 """
49 log = get_logger(send_email)
49 log = get_logger(send_email)
50
50
51 email_config = email_config or rhodecode.CONFIG
51 email_config = email_config or rhodecode.CONFIG
52 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
52 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
53 if not recipients:
53 if not recipients:
54 # if recipients are not defined we send to email_config + all admins
54 # if recipients are not defined we send to email_config + all admins
55 admins = [
55 admins = [
56 u.email for u in User.query().filter(User.admin == True).all()]
56 u.email for u in User.query().filter(User.admin == True).all()]
57 recipients = [email_config.get('email_to')] + admins
57 recipients = [email_config.get('email_to')] + admins
58
58
59 mail_server = email_config.get('smtp_server') or None
59 mail_server = email_config.get('smtp_server') or None
60 if mail_server is None:
60 if mail_server is None:
61 log.error("SMTP server information missing. Sending email failed. "
61 log.error("SMTP server information missing. Sending email failed. "
62 "Make sure that `smtp_server` variable is configured "
62 "Make sure that `smtp_server` variable is configured "
63 "inside the .ini file")
63 "inside the .ini file")
64 return False
64 return False
65
65
66 mail_from = email_config.get('app_email_from', 'RhodeCode')
66 mail_from = email_config.get('app_email_from', 'RhodeCode')
67 user = email_config.get('smtp_username')
67 user = email_config.get('smtp_username')
68 passwd = email_config.get('smtp_password')
68 passwd = email_config.get('smtp_password')
69 mail_port = email_config.get('smtp_port')
69 mail_port = email_config.get('smtp_port')
70 tls = str2bool(email_config.get('smtp_use_tls'))
70 tls = str2bool(email_config.get('smtp_use_tls'))
71 ssl = str2bool(email_config.get('smtp_use_ssl'))
71 ssl = str2bool(email_config.get('smtp_use_ssl'))
72 debug = str2bool(email_config.get('debug'))
72 debug = str2bool(email_config.get('debug'))
73 smtp_auth = email_config.get('smtp_auth')
73 smtp_auth = email_config.get('smtp_auth')
74
74
75 try:
75 try:
76 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
76 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
77 mail_port, ssl, tls, debug=debug)
77 mail_port, ssl, tls, debug=debug)
78 m.send(recipients, subject, body, html_body)
78 m.send(recipients, subject, body, html_body)
79 except Exception:
79 except Exception:
80 log.exception('Mail sending failed')
80 log.exception('Mail sending failed')
81 return False
81 return False
82 return True
82 return True
83
83
84
84
85 @async_task(ignore_result=True, base=RequestContextTask)
85 @async_task(ignore_result=True, base=RequestContextTask)
86 def create_repo(form_data, cur_user):
86 def create_repo(form_data, cur_user):
87 from rhodecode.model.repo import RepoModel
87 from rhodecode.model.repo import RepoModel
88 from rhodecode.model.user import UserModel
88 from rhodecode.model.user import UserModel
89 from rhodecode.model.settings import SettingsModel
89 from rhodecode.model.settings import SettingsModel
90
90
91 log = get_logger(create_repo)
91 log = get_logger(create_repo)
92
92
93 cur_user = UserModel()._get_user(cur_user)
93 cur_user = UserModel()._get_user(cur_user)
94 owner = cur_user
94 owner = cur_user
95
95
96 repo_name = form_data['repo_name']
96 repo_name = form_data['repo_name']
97 repo_name_full = form_data['repo_name_full']
97 repo_name_full = form_data['repo_name_full']
98 repo_type = form_data['repo_type']
98 repo_type = form_data['repo_type']
99 description = form_data['repo_description']
99 description = form_data['repo_description']
100 private = form_data['repo_private']
100 private = form_data['repo_private']
101 clone_uri = form_data.get('clone_uri')
101 clone_uri = form_data.get('clone_uri')
102 repo_group = safe_int(form_data['repo_group'])
102 repo_group = safe_int(form_data['repo_group'])
103 landing_rev = form_data['repo_landing_rev']
103 landing_rev = form_data['repo_landing_rev']
104 copy_fork_permissions = form_data.get('copy_permissions')
104 copy_fork_permissions = form_data.get('copy_permissions')
105 copy_group_permissions = form_data.get('repo_copy_permissions')
105 copy_group_permissions = form_data.get('repo_copy_permissions')
106 fork_of = form_data.get('fork_parent_id')
106 fork_of = form_data.get('fork_parent_id')
107 state = form_data.get('repo_state', Repository.STATE_PENDING)
107 state = form_data.get('repo_state', Repository.STATE_PENDING)
108
108
109 # repo creation defaults, private and repo_type are filled in form
109 # repo creation defaults, private and repo_type are filled in form
110 defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
110 defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
111 enable_statistics = form_data.get(
111 enable_statistics = form_data.get(
112 'enable_statistics', defs.get('repo_enable_statistics'))
112 'enable_statistics', defs.get('repo_enable_statistics'))
113 enable_locking = form_data.get(
113 enable_locking = form_data.get(
114 'enable_locking', defs.get('repo_enable_locking'))
114 'enable_locking', defs.get('repo_enable_locking'))
115 enable_downloads = form_data.get(
115 enable_downloads = form_data.get(
116 'enable_downloads', defs.get('repo_enable_downloads'))
116 'enable_downloads', defs.get('repo_enable_downloads'))
117
117
118 try:
118 try:
119 repo = RepoModel()._create_repo(
119 repo = RepoModel()._create_repo(
120 repo_name=repo_name_full,
120 repo_name=repo_name_full,
121 repo_type=repo_type,
121 repo_type=repo_type,
122 description=description,
122 description=description,
123 owner=owner,
123 owner=owner,
124 private=private,
124 private=private,
125 clone_uri=clone_uri,
125 clone_uri=clone_uri,
126 repo_group=repo_group,
126 repo_group=repo_group,
127 landing_rev=landing_rev,
127 landing_rev=landing_rev,
128 fork_of=fork_of,
128 fork_of=fork_of,
129 copy_fork_permissions=copy_fork_permissions,
129 copy_fork_permissions=copy_fork_permissions,
130 copy_group_permissions=copy_group_permissions,
130 copy_group_permissions=copy_group_permissions,
131 enable_statistics=enable_statistics,
131 enable_statistics=enable_statistics,
132 enable_locking=enable_locking,
132 enable_locking=enable_locking,
133 enable_downloads=enable_downloads,
133 enable_downloads=enable_downloads,
134 state=state
134 state=state
135 )
135 )
136 Session().commit()
136 Session().commit()
137
137
138 # now create this repo on Filesystem
138 # now create this repo on Filesystem
139 RepoModel()._create_filesystem_repo(
139 RepoModel()._create_filesystem_repo(
140 repo_name=repo_name,
140 repo_name=repo_name,
141 repo_type=repo_type,
141 repo_type=repo_type,
142 repo_group=RepoModel()._get_repo_group(repo_group),
142 repo_group=RepoModel()._get_repo_group(repo_group),
143 clone_uri=clone_uri,
143 clone_uri=clone_uri,
144 )
144 )
145 repo = Repository.get_by_repo_name(repo_name_full)
145 repo = Repository.get_by_repo_name(repo_name_full)
146 log_create_repository(created_by=owner.username, **repo.get_dict())
146 log_create_repository(created_by=owner.username, **repo.get_dict())
147
147
148 # update repo commit caches initially
148 # update repo commit caches initially
149 repo.update_commit_cache()
149 repo.update_commit_cache()
150
150
151 # set new created state
151 # set new created state
152 repo.set_state(Repository.STATE_CREATED)
152 repo.set_state(Repository.STATE_CREATED)
153 repo_id = repo.repo_id
153 repo_id = repo.repo_id
154 repo_data = repo.get_api_data()
154 repo_data = repo.get_api_data()
155
155
156 audit_logger.store(
156 audit_logger.store(
157 'repo.create', action_data={'data': repo_data},
157 'repo.create', action_data={'data': repo_data},
158 user=cur_user,
158 user=cur_user,
159 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
159 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
160
160
161 Session().commit()
161 Session().commit()
162 except Exception:
162 except Exception as e:
163 log.warning('Exception occurred when creating repository, '
163 log.warning('Exception occurred when creating repository, '
164 'doing cleanup...', exc_info=True)
164 'doing cleanup...', exc_info=True)
165 if isinstance(e, IntegrityError):
166 Session().rollback()
167
165 # rollback things manually !
168 # rollback things manually !
166 repo = Repository.get_by_repo_name(repo_name_full)
169 repo = Repository.get_by_repo_name(repo_name_full)
167 if repo:
170 if repo:
168 Repository.delete(repo.repo_id)
171 Repository.delete(repo.repo_id)
169 Session().commit()
172 Session().commit()
170 RepoModel()._delete_filesystem_repo(repo)
173 RepoModel()._delete_filesystem_repo(repo)
174 log.info('Cleanup of repo %s finished', repo_name_full)
171 raise
175 raise
172
176
173 # it's an odd fix to make celery fail task when exception occurs
174 def on_failure(self, *args, **kwargs):
175 pass
176
177 return True
177 return True
178
178
179
179
180 @async_task(ignore_result=True, base=RequestContextTask)
180 @async_task(ignore_result=True, base=RequestContextTask)
181 def create_repo_fork(form_data, cur_user):
181 def create_repo_fork(form_data, cur_user):
182 """
182 """
183 Creates a fork of repository using internal VCS methods
183 Creates a fork of repository using internal VCS methods
184 """
184 """
185 from rhodecode.model.repo import RepoModel
185 from rhodecode.model.repo import RepoModel
186 from rhodecode.model.user import UserModel
186 from rhodecode.model.user import UserModel
187
187
188 log = get_logger(create_repo_fork)
188 log = get_logger(create_repo_fork)
189
189
190 cur_user = UserModel()._get_user(cur_user)
190 cur_user = UserModel()._get_user(cur_user)
191 owner = cur_user
191 owner = cur_user
192
192
193 repo_name = form_data['repo_name'] # fork in this case
193 repo_name = form_data['repo_name'] # fork in this case
194 repo_name_full = form_data['repo_name_full']
194 repo_name_full = form_data['repo_name_full']
195 repo_type = form_data['repo_type']
195 repo_type = form_data['repo_type']
196 description = form_data['description']
196 description = form_data['description']
197 private = form_data['private']
197 private = form_data['private']
198 clone_uri = form_data.get('clone_uri')
198 clone_uri = form_data.get('clone_uri')
199 repo_group = safe_int(form_data['repo_group'])
199 repo_group = safe_int(form_data['repo_group'])
200 landing_rev = form_data['landing_rev']
200 landing_rev = form_data['landing_rev']
201 copy_fork_permissions = form_data.get('copy_permissions')
201 copy_fork_permissions = form_data.get('copy_permissions')
202 fork_id = safe_int(form_data.get('fork_parent_id'))
202 fork_id = safe_int(form_data.get('fork_parent_id'))
203
203
204 try:
204 try:
205 fork_of = RepoModel()._get_repo(fork_id)
205 fork_of = RepoModel()._get_repo(fork_id)
206 RepoModel()._create_repo(
206 RepoModel()._create_repo(
207 repo_name=repo_name_full,
207 repo_name=repo_name_full,
208 repo_type=repo_type,
208 repo_type=repo_type,
209 description=description,
209 description=description,
210 owner=owner,
210 owner=owner,
211 private=private,
211 private=private,
212 clone_uri=clone_uri,
212 clone_uri=clone_uri,
213 repo_group=repo_group,
213 repo_group=repo_group,
214 landing_rev=landing_rev,
214 landing_rev=landing_rev,
215 fork_of=fork_of,
215 fork_of=fork_of,
216 copy_fork_permissions=copy_fork_permissions
216 copy_fork_permissions=copy_fork_permissions
217 )
217 )
218
218
219 Session().commit()
219 Session().commit()
220
220
221 base_path = Repository.base_path()
221 base_path = Repository.base_path()
222 source_repo_path = os.path.join(base_path, fork_of.repo_name)
222 source_repo_path = os.path.join(base_path, fork_of.repo_name)
223
223
224 # now create this repo on Filesystem
224 # now create this repo on Filesystem
225 RepoModel()._create_filesystem_repo(
225 RepoModel()._create_filesystem_repo(
226 repo_name=repo_name,
226 repo_name=repo_name,
227 repo_type=repo_type,
227 repo_type=repo_type,
228 repo_group=RepoModel()._get_repo_group(repo_group),
228 repo_group=RepoModel()._get_repo_group(repo_group),
229 clone_uri=source_repo_path,
229 clone_uri=source_repo_path,
230 )
230 )
231 repo = Repository.get_by_repo_name(repo_name_full)
231 repo = Repository.get_by_repo_name(repo_name_full)
232 log_create_repository(created_by=owner.username, **repo.get_dict())
232 log_create_repository(created_by=owner.username, **repo.get_dict())
233
233
234 # update repo commit caches initially
234 # update repo commit caches initially
235 config = repo._config
235 config = repo._config
236 config.set('extensions', 'largefiles', '')
236 config.set('extensions', 'largefiles', '')
237 repo.update_commit_cache(config=config)
237 repo.update_commit_cache(config=config)
238
238
239 # set new created state
239 # set new created state
240 repo.set_state(Repository.STATE_CREATED)
240 repo.set_state(Repository.STATE_CREATED)
241
241
242 repo_id = repo.repo_id
242 repo_id = repo.repo_id
243 repo_data = repo.get_api_data()
243 repo_data = repo.get_api_data()
244 audit_logger.store(
244 audit_logger.store(
245 'repo.fork', action_data={'data': repo_data},
245 'repo.fork', action_data={'data': repo_data},
246 user=cur_user,
246 user=cur_user,
247 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
247 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
248
248
249 Session().commit()
249 Session().commit()
250 except Exception as e:
250 except Exception as e:
251 log.warning('Exception %s occurred when forking repository, '
251 log.warning('Exception %s occurred when forking repository, '
252 'doing cleanup...', e)
252 'doing cleanup...', exc_info=True)
253 if isinstance(e, IntegrityError):
254 Session().rollback()
255
253 # rollback things manually !
256 # rollback things manually !
254 repo = Repository.get_by_repo_name(repo_name_full)
257 repo = Repository.get_by_repo_name(repo_name_full)
255 if repo:
258 if repo:
256 Repository.delete(repo.repo_id)
259 Repository.delete(repo.repo_id)
257 Session().commit()
260 Session().commit()
258 RepoModel()._delete_filesystem_repo(repo)
261 RepoModel()._delete_filesystem_repo(repo)
262 log.info('Cleanup of repo %s finished', repo_name_full)
259 raise
263 raise
260
264
261 # it's an odd fix to make celery fail task when exception occurs
262 def on_failure(self, *args, **kwargs):
263 pass
264
265 return True
265 return True
266
266
267
267
268 @async_task(ignore_result=True)
268 @async_task(ignore_result=True)
269 def sync_repo(*args, **kwargs):
269 def sync_repo(*args, **kwargs):
270 from rhodecode.model.scm import ScmModel
270 from rhodecode.model.scm import ScmModel
271 log = get_logger(sync_repo)
271 log = get_logger(sync_repo)
272 repo_name = kwargs['repo_name']
272 repo_name = kwargs['repo_name']
273 log.info('Pulling from %s', repo_name)
273 log.info('Pulling from %s', repo_name)
274 dbrepo = Repository.get_by_repo_name(repo_name)
274 dbrepo = Repository.get_by_repo_name(repo_name)
275 if dbrepo and dbrepo.clone_uri:
275 if dbrepo and dbrepo.clone_uri:
276 ScmModel().pull_changes(kwargs['repo_name'], kwargs['username'])
276 ScmModel().pull_changes(kwargs['repo_name'], kwargs['username'])
277 else:
277 else:
278 log.debug('Repo `%s` not found or without a clone_url', repo_name)
278 log.debug('Repo `%s` not found or without a clone_url', repo_name)
279
279
280
280
281 @async_task(ignore_result=False)
281 @async_task(ignore_result=False)
282 def beat_check(*args, **kwargs):
282 def beat_check(*args, **kwargs):
283 log = get_logger(beat_check)
283 log = get_logger(beat_check)
284 log.info('Got args: %r and kwargs %r', args, kwargs)
284 log.info('Got args: %r and kwargs %r', args, kwargs)
285 return time.time()
285 return time.time()
General Comments 0
You need to be logged in to leave comments. Login now