##// END OF EJS Templates
celery: tasks, improve fork/repo create to survive a database errors in the cleanup code.
marcink -
r2419:20304eec default
parent child Browse files
Show More
@@ -1,285 +1,285 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2012-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 RhodeCode task modules, containing all task that suppose to be run
23 23 by celery daemon
24 24 """
25 25
26 26 import os
27 27 import time
28 28
29 29 import rhodecode
30 30 from rhodecode.lib import audit_logger
31 31 from rhodecode.lib.celerylib import get_logger, async_task, RequestContextTask
32 32 from rhodecode.lib.hooks_base import log_create_repository
33 33 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
34 34 from rhodecode.lib.utils2 import safe_int, str2bool
35 from rhodecode.model.db import Session, Repository, User
35 from rhodecode.model.db import Session, IntegrityError, Repository, User
36 36
37 37
38 38 @async_task(ignore_result=True, base=RequestContextTask)
39 39 def send_email(recipients, subject, body='', html_body='', email_config=None):
40 40 """
41 41 Sends an email with defined parameters from the .ini files.
42 42
43 43 :param recipients: list of recipients, it this is empty the defined email
44 44 address from field 'email_to' is used instead
45 45 :param subject: subject of the mail
46 46 :param body: body of the mail
47 47 :param html_body: html version of body
48 48 """
49 49 log = get_logger(send_email)
50 50
51 51 email_config = email_config or rhodecode.CONFIG
52 52 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
53 53 if not recipients:
54 54 # if recipients are not defined we send to email_config + all admins
55 55 admins = [
56 56 u.email for u in User.query().filter(User.admin == True).all()]
57 57 recipients = [email_config.get('email_to')] + admins
58 58
59 59 mail_server = email_config.get('smtp_server') or None
60 60 if mail_server is None:
61 61 log.error("SMTP server information missing. Sending email failed. "
62 62 "Make sure that `smtp_server` variable is configured "
63 63 "inside the .ini file")
64 64 return False
65 65
66 66 mail_from = email_config.get('app_email_from', 'RhodeCode')
67 67 user = email_config.get('smtp_username')
68 68 passwd = email_config.get('smtp_password')
69 69 mail_port = email_config.get('smtp_port')
70 70 tls = str2bool(email_config.get('smtp_use_tls'))
71 71 ssl = str2bool(email_config.get('smtp_use_ssl'))
72 72 debug = str2bool(email_config.get('debug'))
73 73 smtp_auth = email_config.get('smtp_auth')
74 74
75 75 try:
76 76 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
77 77 mail_port, ssl, tls, debug=debug)
78 78 m.send(recipients, subject, body, html_body)
79 79 except Exception:
80 80 log.exception('Mail sending failed')
81 81 return False
82 82 return True
83 83
84 84
85 85 @async_task(ignore_result=True, base=RequestContextTask)
86 86 def create_repo(form_data, cur_user):
87 87 from rhodecode.model.repo import RepoModel
88 88 from rhodecode.model.user import UserModel
89 89 from rhodecode.model.settings import SettingsModel
90 90
91 91 log = get_logger(create_repo)
92 92
93 93 cur_user = UserModel()._get_user(cur_user)
94 94 owner = cur_user
95 95
96 96 repo_name = form_data['repo_name']
97 97 repo_name_full = form_data['repo_name_full']
98 98 repo_type = form_data['repo_type']
99 99 description = form_data['repo_description']
100 100 private = form_data['repo_private']
101 101 clone_uri = form_data.get('clone_uri')
102 102 repo_group = safe_int(form_data['repo_group'])
103 103 landing_rev = form_data['repo_landing_rev']
104 104 copy_fork_permissions = form_data.get('copy_permissions')
105 105 copy_group_permissions = form_data.get('repo_copy_permissions')
106 106 fork_of = form_data.get('fork_parent_id')
107 107 state = form_data.get('repo_state', Repository.STATE_PENDING)
108 108
109 109 # repo creation defaults, private and repo_type are filled in form
110 110 defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
111 111 enable_statistics = form_data.get(
112 112 'enable_statistics', defs.get('repo_enable_statistics'))
113 113 enable_locking = form_data.get(
114 114 'enable_locking', defs.get('repo_enable_locking'))
115 115 enable_downloads = form_data.get(
116 116 'enable_downloads', defs.get('repo_enable_downloads'))
117 117
118 118 try:
119 119 repo = RepoModel()._create_repo(
120 120 repo_name=repo_name_full,
121 121 repo_type=repo_type,
122 122 description=description,
123 123 owner=owner,
124 124 private=private,
125 125 clone_uri=clone_uri,
126 126 repo_group=repo_group,
127 127 landing_rev=landing_rev,
128 128 fork_of=fork_of,
129 129 copy_fork_permissions=copy_fork_permissions,
130 130 copy_group_permissions=copy_group_permissions,
131 131 enable_statistics=enable_statistics,
132 132 enable_locking=enable_locking,
133 133 enable_downloads=enable_downloads,
134 134 state=state
135 135 )
136 136 Session().commit()
137 137
138 138 # now create this repo on Filesystem
139 139 RepoModel()._create_filesystem_repo(
140 140 repo_name=repo_name,
141 141 repo_type=repo_type,
142 142 repo_group=RepoModel()._get_repo_group(repo_group),
143 143 clone_uri=clone_uri,
144 144 )
145 145 repo = Repository.get_by_repo_name(repo_name_full)
146 146 log_create_repository(created_by=owner.username, **repo.get_dict())
147 147
148 148 # update repo commit caches initially
149 149 repo.update_commit_cache()
150 150
151 151 # set new created state
152 152 repo.set_state(Repository.STATE_CREATED)
153 153 repo_id = repo.repo_id
154 154 repo_data = repo.get_api_data()
155 155
156 156 audit_logger.store(
157 157 'repo.create', action_data={'data': repo_data},
158 158 user=cur_user,
159 159 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
160 160
161 161 Session().commit()
162 except Exception:
162 except Exception as e:
163 163 log.warning('Exception occurred when creating repository, '
164 164 'doing cleanup...', exc_info=True)
165 if isinstance(e, IntegrityError):
166 Session().rollback()
167
165 168 # rollback things manually !
166 169 repo = Repository.get_by_repo_name(repo_name_full)
167 170 if repo:
168 171 Repository.delete(repo.repo_id)
169 172 Session().commit()
170 173 RepoModel()._delete_filesystem_repo(repo)
174 log.info('Cleanup of repo %s finished', repo_name_full)
171 175 raise
172 176
173 # it's an odd fix to make celery fail task when exception occurs
174 def on_failure(self, *args, **kwargs):
175 pass
176
177 177 return True
178 178
179 179
180 180 @async_task(ignore_result=True, base=RequestContextTask)
181 181 def create_repo_fork(form_data, cur_user):
182 182 """
183 183 Creates a fork of repository using internal VCS methods
184 184 """
185 185 from rhodecode.model.repo import RepoModel
186 186 from rhodecode.model.user import UserModel
187 187
188 188 log = get_logger(create_repo_fork)
189 189
190 190 cur_user = UserModel()._get_user(cur_user)
191 191 owner = cur_user
192 192
193 193 repo_name = form_data['repo_name'] # fork in this case
194 194 repo_name_full = form_data['repo_name_full']
195 195 repo_type = form_data['repo_type']
196 196 description = form_data['description']
197 197 private = form_data['private']
198 198 clone_uri = form_data.get('clone_uri')
199 199 repo_group = safe_int(form_data['repo_group'])
200 200 landing_rev = form_data['landing_rev']
201 201 copy_fork_permissions = form_data.get('copy_permissions')
202 202 fork_id = safe_int(form_data.get('fork_parent_id'))
203 203
204 204 try:
205 205 fork_of = RepoModel()._get_repo(fork_id)
206 206 RepoModel()._create_repo(
207 207 repo_name=repo_name_full,
208 208 repo_type=repo_type,
209 209 description=description,
210 210 owner=owner,
211 211 private=private,
212 212 clone_uri=clone_uri,
213 213 repo_group=repo_group,
214 214 landing_rev=landing_rev,
215 215 fork_of=fork_of,
216 216 copy_fork_permissions=copy_fork_permissions
217 217 )
218 218
219 219 Session().commit()
220 220
221 221 base_path = Repository.base_path()
222 222 source_repo_path = os.path.join(base_path, fork_of.repo_name)
223 223
224 224 # now create this repo on Filesystem
225 225 RepoModel()._create_filesystem_repo(
226 226 repo_name=repo_name,
227 227 repo_type=repo_type,
228 228 repo_group=RepoModel()._get_repo_group(repo_group),
229 229 clone_uri=source_repo_path,
230 230 )
231 231 repo = Repository.get_by_repo_name(repo_name_full)
232 232 log_create_repository(created_by=owner.username, **repo.get_dict())
233 233
234 234 # update repo commit caches initially
235 235 config = repo._config
236 236 config.set('extensions', 'largefiles', '')
237 237 repo.update_commit_cache(config=config)
238 238
239 239 # set new created state
240 240 repo.set_state(Repository.STATE_CREATED)
241 241
242 242 repo_id = repo.repo_id
243 243 repo_data = repo.get_api_data()
244 244 audit_logger.store(
245 245 'repo.fork', action_data={'data': repo_data},
246 246 user=cur_user,
247 247 repo=audit_logger.RepoWrap(repo_name=repo_name, repo_id=repo_id))
248 248
249 249 Session().commit()
250 250 except Exception as e:
251 251 log.warning('Exception %s occurred when forking repository, '
252 'doing cleanup...', e)
252 'doing cleanup...', exc_info=True)
253 if isinstance(e, IntegrityError):
254 Session().rollback()
255
253 256 # rollback things manually !
254 257 repo = Repository.get_by_repo_name(repo_name_full)
255 258 if repo:
256 259 Repository.delete(repo.repo_id)
257 260 Session().commit()
258 261 RepoModel()._delete_filesystem_repo(repo)
262 log.info('Cleanup of repo %s finished', repo_name_full)
259 263 raise
260 264
261 # it's an odd fix to make celery fail task when exception occurs
262 def on_failure(self, *args, **kwargs):
263 pass
264
265 265 return True
266 266
267 267
268 268 @async_task(ignore_result=True)
269 269 def sync_repo(*args, **kwargs):
270 270 from rhodecode.model.scm import ScmModel
271 271 log = get_logger(sync_repo)
272 272 repo_name = kwargs['repo_name']
273 273 log.info('Pulling from %s', repo_name)
274 274 dbrepo = Repository.get_by_repo_name(repo_name)
275 275 if dbrepo and dbrepo.clone_uri:
276 276 ScmModel().pull_changes(kwargs['repo_name'], kwargs['username'])
277 277 else:
278 278 log.debug('Repo `%s` not found or without a clone_url', repo_name)
279 279
280 280
281 281 @async_task(ignore_result=False)
282 282 def beat_check(*args, **kwargs):
283 283 log = get_logger(beat_check)
284 284 log.info('Got args: %r and kwargs %r', args, kwargs)
285 285 return time.time()
General Comments 0
You need to be logged in to leave comments. Login now