##// END OF EJS Templates
starting celery branch
marcink -
r467:3fc3ce53 celery
parent child Browse files
Show More
@@ -0,0 +1,32 b''
1 # List of modules to import when celery starts.
2 import sys
3 import os
4 sys.path.append(os.getcwd())
5 CELERY_IMPORTS = ("pylons_app.lib.celerylib.tasks", )
6
7 ## Result store settings.
8 CELERY_RESULT_BACKEND = "database"
9 CELERY_RESULT_DBURI = "sqlite:///hg_app.db"
10
11
12 ## Broker settings.
13 BROKER_HOST = "localhost"
14 BROKER_PORT = 5672
15 BROKER_VHOST = "rabbitmqhost"
16 BROKER_USER = "rabbitmq"
17 BROKER_PASSWORD = "qweqwe"
18
19 ## Worker settings
20 ## If you're doing mostly I/O you can have more processes,
21 ## but if mostly spending CPU, try to keep it close to the
22 ## number of CPUs on your machine. If not set, the number of CPUs/cores
23 ## available will be used.
24 CELERYD_CONCURRENCY = 2
25 # CELERYD_LOG_FILE = "celeryd.log"
26 CELERYD_LOG_LEVEL = "DEBUG"
27 CELERYD_MAX_TASKS_PER_CHILD = 1
28
29 #CELERY_ALWAYS_EAGER = True
30 #rabbitmqctl add_user rabbitmq qweqwe
31 #rabbitmqctl add_vhost rabbitmqhost
32 #rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*" No newline at end of file
@@ -0,0 +1,24 b''
1 from vcs.utils.lazy import LazyProperty
2 import logging
3
4 log = logging.getLogger(__name__)
5
6 class ResultWrapper(object):
7 def __init__(self, task):
8 self.task = task
9
10 @LazyProperty
11 def result(self):
12 return self.task
13
14 def run_task(task,async,*args,**kwargs):
15 try:
16 t = task.delay(*args,**kwargs)
17 log.info('running task %s',t.task_id)
18 if not async:
19 t.wait()
20 return t
21 except:
22 #pure sync version
23 return ResultWrapper(task(*args,**kwargs))
24 No newline at end of file
@@ -0,0 +1,92 b''
1 from celery.decorators import task
2 from datetime import datetime, timedelta
3 from pylons_app.lib.helpers import person
4 from pylons_app.lib.utils import OrderedDict
5 from time import mktime
6 import calendar
7 import logging
8 from vcs.backends.hg import MercurialRepository
9
10 log = logging.getLogger(__name__)
11
12 @task()
13 def whoosh_index(repo_location,full_index):
14 from pylons_app.lib.indexers import DaemonLock
15 from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon,LockHeld
16 try:
17 l = DaemonLock()
18 WhooshIndexingDaemon(repo_location=repo_location)\
19 .run(full_index=full_index)
20 l.release()
21 return 'Done'
22 except LockHeld:
23 log.info('LockHeld')
24 return 'LockHeld'
25
26 @task()
27 def get_commits_stats(repo):
28 aggregate = OrderedDict()
29 repo = MercurialRepository('/home/marcink/hg_repos/'+repo)
30 #graph range
31 td = datetime.today() + timedelta(days=1)
32 y, m, d = td.year, td.month, td.day
33 ts_min = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
34 d, 0, 0, 0, 0, 0, 0,))
35 ts_max = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
36
37 def author_key_cleaner(k):
38 k = person(k)
39 k = k.replace('"', "'") #for js data compatibilty
40 return k
41
42 for cs in repo[:200]:#added limit 200 until fix #29 is made
43 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
44 cs.date.timetuple()[2])
45 timetupple = [int(x) for x in k.split('-')]
46 timetupple.extend([0 for _ in xrange(6)])
47 k = mktime(timetupple)
48 if aggregate.has_key(author_key_cleaner(cs.author)):
49 if aggregate[author_key_cleaner(cs.author)].has_key(k):
50 aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1
51 aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added)
52 aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed)
53 aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed)
54
55 else:
56 #aggregate[author_key_cleaner(cs.author)].update(dates_range)
57 if k >= ts_min and k <= ts_max:
58 aggregate[author_key_cleaner(cs.author)][k] = {}
59 aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
60 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
61 aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
62 aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)
63
64 else:
65 if k >= ts_min and k <= ts_max:
66 aggregate[author_key_cleaner(cs.author)] = OrderedDict()
67 #aggregate[author_key_cleaner(cs.author)].update(dates_range)
68 aggregate[author_key_cleaner(cs.author)][k] = {}
69 aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
70 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
71 aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
72 aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)
73
74 d = ''
75 tmpl0 = u""""%s":%s"""
76 tmpl1 = u"""{label:"%s",data:%s,schema:["commits"]},"""
77 for author in aggregate:
78
79 d += tmpl0 % (author,
80 tmpl1 \
81 % (author,
82 [{"time":x,
83 "commits":aggregate[author][x]['commits'],
84 "added":aggregate[author][x]['added'],
85 "changed":aggregate[author][x]['changed'],
86 "removed":aggregate[author][x]['removed'],
87 } for x in aggregate[author]]))
88 if d == '':
89 d = '"%s":{label:"%s",data:[[0,1],]}' \
90 % (author_key_cleaner(repo.contact),
91 author_key_cleaner(repo.contact))
92 return (ts_min, ts_max, d)
@@ -38,6 +38,7 b' from pylons_app.model.forms import UserF'
38 ApplicationUiSettingsForm
38 ApplicationUiSettingsForm
39 from pylons_app.model.hg_model import HgModel
39 from pylons_app.model.hg_model import HgModel
40 from pylons_app.model.user_model import UserModel
40 from pylons_app.model.user_model import UserModel
41 from pylons_app.lib.celerylib import tasks,run_task
41 import formencode
42 import formencode
42 import logging
43 import logging
43 import traceback
44 import traceback
@@ -102,6 +103,12 b' class SettingsController(BaseController)'
102 invalidate_cache('cached_repo_list')
103 invalidate_cache('cached_repo_list')
103 h.flash(_('Repositories sucessfully rescanned'), category='success')
104 h.flash(_('Repositories sucessfully rescanned'), category='success')
104
105
106 if setting_id == 'whoosh':
107 repo_location = get_hg_ui_settings()['paths_root_path']
108 full_index = request.POST.get('full_index',False)
109 task = run_task(tasks.whoosh_index,True,repo_location,full_index)
110
111 h.flash(_('Whoosh reindex task scheduled'), category='success')
105 if setting_id == 'global':
112 if setting_id == 'global':
106
113
107 application_form = ApplicationSettingsForm()()
114 application_form = ApplicationSettingsForm()()
@@ -22,16 +22,14 b' Created on April 18, 2010'
22 summary controller for pylons
22 summary controller for pylons
23 @author: marcink
23 @author: marcink
24 """
24 """
25 from datetime import datetime, timedelta
25 from pylons import tmpl_context as c, request,url
26 from pylons import tmpl_context as c, request
27 from pylons_app.lib.auth import LoginRequired, HasRepoPermissionAnyDecorator
26 from pylons_app.lib.auth import LoginRequired, HasRepoPermissionAnyDecorator
28 from pylons_app.lib.base import BaseController, render
27 from pylons_app.lib.base import BaseController, render
29 from pylons_app.lib.helpers import person
30 from pylons_app.lib.utils import OrderedDict
28 from pylons_app.lib.utils import OrderedDict
31 from pylons_app.model.hg_model import HgModel
29 from pylons_app.model.hg_model import HgModel
32 from time import mktime
33 from webhelpers.paginate import Page
30 from webhelpers.paginate import Page
34 import calendar
31 from pylons_app.lib.celerylib import run_task
32 from pylons_app.lib.celerylib.tasks import get_commits_stats
35 import logging
33 import logging
36
34
37 log = logging.getLogger(__name__)
35 log = logging.getLogger(__name__)
@@ -62,78 +60,11 b' class SummaryController(BaseController):'
62 c.repo_branches = OrderedDict()
60 c.repo_branches = OrderedDict()
63 for name, hash in c.repo_info.branches.items()[:10]:
61 for name, hash in c.repo_info.branches.items()[:10]:
64 c.repo_branches[name] = c.repo_info.get_changeset(hash)
62 c.repo_branches[name] = c.repo_info.get_changeset(hash)
65
63
66 c.commit_data = self.__get_commit_stats(c.repo_info)
64 task = run_task(get_commits_stats,False,c.repo_info.name)
65 c.ts_min = task.result[0]
66 c.ts_max = task.result[1]
67 c.commit_data = task.result[2]
67
68
68 return render('summary/summary.html')
69 return render('summary/summary.html')
69
70
70
71
72 def __get_commit_stats(self, repo):
73 aggregate = OrderedDict()
74
75 #graph range
76 td = datetime.today() + timedelta(days=1)
77 y, m, d = td.year, td.month, td.day
78 c.ts_min = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
79 d, 0, 0, 0, 0, 0, 0,))
80 c.ts_max = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
81
82 def author_key_cleaner(k):
83 k = person(k)
84 k = k.replace('"', "'") #for js data compatibilty
85 return k
86
87 for cs in repo[:200]:#added limit 200 until fix #29 is made
88 k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
89 cs.date.timetuple()[2])
90 timetupple = [int(x) for x in k.split('-')]
91 timetupple.extend([0 for _ in xrange(6)])
92 k = mktime(timetupple)
93 if aggregate.has_key(author_key_cleaner(cs.author)):
94 if aggregate[author_key_cleaner(cs.author)].has_key(k):
95 aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1
96 aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added)
97 aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed)
98 aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed)
99
100 else:
101 #aggregate[author_key_cleaner(cs.author)].update(dates_range)
102 if k >= c.ts_min and k <= c.ts_max:
103 aggregate[author_key_cleaner(cs.author)][k] = {}
104 aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
105 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
106 aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
107 aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)
108
109 else:
110 if k >= c.ts_min and k <= c.ts_max:
111 aggregate[author_key_cleaner(cs.author)] = OrderedDict()
112 #aggregate[author_key_cleaner(cs.author)].update(dates_range)
113 aggregate[author_key_cleaner(cs.author)][k] = {}
114 aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
115 aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
116 aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
117 aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)
118
119 d = ''
120 tmpl0 = u""""%s":%s"""
121 tmpl1 = u"""{label:"%s",data:%s,schema:["commits"]},"""
122 for author in aggregate:
123
124 d += tmpl0 % (author,
125 tmpl1 \
126 % (author,
127 [{"time":x,
128 "commits":aggregate[author][x]['commits'],
129 "added":aggregate[author][x]['added'],
130 "changed":aggregate[author][x]['changed'],
131 "removed":aggregate[author][x]['removed'],
132 } for x in aggregate[author]]))
133 if d == '':
134 d = '"%s":{label:"%s",data:[[0,1],]}' \
135 % (author_key_cleaner(repo.contact),
136 author_key_cleaner(repo.contact))
137 return d
138
139
@@ -47,7 +47,32 b''
47 </div>
47 </div>
48 </div>
48 </div>
49 ${h.end_form()}
49 ${h.end_form()}
50
50
51 <h3>${_('Whoosh indexing')}</h3>
52 ${h.form(url('admin_setting', setting_id='whoosh'),method='put')}
53 <div class="form">
54 <!-- fields -->
55
56 <div class="fields">
57 <div class="field">
58 <div class="label label-checkbox">
59 <label for="destroy">${_('index build option')}:</label>
60 </div>
61 <div class="checkboxes">
62 <div class="checkbox">
63 ${h.checkbox('full_index',True)}
64 <label for="checkbox-1">${_('build from scratch')}</label>
65 </div>
66 </div>
67 </div>
68
69 <div class="buttons">
70 ${h.submit('reindex','reindex',class_="ui-button ui-widget ui-state-default ui-corner-all")}
71 </div>
72 </div>
73 </div>
74 ${h.end_form()}
75
51 <h3>${_('Global application settings')}</h3>
76 <h3>${_('Global application settings')}</h3>
52 ${h.form(url('admin_setting', setting_id='global'),method='put')}
77 ${h.form(url('admin_setting', setting_id='global'),method='put')}
53 <div class="form">
78 <div class="form">
General Comments 0
You need to be logged in to leave comments. Login now