##// END OF EJS Templates
make rhodecode reuse current session when not running on celery
marcink -
r559:bc4633a4 default
parent child Browse files
Show More
@@ -1,323 +1,325 b''
1 from celery.decorators import task
1 from celery.decorators import task
2
2
3 from operator import itemgetter
3 from operator import itemgetter
4 from pylons.i18n.translation import _
4 from pylons.i18n.translation import _
5 from rhodecode.lib.celerylib import run_task, locked_task
5 from rhodecode.lib.celerylib import run_task, locked_task
6 from rhodecode.lib.helpers import person
6 from rhodecode.lib.helpers import person
7 from rhodecode.lib.smtp_mailer import SmtpMailer
7 from rhodecode.lib.smtp_mailer import SmtpMailer
8 from rhodecode.lib.utils import OrderedDict
8 from rhodecode.lib.utils import OrderedDict
9 from time import mktime
9 from time import mktime
10 from vcs.backends.hg import MercurialRepository
10 from vcs.backends.hg import MercurialRepository
11 import json
11 import json
12 import traceback
12 import traceback
13
13
14 try:
14 try:
15 from celeryconfig import PYLONS_CONFIG as config
15 from celeryconfig import PYLONS_CONFIG as config
16 celery_on = True
16 except ImportError:
17 except ImportError:
17 #if celeryconfig is not present let's just load our pylons
18 #if celeryconfig is not present let's just load our pylons
18 #config instead
19 #config instead
19 from pylons import config
20 from pylons import config
21 celery_on = False
20
22
21
23
22 __all__ = ['whoosh_index', 'get_commits_stats',
24 __all__ = ['whoosh_index', 'get_commits_stats',
23 'reset_user_password', 'send_email']
25 'reset_user_password', 'send_email']
24
26
25 def get_session():
27 def get_session():
26 from sqlalchemy import engine_from_config
28 if celery_on:
27 from sqlalchemy.orm import sessionmaker, scoped_session
29 from sqlalchemy import engine_from_config
28 engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
30 from sqlalchemy.orm import sessionmaker, scoped_session
29 sa = scoped_session(sessionmaker(bind=engine))
31 engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
32 sa = scoped_session(sessionmaker(bind=engine))
33 else:
34 #If we don't use celery reuse our current application Session
35 from rhodecode.model.meta import Session
36 sa = Session
37
30 return sa
38 return sa
31
39
32 def get_hg_settings():
40 def get_hg_settings():
33 from rhodecode.model.db import RhodeCodeSettings
41 from rhodecode.model.db import RhodeCodeSettings
34 try:
42 sa = get_session()
35 sa = get_session()
43 ret = sa.query(RhodeCodeSettings).all()
36 ret = sa.query(RhodeCodeSettings).all()
37 finally:
38 sa.remove()
39
44
40 if not ret:
45 if not ret:
41 raise Exception('Could not get application settings !')
46 raise Exception('Could not get application settings !')
42 settings = {}
47 settings = {}
43 for each in ret:
48 for each in ret:
44 settings['rhodecode_' + each.app_settings_name] = each.app_settings_value
49 settings['rhodecode_' + each.app_settings_name] = each.app_settings_value
45
50
46 return settings
51 return settings
47
52
48 def get_hg_ui_settings():
53 def get_hg_ui_settings():
49 from rhodecode.model.db import RhodeCodeUi
54 from rhodecode.model.db import RhodeCodeUi
50 try:
55 sa = get_session()
51 sa = get_session()
56 ret = sa.query(RhodeCodeUi).all()
52 ret = sa.query(RhodeCodeUi).all()
53 finally:
54 sa.remove()
55
57
56 if not ret:
58 if not ret:
57 raise Exception('Could not get application ui settings !')
59 raise Exception('Could not get application ui settings !')
58 settings = {}
60 settings = {}
59 for each in ret:
61 for each in ret:
60 k = each.ui_key
62 k = each.ui_key
61 v = each.ui_value
63 v = each.ui_value
62 if k == '/':
64 if k == '/':
63 k = 'root_path'
65 k = 'root_path'
64
66
65 if k.find('.') != -1:
67 if k.find('.') != -1:
66 k = k.replace('.', '_')
68 k = k.replace('.', '_')
67
69
68 if each.ui_section == 'hooks':
70 if each.ui_section == 'hooks':
69 v = each.ui_active
71 v = each.ui_active
70
72
71 settings[each.ui_section + '_' + k] = v
73 settings[each.ui_section + '_' + k] = v
72
74
73 return settings
75 return settings
74
76
75 @task
77 @task
76 @locked_task
78 @locked_task
77 def whoosh_index(repo_location, full_index):
79 def whoosh_index(repo_location, full_index):
78 log = whoosh_index.get_logger()
80 log = whoosh_index.get_logger()
79 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
81 from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
80 WhooshIndexingDaemon(repo_location=repo_location).run(full_index=full_index)
82 WhooshIndexingDaemon(repo_location=repo_location).run(full_index=full_index)
81
83
82 @task
84 @task
83 @locked_task
85 @locked_task
84 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
86 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
85 from rhodecode.model.db import Statistics, Repository
87 from rhodecode.model.db import Statistics, Repository
86 log = get_commits_stats.get_logger()
88 log = get_commits_stats.get_logger()
87 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
89 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
88
90
89 commits_by_day_author_aggregate = {}
91 commits_by_day_author_aggregate = {}
90 commits_by_day_aggregate = {}
92 commits_by_day_aggregate = {}
91 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
93 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
92 repo = MercurialRepository(repos_path + repo_name)
94 repo = MercurialRepository(repos_path + repo_name)
93
95
94 skip_date_limit = True
96 skip_date_limit = True
95 parse_limit = 350 #limit for single task changeset parsing optimal for
97 parse_limit = 350 #limit for single task changeset parsing optimal for
96 last_rev = 0
98 last_rev = 0
97 last_cs = None
99 last_cs = None
98 timegetter = itemgetter('time')
100 timegetter = itemgetter('time')
99
101
100 sa = get_session()
102 sa = get_session()
101
103
102 dbrepo = sa.query(Repository)\
104 dbrepo = sa.query(Repository)\
103 .filter(Repository.repo_name == repo_name).scalar()
105 .filter(Repository.repo_name == repo_name).scalar()
104 cur_stats = sa.query(Statistics)\
106 cur_stats = sa.query(Statistics)\
105 .filter(Statistics.repository == dbrepo).scalar()
107 .filter(Statistics.repository == dbrepo).scalar()
106 if cur_stats:
108 if cur_stats:
107 last_rev = cur_stats.stat_on_revision
109 last_rev = cur_stats.stat_on_revision
108 if not repo.revisions:
110 if not repo.revisions:
109 return True
111 return True
110
112
111 if last_rev == repo.revisions[-1] and len(repo.revisions) > 1:
113 if last_rev == repo.revisions[-1] and len(repo.revisions) > 1:
112 #pass silently without any work if we're not on first revision or current
114 #pass silently without any work if we're not on first revision or current
113 #state of parsing revision(from db marker) is the last revision
115 #state of parsing revision(from db marker) is the last revision
114 return True
116 return True
115
117
116 if cur_stats:
118 if cur_stats:
117 commits_by_day_aggregate = OrderedDict(
119 commits_by_day_aggregate = OrderedDict(
118 json.loads(
120 json.loads(
119 cur_stats.commit_activity_combined))
121 cur_stats.commit_activity_combined))
120 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
122 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
121
123
122 log.debug('starting parsing %s', parse_limit)
124 log.debug('starting parsing %s', parse_limit)
123 for cnt, rev in enumerate(repo.revisions[last_rev:]):
125 for cnt, rev in enumerate(repo.revisions[last_rev:]):
124 last_cs = cs = repo.get_changeset(rev)
126 last_cs = cs = repo.get_changeset(rev)
125 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
127 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
126 cs.date.timetuple()[2])
128 cs.date.timetuple()[2])
127 timetupple = [int(x) for x in k.split('-')]
129 timetupple = [int(x) for x in k.split('-')]
128 timetupple.extend([0 for _ in xrange(6)])
130 timetupple.extend([0 for _ in xrange(6)])
129 k = mktime(timetupple)
131 k = mktime(timetupple)
130 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
132 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
131 try:
133 try:
132 l = [timegetter(x) for x in commits_by_day_author_aggregate\
134 l = [timegetter(x) for x in commits_by_day_author_aggregate\
133 [author_key_cleaner(cs.author)]['data']]
135 [author_key_cleaner(cs.author)]['data']]
134 time_pos = l.index(k)
136 time_pos = l.index(k)
135 except ValueError:
137 except ValueError:
136 time_pos = False
138 time_pos = False
137
139
138 if time_pos >= 0 and time_pos is not False:
140 if time_pos >= 0 and time_pos is not False:
139
141
140 datadict = commits_by_day_author_aggregate\
142 datadict = commits_by_day_author_aggregate\
141 [author_key_cleaner(cs.author)]['data'][time_pos]
143 [author_key_cleaner(cs.author)]['data'][time_pos]
142
144
143 datadict["commits"] += 1
145 datadict["commits"] += 1
144 datadict["added"] += len(cs.added)
146 datadict["added"] += len(cs.added)
145 datadict["changed"] += len(cs.changed)
147 datadict["changed"] += len(cs.changed)
146 datadict["removed"] += len(cs.removed)
148 datadict["removed"] += len(cs.removed)
147 #print datadict
149 #print datadict
148
150
149 else:
151 else:
150 #print 'ELSE !!!!'
152 #print 'ELSE !!!!'
151 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
153 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
152
154
153 datadict = {"time":k,
155 datadict = {"time":k,
154 "commits":1,
156 "commits":1,
155 "added":len(cs.added),
157 "added":len(cs.added),
156 "changed":len(cs.changed),
158 "changed":len(cs.changed),
157 "removed":len(cs.removed),
159 "removed":len(cs.removed),
158 }
160 }
159 commits_by_day_author_aggregate\
161 commits_by_day_author_aggregate\
160 [author_key_cleaner(cs.author)]['data'].append(datadict)
162 [author_key_cleaner(cs.author)]['data'].append(datadict)
161
163
162 else:
164 else:
163 #print k, 'nokey ADDING'
165 #print k, 'nokey ADDING'
164 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
166 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
165 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
167 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
166 "label":author_key_cleaner(cs.author),
168 "label":author_key_cleaner(cs.author),
167 "data":[{"time":k,
169 "data":[{"time":k,
168 "commits":1,
170 "commits":1,
169 "added":len(cs.added),
171 "added":len(cs.added),
170 "changed":len(cs.changed),
172 "changed":len(cs.changed),
171 "removed":len(cs.removed),
173 "removed":len(cs.removed),
172 }],
174 }],
173 "schema":["commits"],
175 "schema":["commits"],
174 }
176 }
175
177
176 # #gather all data by day
178 # #gather all data by day
177 if commits_by_day_aggregate.has_key(k):
179 if commits_by_day_aggregate.has_key(k):
178 commits_by_day_aggregate[k] += 1
180 commits_by_day_aggregate[k] += 1
179 else:
181 else:
180 commits_by_day_aggregate[k] = 1
182 commits_by_day_aggregate[k] = 1
181
183
182 if cnt >= parse_limit:
184 if cnt >= parse_limit:
183 #don't fetch to much data since we can freeze application
185 #don't fetch to much data since we can freeze application
184 break
186 break
185
187
186 overview_data = []
188 overview_data = []
187 for k, v in commits_by_day_aggregate.items():
189 for k, v in commits_by_day_aggregate.items():
188 overview_data.append([k, v])
190 overview_data.append([k, v])
189 overview_data = sorted(overview_data, key=itemgetter(0))
191 overview_data = sorted(overview_data, key=itemgetter(0))
190
192
191 if not commits_by_day_author_aggregate:
193 if not commits_by_day_author_aggregate:
192 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
194 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
193 "label":author_key_cleaner(repo.contact),
195 "label":author_key_cleaner(repo.contact),
194 "data":[0, 1],
196 "data":[0, 1],
195 "schema":["commits"],
197 "schema":["commits"],
196 }
198 }
197
199
198 stats = cur_stats if cur_stats else Statistics()
200 stats = cur_stats if cur_stats else Statistics()
199 stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
201 stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
200 stats.commit_activity_combined = json.dumps(overview_data)
202 stats.commit_activity_combined = json.dumps(overview_data)
201
203
202 log.debug('last revison %s', last_rev)
204 log.debug('last revison %s', last_rev)
203 leftovers = len(repo.revisions[last_rev:])
205 leftovers = len(repo.revisions[last_rev:])
204 log.debug('revisions to parse %s', leftovers)
206 log.debug('revisions to parse %s', leftovers)
205
207
206 if last_rev == 0 or leftovers < parse_limit:
208 if last_rev == 0 or leftovers < parse_limit:
207 stats.languages = json.dumps(__get_codes_stats(repo_name))
209 stats.languages = json.dumps(__get_codes_stats(repo_name))
208
210
209 stats.repository = dbrepo
211 stats.repository = dbrepo
210 stats.stat_on_revision = last_cs.revision
212 stats.stat_on_revision = last_cs.revision
211
213
212 try:
214 try:
213 sa.add(stats)
215 sa.add(stats)
214 sa.commit()
216 sa.commit()
215 except:
217 except:
216 log.error(traceback.format_exc())
218 log.error(traceback.format_exc())
217 sa.rollback()
219 sa.rollback()
218 return False
220 return False
219 if len(repo.revisions) > 1:
221 if len(repo.revisions) > 1:
220 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
222 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
221
223
222 return True
224 return True
223
225
224 @task
226 @task
225 def reset_user_password(user_email):
227 def reset_user_password(user_email):
226 log = reset_user_password.get_logger()
228 log = reset_user_password.get_logger()
227 from rhodecode.lib import auth
229 from rhodecode.lib import auth
228 from rhodecode.model.db import User
230 from rhodecode.model.db import User
229
231
230 try:
232 try:
231 try:
233 try:
232 sa = get_session()
234 sa = get_session()
233 user = sa.query(User).filter(User.email == user_email).scalar()
235 user = sa.query(User).filter(User.email == user_email).scalar()
234 new_passwd = auth.PasswordGenerator().gen_password(8,
236 new_passwd = auth.PasswordGenerator().gen_password(8,
235 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
237 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
236 if user:
238 if user:
237 user.password = auth.get_crypt_password(new_passwd)
239 user.password = auth.get_crypt_password(new_passwd)
238 sa.add(user)
240 sa.add(user)
239 sa.commit()
241 sa.commit()
240 log.info('change password for %s', user_email)
242 log.info('change password for %s', user_email)
241 if new_passwd is None:
243 if new_passwd is None:
242 raise Exception('unable to generate new password')
244 raise Exception('unable to generate new password')
243
245
244 except:
246 except:
245 log.error(traceback.format_exc())
247 log.error(traceback.format_exc())
246 sa.rollback()
248 sa.rollback()
247
249
248 run_task(send_email, user_email,
250 run_task(send_email, user_email,
249 "Your new rhodecode password",
251 "Your new rhodecode password",
250 'Your new rhodecode password:%s' % (new_passwd))
252 'Your new rhodecode password:%s' % (new_passwd))
251 log.info('send new password mail to %s', user_email)
253 log.info('send new password mail to %s', user_email)
252
254
253
255
254 except:
256 except:
255 log.error('Failed to update user password')
257 log.error('Failed to update user password')
256 log.error(traceback.format_exc())
258 log.error(traceback.format_exc())
257 return True
259 return True
258
260
259 @task
261 @task
260 def send_email(recipients, subject, body):
262 def send_email(recipients, subject, body):
261 log = send_email.get_logger()
263 log = send_email.get_logger()
262 email_config = dict(config.items('DEFAULT'))
264 email_config = dict(config.items('DEFAULT'))
263 mail_from = email_config.get('app_email_from')
265 mail_from = email_config.get('app_email_from')
264 user = email_config.get('smtp_username')
266 user = email_config.get('smtp_username')
265 passwd = email_config.get('smtp_password')
267 passwd = email_config.get('smtp_password')
266 mail_server = email_config.get('smtp_server')
268 mail_server = email_config.get('smtp_server')
267 mail_port = email_config.get('smtp_port')
269 mail_port = email_config.get('smtp_port')
268 tls = email_config.get('smtp_use_tls')
270 tls = email_config.get('smtp_use_tls')
269 ssl = False
271 ssl = False
270
272
271 try:
273 try:
272 m = SmtpMailer(mail_from, user, passwd, mail_server,
274 m = SmtpMailer(mail_from, user, passwd, mail_server,
273 mail_port, ssl, tls)
275 mail_port, ssl, tls)
274 m.send(recipients, subject, body)
276 m.send(recipients, subject, body)
275 except:
277 except:
276 log.error('Mail sending failed')
278 log.error('Mail sending failed')
277 log.error(traceback.format_exc())
279 log.error(traceback.format_exc())
278 return False
280 return False
279 return True
281 return True
280
282
281 @task
283 @task
282 def create_repo_fork(form_data, cur_user):
284 def create_repo_fork(form_data, cur_user):
283 import os
285 import os
284 from rhodecode.model.repo_model import RepoModel
286 from rhodecode.model.repo_model import RepoModel
285 sa = get_session()
287 sa = get_session()
286 rm = RepoModel(sa)
288 rm = RepoModel(sa)
287
289
288 rm.create(form_data, cur_user, just_db=True, fork=True)
290 rm.create(form_data, cur_user, just_db=True, fork=True)
289
291
290 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
292 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
291 repo_path = os.path.join(repos_path, form_data['repo_name'])
293 repo_path = os.path.join(repos_path, form_data['repo_name'])
292 repo_fork_path = os.path.join(repos_path, form_data['fork_name'])
294 repo_fork_path = os.path.join(repos_path, form_data['fork_name'])
293
295
294 MercurialRepository(str(repo_fork_path), True, clone_url=str(repo_path))
296 MercurialRepository(str(repo_fork_path), True, clone_url=str(repo_path))
295
297
296
298
297 def __get_codes_stats(repo_name):
299 def __get_codes_stats(repo_name):
298 LANGUAGES_EXTENSIONS = ['action', 'adp', 'ashx', 'asmx', 'aspx', 'asx', 'axd', 'c',
300 LANGUAGES_EXTENSIONS = ['action', 'adp', 'ashx', 'asmx', 'aspx', 'asx', 'axd', 'c',
299 'cfg', 'cfm', 'cpp', 'cs', 'diff', 'do', 'el', 'erl',
301 'cfg', 'cfm', 'cpp', 'cs', 'diff', 'do', 'el', 'erl',
300 'h', 'java', 'js', 'jsp', 'jspx', 'lisp',
302 'h', 'java', 'js', 'jsp', 'jspx', 'lisp',
301 'lua', 'm', 'mako', 'ml', 'pas', 'patch', 'php', 'php3',
303 'lua', 'm', 'mako', 'ml', 'pas', 'patch', 'php', 'php3',
302 'php4', 'phtml', 'pm', 'py', 'rb', 'rst', 's', 'sh',
304 'php4', 'phtml', 'pm', 'py', 'rb', 'rst', 's', 'sh',
303 'tpl', 'txt', 'vim', 'wss', 'xhtml', 'xml', 'xsl', 'xslt',
305 'tpl', 'txt', 'vim', 'wss', 'xhtml', 'xml', 'xsl', 'xslt',
304 'yaws']
306 'yaws']
305 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
307 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
306 repo = MercurialRepository(repos_path + repo_name)
308 repo = MercurialRepository(repos_path + repo_name)
307
309
308 code_stats = {}
310 code_stats = {}
309 for topnode, dirs, files in repo.walk('/', 'tip'):
311 for topnode, dirs, files in repo.walk('/', 'tip'):
310 for f in files:
312 for f in files:
311 k = f.mimetype
313 k = f.mimetype
312 if f.extension in LANGUAGES_EXTENSIONS:
314 if f.extension in LANGUAGES_EXTENSIONS:
313 if code_stats.has_key(k):
315 if code_stats.has_key(k):
314 code_stats[k] += 1
316 code_stats[k] += 1
315 else:
317 else:
316 code_stats[k] = 1
318 code_stats[k] = 1
317
319
318 return code_stats or {}
320 return code_stats or {}
319
321
320
322
321
323
322
324
323
325
General Comments 0
You need to be logged in to leave comments. Login now