##// END OF EJS Templates
implemented basic autoupdating statistics fetched from database
marcink -
r493:2256c78a celery
parent child Browse files
Show More
@@ -16,6 +16,8 CELERY_IMPORTS = ("pylons_app.lib.celery
16 ## Result store settings.
16 ## Result store settings.
17 CELERY_RESULT_BACKEND = "database"
17 CELERY_RESULT_BACKEND = "database"
18 CELERY_RESULT_DBURI = dict(config.items('app:main'))['sqlalchemy.db1.url']
18 CELERY_RESULT_DBURI = dict(config.items('app:main'))['sqlalchemy.db1.url']
19 CELERY_RESULT_SERIALIZER = 'json'
20
19
21
20 BROKER_CONNECTION_MAX_RETRIES = 30
22 BROKER_CONNECTION_MAX_RETRIES = 30
21
23
@@ -36,7 +38,37 CELERYD_CONCURRENCY = 2
36 CELERYD_LOG_LEVEL = "DEBUG"
38 CELERYD_LOG_LEVEL = "DEBUG"
37 CELERYD_MAX_TASKS_PER_CHILD = 1
39 CELERYD_MAX_TASKS_PER_CHILD = 1
38
40
39 #CELERY_ALWAYS_EAGER = True
41 #Tasks will never be sent to the queue, but executed locally instead.
42 CELERY_ALWAYS_EAGER = False
43
44 #===============================================================================
45 # EMAIL SETTINGS
46 #===============================================================================
47 pylons_email_config = dict(config.items('DEFAULT'))
48
49 CELERY_SEND_TASK_ERROR_EMAILS = True
50
51 #List of (name, email_address) tuples for the admins that should receive error e-mails.
52 ADMINS = [('Administrator', pylons_email_config.get('email_to'))]
53
54 #The e-mail address this worker sends e-mails from. Default is "celery@localhost".
55 SERVER_EMAIL = pylons_email_config.get('error_email_from')
56
57 #The mail server to use. Default is "localhost".
58 MAIL_HOST = pylons_email_config.get('smtp_server')
59
60 #Username (if required) to log on to the mail server with.
61 MAIL_HOST_USER = pylons_email_config.get('smtp_username')
62
63 #Password (if required) to log on to the mail server with.
64 MAIL_HOST_PASSWORD = pylons_email_config.get('smtp_password')
65
66 MAIL_PORT = pylons_email_config.get('smtp_port')
67
68
69 #===============================================================================
70 # INSTRUCTIONS FOR RABBITMQ
71 #===============================================================================
40 #rabbitmqctl add_user rabbitmq qweqwe
72 # rabbitmqctl add_user rabbitmq qweqwe
41 #rabbitmqctl add_vhost rabbitmqhost
73 # rabbitmqctl add_vhost rabbitmqhost
42 #rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*"
74 # rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*"
@@ -27,9 +27,13 from pylons_app.lib.auth import LoginReq
27 from pylons_app.lib.base import BaseController, render
27 from pylons_app.lib.base import BaseController, render
28 from pylons_app.lib.utils import OrderedDict
28 from pylons_app.lib.utils import OrderedDict
29 from pylons_app.model.hg_model import HgModel
29 from pylons_app.model.hg_model import HgModel
30 from pylons_app.model.db import Statistics
30 from webhelpers.paginate import Page
31 from webhelpers.paginate import Page
31 from pylons_app.lib.celerylib import run_task
32 from pylons_app.lib.celerylib import run_task
32 from pylons_app.lib.celerylib.tasks import get_commits_stats
33 from pylons_app.lib.celerylib.tasks import get_commits_stats
34 from datetime import datetime, timedelta
35 from time import mktime
36 import calendar
33 import logging
37 import logging
34
38
35 log = logging.getLogger(__name__)
39 log = logging.getLogger(__name__)
@@ -61,11 +65,32 class SummaryController(BaseController):
61 for name, hash in c.repo_info.branches.items()[:10]:
65 for name, hash in c.repo_info.branches.items()[:10]:
62 c.repo_branches[name] = c.repo_info.get_changeset(hash)
66 c.repo_branches[name] = c.repo_info.get_changeset(hash)
63
67
64 task = run_task(get_commits_stats, c.repo_info.name)
68 td = datetime.today() + timedelta(days=1)
65 c.ts_min = task.result[0]
69 y, m, d = td.year, td.month, td.day
66 c.ts_max = task.result[1]
70
67 c.commit_data = task.result[2]
71 ts_min_y = mktime((y - 1, (td - timedelta(days=calendar.mdays[m])).month,
68 c.overview_data = task.result[3]
72 d, 0, 0, 0, 0, 0, 0,))
73 ts_min_m = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
74 d, 0, 0, 0, 0, 0, 0,))
75
76 ts_max_y = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
77
78 run_task(get_commits_stats, c.repo_info.name, ts_min_y, ts_max_y)
79 c.ts_min = ts_min_m
80 c.ts_max = ts_max_y
81
82
83 stats = self.sa.query(Statistics)\
84 .filter(Statistics.repository == c.repo_info.dbrepo)\
85 .scalar()
86
87 if stats:
88 c.commit_data = stats.commit_activity
89 c.overview_data = stats.commit_activity_combined
90 else:
91 import json
92 c.commit_data = json.dumps({})
93 c.overview_data = json.dumps([[ts_min_y, 0], [ts_max_y, 0] ])
69
94
70 return render('summary/summary.html')
95 return render('summary/summary.html')
71
96
@@ -1,7 +1,6
1 from celery.decorators import task
1 from celery.decorators import task
2 from celery.task.sets import subtask
2 from celery.task.sets import subtask
3 from celeryconfig import PYLONS_CONFIG as config
3 from celeryconfig import PYLONS_CONFIG as config
4 from datetime import datetime, timedelta
5 from pylons.i18n.translation import _
4 from pylons.i18n.translation import _
6 from pylons_app.lib.celerylib import run_task
5 from pylons_app.lib.celerylib import run_task
7 from pylons_app.lib.helpers import person
6 from pylons_app.lib.helpers import person
@@ -10,7 +9,6 from pylons_app.lib.utils import Ordered
10 from operator import itemgetter
9 from operator import itemgetter
11 from vcs.backends.hg import MercurialRepository
10 from vcs.backends.hg import MercurialRepository
12 from time import mktime
11 from time import mktime
13 import calendar
14 import traceback
12 import traceback
15 import json
13 import json
16
14
@@ -83,94 +81,132 def whoosh_index(repo_location, full_ind
83 return 'LockHeld'
81 return 'LockHeld'
84
82
85 @task
83 @task
86 def get_commits_stats(repo):
84 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
85 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
86
87 from pylons_app.model.db import Statistics, Repository
87 log = get_commits_stats.get_logger()
88 log = get_commits_stats.get_logger()
88 aggregate = OrderedDict()
89 commits_by_day_author_aggregate = {}
89 overview_aggregate = OrderedDict()
90 commits_by_day_aggregate = {}
90 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
91 repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
91 repo = MercurialRepository(repos_path + repo)
92 repo = MercurialRepository(repos_path + repo_name)
92 #graph range
93
93 td = datetime.today() + timedelta(days=1)
94 skip_date_limit = True
94 y, m, d = td.year, td.month, td.day
95 parse_limit = 500 #limit for single task changeset parsing
96 last_rev = 0
97 last_cs = None
98 timegetter = itemgetter('time')
99
100 sa = get_session()
95
101
96 ts_min_y = mktime((y - 1, (td - timedelta(days=calendar.mdays[m])).month,
102 dbrepo = sa.query(Repository)\
97 d, 0, 0, 0, 0, 0, 0,))
103 .filter(Repository.repo_name == repo_name).scalar()
98 ts_min_m = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
104 cur_stats = sa.query(Statistics)\
99 d, 0, 0, 0, 0, 0, 0,))
105 .filter(Statistics.repository == dbrepo).scalar()
106 if cur_stats:
107 last_rev = cur_stats.stat_on_revision
100
108
101 ts_max_y = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
109 if last_rev == repo.revisions[-1]:
102 skip_date_limit = True
110 #pass silently without any work
111 return True
103
112
104 def author_key_cleaner(k):
113 if cur_stats:
105 k = person(k)
114 commits_by_day_aggregate = OrderedDict(
106 k = k.replace('"', "") #for js data compatibilty
115 json.loads(
107 return k
116 cur_stats.commit_activity_combined))
117 commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
108
118
109 for cs in repo[:200]:#added limit 200 until fix #29 is made
119 for cnt, rev in enumerate(repo.revisions[last_rev:]):
120 last_cs = cs = repo.get_changeset(rev)
110 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
121 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
111 cs.date.timetuple()[2])
122 cs.date.timetuple()[2])
112 timetupple = [int(x) for x in k.split('-')]
123 timetupple = [int(x) for x in k.split('-')]
113 timetupple.extend([0 for _ in xrange(6)])
124 timetupple.extend([0 for _ in xrange(6)])
114 k = mktime(timetupple)
125 k = mktime(timetupple)
115 if aggregate.has_key(author_key_cleaner(cs.author)):
126 if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
116 if aggregate[author_key_cleaner(cs.author)].has_key(k):
127 try:
117 aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1
128 l = [timegetter(x) for x in commits_by_day_author_aggregate\
118 aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added)
129 [author_key_cleaner(cs.author)]['data']]
119 aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed)
130 time_pos = l.index(k)
120 aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed)
131 except ValueError:
132 time_pos = False
121
133
122 else:
134 if time_pos >= 0 and time_pos is not False:
123 #aggregate[author_key_cleaner(cs.author)].update(dates_range)
135
124 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
136 datadict = commits_by_day_author_aggregate\
125 aggregate[author_key_cleaner(cs.author)][k] = {}
137 [author_key_cleaner(cs.author)]['data'][time_pos]
126 aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
138
127 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
139 datadict["commits"] += 1
128 aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
140 datadict["added"] += len(cs.added)
129 aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)
141 datadict["changed"] += len(cs.changed)
142 datadict["removed"] += len(cs.removed)
143 #print datadict
130
144
131 else:
145 else:
146 #print 'ELSE !!!!'
132 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
147 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
133 aggregate[author_key_cleaner(cs.author)] = OrderedDict()
148
134 #aggregate[author_key_cleaner(cs.author)].update(dates_range)
149 datadict = {"time":k,
135 aggregate[author_key_cleaner(cs.author)][k] = {}
150 "commits":1,
136 aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
151 "added":len(cs.added),
137 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
152 "changed":len(cs.changed),
138 aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
153 "removed":len(cs.removed),
139 aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)
154 }
155 commits_by_day_author_aggregate\
156 [author_key_cleaner(cs.author)]['data'].append(datadict)
140
157
158 else:
159 #print k, 'nokey ADDING'
160 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
161 commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
162 "label":author_key_cleaner(cs.author),
163 "data":[{"time":k,
164 "commits":1,
165 "added":len(cs.added),
166 "changed":len(cs.changed),
167 "removed":len(cs.removed),
168 }],
169 "schema":["commits"],
170 }
141
171
142 if overview_aggregate.has_key(k):
172 # #gather all data by day
143 overview_aggregate[k] += 1
173 if commits_by_day_aggregate.has_key(k):
174 commits_by_day_aggregate[k] += 1
144 else:
175 else:
145 overview_aggregate[k] = 1
176 commits_by_day_aggregate[k] = 1
177
178 if cnt >= parse_limit:
179 #don't fetch to much data since we can freeze application
180 break
146
181
147 overview_data = []
182 overview_data = []
148 for k, v in overview_aggregate.items():
183 for k, v in commits_by_day_aggregate.items():
149 overview_data.append([k, v])
184 overview_data.append([k, v])
150 overview_data = sorted(overview_data, key=itemgetter(0))
185 overview_data = sorted(overview_data, key=itemgetter(0))
151 data = {}
152 for author in aggregate:
153 commit_data = sorted([{"time":x,
154 "commits":aggregate[author][x]['commits'],
155 "added":aggregate[author][x]['added'],
156 "changed":aggregate[author][x]['changed'],
157 "removed":aggregate[author][x]['removed'],
158 } for x in aggregate[author]],
159 key=itemgetter('time'))
160
186
161 data[author] = {"label":author,
187 if not commits_by_day_author_aggregate:
162 "data":commit_data,
188 commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
163 "schema":["commits"]
164 }
165
166 if not data:
167 data[author_key_cleaner(repo.contact)] = {
168 "label":author_key_cleaner(repo.contact),
189 "label":author_key_cleaner(repo.contact),
169 "data":[0, 1],
190 "data":[0, 1],
170 "schema":["commits"],
191 "schema":["commits"],
171 }
192 }
172
193
173 return (ts_min_m, ts_max_y, json.dumps(data), json.dumps(overview_data))
194 stats = cur_stats if cur_stats else Statistics()
195 stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
196 stats.commit_activity_combined = json.dumps(overview_data)
197 stats.repository = dbrepo
198 stats.stat_on_revision = last_cs.revision
199 stats.languages = json.dumps({'_TOTAL_':0, '':0})
200
201 try:
202 sa.add(stats)
203 sa.commit()
204 except:
205 log.error(traceback.format_exc())
206 sa.rollback()
207 return False
208
209 return True
174
210
175 @task
211 @task
176 def reset_user_password(user_email):
212 def reset_user_password(user_email):
@@ -184,6 +220,7 def reset_user_password(user_email):
184 user = sa.query(User).filter(User.email == user_email).scalar()
220 user = sa.query(User).filter(User.email == user_email).scalar()
185 new_passwd = auth.PasswordGenerator().gen_password(8,
221 new_passwd = auth.PasswordGenerator().gen_password(8,
186 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
222 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
223 if user:
187 user.password = auth.get_crypt_password(new_passwd)
224 user.password = auth.get_crypt_password(new_passwd)
188 sa.add(user)
225 sa.add(user)
189 sa.commit()
226 sa.commit()
@@ -120,6 +120,15 class UserToPerm(Base):
120 user = relation('User')
120 user = relation('User')
121 permission = relation('Permission')
121 permission = relation('Permission')
122
122
123
123 class Statistics(Base):
124 __tablename__ = 'statistics'
125 __table_args__ = (UniqueConstraint('repository_id'), {'useexisting':True})
126 stat_id = Column("stat_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
127 repository_id = Column("repository_id", INTEGER(), ForeignKey(u'repositories.repo_id'), nullable=False, unique=True, default=None)
128 stat_on_revision = Column("stat_on_revision", INTEGER(), nullable=False)
129 commit_activity = Column("commit_activity", BLOB(), nullable=False)#JSON data
130 commit_activity_combined = Column("commit_activity_combined", BLOB(), nullable=False)#JSON data
131 languages = Column("languages", BLOB(), nullable=False)#JSON data
124
132
133 repository = relation('Repository')
125
134
@@ -123,7 +123,7 E.onDOMReady(function(e){
123 <div class="box box-right" style="min-height:455px">
123 <div class="box box-right" style="min-height:455px">
124 <!-- box / title -->
124 <!-- box / title -->
125 <div class="title">
125 <div class="title">
126 <h5>${_('Commit activity')}</h5>
126 <h5>${_('Commit activity by day / author')}</h5>
127 </div>
127 </div>
128
128
129 <div class="table">
129 <div class="table">
General Comments 0
You need to be logged in to leave comments. Login now