##// END OF EJS Templates
implemented basic autoupdating statistics fetched from database
marcink -
r493:2256c78a celery
parent child Browse files
Show More
@@ -16,6 +16,8 b' CELERY_IMPORTS = ("pylons_app.lib.celery'
16 ## Result store settings.
16 ## Result store settings.
17 CELERY_RESULT_BACKEND = "database"
17 CELERY_RESULT_BACKEND = "database"
18 CELERY_RESULT_DBURI = dict(config.items('app:main'))['sqlalchemy.db1.url']
18 CELERY_RESULT_DBURI = dict(config.items('app:main'))['sqlalchemy.db1.url']
19 CELERY_RESULT_SERIALIZER = 'json'
20
19
21
20 BROKER_CONNECTION_MAX_RETRIES = 30
22 BROKER_CONNECTION_MAX_RETRIES = 30
21
23
@@ -36,7 +38,37 b' CELERYD_CONCURRENCY = 2'
36 CELERYD_LOG_LEVEL = "DEBUG"
38 CELERYD_LOG_LEVEL = "DEBUG"
37 CELERYD_MAX_TASKS_PER_CHILD = 1
39 CELERYD_MAX_TASKS_PER_CHILD = 1
38
40
39 #CELERY_ALWAYS_EAGER = True
41 #Tasks will never be sent to the queue, but executed locally instead.
40 #rabbitmqctl add_user rabbitmq qweqwe
42 CELERY_ALWAYS_EAGER = False
41 #rabbitmqctl add_vhost rabbitmqhost
43
42 #rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*"
44 #===============================================================================
45 # EMAIL SETTINGS
46 #===============================================================================
47 pylons_email_config = dict(config.items('DEFAULT'))
48
49 CELERY_SEND_TASK_ERROR_EMAILS = True
50
51 #List of (name, email_address) tuples for the admins that should receive error e-mails.
52 ADMINS = [('Administrator', pylons_email_config.get('email_to'))]
53
54 #The e-mail address this worker sends e-mails from. Default is "celery@localhost".
55 SERVER_EMAIL = pylons_email_config.get('error_email_from')
56
57 #The mail server to use. Default is "localhost".
58 MAIL_HOST = pylons_email_config.get('smtp_server')
59
60 #Username (if required) to log on to the mail server with.
61 MAIL_HOST_USER = pylons_email_config.get('smtp_username')
62
63 #Password (if required) to log on to the mail server with.
64 MAIL_HOST_PASSWORD = pylons_email_config.get('smtp_password')
65
66 MAIL_PORT = pylons_email_config.get('smtp_port')
67
68
69 #===============================================================================
70 # INSTRUCTIONS FOR RABBITMQ
71 #===============================================================================
72 # rabbitmqctl add_user rabbitmq qweqwe
73 # rabbitmqctl add_vhost rabbitmqhost
74 # rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*"
@@ -27,9 +27,13 b' from pylons_app.lib.auth import LoginReq'
27 from pylons_app.lib.base import BaseController, render
27 from pylons_app.lib.base import BaseController, render
28 from pylons_app.lib.utils import OrderedDict
28 from pylons_app.lib.utils import OrderedDict
29 from pylons_app.model.hg_model import HgModel
29 from pylons_app.model.hg_model import HgModel
30 from pylons_app.model.db import Statistics
30 from webhelpers.paginate import Page
31 from webhelpers.paginate import Page
31 from pylons_app.lib.celerylib import run_task
32 from pylons_app.lib.celerylib import run_task
32 from pylons_app.lib.celerylib.tasks import get_commits_stats
33 from pylons_app.lib.celerylib.tasks import get_commits_stats
34 from datetime import datetime, timedelta
35 from time import mktime
36 import calendar
33 import logging
37 import logging
34
38
35 log = logging.getLogger(__name__)
39 log = logging.getLogger(__name__)
@@ -61,11 +65,32 b' class SummaryController(BaseController):'
61 for name, hash in c.repo_info.branches.items()[:10]:
65 for name, hash in c.repo_info.branches.items()[:10]:
62 c.repo_branches[name] = c.repo_info.get_changeset(hash)
66 c.repo_branches[name] = c.repo_info.get_changeset(hash)
63
67
64 task = run_task(get_commits_stats, c.repo_info.name)
68 td = datetime.today() + timedelta(days=1)
65 c.ts_min = task.result[0]
69 y, m, d = td.year, td.month, td.day
66 c.ts_max = task.result[1]
70
67 c.commit_data = task.result[2]
71 ts_min_y = mktime((y - 1, (td - timedelta(days=calendar.mdays[m])).month,
68 c.overview_data = task.result[3]
72 d, 0, 0, 0, 0, 0, 0,))
73 ts_min_m = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
74 d, 0, 0, 0, 0, 0, 0,))
75
76 ts_max_y = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
77
78 run_task(get_commits_stats, c.repo_info.name, ts_min_y, ts_max_y)
79 c.ts_min = ts_min_m
80 c.ts_max = ts_max_y
81
82
83 stats = self.sa.query(Statistics)\
84 .filter(Statistics.repository == c.repo_info.dbrepo)\
85 .scalar()
86
87 if stats:
88 c.commit_data = stats.commit_activity
89 c.overview_data = stats.commit_activity_combined
90 else:
91 import json
92 c.commit_data = json.dumps({})
93 c.overview_data = json.dumps([[ts_min_y, 0], [ts_max_y, 0] ])
69
94
70 return render('summary/summary.html')
95 return render('summary/summary.html')
71
96
@@ -1,7 +1,6 b''
1 from celery.decorators import task
1 from celery.decorators import task
2 from celery.task.sets import subtask
2 from celery.task.sets import subtask
3 from celeryconfig import PYLONS_CONFIG as config
3 from celeryconfig import PYLONS_CONFIG as config
4 from datetime import datetime, timedelta
5 from pylons.i18n.translation import _
4 from pylons.i18n.translation import _
6 from pylons_app.lib.celerylib import run_task
5 from pylons_app.lib.celerylib import run_task
7 from pylons_app.lib.helpers import person
6 from pylons_app.lib.helpers import person
@@ -10,7 +9,6 b' from pylons_app.lib.utils import Ordered'
10 from operator import itemgetter
9 from operator import itemgetter
11 from vcs.backends.hg import MercurialRepository
10 from vcs.backends.hg import MercurialRepository
12 from time import mktime
11 from time import mktime
13 import calendar
14 import traceback
12 import traceback
15 import json
13 import json
16
14
@@ -83,94 +81,132 b' def whoosh_index(repo_location, full_ind'
83 return 'LockHeld'
81 return 'LockHeld'
84
82
85 @task
83 @task
86 def get_commits_stats(repo):
84 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
85 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
86
87 from pylons_app.model.db import Statistics, Repository
87 log = get_commits_stats.get_logger()
88 log = get_commits_stats.get_logger()
88 aggregate = OrderedDict()
89 commits_by_day_author_aggregate = {}
89 overview_aggregate = OrderedDict()
90 commits_by_day_aggregate = {}
90 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
91 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
91 repo = MercurialRepository(repos_path + repo)
92 repo = MercurialRepository(repos_path + repo_name)
92 #graph range
93
93 td = datetime.today() + timedelta(days=1)
94 skip_date_limit = True
94 y, m, d = td.year, td.month, td.day
95 parse_limit = 500 #limit for single task changeset parsing
96 last_rev = 0
97 last_cs = None
98 timegetter = itemgetter('time')
99
100 sa = get_session()
95
101
96 ts_min_y = mktime((y - 1, (td - timedelta(days=calendar.mdays[m])).month,
102 dbrepo = sa.query(Repository)\
97 d, 0, 0, 0, 0, 0, 0,))
103 .filter(Repository.repo_name == repo_name).scalar()
98 ts_min_m = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
104 cur_stats = sa.query(Statistics)\
99 d, 0, 0, 0, 0, 0, 0,))
105 .filter(Statistics.repository == dbrepo).scalar()
106 if cur_stats:
107 last_rev = cur_stats.stat_on_revision
100
108
101 ts_max_y = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
109 if last_rev == repo.revisions[-1]:
102 skip_date_limit = True
110 #pass silently without any work
111 return True
103
112
104 def author_key_cleaner(k):
113 if cur_stats:
105 k = person(k)
114 commits_by_day_aggregate = OrderedDict(
106 k = k.replace('"', "") #for js data compatibilty
115 json.loads(
107 return k
116 cur_stats.commit_activity_combined))
108
117 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
109 for cs in repo[:200]:#added limit 200 until fix #29 is made
118
119 for cnt, rev in enumerate(repo.revisions[last_rev:]):
120 last_cs = cs = repo.get_changeset(rev)
110 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
121 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
111 cs.date.timetuple()[2])
122 cs.date.timetuple()[2])
112 timetupple = [int(x) for x in k.split('-')]
123 timetupple = [int(x) for x in k.split('-')]
113 timetupple.extend([0 for _ in xrange(6)])
124 timetupple.extend([0 for _ in xrange(6)])
114 k = mktime(timetupple)
125 k = mktime(timetupple)
115 if aggregate.has_key(author_key_cleaner(cs.author)):
126 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
116 if aggregate[author_key_cleaner(cs.author)].has_key(k):
127 try:
117 aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1
128 l = [timegetter(x) for x in commits_by_day_author_aggregate\
118 aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added)
129 [author_key_cleaner(cs.author)]['data']]
119 aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed)
130 time_pos = l.index(k)
120 aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed)
131 except ValueError:
132 time_pos = False
133
134 if time_pos >= 0 and time_pos is not False:
135
136 datadict = commits_by_day_author_aggregate\
137 [author_key_cleaner(cs.author)]['data'][time_pos]
138
139 datadict["commits"] += 1
140 datadict["added"] += len(cs.added)
141 datadict["changed"] += len(cs.changed)
142 datadict["removed"] += len(cs.removed)
143 #print datadict
121
144
122 else:
145 else:
123 #aggregate[author_key_cleaner(cs.author)].update(dates_range)
146 #print 'ELSE !!!!'
124 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
147 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
125 aggregate[author_key_cleaner(cs.author)][k] = {}
148
126 aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
149 datadict = {"time":k,
127 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
150 "commits":1,
128 aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
151 "added":len(cs.added),
129 aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)
152 "changed":len(cs.changed),
153 "removed":len(cs.removed),
154 }
155 commits_by_day_author_aggregate\
156 [author_key_cleaner(cs.author)]['data'].append(datadict)
130
157
131 else:
158 else:
159 #print k, 'nokey ADDING'
132 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
160 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
133 aggregate[author_key_cleaner(cs.author)] = OrderedDict()
161 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
134 #aggregate[author_key_cleaner(cs.author)].update(dates_range)
162 "label":author_key_cleaner(cs.author),
135 aggregate[author_key_cleaner(cs.author)][k] = {}
163 "data":[{"time":k,
136 aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
164 "commits":1,
137 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
165 "added":len(cs.added),
138 aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
166 "changed":len(cs.changed),
139 aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)
167 "removed":len(cs.removed),
168 }],
169 "schema":["commits"],
170 }
140
171
141
172 # #gather all data by day
142 if overview_aggregate.has_key(k):
173 if commits_by_day_aggregate.has_key(k):
143 overview_aggregate[k] += 1
174 commits_by_day_aggregate[k] += 1
144 else:
175 else:
145 overview_aggregate[k] = 1
176 commits_by_day_aggregate[k] = 1
146
177
178 if cnt >= parse_limit:
179 #don't fetch to much data since we can freeze application
180 break
181
147 overview_data = []
182 overview_data = []
148 for k, v in overview_aggregate.items():
183 for k, v in commits_by_day_aggregate.items():
149 overview_data.append([k, v])
184 overview_data.append([k, v])
150 overview_data = sorted(overview_data, key=itemgetter(0))
185 overview_data = sorted(overview_data, key=itemgetter(0))
151 data = {}
152 for author in aggregate:
153 commit_data = sorted([{"time":x,
154 "commits":aggregate[author][x]['commits'],
155 "added":aggregate[author][x]['added'],
156 "changed":aggregate[author][x]['changed'],
157 "removed":aggregate[author][x]['removed'],
158 } for x in aggregate[author]],
159 key=itemgetter('time'))
160
186
161 data[author] = {"label":author,
187 if not commits_by_day_author_aggregate:
162 "data":commit_data,
188 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
163 "schema":["commits"]
164 }
165
166 if not data:
167 data[author_key_cleaner(repo.contact)] = {
168 "label":author_key_cleaner(repo.contact),
189 "label":author_key_cleaner(repo.contact),
169 "data":[0, 1],
190 "data":[0, 1],
170 "schema":["commits"],
191 "schema":["commits"],
171 }
192 }
172
193
173 return (ts_min_m, ts_max_y, json.dumps(data), json.dumps(overview_data))
194 stats = cur_stats if cur_stats else Statistics()
195 stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
196 stats.commit_activity_combined = json.dumps(overview_data)
197 stats.repository = dbrepo
198 stats.stat_on_revision = last_cs.revision
199 stats.languages = json.dumps({'_TOTAL_':0, '':0})
200
201 try:
202 sa.add(stats)
203 sa.commit()
204 except:
205 log.error(traceback.format_exc())
206 sa.rollback()
207 return False
208
209 return True
174
210
175 @task
211 @task
176 def reset_user_password(user_email):
212 def reset_user_password(user_email):
@@ -184,10 +220,11 b' def reset_user_password(user_email):'
184 user = sa.query(User).filter(User.email == user_email).scalar()
220 user = sa.query(User).filter(User.email == user_email).scalar()
185 new_passwd = auth.PasswordGenerator().gen_password(8,
221 new_passwd = auth.PasswordGenerator().gen_password(8,
186 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
222 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
187 user.password = auth.get_crypt_password(new_passwd)
223 if user:
188 sa.add(user)
224 user.password = auth.get_crypt_password(new_passwd)
189 sa.commit()
225 sa.add(user)
190 log.info('change password for %s', user_email)
226 sa.commit()
227 log.info('change password for %s', user_email)
191 if new_passwd is None:
228 if new_passwd is None:
192 raise Exception('unable to generate new password')
229 raise Exception('unable to generate new password')
193
230
@@ -120,6 +120,15 b' class UserToPerm(Base):'
120 user = relation('User')
120 user = relation('User')
121 permission = relation('Permission')
121 permission = relation('Permission')
122
122
123
123 class Statistics(Base):
124 __tablename__ = 'statistics'
125 __table_args__ = (UniqueConstraint('repository_id'), {'useexisting':True})
126 stat_id = Column("stat_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
127 repository_id = Column("repository_id", INTEGER(), ForeignKey(u'repositories.repo_id'), nullable=False, unique=True, default=None)
128 stat_on_revision = Column("stat_on_revision", INTEGER(), nullable=False)
129 commit_activity = Column("commit_activity", BLOB(), nullable=False)#JSON data
130 commit_activity_combined = Column("commit_activity_combined", BLOB(), nullable=False)#JSON data
131 languages = Column("languages", BLOB(), nullable=False)#JSON data
132
133 repository = relation('Repository')
124
134
125
@@ -123,7 +123,7 b' E.onDOMReady(function(e){'
123 <div class="box box-right" style="min-height:455px">
123 <div class="box box-right" style="min-height:455px">
124 <!-- box / title -->
124 <!-- box / title -->
125 <div class="title">
125 <div class="title">
126 <h5>${_('Commit activity')}</h5>
126 <h5>${_('Commit activity by day / author')}</h5>
127 </div>
127 </div>
128
128
129 <div class="table">
129 <div class="table">
General Comments 0
You need to be logged in to leave comments. Login now