##// END OF EJS Templates
removed pidlock from whoosh and added it as locked_task decorator
marcink -
r504:d280aa1c default
parent child Browse files
Show More
@@ -1,270 +1,261
1 1 from celery.decorators import task
2 2 from celery.task.sets import subtask
3 3 from celeryconfig import PYLONS_CONFIG as config
4 4 from pylons.i18n.translation import _
5 5 from pylons_app.lib.celerylib import run_task, locked_task
6 6 from pylons_app.lib.helpers import person
7 7 from pylons_app.lib.smtp_mailer import SmtpMailer
8 8 from pylons_app.lib.utils import OrderedDict
9 9 from operator import itemgetter
10 10 from vcs.backends.hg import MercurialRepository
11 11 from time import mktime
12 12 import traceback
13 13 import json
14 14
15 15 __all__ = ['whoosh_index', 'get_commits_stats',
16 16 'reset_user_password', 'send_email']
17 17
18 18 def get_session():
19 19 from sqlalchemy import engine_from_config
20 20 from sqlalchemy.orm import sessionmaker, scoped_session
21 21 engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
22 22 sa = scoped_session(sessionmaker(bind=engine))
23 23 return sa
24 24
25 25 def get_hg_settings():
26 26 from pylons_app.model.db import HgAppSettings
27 27 try:
28 28 sa = get_session()
29 29 ret = sa.query(HgAppSettings).all()
30 30 finally:
31 31 sa.remove()
32 32
33 33 if not ret:
34 34 raise Exception('Could not get application settings !')
35 35 settings = {}
36 36 for each in ret:
37 37 settings['hg_app_' + each.app_settings_name] = each.app_settings_value
38 38
39 39 return settings
40 40
41 41 def get_hg_ui_settings():
42 42 from pylons_app.model.db import HgAppUi
43 43 try:
44 44 sa = get_session()
45 45 ret = sa.query(HgAppUi).all()
46 46 finally:
47 47 sa.remove()
48 48
49 49 if not ret:
50 50 raise Exception('Could not get application ui settings !')
51 51 settings = {}
52 52 for each in ret:
53 53 k = each.ui_key
54 54 v = each.ui_value
55 55 if k == '/':
56 56 k = 'root_path'
57 57
58 58 if k.find('.') != -1:
59 59 k = k.replace('.', '_')
60 60
61 61 if each.ui_section == 'hooks':
62 62 v = each.ui_active
63 63
64 64 settings[each.ui_section + '_' + k] = v
65 65
66 66 return settings
67 67
68 68 @task
69 @locked_task
69 70 def whoosh_index(repo_location, full_index):
70 71 log = whoosh_index.get_logger()
71 from pylons_app.lib.pidlock import DaemonLock
72 from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon, LockHeld
73 try:
74 l = DaemonLock()
75 WhooshIndexingDaemon(repo_location=repo_location)\
76 .run(full_index=full_index)
77 l.release()
78 return 'Done'
79 except LockHeld:
80 log.info('LockHeld')
81 return 'LockHeld'
82
72 from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon
73 WhooshIndexingDaemon(repo_location=repo_location).run(full_index=full_index)
83 74
84 75 @task
85 76 @locked_task
86 77 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
87 78 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
88 79
89 80 from pylons_app.model.db import Statistics, Repository
90 81 log = get_commits_stats.get_logger()
91 82 commits_by_day_author_aggregate = {}
92 83 commits_by_day_aggregate = {}
93 84 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
94 85 repo = MercurialRepository(repos_path + repo_name)
95 86
96 87 skip_date_limit = True
97 88 parse_limit = 350 #limit for single task changeset parsing
98 89 last_rev = 0
99 90 last_cs = None
100 91 timegetter = itemgetter('time')
101 92
102 93 sa = get_session()
103 94
104 95 dbrepo = sa.query(Repository)\
105 96 .filter(Repository.repo_name == repo_name).scalar()
106 97 cur_stats = sa.query(Statistics)\
107 98 .filter(Statistics.repository == dbrepo).scalar()
108 99 if cur_stats:
109 100 last_rev = cur_stats.stat_on_revision
110 101
111 102 if last_rev == repo.revisions[-1]:
112 103 #pass silently without any work
113 104 return True
114 105
115 106 if cur_stats:
116 107 commits_by_day_aggregate = OrderedDict(
117 108 json.loads(
118 109 cur_stats.commit_activity_combined))
119 110 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
120 111
121 112 for cnt, rev in enumerate(repo.revisions[last_rev:]):
122 113 last_cs = cs = repo.get_changeset(rev)
123 114 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
124 115 cs.date.timetuple()[2])
125 116 timetupple = [int(x) for x in k.split('-')]
126 117 timetupple.extend([0 for _ in xrange(6)])
127 118 k = mktime(timetupple)
128 119 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
129 120 try:
130 121 l = [timegetter(x) for x in commits_by_day_author_aggregate\
131 122 [author_key_cleaner(cs.author)]['data']]
132 123 time_pos = l.index(k)
133 124 except ValueError:
134 125 time_pos = False
135 126
136 127 if time_pos >= 0 and time_pos is not False:
137 128
138 129 datadict = commits_by_day_author_aggregate\
139 130 [author_key_cleaner(cs.author)]['data'][time_pos]
140 131
141 132 datadict["commits"] += 1
142 133 datadict["added"] += len(cs.added)
143 134 datadict["changed"] += len(cs.changed)
144 135 datadict["removed"] += len(cs.removed)
145 136 #print datadict
146 137
147 138 else:
148 139 #print 'ELSE !!!!'
149 140 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
150 141
151 142 datadict = {"time":k,
152 143 "commits":1,
153 144 "added":len(cs.added),
154 145 "changed":len(cs.changed),
155 146 "removed":len(cs.removed),
156 147 }
157 148 commits_by_day_author_aggregate\
158 149 [author_key_cleaner(cs.author)]['data'].append(datadict)
159 150
160 151 else:
161 152 #print k, 'nokey ADDING'
162 153 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
163 154 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
164 155 "label":author_key_cleaner(cs.author),
165 156 "data":[{"time":k,
166 157 "commits":1,
167 158 "added":len(cs.added),
168 159 "changed":len(cs.changed),
169 160 "removed":len(cs.removed),
170 161 }],
171 162 "schema":["commits"],
172 163 }
173 164
174 165 # #gather all data by day
175 166 if commits_by_day_aggregate.has_key(k):
176 167 commits_by_day_aggregate[k] += 1
177 168 else:
178 169 commits_by_day_aggregate[k] = 1
179 170
180 171 if cnt >= parse_limit:
181 172 #don't fetch to much data since we can freeze application
182 173 break
183 174
184 175 overview_data = []
185 176 for k, v in commits_by_day_aggregate.items():
186 177 overview_data.append([k, v])
187 178 overview_data = sorted(overview_data, key=itemgetter(0))
188 179
189 180 if not commits_by_day_author_aggregate:
190 181 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
191 182 "label":author_key_cleaner(repo.contact),
192 183 "data":[0, 1],
193 184 "schema":["commits"],
194 185 }
195 186
196 187 stats = cur_stats if cur_stats else Statistics()
197 188 stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
198 189 stats.commit_activity_combined = json.dumps(overview_data)
199 190 stats.repository = dbrepo
200 191 stats.stat_on_revision = last_cs.revision
201 192 stats.languages = json.dumps({'_TOTAL_':0, '':0})
202 193
203 194 try:
204 195 sa.add(stats)
205 196 sa.commit()
206 197 except:
207 198 log.error(traceback.format_exc())
208 199 sa.rollback()
209 200 return False
210 201
211 202 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
212 203
213 204 return True
214 205
215 206 @task
216 207 def reset_user_password(user_email):
217 208 log = reset_user_password.get_logger()
218 209 from pylons_app.lib import auth
219 210 from pylons_app.model.db import User
220 211
221 212 try:
222 213 try:
223 214 sa = get_session()
224 215 user = sa.query(User).filter(User.email == user_email).scalar()
225 216 new_passwd = auth.PasswordGenerator().gen_password(8,
226 217 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
227 218 if user:
228 219 user.password = auth.get_crypt_password(new_passwd)
229 220 sa.add(user)
230 221 sa.commit()
231 222 log.info('change password for %s', user_email)
232 223 if new_passwd is None:
233 224 raise Exception('unable to generate new password')
234 225
235 226 except:
236 227 log.error(traceback.format_exc())
237 228 sa.rollback()
238 229
239 230 run_task(send_email, user_email,
240 231 "Your new hg-app password",
241 232 'Your new hg-app password:%s' % (new_passwd))
242 233 log.info('send new password mail to %s', user_email)
243 234
244 235
245 236 except:
246 237 log.error('Failed to update user password')
247 238 log.error(traceback.format_exc())
248 239 return True
249 240
250 241 @task
251 242 def send_email(recipients, subject, body):
252 243 log = send_email.get_logger()
253 244 email_config = dict(config.items('DEFAULT'))
254 245 mail_from = email_config.get('app_email_from')
255 246 user = email_config.get('smtp_username')
256 247 passwd = email_config.get('smtp_password')
257 248 mail_server = email_config.get('smtp_server')
258 249 mail_port = email_config.get('smtp_port')
259 250 tls = email_config.get('smtp_use_tls')
260 251 ssl = False
261 252
262 253 try:
263 254 m = SmtpMailer(mail_from, user, passwd, mail_server,
264 255 mail_port, ssl, tls)
265 256 m.send(recipients, subject, body)
266 257 except:
267 258 log.error('Mail sending failed')
268 259 log.error(traceback.format_exc())
269 260 return False
270 261 return True
@@ -1,127 +1,105
1 1 import os, time
2 2 import sys
3 3 from warnings import warn
4 4
5 5 class LockHeld(Exception):pass
6 6
7 7
8 8 class DaemonLock(object):
9 9 """daemon locking
10 10 USAGE:
11 11 try:
12 l = lock()
12 l = DaemonLock(desc='test lock')
13 13 main()
14 14 l.release()
15 15 except LockHeld:
16 16 sys.exit(1)
17 17 """
18 18
19 19 def __init__(self, file=None, callbackfn=None,
20 20 desc='daemon lock', debug=False):
21 21
22 22 self.pidfile = file if file else os.path.join(os.path.dirname(__file__),
23 23 'running.lock')
24 24 self.callbackfn = callbackfn
25 25 self.desc = desc
26 26 self.debug = debug
27 27 self.held = False
28 28 #run the lock automatically !
29 29 self.lock()
30 30
31 31 def __del__(self):
32 32 if self.held:
33 33
34 34 # warn("use lock.release instead of del lock",
35 35 # category = DeprecationWarning,
36 36 # stacklevel = 2)
37 37
38 38 # ensure the lock will be removed
39 39 self.release()
40 40
41 41
42 42 def lock(self):
43 """
44 locking function, if lock is present it will raise LockHeld exception
43 """locking function, if lock is present it will raise LockHeld exception
45 44 """
46 45 lockname = '%s' % (os.getpid())
47 46
48 47 self.trylock()
49 48 self.makelock(lockname, self.pidfile)
50 49 return True
51 50
52 51 def trylock(self):
53 52 running_pid = False
54 53 try:
55 54 pidfile = open(self.pidfile, "r")
56 55 pidfile.seek(0)
57 56 running_pid = pidfile.readline()
58 57 if self.debug:
59 58 print 'lock file present running_pid: %s, checking for execution'\
60 59 % running_pid
61 60 # Now we check the PID from lock file matches to the current
62 61 # process PID
63 62 if running_pid:
64 63 if os.path.exists("/proc/%s" % running_pid):
65 64 print "You already have an instance of the program running"
66 65 print "It is running as process %s" % running_pid
67 66 raise LockHeld
68 67 else:
69 68 print "Lock File is there but the program is not running"
70 69 print "Removing lock file for the: %s" % running_pid
71 70 self.release()
72 71 except IOError, e:
73 72 if e.errno != 2:
74 73 raise
75 74
76 75
77 76 def release(self):
78 """
79 releases the pid by removing the pidfile
77 """releases the pid by removing the pidfile
80 78 """
81 79 if self.callbackfn:
82 80 #execute callback function on release
83 81 if self.debug:
84 82 print 'executing callback function %s' % self.callbackfn
85 83 self.callbackfn()
86 84 try:
87 85 if self.debug:
88 86 print 'removing pidfile %s' % self.pidfile
89 87 os.remove(self.pidfile)
90 88 self.held = False
91 89 except OSError, e:
92 90 if self.debug:
93 91 print 'removing pidfile failed %s' % e
94 92 pass
95 93
96 94 def makelock(self, lockname, pidfile):
97 95 """
98 96 this function will make an actual lock
99 97 @param lockname: acctual pid of file
100 98 @param pidfile: the file to write the pid in
101 99 """
102 100 if self.debug:
103 101 print 'creating a file %s and pid: %s' % (pidfile, lockname)
104 102 pidfile = open(self.pidfile, "wb")
105 103 pidfile.write(lockname)
106 104 pidfile.close
107 105 self.held = True
108
109
110 def main():
111 print 'func is running'
112 cnt = 20
113 while 1:
114 print cnt
115 if cnt == 0:
116 break
117 time.sleep(1)
118 cnt -= 1
119
120
121 if __name__ == "__main__":
122 try:
123 l = DaemonLock(desc='test lock')
124 main()
125 l.release()
126 except LockHeld:
127 sys.exit(1)
General Comments 0
You need to be logged in to leave comments. Login now