##// END OF EJS Templates
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
marcink -
r1264:0c43c667 beta
parent child Browse files
Show More
@@ -48,6 +48,7 b' try:'
48 except KeyError:
48 except KeyError:
49 CELERY_ON = False
49 CELERY_ON = False
50
50
51
51 class ResultWrapper(object):
52 class ResultWrapper(object):
52 def __init__(self, task):
53 def __init__(self, task):
53 self.task = task
54 self.task = task
@@ -56,12 +57,14 b' class ResultWrapper(object):'
56 def result(self):
57 def result(self):
57 return self.task
58 return self.task
58
59
60
59 def run_task(task, *args, **kwargs):
61 def run_task(task, *args, **kwargs):
60 if CELERY_ON:
62 if CELERY_ON:
61 try:
63 try:
62 t = task.apply_async(args=args, kwargs=kwargs)
64 t = task.apply_async(args=args, kwargs=kwargs)
63 log.info('running task %s:%s', t.task_id, task)
65 log.info('running task %s:%s', t.task_id, task)
64 return t
66 return t
67
65 except socket.error, e:
68 except socket.error, e:
66 if e.errno == 111:
69 if e.errno == 111:
67 log.debug('Unable to connect to celeryd. Sync execution')
70 log.debug('Unable to connect to celeryd. Sync execution')
@@ -76,14 +79,20 b' def run_task(task, *args, **kwargs):'
76 return ResultWrapper(task(*args, **kwargs))
79 return ResultWrapper(task(*args, **kwargs))
77
80
78
81
82 def __get_lockkey(func, *fargs, **fkwargs):
83 params = list(fargs)
84 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
85
86 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
87
88 lockkey = 'task_%s' % \
89 md5(func_name + '-' + '-'.join(map(str, params))).hexdigest()
90 return lockkey
91
92
79 def locked_task(func):
93 def locked_task(func):
80 def __wrapper(func, *fargs, **fkwargs):
94 def __wrapper(func, *fargs, **fkwargs):
81 params = list(fargs)
95 lockkey = __get_lockkey(func, *fargs, **fkwargs)
82 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
83
84 lockkey = 'task_%s' % \
85 md5(str(func.__name__) + '-' + \
86 '-'.join(map(str, params))).hexdigest()
87 log.info('running task with lockkey %s', lockkey)
96 log.info('running task with lockkey %s', lockkey)
88 try:
97 try:
89 l = DaemonLock(lockkey)
98 l = DaemonLock(lockkey)
@@ -37,13 +37,14 b' from string import lower'
37 from pylons import config
37 from pylons import config
38 from pylons.i18n.translation import _
38 from pylons.i18n.translation import _
39
39
40 from rhodecode.lib.celerylib import run_task, locked_task, str2bool
40 from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
41 __get_lockkey, LockHeld, DaemonLock
41 from rhodecode.lib.helpers import person
42 from rhodecode.lib.helpers import person
42 from rhodecode.lib.smtp_mailer import SmtpMailer
43 from rhodecode.lib.smtp_mailer import SmtpMailer
43 from rhodecode.lib.utils import OrderedDict, add_cache
44 from rhodecode.lib.utils import OrderedDict, add_cache
44 from rhodecode.model import init_model
45 from rhodecode.model import init_model
45 from rhodecode.model import meta
46 from rhodecode.model import meta
46 from rhodecode.model.db import RhodeCodeUi
47 from rhodecode.model.db import RhodeCodeUi, Statistics, Repository
47
48
48 from vcs.backends import get_repo
49 from vcs.backends import get_repo
49
50
@@ -125,146 +126,162 b' def whoosh_index(repo_location, full_ind'
125
126
126
127
127 @task(ignore_result=True)
128 @task(ignore_result=True)
128 @locked_task
129 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
129 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
130 try:
130 try:
131 log = get_commits_stats.get_logger()
131 log = get_commits_stats.get_logger()
132 except:
132 except:
133 log = logging.getLogger(__name__)
133 log = logging.getLogger(__name__)
134
134
135 from rhodecode.model.db import Statistics, Repository
135 lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
136 ts_max_y)
137 log.info('running task with lockkey %s', lockkey)
138 try:
139 lock = DaemonLock(lockkey)
136
140
137 #for js data compatibilty
141 #for js data compatibilty cleans the key for person from '
138 akc = lambda k: person(k).replace('"', "")
142 akc = lambda k: person(k).replace('"', "")
139
143
140 co_day_auth_aggr = {}
144 co_day_auth_aggr = {}
141 commits_by_day_aggregate = {}
145 commits_by_day_aggregate = {}
142 repos_path = get_repos_path()
146 repos_path = get_repos_path()
143 p = os.path.join(repos_path, repo_name)
147 p = os.path.join(repos_path, repo_name)
144 repo = get_repo(p)
148 repo = get_repo(p)
149 repo_size = len(repo.revisions)
150 #return if repo have no revisions
151 if repo_size < 1:
152 lock.release()
153 return True
145
154
146 skip_date_limit = True
155 skip_date_limit = True
147 parse_limit = int(config['app_conf'].get('commit_parse_limit'))
156 parse_limit = int(config['app_conf'].get('commit_parse_limit'))
148 last_rev = 0
157 last_rev = 0
149 last_cs = None
158 last_cs = None
150 timegetter = itemgetter('time')
159 timegetter = itemgetter('time')
151
160
152 sa = get_session()
161 sa = get_session()
153
162
154 dbrepo = sa.query(Repository)\
163 dbrepo = sa.query(Repository)\
155 .filter(Repository.repo_name == repo_name).scalar()
164 .filter(Repository.repo_name == repo_name).scalar()
156 cur_stats = sa.query(Statistics)\
165 cur_stats = sa.query(Statistics)\
157 .filter(Statistics.repository == dbrepo).scalar()
166 .filter(Statistics.repository == dbrepo).scalar()
158
167
159 if cur_stats is not None:
168 if cur_stats is not None:
160 last_rev = cur_stats.stat_on_revision
169 last_rev = cur_stats.stat_on_revision
161
170
162 #return if repo is empty
171 if last_rev == repo.get_changeset().revision and repo_size > 1:
163 if not repo.revisions:
172 #pass silently without any work if we're not on first revision or
164 return True
173 #current state of parsing revision(from db marker) is the
174 #last revision
175 lock.release()
176 return True
165
177
166 if last_rev == repo.get_changeset().revision and len(repo.revisions) > 1:
178 if cur_stats:
167 #pass silently without any work if we're not on first revision or
179 commits_by_day_aggregate = OrderedDict(json.loads(
168 #current state of parsing revision(from db marker) is the last revision
180 cur_stats.commit_activity_combined))
169 return True
181 co_day_auth_aggr = json.loads(cur_stats.commit_activity)
170
182
171 if cur_stats:
183 log.debug('starting parsing %s', parse_limit)
172 commits_by_day_aggregate = OrderedDict(
184 lmktime = mktime
173 json.loads(
185
174 cur_stats.commit_activity_combined))
186 last_rev = last_rev + 1 if last_rev > 0 else last_rev
175 co_day_auth_aggr = json.loads(cur_stats.commit_activity)
176
187
177 log.debug('starting parsing %s', parse_limit)
188 for cs in repo[last_rev:last_rev + parse_limit]:
178 lmktime = mktime
189 last_cs = cs # remember last parsed changeset
179
190 k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
180 last_rev = last_rev + 1 if last_rev > 0 else last_rev
191 cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
181
192
182 for cs in repo[last_rev:last_rev + parse_limit]:
193 if akc(cs.author) in co_day_auth_aggr:
183 last_cs = cs # remember last parsed changeset
194 try:
184 k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
195 l = [timegetter(x) for x in
185 cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
196 co_day_auth_aggr[akc(cs.author)]['data']]
197 time_pos = l.index(k)
198 except ValueError:
199 time_pos = False
200
201 if time_pos >= 0 and time_pos is not False:
202
203 datadict = \
204 co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
186
205
187 if akc(cs.author) in co_day_auth_aggr:
206 datadict["commits"] += 1
188 try:
207 datadict["added"] += len(cs.added)
189 l = [timegetter(x) for x in
208 datadict["changed"] += len(cs.changed)
190 co_day_auth_aggr[akc(cs.author)]['data']]
209 datadict["removed"] += len(cs.removed)
191 time_pos = l.index(k)
210
192 except ValueError:
211 else:
193 time_pos = False
212 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
194
213
195 if time_pos >= 0 and time_pos is not False:
214 datadict = {"time": k,
196
215 "commits": 1,
197 datadict = co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
216 "added": len(cs.added),
198
217 "changed": len(cs.changed),
199 datadict["commits"] += 1
218 "removed": len(cs.removed),
200 datadict["added"] += len(cs.added)
219 }
201 datadict["changed"] += len(cs.changed)
220 co_day_auth_aggr[akc(cs.author)]['data']\
202 datadict["removed"] += len(cs.removed)
221 .append(datadict)
203
222
204 else:
223 else:
205 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
224 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
206
225 co_day_auth_aggr[akc(cs.author)] = {
207 datadict = {"time": k,
226 "label": akc(cs.author),
208 "commits": 1,
227 "data": [{"time":k,
209 "added": len(cs.added),
228 "commits":1,
210 "changed": len(cs.changed),
229 "added":len(cs.added),
211 "removed": len(cs.removed),
230 "changed":len(cs.changed),
212 }
231 "removed":len(cs.removed),
213 co_day_auth_aggr[akc(cs.author)]['data']\
232 }],
214 .append(datadict)
233 "schema": ["commits"],
234 }
215
235
216 else:
236 #gather all data by day
217 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
237 if k in commits_by_day_aggregate:
218 co_day_auth_aggr[akc(cs.author)] = {
238 commits_by_day_aggregate[k] += 1
219 "label": akc(cs.author),
239 else:
220 "data": [{"time":k,
240 commits_by_day_aggregate[k] = 1
221 "commits":1,
222 "added":len(cs.added),
223 "changed":len(cs.changed),
224 "removed":len(cs.removed),
225 }],
226 "schema": ["commits"],
227 }
228
241
229 #gather all data by day
242 overview_data = sorted(commits_by_day_aggregate.items(),
230 if k in commits_by_day_aggregate:
243 key=itemgetter(0))
231 commits_by_day_aggregate[k] += 1
244
232 else:
245 if not co_day_auth_aggr:
233 commits_by_day_aggregate[k] = 1
246 co_day_auth_aggr[akc(repo.contact)] = {
247 "label": akc(repo.contact),
248 "data": [0, 1],
249 "schema": ["commits"],
250 }
234
251
235 overview_data = sorted(commits_by_day_aggregate.items(), key=itemgetter(0))
252 stats = cur_stats if cur_stats else Statistics()
236 if not co_day_auth_aggr:
253 stats.commit_activity = json.dumps(co_day_auth_aggr)
237 co_day_auth_aggr[akc(repo.contact)] = {
254 stats.commit_activity_combined = json.dumps(overview_data)
238 "label": akc(repo.contact),
239 "data": [0, 1],
240 "schema": ["commits"],
241 }
242
255
243 stats = cur_stats if cur_stats else Statistics()
256 log.debug('last revison %s', last_rev)
244 stats.commit_activity = json.dumps(co_day_auth_aggr)
257 leftovers = len(repo.revisions[last_rev:])
245 stats.commit_activity_combined = json.dumps(overview_data)
258 log.debug('revisions to parse %s', leftovers)
246
259
247 log.debug('last revison %s', last_rev)
260 if last_rev == 0 or leftovers < parse_limit:
248 leftovers = len(repo.revisions[last_rev:])
261 log.debug('getting code trending stats')
249 log.debug('revisions to parse %s', leftovers)
262 stats.languages = json.dumps(__get_codes_stats(repo_name))
250
263
251 if last_rev == 0 or leftovers < parse_limit:
264 try:
252 log.debug('getting code trending stats')
265 stats.repository = dbrepo
253 stats.languages = json.dumps(__get_codes_stats(repo_name))
266 stats.stat_on_revision = last_cs.revision if last_cs else 0
267 sa.add(stats)
268 sa.commit()
269 except:
270 log.error(traceback.format_exc())
271 sa.rollback()
272 lock.release()
273 return False
254
274
255 try:
275 #final release
256 stats.repository = dbrepo
276 lock.release()
257 stats.stat_on_revision = last_cs.revision if last_cs else 0
258 sa.add(stats)
259 sa.commit()
260 except:
261 log.error(traceback.format_exc())
262 sa.rollback()
263 return False
264 if len(repo.revisions) > 1:
265 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
266
277
267 return True
278 #execute another task if celery is enabled
279 if len(repo.revisions) > 1 and CELERY_ON:
280 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
281 return True
282 except LockHeld:
283 log.info('LockHeld')
284 return 'Task with key %s already running' % lockkey
268
285
269
286
270 @task(ignore_result=True)
287 @task(ignore_result=True)
@@ -313,7 +330,6 b' def send_email(recipients, subject, body'
313 """
330 """
314 Sends an email with defined parameters from the .ini files.
331 Sends an email with defined parameters from the .ini files.
315
332
316
317 :param recipients: list of recipients, it this is empty the defined email
333 :param recipients: list of recipients, it this is empty the defined email
318 address from field 'email_to' is used instead
334 address from field 'email_to' is used instead
319 :param subject: subject of the mail
335 :param subject: subject of the mail
@@ -351,14 +367,14 b' def send_email(recipients, subject, body'
351
367
352 @task(ignore_result=True)
368 @task(ignore_result=True)
353 def create_repo_fork(form_data, cur_user):
369 def create_repo_fork(form_data, cur_user):
370 from rhodecode.model.repo import RepoModel
371 from vcs import get_backend
372
354 try:
373 try:
355 log = create_repo_fork.get_logger()
374 log = create_repo_fork.get_logger()
356 except:
375 except:
357 log = logging.getLogger(__name__)
376 log = logging.getLogger(__name__)
358
377
359 from rhodecode.model.repo import RepoModel
360 from vcs import get_backend
361
362 repo_model = RepoModel(get_session())
378 repo_model = RepoModel(get_session())
363 repo_model.create(form_data, cur_user, just_db=True, fork=True)
379 repo_model.create(form_data, cur_user, just_db=True, fork=True)
364 repo_name = form_data['repo_name']
380 repo_name = form_data['repo_name']
General Comments 0
You need to be logged in to leave comments. Login now