##// END OF EJS Templates
fixed celery tasks for using only one method of fetching main repos path
marcink -
r666:6ed37675 beta
parent child Browse files
Show More
@@ -1,338 +1,298 b''
1 1 from celery.decorators import task
2 2
3 3 from operator import itemgetter
4 4 from pylons.i18n.translation import _
5 5 from rhodecode.lib.celerylib import run_task, locked_task
6 6 from rhodecode.lib.helpers import person
7 7 from rhodecode.lib.smtp_mailer import SmtpMailer
8 8 from rhodecode.lib.utils import OrderedDict
9 9 from time import mktime
10 from vcs.backends.hg import MercurialRepository
11 from vcs.backends.git import GitRepository
12 10 import os
13 11 import traceback
14 12 from vcs.backends import get_repo
15 from vcs.utils.helpers import get_scm
16
13 from rhodecode.model.hg import HgModel
17 14 try:
18 15 import json
19 16 except ImportError:
20 17 #python 2.5 compatibility
21 18 import simplejson as json
22 19
23 20 try:
24 21 from celeryconfig import PYLONS_CONFIG as config
25 22 celery_on = True
26 23 except ImportError:
27 24 #if celeryconfig is not present let's just load our pylons
28 25 #config instead
29 26 from pylons import config
30 27 celery_on = False
31 28
32 29
33 30 __all__ = ['whoosh_index', 'get_commits_stats',
34 31 'reset_user_password', 'send_email']
35 32
36 33 def get_session():
37 34 if celery_on:
38 35 from sqlalchemy import engine_from_config
39 36 from sqlalchemy.orm import sessionmaker, scoped_session
40 37 engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
41 38 sa = scoped_session(sessionmaker(bind=engine))
42 39 else:
43 40 #If we don't use celery reuse our current application Session
44 41 from rhodecode.model.meta import Session
45 42 sa = Session()
46 43
47 44 return sa
48 45
49 def get_hg_settings():
50 from rhodecode.model.db import RhodeCodeSettings
51 sa = get_session()
52 ret = sa.query(RhodeCodeSettings).all()
53
54 if not ret:
55 raise Exception('Could not get application settings !')
56 settings = {}
57 for each in ret:
58 settings['rhodecode_' + each.app_settings_name] = each.app_settings_value
59
60 return settings
61
62 def get_hg_ui_settings():
63 from rhodecode.model.db import RhodeCodeUi
64 sa = get_session()
65 ret = sa.query(RhodeCodeUi).all()
66
67 if not ret:
68 raise Exception('Could not get application ui settings !')
69 settings = {}
70 for each in ret:
71 k = each.ui_key
72 v = each.ui_value
73 if k == '/':
74 k = 'root_path'
75
76 if k.find('.') != -1:
77 k = k.replace('.', '_')
78
79 if each.ui_section == 'hooks':
80 v = each.ui_active
81
82 settings[each.ui_section + '_' + k] = v
83
84 return settings
85
86 46 @task
87 47 @locked_task
88 48 def whoosh_index(repo_location, full_index):
89 49 log = whoosh_index.get_logger()
90 50 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
91 51 index_location = ''
92 52 WhooshIndexingDaemon(index_location=index_location,
93 53 repo_location=repo_location).run(full_index=full_index)
94 54
95 55 @task
96 56 @locked_task
97 57 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
98 58 from rhodecode.model.db import Statistics, Repository
99 59 log = get_commits_stats.get_logger()
100 60 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
101 61
102 62 commits_by_day_author_aggregate = {}
103 63 commits_by_day_aggregate = {}
104 repos_path = get_hg_ui_settings()['paths_root_path']
64 repos_path = HgModel().repos_path
105 65 p = os.path.join(repos_path, repo_name)
106 66 repo = get_repo(p)
107 67
108 68 skip_date_limit = True
109 69 parse_limit = 250 #limit for single task changeset parsing optimal for
110 70 last_rev = 0
111 71 last_cs = None
112 72 timegetter = itemgetter('time')
113 73
114 74 sa = get_session()
115 75
116 76 dbrepo = sa.query(Repository)\
117 77 .filter(Repository.repo_name == repo_name).scalar()
118 78 cur_stats = sa.query(Statistics)\
119 79 .filter(Statistics.repository == dbrepo).scalar()
120 80 if cur_stats:
121 81 last_rev = cur_stats.stat_on_revision
122 82 if not repo.revisions:
123 83 return True
124 84
125 85 if last_rev == repo.revisions[-1] and len(repo.revisions) > 1:
126 86 #pass silently without any work if we're not on first revision or current
127 87 #state of parsing revision(from db marker) is the last revision
128 88 return True
129 89
130 90 if cur_stats:
131 91 commits_by_day_aggregate = OrderedDict(
132 92 json.loads(
133 93 cur_stats.commit_activity_combined))
134 94 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
135 95
136 96 log.debug('starting parsing %s', parse_limit)
137 97 lmktime = mktime
138 98
139 99 for cnt, rev in enumerate(repo.revisions[last_rev:]):
140 100 last_cs = cs = repo.get_changeset(rev)
141 101 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
142 102 cs.date.timetuple()[2])
143 103 timetupple = [int(x) for x in k.split('-')]
144 104 timetupple.extend([0 for _ in xrange(6)])
145 105 k = lmktime(timetupple)
146 106 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
147 107 try:
148 108 l = [timegetter(x) for x in commits_by_day_author_aggregate\
149 109 [author_key_cleaner(cs.author)]['data']]
150 110 time_pos = l.index(k)
151 111 except ValueError:
152 112 time_pos = False
153 113
154 114 if time_pos >= 0 and time_pos is not False:
155 115
156 116 datadict = commits_by_day_author_aggregate\
157 117 [author_key_cleaner(cs.author)]['data'][time_pos]
158 118
159 119 datadict["commits"] += 1
160 120 datadict["added"] += len(cs.added)
161 121 datadict["changed"] += len(cs.changed)
162 122 datadict["removed"] += len(cs.removed)
163 123
164 124 else:
165 125 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
166 126
167 127 datadict = {"time":k,
168 128 "commits":1,
169 129 "added":len(cs.added),
170 130 "changed":len(cs.changed),
171 131 "removed":len(cs.removed),
172 132 }
173 133 commits_by_day_author_aggregate\
174 134 [author_key_cleaner(cs.author)]['data'].append(datadict)
175 135
176 136 else:
177 137 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
178 138 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
179 139 "label":author_key_cleaner(cs.author),
180 140 "data":[{"time":k,
181 141 "commits":1,
182 142 "added":len(cs.added),
183 143 "changed":len(cs.changed),
184 144 "removed":len(cs.removed),
185 145 }],
186 146 "schema":["commits"],
187 147 }
188 148
189 149 #gather all data by day
190 150 if commits_by_day_aggregate.has_key(k):
191 151 commits_by_day_aggregate[k] += 1
192 152 else:
193 153 commits_by_day_aggregate[k] = 1
194 154
195 155 if cnt >= parse_limit:
196 156 #don't fetch to much data since we can freeze application
197 157 break
198 158 overview_data = []
199 159 for k, v in commits_by_day_aggregate.items():
200 160 overview_data.append([k, v])
201 161 overview_data = sorted(overview_data, key=itemgetter(0))
202 162 if not commits_by_day_author_aggregate:
203 163 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
204 164 "label":author_key_cleaner(repo.contact),
205 165 "data":[0, 1],
206 166 "schema":["commits"],
207 167 }
208 168
209 169 stats = cur_stats if cur_stats else Statistics()
210 170 stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
211 171 stats.commit_activity_combined = json.dumps(overview_data)
212 172
213 173 log.debug('last revison %s', last_rev)
214 174 leftovers = len(repo.revisions[last_rev:])
215 175 log.debug('revisions to parse %s', leftovers)
216 176
217 177 if last_rev == 0 or leftovers < parse_limit:
218 178 stats.languages = json.dumps(__get_codes_stats(repo_name))
219 179
220 180 stats.repository = dbrepo
221 181 stats.stat_on_revision = last_cs.revision
222 182
223 183 try:
224 184 sa.add(stats)
225 185 sa.commit()
226 186 except:
227 187 log.error(traceback.format_exc())
228 188 sa.rollback()
229 189 return False
230 190 if len(repo.revisions) > 1:
231 191 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
232 192
233 193 return True
234 194
235 195 @task
236 196 def reset_user_password(user_email):
237 197 log = reset_user_password.get_logger()
238 198 from rhodecode.lib import auth
239 199 from rhodecode.model.db import User
240 200
241 201 try:
242 202 try:
243 203 sa = get_session()
244 204 user = sa.query(User).filter(User.email == user_email).scalar()
245 205 new_passwd = auth.PasswordGenerator().gen_password(8,
246 206 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
247 207 if user:
248 208 user.password = auth.get_crypt_password(new_passwd)
249 209 sa.add(user)
250 210 sa.commit()
251 211 log.info('change password for %s', user_email)
252 212 if new_passwd is None:
253 213 raise Exception('unable to generate new password')
254 214
255 215 except:
256 216 log.error(traceback.format_exc())
257 217 sa.rollback()
258 218
259 219 run_task(send_email, user_email,
260 220 "Your new rhodecode password",
261 221 'Your new rhodecode password:%s' % (new_passwd))
262 222 log.info('send new password mail to %s', user_email)
263 223
264 224
265 225 except:
266 226 log.error('Failed to update user password')
267 227 log.error(traceback.format_exc())
268 228 return True
269 229
270 230 @task
271 231 def send_email(recipients, subject, body):
272 232 log = send_email.get_logger()
273 233 email_config = dict(config.items('DEFAULT'))
274 234 mail_from = email_config.get('app_email_from')
275 235 user = email_config.get('smtp_username')
276 236 passwd = email_config.get('smtp_password')
277 237 mail_server = email_config.get('smtp_server')
278 238 mail_port = email_config.get('smtp_port')
279 239 tls = email_config.get('smtp_use_tls')
280 240 ssl = False
281 241
282 242 try:
283 243 m = SmtpMailer(mail_from, user, passwd, mail_server,
284 244 mail_port, ssl, tls)
285 245 m.send(recipients, subject, body)
286 246 except:
287 247 log.error('Mail sending failed')
288 248 log.error(traceback.format_exc())
289 249 return False
290 250 return True
291 251
292 252 @task
293 253 def create_repo_fork(form_data, cur_user):
294 254 from rhodecode.model.repo import RepoModel
295 255 from vcs import get_backend
296 256 log = create_repo_fork.get_logger()
297 257 repo_model = RepoModel(get_session())
298 258 repo_model.create(form_data, cur_user, just_db=True, fork=True)
299 259 repo_name = form_data['repo_name']
300 repos_path = get_hg_ui_settings()['paths_root_path']
260 repos_path = HgModel().repos_path
301 261 repo_path = os.path.join(repos_path, repo_name)
302 262 repo_fork_path = os.path.join(repos_path, form_data['fork_name'])
303 263 alias = form_data['repo_type']
304 264
305 265 log.info('creating repo fork %s as %s', repo_name, repo_path)
306 266 backend = get_backend(alias)
307 267 backend(str(repo_fork_path), create=True, src_url=str(repo_path))
308 268
309 269 def __get_codes_stats(repo_name):
310 270 LANGUAGES_EXTENSIONS = ['action', 'adp', 'ashx', 'asmx',
311 271 'aspx', 'asx', 'axd', 'c', 'cfg', 'cfm', 'cpp', 'cs', 'diff', 'do', 'el',
312 272 'erl', 'h', 'java', 'js', 'jsp', 'jspx', 'lisp', 'lua', 'm', 'mako', 'ml',
313 273 'pas', 'patch', 'php', 'php3', 'php4', 'phtml', 'pm', 'py', 'rb', 'rst',
314 274 's', 'sh', 'tpl', 'txt', 'vim', 'wss', 'xhtml', 'xml', 'xsl', 'xslt', 'yaws']
315 275
316 276
317 repos_path = get_hg_ui_settings()['paths_root_path']
277 repos_path = HgModel().repos_path
318 278 p = os.path.join(repos_path, repo_name)
319 279 repo = get_repo(p)
320 280 tip = repo.get_changeset()
321 281 code_stats = {}
322 282
323 283 def aggregate(cs):
324 284 for f in cs[2]:
325 285 k = f.mimetype
326 286 if f.extension in LANGUAGES_EXTENSIONS:
327 287 if code_stats.has_key(k):
328 288 code_stats[k] += 1
329 289 else:
330 290 code_stats[k] = 1
331 291
332 292 map(aggregate, tip.walk('/'))
333 293
334 294 return code_stats or {}
335 295
336 296
337 297
338 298
General Comments 0
You need to be logged in to leave comments. Login now