##// END OF EJS Templates
fixed celery tasks for using only one method of fetching main repos path
marcink -
r666:6ed37675 beta
parent child Browse files
Show More
@@ -1,338 +1,298 b''
1 from celery.decorators import task
1 from celery.decorators import task
2
2
3 from operator import itemgetter
3 from operator import itemgetter
4 from pylons.i18n.translation import _
4 from pylons.i18n.translation import _
5 from rhodecode.lib.celerylib import run_task, locked_task
5 from rhodecode.lib.celerylib import run_task, locked_task
6 from rhodecode.lib.helpers import person
6 from rhodecode.lib.helpers import person
7 from rhodecode.lib.smtp_mailer import SmtpMailer
7 from rhodecode.lib.smtp_mailer import SmtpMailer
8 from rhodecode.lib.utils import OrderedDict
8 from rhodecode.lib.utils import OrderedDict
9 from time import mktime
9 from time import mktime
10 from vcs.backends.hg import MercurialRepository
11 from vcs.backends.git import GitRepository
12 import os
10 import os
13 import traceback
11 import traceback
14 from vcs.backends import get_repo
12 from vcs.backends import get_repo
15 from vcs.utils.helpers import get_scm
13 from rhodecode.model.hg import HgModel
16
17 try:
14 try:
18 import json
15 import json
19 except ImportError:
16 except ImportError:
20 #python 2.5 compatibility
17 #python 2.5 compatibility
21 import simplejson as json
18 import simplejson as json
22
19
23 try:
20 try:
24 from celeryconfig import PYLONS_CONFIG as config
21 from celeryconfig import PYLONS_CONFIG as config
25 celery_on = True
22 celery_on = True
26 except ImportError:
23 except ImportError:
27 #if celeryconfig is not present let's just load our pylons
24 #if celeryconfig is not present let's just load our pylons
28 #config instead
25 #config instead
29 from pylons import config
26 from pylons import config
30 celery_on = False
27 celery_on = False
31
28
32
29
33 __all__ = ['whoosh_index', 'get_commits_stats',
30 __all__ = ['whoosh_index', 'get_commits_stats',
34 'reset_user_password', 'send_email']
31 'reset_user_password', 'send_email']
35
32
36 def get_session():
33 def get_session():
37 if celery_on:
34 if celery_on:
38 from sqlalchemy import engine_from_config
35 from sqlalchemy import engine_from_config
39 from sqlalchemy.orm import sessionmaker, scoped_session
36 from sqlalchemy.orm import sessionmaker, scoped_session
40 engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
37 engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
41 sa = scoped_session(sessionmaker(bind=engine))
38 sa = scoped_session(sessionmaker(bind=engine))
42 else:
39 else:
43 #If we don't use celery reuse our current application Session
40 #If we don't use celery reuse our current application Session
44 from rhodecode.model.meta import Session
41 from rhodecode.model.meta import Session
45 sa = Session()
42 sa = Session()
46
43
47 return sa
44 return sa
48
45
49 def get_hg_settings():
50 from rhodecode.model.db import RhodeCodeSettings
51 sa = get_session()
52 ret = sa.query(RhodeCodeSettings).all()
53
54 if not ret:
55 raise Exception('Could not get application settings !')
56 settings = {}
57 for each in ret:
58 settings['rhodecode_' + each.app_settings_name] = each.app_settings_value
59
60 return settings
61
62 def get_hg_ui_settings():
63 from rhodecode.model.db import RhodeCodeUi
64 sa = get_session()
65 ret = sa.query(RhodeCodeUi).all()
66
67 if not ret:
68 raise Exception('Could not get application ui settings !')
69 settings = {}
70 for each in ret:
71 k = each.ui_key
72 v = each.ui_value
73 if k == '/':
74 k = 'root_path'
75
76 if k.find('.') != -1:
77 k = k.replace('.', '_')
78
79 if each.ui_section == 'hooks':
80 v = each.ui_active
81
82 settings[each.ui_section + '_' + k] = v
83
84 return settings
85
86 @task
46 @task
87 @locked_task
47 @locked_task
88 def whoosh_index(repo_location, full_index):
48 def whoosh_index(repo_location, full_index):
89 log = whoosh_index.get_logger()
49 log = whoosh_index.get_logger()
90 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
50 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
91 index_location = ''
51 index_location = ''
92 WhooshIndexingDaemon(index_location=index_location,
52 WhooshIndexingDaemon(index_location=index_location,
93 repo_location=repo_location).run(full_index=full_index)
53 repo_location=repo_location).run(full_index=full_index)
94
54
95 @task
55 @task
96 @locked_task
56 @locked_task
97 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
57 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
98 from rhodecode.model.db import Statistics, Repository
58 from rhodecode.model.db import Statistics, Repository
99 log = get_commits_stats.get_logger()
59 log = get_commits_stats.get_logger()
100 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
60 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
101
61
102 commits_by_day_author_aggregate = {}
62 commits_by_day_author_aggregate = {}
103 commits_by_day_aggregate = {}
63 commits_by_day_aggregate = {}
104 repos_path = get_hg_ui_settings()['paths_root_path']
64 repos_path = HgModel().repos_path
105 p = os.path.join(repos_path, repo_name)
65 p = os.path.join(repos_path, repo_name)
106 repo = get_repo(p)
66 repo = get_repo(p)
107
67
108 skip_date_limit = True
68 skip_date_limit = True
109 parse_limit = 250 #limit for single task changeset parsing optimal for
69 parse_limit = 250 #limit for single task changeset parsing optimal for
110 last_rev = 0
70 last_rev = 0
111 last_cs = None
71 last_cs = None
112 timegetter = itemgetter('time')
72 timegetter = itemgetter('time')
113
73
114 sa = get_session()
74 sa = get_session()
115
75
116 dbrepo = sa.query(Repository)\
76 dbrepo = sa.query(Repository)\
117 .filter(Repository.repo_name == repo_name).scalar()
77 .filter(Repository.repo_name == repo_name).scalar()
118 cur_stats = sa.query(Statistics)\
78 cur_stats = sa.query(Statistics)\
119 .filter(Statistics.repository == dbrepo).scalar()
79 .filter(Statistics.repository == dbrepo).scalar()
120 if cur_stats:
80 if cur_stats:
121 last_rev = cur_stats.stat_on_revision
81 last_rev = cur_stats.stat_on_revision
122 if not repo.revisions:
82 if not repo.revisions:
123 return True
83 return True
124
84
125 if last_rev == repo.revisions[-1] and len(repo.revisions) > 1:
85 if last_rev == repo.revisions[-1] and len(repo.revisions) > 1:
126 #pass silently without any work if we're not on first revision or current
86 #pass silently without any work if we're not on first revision or current
127 #state of parsing revision(from db marker) is the last revision
87 #state of parsing revision(from db marker) is the last revision
128 return True
88 return True
129
89
130 if cur_stats:
90 if cur_stats:
131 commits_by_day_aggregate = OrderedDict(
91 commits_by_day_aggregate = OrderedDict(
132 json.loads(
92 json.loads(
133 cur_stats.commit_activity_combined))
93 cur_stats.commit_activity_combined))
134 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
94 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
135
95
136 log.debug('starting parsing %s', parse_limit)
96 log.debug('starting parsing %s', parse_limit)
137 lmktime = mktime
97 lmktime = mktime
138
98
139 for cnt, rev in enumerate(repo.revisions[last_rev:]):
99 for cnt, rev in enumerate(repo.revisions[last_rev:]):
140 last_cs = cs = repo.get_changeset(rev)
100 last_cs = cs = repo.get_changeset(rev)
141 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
101 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
142 cs.date.timetuple()[2])
102 cs.date.timetuple()[2])
143 timetupple = [int(x) for x in k.split('-')]
103 timetupple = [int(x) for x in k.split('-')]
144 timetupple.extend([0 for _ in xrange(6)])
104 timetupple.extend([0 for _ in xrange(6)])
145 k = lmktime(timetupple)
105 k = lmktime(timetupple)
146 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
106 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
147 try:
107 try:
148 l = [timegetter(x) for x in commits_by_day_author_aggregate\
108 l = [timegetter(x) for x in commits_by_day_author_aggregate\
149 [author_key_cleaner(cs.author)]['data']]
109 [author_key_cleaner(cs.author)]['data']]
150 time_pos = l.index(k)
110 time_pos = l.index(k)
151 except ValueError:
111 except ValueError:
152 time_pos = False
112 time_pos = False
153
113
154 if time_pos >= 0 and time_pos is not False:
114 if time_pos >= 0 and time_pos is not False:
155
115
156 datadict = commits_by_day_author_aggregate\
116 datadict = commits_by_day_author_aggregate\
157 [author_key_cleaner(cs.author)]['data'][time_pos]
117 [author_key_cleaner(cs.author)]['data'][time_pos]
158
118
159 datadict["commits"] += 1
119 datadict["commits"] += 1
160 datadict["added"] += len(cs.added)
120 datadict["added"] += len(cs.added)
161 datadict["changed"] += len(cs.changed)
121 datadict["changed"] += len(cs.changed)
162 datadict["removed"] += len(cs.removed)
122 datadict["removed"] += len(cs.removed)
163
123
164 else:
124 else:
165 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
125 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
166
126
167 datadict = {"time":k,
127 datadict = {"time":k,
168 "commits":1,
128 "commits":1,
169 "added":len(cs.added),
129 "added":len(cs.added),
170 "changed":len(cs.changed),
130 "changed":len(cs.changed),
171 "removed":len(cs.removed),
131 "removed":len(cs.removed),
172 }
132 }
173 commits_by_day_author_aggregate\
133 commits_by_day_author_aggregate\
174 [author_key_cleaner(cs.author)]['data'].append(datadict)
134 [author_key_cleaner(cs.author)]['data'].append(datadict)
175
135
176 else:
136 else:
177 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
137 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
178 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
138 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
179 "label":author_key_cleaner(cs.author),
139 "label":author_key_cleaner(cs.author),
180 "data":[{"time":k,
140 "data":[{"time":k,
181 "commits":1,
141 "commits":1,
182 "added":len(cs.added),
142 "added":len(cs.added),
183 "changed":len(cs.changed),
143 "changed":len(cs.changed),
184 "removed":len(cs.removed),
144 "removed":len(cs.removed),
185 }],
145 }],
186 "schema":["commits"],
146 "schema":["commits"],
187 }
147 }
188
148
189 #gather all data by day
149 #gather all data by day
190 if commits_by_day_aggregate.has_key(k):
150 if commits_by_day_aggregate.has_key(k):
191 commits_by_day_aggregate[k] += 1
151 commits_by_day_aggregate[k] += 1
192 else:
152 else:
193 commits_by_day_aggregate[k] = 1
153 commits_by_day_aggregate[k] = 1
194
154
195 if cnt >= parse_limit:
155 if cnt >= parse_limit:
196 #don't fetch to much data since we can freeze application
156 #don't fetch to much data since we can freeze application
197 break
157 break
198 overview_data = []
158 overview_data = []
199 for k, v in commits_by_day_aggregate.items():
159 for k, v in commits_by_day_aggregate.items():
200 overview_data.append([k, v])
160 overview_data.append([k, v])
201 overview_data = sorted(overview_data, key=itemgetter(0))
161 overview_data = sorted(overview_data, key=itemgetter(0))
202 if not commits_by_day_author_aggregate:
162 if not commits_by_day_author_aggregate:
203 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
163 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
204 "label":author_key_cleaner(repo.contact),
164 "label":author_key_cleaner(repo.contact),
205 "data":[0, 1],
165 "data":[0, 1],
206 "schema":["commits"],
166 "schema":["commits"],
207 }
167 }
208
168
209 stats = cur_stats if cur_stats else Statistics()
169 stats = cur_stats if cur_stats else Statistics()
210 stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
170 stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
211 stats.commit_activity_combined = json.dumps(overview_data)
171 stats.commit_activity_combined = json.dumps(overview_data)
212
172
213 log.debug('last revison %s', last_rev)
173 log.debug('last revison %s', last_rev)
214 leftovers = len(repo.revisions[last_rev:])
174 leftovers = len(repo.revisions[last_rev:])
215 log.debug('revisions to parse %s', leftovers)
175 log.debug('revisions to parse %s', leftovers)
216
176
217 if last_rev == 0 or leftovers < parse_limit:
177 if last_rev == 0 or leftovers < parse_limit:
218 stats.languages = json.dumps(__get_codes_stats(repo_name))
178 stats.languages = json.dumps(__get_codes_stats(repo_name))
219
179
220 stats.repository = dbrepo
180 stats.repository = dbrepo
221 stats.stat_on_revision = last_cs.revision
181 stats.stat_on_revision = last_cs.revision
222
182
223 try:
183 try:
224 sa.add(stats)
184 sa.add(stats)
225 sa.commit()
185 sa.commit()
226 except:
186 except:
227 log.error(traceback.format_exc())
187 log.error(traceback.format_exc())
228 sa.rollback()
188 sa.rollback()
229 return False
189 return False
230 if len(repo.revisions) > 1:
190 if len(repo.revisions) > 1:
231 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
191 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
232
192
233 return True
193 return True
234
194
235 @task
195 @task
236 def reset_user_password(user_email):
196 def reset_user_password(user_email):
237 log = reset_user_password.get_logger()
197 log = reset_user_password.get_logger()
238 from rhodecode.lib import auth
198 from rhodecode.lib import auth
239 from rhodecode.model.db import User
199 from rhodecode.model.db import User
240
200
241 try:
201 try:
242 try:
202 try:
243 sa = get_session()
203 sa = get_session()
244 user = sa.query(User).filter(User.email == user_email).scalar()
204 user = sa.query(User).filter(User.email == user_email).scalar()
245 new_passwd = auth.PasswordGenerator().gen_password(8,
205 new_passwd = auth.PasswordGenerator().gen_password(8,
246 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
206 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
247 if user:
207 if user:
248 user.password = auth.get_crypt_password(new_passwd)
208 user.password = auth.get_crypt_password(new_passwd)
249 sa.add(user)
209 sa.add(user)
250 sa.commit()
210 sa.commit()
251 log.info('change password for %s', user_email)
211 log.info('change password for %s', user_email)
252 if new_passwd is None:
212 if new_passwd is None:
253 raise Exception('unable to generate new password')
213 raise Exception('unable to generate new password')
254
214
255 except:
215 except:
256 log.error(traceback.format_exc())
216 log.error(traceback.format_exc())
257 sa.rollback()
217 sa.rollback()
258
218
259 run_task(send_email, user_email,
219 run_task(send_email, user_email,
260 "Your new rhodecode password",
220 "Your new rhodecode password",
261 'Your new rhodecode password:%s' % (new_passwd))
221 'Your new rhodecode password:%s' % (new_passwd))
262 log.info('send new password mail to %s', user_email)
222 log.info('send new password mail to %s', user_email)
263
223
264
224
265 except:
225 except:
266 log.error('Failed to update user password')
226 log.error('Failed to update user password')
267 log.error(traceback.format_exc())
227 log.error(traceback.format_exc())
268 return True
228 return True
269
229
270 @task
230 @task
271 def send_email(recipients, subject, body):
231 def send_email(recipients, subject, body):
272 log = send_email.get_logger()
232 log = send_email.get_logger()
273 email_config = dict(config.items('DEFAULT'))
233 email_config = dict(config.items('DEFAULT'))
274 mail_from = email_config.get('app_email_from')
234 mail_from = email_config.get('app_email_from')
275 user = email_config.get('smtp_username')
235 user = email_config.get('smtp_username')
276 passwd = email_config.get('smtp_password')
236 passwd = email_config.get('smtp_password')
277 mail_server = email_config.get('smtp_server')
237 mail_server = email_config.get('smtp_server')
278 mail_port = email_config.get('smtp_port')
238 mail_port = email_config.get('smtp_port')
279 tls = email_config.get('smtp_use_tls')
239 tls = email_config.get('smtp_use_tls')
280 ssl = False
240 ssl = False
281
241
282 try:
242 try:
283 m = SmtpMailer(mail_from, user, passwd, mail_server,
243 m = SmtpMailer(mail_from, user, passwd, mail_server,
284 mail_port, ssl, tls)
244 mail_port, ssl, tls)
285 m.send(recipients, subject, body)
245 m.send(recipients, subject, body)
286 except:
246 except:
287 log.error('Mail sending failed')
247 log.error('Mail sending failed')
288 log.error(traceback.format_exc())
248 log.error(traceback.format_exc())
289 return False
249 return False
290 return True
250 return True
291
251
292 @task
252 @task
293 def create_repo_fork(form_data, cur_user):
253 def create_repo_fork(form_data, cur_user):
294 from rhodecode.model.repo import RepoModel
254 from rhodecode.model.repo import RepoModel
295 from vcs import get_backend
255 from vcs import get_backend
296 log = create_repo_fork.get_logger()
256 log = create_repo_fork.get_logger()
297 repo_model = RepoModel(get_session())
257 repo_model = RepoModel(get_session())
298 repo_model.create(form_data, cur_user, just_db=True, fork=True)
258 repo_model.create(form_data, cur_user, just_db=True, fork=True)
299 repo_name = form_data['repo_name']
259 repo_name = form_data['repo_name']
300 repos_path = get_hg_ui_settings()['paths_root_path']
260 repos_path = HgModel().repos_path
301 repo_path = os.path.join(repos_path, repo_name)
261 repo_path = os.path.join(repos_path, repo_name)
302 repo_fork_path = os.path.join(repos_path, form_data['fork_name'])
262 repo_fork_path = os.path.join(repos_path, form_data['fork_name'])
303 alias = form_data['repo_type']
263 alias = form_data['repo_type']
304
264
305 log.info('creating repo fork %s as %s', repo_name, repo_path)
265 log.info('creating repo fork %s as %s', repo_name, repo_path)
306 backend = get_backend(alias)
266 backend = get_backend(alias)
307 backend(str(repo_fork_path), create=True, src_url=str(repo_path))
267 backend(str(repo_fork_path), create=True, src_url=str(repo_path))
308
268
309 def __get_codes_stats(repo_name):
269 def __get_codes_stats(repo_name):
310 LANGUAGES_EXTENSIONS = ['action', 'adp', 'ashx', 'asmx',
270 LANGUAGES_EXTENSIONS = ['action', 'adp', 'ashx', 'asmx',
311 'aspx', 'asx', 'axd', 'c', 'cfg', 'cfm', 'cpp', 'cs', 'diff', 'do', 'el',
271 'aspx', 'asx', 'axd', 'c', 'cfg', 'cfm', 'cpp', 'cs', 'diff', 'do', 'el',
312 'erl', 'h', 'java', 'js', 'jsp', 'jspx', 'lisp', 'lua', 'm', 'mako', 'ml',
272 'erl', 'h', 'java', 'js', 'jsp', 'jspx', 'lisp', 'lua', 'm', 'mako', 'ml',
313 'pas', 'patch', 'php', 'php3', 'php4', 'phtml', 'pm', 'py', 'rb', 'rst',
273 'pas', 'patch', 'php', 'php3', 'php4', 'phtml', 'pm', 'py', 'rb', 'rst',
314 's', 'sh', 'tpl', 'txt', 'vim', 'wss', 'xhtml', 'xml', 'xsl', 'xslt', 'yaws']
274 's', 'sh', 'tpl', 'txt', 'vim', 'wss', 'xhtml', 'xml', 'xsl', 'xslt', 'yaws']
315
275
316
276
317 repos_path = get_hg_ui_settings()['paths_root_path']
277 repos_path = HgModel().repos_path
318 p = os.path.join(repos_path, repo_name)
278 p = os.path.join(repos_path, repo_name)
319 repo = get_repo(p)
279 repo = get_repo(p)
320 tip = repo.get_changeset()
280 tip = repo.get_changeset()
321 code_stats = {}
281 code_stats = {}
322
282
323 def aggregate(cs):
283 def aggregate(cs):
324 for f in cs[2]:
284 for f in cs[2]:
325 k = f.mimetype
285 k = f.mimetype
326 if f.extension in LANGUAGES_EXTENSIONS:
286 if f.extension in LANGUAGES_EXTENSIONS:
327 if code_stats.has_key(k):
287 if code_stats.has_key(k):
328 code_stats[k] += 1
288 code_stats[k] += 1
329 else:
289 else:
330 code_stats[k] = 1
290 code_stats[k] = 1
331
291
332 map(aggregate, tip.walk('/'))
292 map(aggregate, tip.walk('/'))
333
293
334 return code_stats or {}
294 return code_stats or {}
335
295
336
296
337
297
338
298
General Comments 0
You need to be logged in to leave comments. Login now