##// END OF EJS Templates
removed pidlock from whoosh and added it as locked_task decorator
marcink -
r504:d280aa1c default
parent child Browse files
Show More
@@ -1,270 +1,261
1 from celery.decorators import task
1 from celery.decorators import task
2 from celery.task.sets import subtask
2 from celery.task.sets import subtask
3 from celeryconfig import PYLONS_CONFIG as config
3 from celeryconfig import PYLONS_CONFIG as config
4 from pylons.i18n.translation import _
4 from pylons.i18n.translation import _
5 from pylons_app.lib.celerylib import run_task, locked_task
5 from pylons_app.lib.celerylib import run_task, locked_task
6 from pylons_app.lib.helpers import person
6 from pylons_app.lib.helpers import person
7 from pylons_app.lib.smtp_mailer import SmtpMailer
7 from pylons_app.lib.smtp_mailer import SmtpMailer
8 from pylons_app.lib.utils import OrderedDict
8 from pylons_app.lib.utils import OrderedDict
9 from operator import itemgetter
9 from operator import itemgetter
10 from vcs.backends.hg import MercurialRepository
10 from vcs.backends.hg import MercurialRepository
11 from time import mktime
11 from time import mktime
12 import traceback
12 import traceback
13 import json
13 import json
14
14
15 __all__ = ['whoosh_index', 'get_commits_stats',
15 __all__ = ['whoosh_index', 'get_commits_stats',
16 'reset_user_password', 'send_email']
16 'reset_user_password', 'send_email']
17
17
18 def get_session():
18 def get_session():
19 from sqlalchemy import engine_from_config
19 from sqlalchemy import engine_from_config
20 from sqlalchemy.orm import sessionmaker, scoped_session
20 from sqlalchemy.orm import sessionmaker, scoped_session
21 engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
21 engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
22 sa = scoped_session(sessionmaker(bind=engine))
22 sa = scoped_session(sessionmaker(bind=engine))
23 return sa
23 return sa
24
24
25 def get_hg_settings():
25 def get_hg_settings():
26 from pylons_app.model.db import HgAppSettings
26 from pylons_app.model.db import HgAppSettings
27 try:
27 try:
28 sa = get_session()
28 sa = get_session()
29 ret = sa.query(HgAppSettings).all()
29 ret = sa.query(HgAppSettings).all()
30 finally:
30 finally:
31 sa.remove()
31 sa.remove()
32
32
33 if not ret:
33 if not ret:
34 raise Exception('Could not get application settings !')
34 raise Exception('Could not get application settings !')
35 settings = {}
35 settings = {}
36 for each in ret:
36 for each in ret:
37 settings['hg_app_' + each.app_settings_name] = each.app_settings_value
37 settings['hg_app_' + each.app_settings_name] = each.app_settings_value
38
38
39 return settings
39 return settings
40
40
41 def get_hg_ui_settings():
41 def get_hg_ui_settings():
42 from pylons_app.model.db import HgAppUi
42 from pylons_app.model.db import HgAppUi
43 try:
43 try:
44 sa = get_session()
44 sa = get_session()
45 ret = sa.query(HgAppUi).all()
45 ret = sa.query(HgAppUi).all()
46 finally:
46 finally:
47 sa.remove()
47 sa.remove()
48
48
49 if not ret:
49 if not ret:
50 raise Exception('Could not get application ui settings !')
50 raise Exception('Could not get application ui settings !')
51 settings = {}
51 settings = {}
52 for each in ret:
52 for each in ret:
53 k = each.ui_key
53 k = each.ui_key
54 v = each.ui_value
54 v = each.ui_value
55 if k == '/':
55 if k == '/':
56 k = 'root_path'
56 k = 'root_path'
57
57
58 if k.find('.') != -1:
58 if k.find('.') != -1:
59 k = k.replace('.', '_')
59 k = k.replace('.', '_')
60
60
61 if each.ui_section == 'hooks':
61 if each.ui_section == 'hooks':
62 v = each.ui_active
62 v = each.ui_active
63
63
64 settings[each.ui_section + '_' + k] = v
64 settings[each.ui_section + '_' + k] = v
65
65
66 return settings
66 return settings
67
67
68 @task
68 @task
69 @locked_task
69 def whoosh_index(repo_location, full_index):
70 def whoosh_index(repo_location, full_index):
70 log = whoosh_index.get_logger()
71 log = whoosh_index.get_logger()
71 from pylons_app.lib.pidlock import DaemonLock
72 from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon
72 from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon, LockHeld
73 WhooshIndexingDaemon(repo_location=repo_location).run(full_index=full_index)
73 try:
74 l = DaemonLock()
75 WhooshIndexingDaemon(repo_location=repo_location)\
76 .run(full_index=full_index)
77 l.release()
78 return 'Done'
79 except LockHeld:
80 log.info('LockHeld')
81 return 'LockHeld'
82
83
74
84 @task
75 @task
85 @locked_task
76 @locked_task
86 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
77 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
87 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
78 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
88
79
89 from pylons_app.model.db import Statistics, Repository
80 from pylons_app.model.db import Statistics, Repository
90 log = get_commits_stats.get_logger()
81 log = get_commits_stats.get_logger()
91 commits_by_day_author_aggregate = {}
82 commits_by_day_author_aggregate = {}
92 commits_by_day_aggregate = {}
83 commits_by_day_aggregate = {}
93 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
84 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
94 repo = MercurialRepository(repos_path + repo_name)
85 repo = MercurialRepository(repos_path + repo_name)
95
86
96 skip_date_limit = True
87 skip_date_limit = True
97 parse_limit = 350 #limit for single task changeset parsing
88 parse_limit = 350 #limit for single task changeset parsing
98 last_rev = 0
89 last_rev = 0
99 last_cs = None
90 last_cs = None
100 timegetter = itemgetter('time')
91 timegetter = itemgetter('time')
101
92
102 sa = get_session()
93 sa = get_session()
103
94
104 dbrepo = sa.query(Repository)\
95 dbrepo = sa.query(Repository)\
105 .filter(Repository.repo_name == repo_name).scalar()
96 .filter(Repository.repo_name == repo_name).scalar()
106 cur_stats = sa.query(Statistics)\
97 cur_stats = sa.query(Statistics)\
107 .filter(Statistics.repository == dbrepo).scalar()
98 .filter(Statistics.repository == dbrepo).scalar()
108 if cur_stats:
99 if cur_stats:
109 last_rev = cur_stats.stat_on_revision
100 last_rev = cur_stats.stat_on_revision
110
101
111 if last_rev == repo.revisions[-1]:
102 if last_rev == repo.revisions[-1]:
112 #pass silently without any work
103 #pass silently without any work
113 return True
104 return True
114
105
115 if cur_stats:
106 if cur_stats:
116 commits_by_day_aggregate = OrderedDict(
107 commits_by_day_aggregate = OrderedDict(
117 json.loads(
108 json.loads(
118 cur_stats.commit_activity_combined))
109 cur_stats.commit_activity_combined))
119 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
110 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
120
111
121 for cnt, rev in enumerate(repo.revisions[last_rev:]):
112 for cnt, rev in enumerate(repo.revisions[last_rev:]):
122 last_cs = cs = repo.get_changeset(rev)
113 last_cs = cs = repo.get_changeset(rev)
123 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
114 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
124 cs.date.timetuple()[2])
115 cs.date.timetuple()[2])
125 timetupple = [int(x) for x in k.split('-')]
116 timetupple = [int(x) for x in k.split('-')]
126 timetupple.extend([0 for _ in xrange(6)])
117 timetupple.extend([0 for _ in xrange(6)])
127 k = mktime(timetupple)
118 k = mktime(timetupple)
128 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
119 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
129 try:
120 try:
130 l = [timegetter(x) for x in commits_by_day_author_aggregate\
121 l = [timegetter(x) for x in commits_by_day_author_aggregate\
131 [author_key_cleaner(cs.author)]['data']]
122 [author_key_cleaner(cs.author)]['data']]
132 time_pos = l.index(k)
123 time_pos = l.index(k)
133 except ValueError:
124 except ValueError:
134 time_pos = False
125 time_pos = False
135
126
136 if time_pos >= 0 and time_pos is not False:
127 if time_pos >= 0 and time_pos is not False:
137
128
138 datadict = commits_by_day_author_aggregate\
129 datadict = commits_by_day_author_aggregate\
139 [author_key_cleaner(cs.author)]['data'][time_pos]
130 [author_key_cleaner(cs.author)]['data'][time_pos]
140
131
141 datadict["commits"] += 1
132 datadict["commits"] += 1
142 datadict["added"] += len(cs.added)
133 datadict["added"] += len(cs.added)
143 datadict["changed"] += len(cs.changed)
134 datadict["changed"] += len(cs.changed)
144 datadict["removed"] += len(cs.removed)
135 datadict["removed"] += len(cs.removed)
145 #print datadict
136 #print datadict
146
137
147 else:
138 else:
148 #print 'ELSE !!!!'
139 #print 'ELSE !!!!'
149 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
140 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
150
141
151 datadict = {"time":k,
142 datadict = {"time":k,
152 "commits":1,
143 "commits":1,
153 "added":len(cs.added),
144 "added":len(cs.added),
154 "changed":len(cs.changed),
145 "changed":len(cs.changed),
155 "removed":len(cs.removed),
146 "removed":len(cs.removed),
156 }
147 }
157 commits_by_day_author_aggregate\
148 commits_by_day_author_aggregate\
158 [author_key_cleaner(cs.author)]['data'].append(datadict)
149 [author_key_cleaner(cs.author)]['data'].append(datadict)
159
150
160 else:
151 else:
161 #print k, 'nokey ADDING'
152 #print k, 'nokey ADDING'
162 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
153 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
163 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
154 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
164 "label":author_key_cleaner(cs.author),
155 "label":author_key_cleaner(cs.author),
165 "data":[{"time":k,
156 "data":[{"time":k,
166 "commits":1,
157 "commits":1,
167 "added":len(cs.added),
158 "added":len(cs.added),
168 "changed":len(cs.changed),
159 "changed":len(cs.changed),
169 "removed":len(cs.removed),
160 "removed":len(cs.removed),
170 }],
161 }],
171 "schema":["commits"],
162 "schema":["commits"],
172 }
163 }
173
164
174 # #gather all data by day
165 # #gather all data by day
175 if commits_by_day_aggregate.has_key(k):
166 if commits_by_day_aggregate.has_key(k):
176 commits_by_day_aggregate[k] += 1
167 commits_by_day_aggregate[k] += 1
177 else:
168 else:
178 commits_by_day_aggregate[k] = 1
169 commits_by_day_aggregate[k] = 1
179
170
180 if cnt >= parse_limit:
171 if cnt >= parse_limit:
181 #don't fetch to much data since we can freeze application
172 #don't fetch to much data since we can freeze application
182 break
173 break
183
174
184 overview_data = []
175 overview_data = []
185 for k, v in commits_by_day_aggregate.items():
176 for k, v in commits_by_day_aggregate.items():
186 overview_data.append([k, v])
177 overview_data.append([k, v])
187 overview_data = sorted(overview_data, key=itemgetter(0))
178 overview_data = sorted(overview_data, key=itemgetter(0))
188
179
189 if not commits_by_day_author_aggregate:
180 if not commits_by_day_author_aggregate:
190 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
181 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
191 "label":author_key_cleaner(repo.contact),
182 "label":author_key_cleaner(repo.contact),
192 "data":[0, 1],
183 "data":[0, 1],
193 "schema":["commits"],
184 "schema":["commits"],
194 }
185 }
195
186
196 stats = cur_stats if cur_stats else Statistics()
187 stats = cur_stats if cur_stats else Statistics()
197 stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
188 stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
198 stats.commit_activity_combined = json.dumps(overview_data)
189 stats.commit_activity_combined = json.dumps(overview_data)
199 stats.repository = dbrepo
190 stats.repository = dbrepo
200 stats.stat_on_revision = last_cs.revision
191 stats.stat_on_revision = last_cs.revision
201 stats.languages = json.dumps({'_TOTAL_':0, '':0})
192 stats.languages = json.dumps({'_TOTAL_':0, '':0})
202
193
203 try:
194 try:
204 sa.add(stats)
195 sa.add(stats)
205 sa.commit()
196 sa.commit()
206 except:
197 except:
207 log.error(traceback.format_exc())
198 log.error(traceback.format_exc())
208 sa.rollback()
199 sa.rollback()
209 return False
200 return False
210
201
211 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
202 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
212
203
213 return True
204 return True
214
205
215 @task
206 @task
216 def reset_user_password(user_email):
207 def reset_user_password(user_email):
217 log = reset_user_password.get_logger()
208 log = reset_user_password.get_logger()
218 from pylons_app.lib import auth
209 from pylons_app.lib import auth
219 from pylons_app.model.db import User
210 from pylons_app.model.db import User
220
211
221 try:
212 try:
222 try:
213 try:
223 sa = get_session()
214 sa = get_session()
224 user = sa.query(User).filter(User.email == user_email).scalar()
215 user = sa.query(User).filter(User.email == user_email).scalar()
225 new_passwd = auth.PasswordGenerator().gen_password(8,
216 new_passwd = auth.PasswordGenerator().gen_password(8,
226 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
217 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
227 if user:
218 if user:
228 user.password = auth.get_crypt_password(new_passwd)
219 user.password = auth.get_crypt_password(new_passwd)
229 sa.add(user)
220 sa.add(user)
230 sa.commit()
221 sa.commit()
231 log.info('change password for %s', user_email)
222 log.info('change password for %s', user_email)
232 if new_passwd is None:
223 if new_passwd is None:
233 raise Exception('unable to generate new password')
224 raise Exception('unable to generate new password')
234
225
235 except:
226 except:
236 log.error(traceback.format_exc())
227 log.error(traceback.format_exc())
237 sa.rollback()
228 sa.rollback()
238
229
239 run_task(send_email, user_email,
230 run_task(send_email, user_email,
240 "Your new hg-app password",
231 "Your new hg-app password",
241 'Your new hg-app password:%s' % (new_passwd))
232 'Your new hg-app password:%s' % (new_passwd))
242 log.info('send new password mail to %s', user_email)
233 log.info('send new password mail to %s', user_email)
243
234
244
235
245 except:
236 except:
246 log.error('Failed to update user password')
237 log.error('Failed to update user password')
247 log.error(traceback.format_exc())
238 log.error(traceback.format_exc())
248 return True
239 return True
249
240
250 @task
241 @task
251 def send_email(recipients, subject, body):
242 def send_email(recipients, subject, body):
252 log = send_email.get_logger()
243 log = send_email.get_logger()
253 email_config = dict(config.items('DEFAULT'))
244 email_config = dict(config.items('DEFAULT'))
254 mail_from = email_config.get('app_email_from')
245 mail_from = email_config.get('app_email_from')
255 user = email_config.get('smtp_username')
246 user = email_config.get('smtp_username')
256 passwd = email_config.get('smtp_password')
247 passwd = email_config.get('smtp_password')
257 mail_server = email_config.get('smtp_server')
248 mail_server = email_config.get('smtp_server')
258 mail_port = email_config.get('smtp_port')
249 mail_port = email_config.get('smtp_port')
259 tls = email_config.get('smtp_use_tls')
250 tls = email_config.get('smtp_use_tls')
260 ssl = False
251 ssl = False
261
252
262 try:
253 try:
263 m = SmtpMailer(mail_from, user, passwd, mail_server,
254 m = SmtpMailer(mail_from, user, passwd, mail_server,
264 mail_port, ssl, tls)
255 mail_port, ssl, tls)
265 m.send(recipients, subject, body)
256 m.send(recipients, subject, body)
266 except:
257 except:
267 log.error('Mail sending failed')
258 log.error('Mail sending failed')
268 log.error(traceback.format_exc())
259 log.error(traceback.format_exc())
269 return False
260 return False
270 return True
261 return True
@@ -1,127 +1,105
1 import os, time
1 import os, time
2 import sys
2 import sys
3 from warnings import warn
3 from warnings import warn
4
4
5 class LockHeld(Exception):pass
5 class LockHeld(Exception):pass
6
6
7
7
8 class DaemonLock(object):
8 class DaemonLock(object):
9 """daemon locking
9 """daemon locking
10 USAGE:
10 USAGE:
11 try:
11 try:
12 l = lock()
12 l = DaemonLock(desc='test lock')
13 main()
13 main()
14 l.release()
14 l.release()
15 except LockHeld:
15 except LockHeld:
16 sys.exit(1)
16 sys.exit(1)
17 """
17 """
18
18
19 def __init__(self, file=None, callbackfn=None,
19 def __init__(self, file=None, callbackfn=None,
20 desc='daemon lock', debug=False):
20 desc='daemon lock', debug=False):
21
21
22 self.pidfile = file if file else os.path.join(os.path.dirname(__file__),
22 self.pidfile = file if file else os.path.join(os.path.dirname(__file__),
23 'running.lock')
23 'running.lock')
24 self.callbackfn = callbackfn
24 self.callbackfn = callbackfn
25 self.desc = desc
25 self.desc = desc
26 self.debug = debug
26 self.debug = debug
27 self.held = False
27 self.held = False
28 #run the lock automatically !
28 #run the lock automatically !
29 self.lock()
29 self.lock()
30
30
31 def __del__(self):
31 def __del__(self):
32 if self.held:
32 if self.held:
33
33
34 # warn("use lock.release instead of del lock",
34 # warn("use lock.release instead of del lock",
35 # category = DeprecationWarning,
35 # category = DeprecationWarning,
36 # stacklevel = 2)
36 # stacklevel = 2)
37
37
38 # ensure the lock will be removed
38 # ensure the lock will be removed
39 self.release()
39 self.release()
40
40
41
41
42 def lock(self):
42 def lock(self):
43 """
43 """locking function, if lock is present it will raise LockHeld exception
44 locking function, if lock is present it will raise LockHeld exception
45 """
44 """
46 lockname = '%s' % (os.getpid())
45 lockname = '%s' % (os.getpid())
47
46
48 self.trylock()
47 self.trylock()
49 self.makelock(lockname, self.pidfile)
48 self.makelock(lockname, self.pidfile)
50 return True
49 return True
51
50
52 def trylock(self):
51 def trylock(self):
53 running_pid = False
52 running_pid = False
54 try:
53 try:
55 pidfile = open(self.pidfile, "r")
54 pidfile = open(self.pidfile, "r")
56 pidfile.seek(0)
55 pidfile.seek(0)
57 running_pid = pidfile.readline()
56 running_pid = pidfile.readline()
58 if self.debug:
57 if self.debug:
59 print 'lock file present running_pid: %s, checking for execution'\
58 print 'lock file present running_pid: %s, checking for execution'\
60 % running_pid
59 % running_pid
61 # Now we check the PID from lock file matches to the current
60 # Now we check the PID from lock file matches to the current
62 # process PID
61 # process PID
63 if running_pid:
62 if running_pid:
64 if os.path.exists("/proc/%s" % running_pid):
63 if os.path.exists("/proc/%s" % running_pid):
65 print "You already have an instance of the program running"
64 print "You already have an instance of the program running"
66 print "It is running as process %s" % running_pid
65 print "It is running as process %s" % running_pid
67 raise LockHeld
66 raise LockHeld
68 else:
67 else:
69 print "Lock File is there but the program is not running"
68 print "Lock File is there but the program is not running"
70 print "Removing lock file for the: %s" % running_pid
69 print "Removing lock file for the: %s" % running_pid
71 self.release()
70 self.release()
72 except IOError, e:
71 except IOError, e:
73 if e.errno != 2:
72 if e.errno != 2:
74 raise
73 raise
75
74
76
75
77 def release(self):
76 def release(self):
78 """
77 """releases the pid by removing the pidfile
79 releases the pid by removing the pidfile
80 """
78 """
81 if self.callbackfn:
79 if self.callbackfn:
82 #execute callback function on release
80 #execute callback function on release
83 if self.debug:
81 if self.debug:
84 print 'executing callback function %s' % self.callbackfn
82 print 'executing callback function %s' % self.callbackfn
85 self.callbackfn()
83 self.callbackfn()
86 try:
84 try:
87 if self.debug:
85 if self.debug:
88 print 'removing pidfile %s' % self.pidfile
86 print 'removing pidfile %s' % self.pidfile
89 os.remove(self.pidfile)
87 os.remove(self.pidfile)
90 self.held = False
88 self.held = False
91 except OSError, e:
89 except OSError, e:
92 if self.debug:
90 if self.debug:
93 print 'removing pidfile failed %s' % e
91 print 'removing pidfile failed %s' % e
94 pass
92 pass
95
93
96 def makelock(self, lockname, pidfile):
94 def makelock(self, lockname, pidfile):
97 """
95 """
98 this function will make an actual lock
96 this function will make an actual lock
99 @param lockname: acctual pid of file
97 @param lockname: acctual pid of file
100 @param pidfile: the file to write the pid in
98 @param pidfile: the file to write the pid in
101 """
99 """
102 if self.debug:
100 if self.debug:
103 print 'creating a file %s and pid: %s' % (pidfile, lockname)
101 print 'creating a file %s and pid: %s' % (pidfile, lockname)
104 pidfile = open(self.pidfile, "wb")
102 pidfile = open(self.pidfile, "wb")
105 pidfile.write(lockname)
103 pidfile.write(lockname)
106 pidfile.close
104 pidfile.close
107 self.held = True
105 self.held = True
108
109
110 def main():
111 print 'func is running'
112 cnt = 20
113 while 1:
114 print cnt
115 if cnt == 0:
116 break
117 time.sleep(1)
118 cnt -= 1
119
120
121 if __name__ == "__main__":
122 try:
123 l = DaemonLock(desc='test lock')
124 main()
125 l.release()
126 except LockHeld:
127 sys.exit(1)
General Comments 0
You need to be logged in to leave comments. Login now