##// END OF EJS Templates
make rhodecode reuse current session when not running on celery
marcink -
r559:bc4633a4 default
parent child Browse files
Show More
@@ -1,323 +1,325 b''
1 1 from celery.decorators import task
2 2
3 3 from operator import itemgetter
4 4 from pylons.i18n.translation import _
5 5 from rhodecode.lib.celerylib import run_task, locked_task
6 6 from rhodecode.lib.helpers import person
7 7 from rhodecode.lib.smtp_mailer import SmtpMailer
8 8 from rhodecode.lib.utils import OrderedDict
9 9 from time import mktime
10 10 from vcs.backends.hg import MercurialRepository
11 11 import json
12 12 import traceback
13 13
14 14 try:
15 15 from celeryconfig import PYLONS_CONFIG as config
16 celery_on = True
16 17 except ImportError:
17 18 #if celeryconfig is not present let's just load our pylons
18 19 #config instead
19 20 from pylons import config
21 celery_on = False
20 22
21 23
22 24 __all__ = ['whoosh_index', 'get_commits_stats',
23 25 'reset_user_password', 'send_email']
24 26
25 27 def get_session():
28 if celery_on:
26 29 from sqlalchemy import engine_from_config
27 30 from sqlalchemy.orm import sessionmaker, scoped_session
28 31 engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
29 32 sa = scoped_session(sessionmaker(bind=engine))
33 else:
34 #If we don't use celery reuse our current application Session
35 from rhodecode.model.meta import Session
36 sa = Session
37
30 38 return sa
31 39
32 40 def get_hg_settings():
33 41 from rhodecode.model.db import RhodeCodeSettings
34 try:
35 42 sa = get_session()
36 43 ret = sa.query(RhodeCodeSettings).all()
37 finally:
38 sa.remove()
39 44
40 45 if not ret:
41 46 raise Exception('Could not get application settings !')
42 47 settings = {}
43 48 for each in ret:
44 49 settings['rhodecode_' + each.app_settings_name] = each.app_settings_value
45 50
46 51 return settings
47 52
48 53 def get_hg_ui_settings():
49 54 from rhodecode.model.db import RhodeCodeUi
50 try:
51 55 sa = get_session()
52 56 ret = sa.query(RhodeCodeUi).all()
53 finally:
54 sa.remove()
55 57
56 58 if not ret:
57 59 raise Exception('Could not get application ui settings !')
58 60 settings = {}
59 61 for each in ret:
60 62 k = each.ui_key
61 63 v = each.ui_value
62 64 if k == '/':
63 65 k = 'root_path'
64 66
65 67 if k.find('.') != -1:
66 68 k = k.replace('.', '_')
67 69
68 70 if each.ui_section == 'hooks':
69 71 v = each.ui_active
70 72
71 73 settings[each.ui_section + '_' + k] = v
72 74
73 75 return settings
74 76
75 77 @task
76 78 @locked_task
77 79 def whoosh_index(repo_location, full_index):
78 80 log = whoosh_index.get_logger()
79 81 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
80 82 WhooshIndexingDaemon(repo_location=repo_location).run(full_index=full_index)
81 83
82 84 @task
83 85 @locked_task
84 86 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
85 87 from rhodecode.model.db import Statistics, Repository
86 88 log = get_commits_stats.get_logger()
87 89 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
88 90
89 91 commits_by_day_author_aggregate = {}
90 92 commits_by_day_aggregate = {}
91 93 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
92 94 repo = MercurialRepository(repos_path + repo_name)
93 95
94 96 skip_date_limit = True
95 97 parse_limit = 350 #limit for single task changeset parsing optimal for
96 98 last_rev = 0
97 99 last_cs = None
98 100 timegetter = itemgetter('time')
99 101
100 102 sa = get_session()
101 103
102 104 dbrepo = sa.query(Repository)\
103 105 .filter(Repository.repo_name == repo_name).scalar()
104 106 cur_stats = sa.query(Statistics)\
105 107 .filter(Statistics.repository == dbrepo).scalar()
106 108 if cur_stats:
107 109 last_rev = cur_stats.stat_on_revision
108 110 if not repo.revisions:
109 111 return True
110 112
111 113 if last_rev == repo.revisions[-1] and len(repo.revisions) > 1:
112 114 #pass silently without any work if we're not on first revision or current
113 115 #state of parsing revision(from db marker) is the last revision
114 116 return True
115 117
116 118 if cur_stats:
117 119 commits_by_day_aggregate = OrderedDict(
118 120 json.loads(
119 121 cur_stats.commit_activity_combined))
120 122 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
121 123
122 124 log.debug('starting parsing %s', parse_limit)
123 125 for cnt, rev in enumerate(repo.revisions[last_rev:]):
124 126 last_cs = cs = repo.get_changeset(rev)
125 127 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
126 128 cs.date.timetuple()[2])
127 129 timetupple = [int(x) for x in k.split('-')]
128 130 timetupple.extend([0 for _ in xrange(6)])
129 131 k = mktime(timetupple)
130 132 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
131 133 try:
132 134 l = [timegetter(x) for x in commits_by_day_author_aggregate\
133 135 [author_key_cleaner(cs.author)]['data']]
134 136 time_pos = l.index(k)
135 137 except ValueError:
136 138 time_pos = False
137 139
138 140 if time_pos >= 0 and time_pos is not False:
139 141
140 142 datadict = commits_by_day_author_aggregate\
141 143 [author_key_cleaner(cs.author)]['data'][time_pos]
142 144
143 145 datadict["commits"] += 1
144 146 datadict["added"] += len(cs.added)
145 147 datadict["changed"] += len(cs.changed)
146 148 datadict["removed"] += len(cs.removed)
147 149 #print datadict
148 150
149 151 else:
150 152 #print 'ELSE !!!!'
151 153 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
152 154
153 155 datadict = {"time":k,
154 156 "commits":1,
155 157 "added":len(cs.added),
156 158 "changed":len(cs.changed),
157 159 "removed":len(cs.removed),
158 160 }
159 161 commits_by_day_author_aggregate\
160 162 [author_key_cleaner(cs.author)]['data'].append(datadict)
161 163
162 164 else:
163 165 #print k, 'nokey ADDING'
164 166 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
165 167 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
166 168 "label":author_key_cleaner(cs.author),
167 169 "data":[{"time":k,
168 170 "commits":1,
169 171 "added":len(cs.added),
170 172 "changed":len(cs.changed),
171 173 "removed":len(cs.removed),
172 174 }],
173 175 "schema":["commits"],
174 176 }
175 177
176 178 # #gather all data by day
177 179 if commits_by_day_aggregate.has_key(k):
178 180 commits_by_day_aggregate[k] += 1
179 181 else:
180 182 commits_by_day_aggregate[k] = 1
181 183
182 184 if cnt >= parse_limit:
183 185 #don't fetch to much data since we can freeze application
184 186 break
185 187
186 188 overview_data = []
187 189 for k, v in commits_by_day_aggregate.items():
188 190 overview_data.append([k, v])
189 191 overview_data = sorted(overview_data, key=itemgetter(0))
190 192
191 193 if not commits_by_day_author_aggregate:
192 194 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
193 195 "label":author_key_cleaner(repo.contact),
194 196 "data":[0, 1],
195 197 "schema":["commits"],
196 198 }
197 199
198 200 stats = cur_stats if cur_stats else Statistics()
199 201 stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
200 202 stats.commit_activity_combined = json.dumps(overview_data)
201 203
202 204 log.debug('last revison %s', last_rev)
203 205 leftovers = len(repo.revisions[last_rev:])
204 206 log.debug('revisions to parse %s', leftovers)
205 207
206 208 if last_rev == 0 or leftovers < parse_limit:
207 209 stats.languages = json.dumps(__get_codes_stats(repo_name))
208 210
209 211 stats.repository = dbrepo
210 212 stats.stat_on_revision = last_cs.revision
211 213
212 214 try:
213 215 sa.add(stats)
214 216 sa.commit()
215 217 except:
216 218 log.error(traceback.format_exc())
217 219 sa.rollback()
218 220 return False
219 221 if len(repo.revisions) > 1:
220 222 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
221 223
222 224 return True
223 225
224 226 @task
225 227 def reset_user_password(user_email):
226 228 log = reset_user_password.get_logger()
227 229 from rhodecode.lib import auth
228 230 from rhodecode.model.db import User
229 231
230 232 try:
231 233 try:
232 234 sa = get_session()
233 235 user = sa.query(User).filter(User.email == user_email).scalar()
234 236 new_passwd = auth.PasswordGenerator().gen_password(8,
235 237 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
236 238 if user:
237 239 user.password = auth.get_crypt_password(new_passwd)
238 240 sa.add(user)
239 241 sa.commit()
240 242 log.info('change password for %s', user_email)
241 243 if new_passwd is None:
242 244 raise Exception('unable to generate new password')
243 245
244 246 except:
245 247 log.error(traceback.format_exc())
246 248 sa.rollback()
247 249
248 250 run_task(send_email, user_email,
249 251 "Your new rhodecode password",
250 252 'Your new rhodecode password:%s' % (new_passwd))
251 253 log.info('send new password mail to %s', user_email)
252 254
253 255
254 256 except:
255 257 log.error('Failed to update user password')
256 258 log.error(traceback.format_exc())
257 259 return True
258 260
259 261 @task
260 262 def send_email(recipients, subject, body):
261 263 log = send_email.get_logger()
262 264 email_config = dict(config.items('DEFAULT'))
263 265 mail_from = email_config.get('app_email_from')
264 266 user = email_config.get('smtp_username')
265 267 passwd = email_config.get('smtp_password')
266 268 mail_server = email_config.get('smtp_server')
267 269 mail_port = email_config.get('smtp_port')
268 270 tls = email_config.get('smtp_use_tls')
269 271 ssl = False
270 272
271 273 try:
272 274 m = SmtpMailer(mail_from, user, passwd, mail_server,
273 275 mail_port, ssl, tls)
274 276 m.send(recipients, subject, body)
275 277 except:
276 278 log.error('Mail sending failed')
277 279 log.error(traceback.format_exc())
278 280 return False
279 281 return True
280 282
281 283 @task
282 284 def create_repo_fork(form_data, cur_user):
283 285 import os
284 286 from rhodecode.model.repo_model import RepoModel
285 287 sa = get_session()
286 288 rm = RepoModel(sa)
287 289
288 290 rm.create(form_data, cur_user, just_db=True, fork=True)
289 291
290 292 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
291 293 repo_path = os.path.join(repos_path, form_data['repo_name'])
292 294 repo_fork_path = os.path.join(repos_path, form_data['fork_name'])
293 295
294 296 MercurialRepository(str(repo_fork_path), True, clone_url=str(repo_path))
295 297
296 298
297 299 def __get_codes_stats(repo_name):
298 300 LANGUAGES_EXTENSIONS = ['action', 'adp', 'ashx', 'asmx', 'aspx', 'asx', 'axd', 'c',
299 301 'cfg', 'cfm', 'cpp', 'cs', 'diff', 'do', 'el', 'erl',
300 302 'h', 'java', 'js', 'jsp', 'jspx', 'lisp',
301 303 'lua', 'm', 'mako', 'ml', 'pas', 'patch', 'php', 'php3',
302 304 'php4', 'phtml', 'pm', 'py', 'rb', 'rst', 's', 'sh',
303 305 'tpl', 'txt', 'vim', 'wss', 'xhtml', 'xml', 'xsl', 'xslt',
304 306 'yaws']
305 307 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
306 308 repo = MercurialRepository(repos_path + repo_name)
307 309
308 310 code_stats = {}
309 311 for topnode, dirs, files in repo.walk('/', 'tip'):
310 312 for f in files:
311 313 k = f.mimetype
312 314 if f.extension in LANGUAGES_EXTENSIONS:
313 315 if code_stats.has_key(k):
314 316 code_stats[k] += 1
315 317 else:
316 318 code_stats[k] = 1
317 319
318 320 return code_stats or {}
319 321
320 322
321 323
322 324
323 325
General Comments 0
You need to be logged in to leave comments. Login now