##// END OF EJS Templates
fixed fork journal entry
marcink -
r1730:ce0b4753 beta
parent child Browse files
Show More
@@ -1,404 +1,405 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.celerylib.tasks
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 RhodeCode task modules, containing all task that suppose to be run
7 7 by celery daemon
8 8
9 9 :created_on: Oct 6, 2010
10 10 :author: marcink
11 11 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
12 12 :license: GPLv3, see COPYING for more details.
13 13 """
14 14 # This program is free software: you can redistribute it and/or modify
15 15 # it under the terms of the GNU General Public License as published by
16 16 # the Free Software Foundation, either version 3 of the License, or
17 17 # (at your option) any later version.
18 18 #
19 19 # This program is distributed in the hope that it will be useful,
20 20 # but WITHOUT ANY WARRANTY; without even the implied warranty of
21 21 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
22 22 # GNU General Public License for more details.
23 23 #
24 24 # You should have received a copy of the GNU General Public License
25 25 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 26 from celery.decorators import task
27 27
28 28 import os
29 29 import traceback
30 30 import logging
31 31 from os.path import join as jn
32 32
33 33 from time import mktime
34 34 from operator import itemgetter
35 35 from string import lower
36 36
37 37 from pylons import config, url
38 38 from pylons.i18n.translation import _
39 39
40 40 from vcs import get_backend
41 41
42 42 from rhodecode import CELERY_ON
43 43 from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
44 44 from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
45 45 __get_lockkey, LockHeld, DaemonLock
46 46 from rhodecode.lib.helpers import person
47 47 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
48 48 from rhodecode.lib.utils import add_cache, action_logger
49 49 from rhodecode.lib.compat import json, OrderedDict
50 50
51 51 from rhodecode.model import init_model
52 52 from rhodecode.model import meta
53 53 from rhodecode.model.db import Statistics, Repository, User
54 54
55 55 from sqlalchemy import engine_from_config
56 56
57 57 add_cache(config)
58 58
59 59 __all__ = ['whoosh_index', 'get_commits_stats',
60 60 'reset_user_password', 'send_email']
61 61
62 62
63 63 def get_session():
64 64 if CELERY_ON:
65 65 engine = engine_from_config(config, 'sqlalchemy.db1.')
66 66 init_model(engine)
67 67 sa = meta.Session()
68 68 return sa
69 69
70 70 def get_logger(cls):
71 71 if CELERY_ON:
72 72 try:
73 73 log = cls.get_logger()
74 74 except:
75 75 log = logging.getLogger(__name__)
76 76 else:
77 77 log = logging.getLogger(__name__)
78 78
79 79 return log
80 80
81 81 @task(ignore_result=True)
82 82 @locked_task
83 83 def whoosh_index(repo_location, full_index):
84 84 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
85 85
86 86 #log = whoosh_index.get_logger()
87 87
88 88 index_location = config['index_dir']
89 89 WhooshIndexingDaemon(index_location=index_location,
90 90 repo_location=repo_location, sa=get_session())\
91 91 .run(full_index=full_index)
92 92
93 93
94 94 @task(ignore_result=True)
95 95 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
96 96 log = get_logger(get_commits_stats)
97 97
98 98 lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
99 99 ts_max_y)
100 100 lockkey_path = config['here']
101 101
102 102 log.info('running task with lockkey %s', lockkey)
103 103 try:
104 104 sa = get_session()
105 105 lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
106 106
107 107 # for js data compatibilty cleans the key for person from '
108 108 akc = lambda k: person(k).replace('"', "")
109 109
110 110 co_day_auth_aggr = {}
111 111 commits_by_day_aggregate = {}
112 112 repo = Repository.get_by_repo_name(repo_name).scm_instance
113 113 repo_size = len(repo.revisions)
114 114 #return if repo have no revisions
115 115 if repo_size < 1:
116 116 lock.release()
117 117 return True
118 118
119 119 skip_date_limit = True
120 120 parse_limit = int(config['app_conf'].get('commit_parse_limit'))
121 121 last_rev = 0
122 122 last_cs = None
123 123 timegetter = itemgetter('time')
124 124
125 125 dbrepo = sa.query(Repository)\
126 126 .filter(Repository.repo_name == repo_name).scalar()
127 127 cur_stats = sa.query(Statistics)\
128 128 .filter(Statistics.repository == dbrepo).scalar()
129 129
130 130 if cur_stats is not None:
131 131 last_rev = cur_stats.stat_on_revision
132 132
133 133 if last_rev == repo.get_changeset().revision and repo_size > 1:
134 134 # pass silently without any work if we're not on first revision or
135 135 # current state of parsing revision(from db marker) is the
136 136 # last revision
137 137 lock.release()
138 138 return True
139 139
140 140 if cur_stats:
141 141 commits_by_day_aggregate = OrderedDict(json.loads(
142 142 cur_stats.commit_activity_combined))
143 143 co_day_auth_aggr = json.loads(cur_stats.commit_activity)
144 144
145 145 log.debug('starting parsing %s', parse_limit)
146 146 lmktime = mktime
147 147
148 148 last_rev = last_rev + 1 if last_rev > 0 else last_rev
149 149
150 150 for cs in repo[last_rev:last_rev + parse_limit]:
151 151 last_cs = cs # remember last parsed changeset
152 152 k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
153 153 cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
154 154
155 155 if akc(cs.author) in co_day_auth_aggr:
156 156 try:
157 157 l = [timegetter(x) for x in
158 158 co_day_auth_aggr[akc(cs.author)]['data']]
159 159 time_pos = l.index(k)
160 160 except ValueError:
161 161 time_pos = False
162 162
163 163 if time_pos >= 0 and time_pos is not False:
164 164
165 165 datadict = \
166 166 co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
167 167
168 168 datadict["commits"] += 1
169 169 datadict["added"] += len(cs.added)
170 170 datadict["changed"] += len(cs.changed)
171 171 datadict["removed"] += len(cs.removed)
172 172
173 173 else:
174 174 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
175 175
176 176 datadict = {"time": k,
177 177 "commits": 1,
178 178 "added": len(cs.added),
179 179 "changed": len(cs.changed),
180 180 "removed": len(cs.removed),
181 181 }
182 182 co_day_auth_aggr[akc(cs.author)]['data']\
183 183 .append(datadict)
184 184
185 185 else:
186 186 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
187 187 co_day_auth_aggr[akc(cs.author)] = {
188 188 "label": akc(cs.author),
189 189 "data": [{"time":k,
190 190 "commits":1,
191 191 "added":len(cs.added),
192 192 "changed":len(cs.changed),
193 193 "removed":len(cs.removed),
194 194 }],
195 195 "schema": ["commits"],
196 196 }
197 197
198 198 #gather all data by day
199 199 if k in commits_by_day_aggregate:
200 200 commits_by_day_aggregate[k] += 1
201 201 else:
202 202 commits_by_day_aggregate[k] = 1
203 203
204 204 overview_data = sorted(commits_by_day_aggregate.items(),
205 205 key=itemgetter(0))
206 206
207 207 if not co_day_auth_aggr:
208 208 co_day_auth_aggr[akc(repo.contact)] = {
209 209 "label": akc(repo.contact),
210 210 "data": [0, 1],
211 211 "schema": ["commits"],
212 212 }
213 213
214 214 stats = cur_stats if cur_stats else Statistics()
215 215 stats.commit_activity = json.dumps(co_day_auth_aggr)
216 216 stats.commit_activity_combined = json.dumps(overview_data)
217 217
218 218 log.debug('last revison %s', last_rev)
219 219 leftovers = len(repo.revisions[last_rev:])
220 220 log.debug('revisions to parse %s', leftovers)
221 221
222 222 if last_rev == 0 or leftovers < parse_limit:
223 223 log.debug('getting code trending stats')
224 224 stats.languages = json.dumps(__get_codes_stats(repo_name))
225 225
226 226 try:
227 227 stats.repository = dbrepo
228 228 stats.stat_on_revision = last_cs.revision if last_cs else 0
229 229 sa.add(stats)
230 230 sa.commit()
231 231 except:
232 232 log.error(traceback.format_exc())
233 233 sa.rollback()
234 234 lock.release()
235 235 return False
236 236
237 237 #final release
238 238 lock.release()
239 239
240 240 #execute another task if celery is enabled
241 241 if len(repo.revisions) > 1 and CELERY_ON:
242 242 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
243 243 return True
244 244 except LockHeld:
245 245 log.info('LockHeld')
246 246 return 'Task with key %s already running' % lockkey
247 247
248 248 @task(ignore_result=True)
249 249 def send_password_link(user_email):
250 250 from rhodecode.model.notification import EmailNotificationModel
251 251
252 252 log = get_logger(send_password_link)
253 253
254 254 try:
255 255 sa = get_session()
256 256 user = User.get_by_email(user_email)
257 257 if user:
258 258 log.debug('password reset user found %s' % user)
259 259 link = url('reset_password_confirmation', key=user.api_key,
260 260 qualified=True)
261 261 reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
262 262 body = EmailNotificationModel().get_email_tmpl(reg_type,
263 263 **{'user':user.short_contact,
264 264 'reset_url':link})
265 265 log.debug('sending email')
266 266 run_task(send_email, user_email,
267 267 _("password reset link"), body)
268 268 log.info('send new password mail to %s', user_email)
269 269 else:
270 270 log.debug("password reset email %s not found" % user_email)
271 271 except:
272 272 log.error(traceback.format_exc())
273 273 return False
274 274
275 275 return True
276 276
277 277 @task(ignore_result=True)
278 278 def reset_user_password(user_email):
279 279 from rhodecode.lib import auth
280 280
281 281 log = get_logger(reset_user_password)
282 282
283 283 try:
284 284 try:
285 285 sa = get_session()
286 286 user = User.get_by_email(user_email)
287 287 new_passwd = auth.PasswordGenerator().gen_password(8,
288 288 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
289 289 if user:
290 290 user.password = auth.get_crypt_password(new_passwd)
291 291 user.api_key = auth.generate_api_key(user.username)
292 292 sa.add(user)
293 293 sa.commit()
294 294 log.info('change password for %s', user_email)
295 295 if new_passwd is None:
296 296 raise Exception('unable to generate new password')
297 297 except:
298 298 log.error(traceback.format_exc())
299 299 sa.rollback()
300 300
301 301 run_task(send_email, user_email,
302 302 'Your new password',
303 303 'Your new RhodeCode password:%s' % (new_passwd))
304 304 log.info('send new password mail to %s', user_email)
305 305
306 306 except:
307 307 log.error('Failed to update user password')
308 308 log.error(traceback.format_exc())
309 309
310 310 return True
311 311
312 312
313 313 @task(ignore_result=True)
314 314 def send_email(recipients, subject, body, html_body=''):
315 315 """
316 316 Sends an email with defined parameters from the .ini files.
317 317
318 318 :param recipients: list of recipients, it this is empty the defined email
319 319 address from field 'email_to' is used instead
320 320 :param subject: subject of the mail
321 321 :param body: body of the mail
322 322 :param html_body: html version of body
323 323 """
324 324 log = get_logger(send_email)
325 325 sa = get_session()
326 326 email_config = config
327 327 subject = "%s %s" % (email_config.get('email_prefix'), subject)
328 328 if not recipients:
329 329 # if recipients are not defined we send to email_config + all admins
330 330 admins = [u.email for u in User.query()
331 331 .filter(User.admin == True).all()]
332 332 recipients = [email_config.get('email_to')] + admins
333 333
334 334 mail_from = email_config.get('app_email_from', 'RhodeCode')
335 335 user = email_config.get('smtp_username')
336 336 passwd = email_config.get('smtp_password')
337 337 mail_server = email_config.get('smtp_server')
338 338 mail_port = email_config.get('smtp_port')
339 339 tls = str2bool(email_config.get('smtp_use_tls'))
340 340 ssl = str2bool(email_config.get('smtp_use_ssl'))
341 341 debug = str2bool(config.get('debug'))
342 342 smtp_auth = email_config.get('smtp_auth')
343 343
344 344 try:
345 345 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
346 346 mail_port, ssl, tls, debug=debug)
347 347 m.send(recipients, subject, body, html_body)
348 348 except:
349 349 log.error('Mail sending failed')
350 350 log.error(traceback.format_exc())
351 351 return False
352 352 return True
353 353
354 354
355 355 @task(ignore_result=True)
356 356 def create_repo_fork(form_data, cur_user):
357 357 """
358 358 Creates a fork of repository using interval VCS methods
359 359
360 360 :param form_data:
361 361 :param cur_user:
362 362 """
363 363 from rhodecode.model.repo import RepoModel
364 364
365 365 log = get_logger(create_repo_fork)
366 366
367 367 Session = get_session()
368 368 base_path = Repository.base_path()
369 369
370 370 RepoModel(Session).create(form_data, cur_user, just_db=True, fork=True)
371 371
372 372 alias = form_data['repo_type']
373 373 org_repo_name = form_data['org_path']
374 fork_name = form_data['repo_name_full']
374 375 source_repo_path = os.path.join(base_path, org_repo_name)
375 destination_fork_path = os.path.join(base_path, form_data['repo_name_full'])
376 destination_fork_path = os.path.join(base_path, fork_name)
376 377
377 378 log.info('creating fork of %s as %s', source_repo_path,
378 379 destination_fork_path)
379 380 backend = get_backend(alias)
380 381 backend(safe_str(destination_fork_path), create=True,
381 382 src_url=safe_str(source_repo_path))
382 action_logger(cur_user, 'user_forked_repo:%s' % org_repo_name,
383 action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
383 384 org_repo_name, '', Session)
384 385 # finally commit at latest possible stage
385 386 Session.commit()
386 387
387 388 def __get_codes_stats(repo_name):
388 389 repo = Repository.get_by_repo_name(repo_name).scm_instance
389 390
390 391 tip = repo.get_changeset()
391 392 code_stats = {}
392 393
393 394 def aggregate(cs):
394 395 for f in cs[2]:
395 396 ext = lower(f.extension)
396 397 if ext in LANGUAGES_EXTENSIONS_MAP.keys() and not f.is_binary:
397 398 if ext in code_stats:
398 399 code_stats[ext] += 1
399 400 else:
400 401 code_stats[ext] = 1
401 402
402 403 map(aggregate, tip.walk('/'))
403 404
404 405 return code_stats or {}
General Comments 0
You need to be logged in to leave comments. Login now