##// END OF EJS Templates
TurboGears: drop workaround for < 2.4...
Mads Kiilerich -
r8599:1b683a4e default
parent child Browse files
Show More
@@ -1,133 +1,131 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 # This program is free software: you can redistribute it and/or modify
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
5 # (at your option) any later version.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU General Public License
12 # You should have received a copy of the GNU General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 """
14 """
15 kallithea.lib.celerylib
15 kallithea.lib.celerylib
16 ~~~~~~~~~~~~~~~~~~~~~~~
16 ~~~~~~~~~~~~~~~~~~~~~~~
17
17
18 celery libs for Kallithea
18 celery libs for Kallithea
19
19
20 This file was forked by the Kallithea project in July 2014.
20 This file was forked by the Kallithea project in July 2014.
21 Original author and date, and relevant copyright and licensing information is below:
21 Original author and date, and relevant copyright and licensing information is below:
22 :created_on: Nov 27, 2010
22 :created_on: Nov 27, 2010
23 :author: marcink
23 :author: marcink
24 :copyright: (c) 2013 RhodeCode GmbH, and others.
24 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 :license: GPLv3, see LICENSE.md for more details.
25 :license: GPLv3, see LICENSE.md for more details.
26 """
26 """
27
27
28
28
29 import logging
29 import logging
30 import os
30 import os
31 from hashlib import sha1
31 from hashlib import sha1
32
32
33 from decorator import decorator
33 from decorator import decorator
34 from tg import config
34 from tg import config
35
35
36 import kallithea
36 import kallithea
37 from kallithea.lib.pidlock import DaemonLock, LockHeld
37 from kallithea.lib.pidlock import DaemonLock, LockHeld
38 from kallithea.lib.utils2 import safe_bytes
38 from kallithea.lib.utils2 import safe_bytes
39 from kallithea.model import meta
39 from kallithea.model import meta
40
40
41
41
42 log = logging.getLogger(__name__)
42 log = logging.getLogger(__name__)
43
43
44
44
45 class FakeTask(object):
45 class FakeTask(object):
46 """Fake a sync result to make it look like a finished task"""
46 """Fake a sync result to make it look like a finished task"""
47
47
48 def __init__(self, result):
48 def __init__(self, result):
49 self.result = result
49 self.result = result
50
50
51 def failed(self):
51 def failed(self):
52 return False
52 return False
53
53
54 traceback = None # if failed
54 traceback = None # if failed
55
55
56 task_id = None
56 task_id = None
57
57
58
58
59 def task(f_org):
59 def task(f_org):
60 """Wrapper of celery.task.task, running async if CELERY_APP
60 """Wrapper of celery.task.task, running async if CELERY_APP
61 """
61 """
62
62
63 if kallithea.CELERY_APP:
63 if kallithea.CELERY_APP:
64 def f_async(*args, **kwargs):
64 def f_async(*args, **kwargs):
65 log.info('executing %s task', f_org.__name__)
65 log.info('executing %s task', f_org.__name__)
66 try:
66 try:
67 f_org(*args, **kwargs)
67 f_org(*args, **kwargs)
68 finally:
68 finally:
69 log.info('executed %s task', f_org.__name__)
69 log.info('executed %s task', f_org.__name__)
70 f_async.__name__ = f_org.__name__
70 f_async.__name__ = f_org.__name__
71 runner = kallithea.CELERY_APP.task(ignore_result=True)(f_async)
71 runner = kallithea.CELERY_APP.task(ignore_result=True)(f_async)
72
72
73 def f_wrapped(*args, **kwargs):
73 def f_wrapped(*args, **kwargs):
74 t = runner.apply_async(args=args, kwargs=kwargs)
74 t = runner.apply_async(args=args, kwargs=kwargs)
75 log.info('executing task %s in async mode - id %s', f_org, t.task_id)
75 log.info('executing task %s in async mode - id %s', f_org, t.task_id)
76 return t
76 return t
77 else:
77 else:
78 def f_wrapped(*args, **kwargs):
78 def f_wrapped(*args, **kwargs):
79 log.info('executing task %s in sync', f_org.__name__)
79 log.info('executing task %s in sync', f_org.__name__)
80 try:
80 try:
81 result = f_org(*args, **kwargs)
81 result = f_org(*args, **kwargs)
82 except Exception as e:
82 except Exception as e:
83 log.error('exception executing sync task %s in sync: %r', f_org.__name__, e)
83 log.error('exception executing sync task %s in sync: %r', f_org.__name__, e)
84 raise # TODO: return this in FakeTask as with async tasks?
84 raise # TODO: return this in FakeTask as with async tasks?
85 return FakeTask(result)
85 return FakeTask(result)
86
86
87 return f_wrapped
87 return f_wrapped
88
88
89
89
90 def __get_lockkey(func, *fargs, **fkwargs):
90 def __get_lockkey(func, *fargs, **fkwargs):
91 params = list(fargs)
91 params = list(fargs)
92 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
92 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
93
93
94 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
94 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
95
95
96 lockkey = 'task_%s.lock' % \
96 lockkey = 'task_%s.lock' % \
97 sha1(safe_bytes(func_name + '-' + '-'.join(str(x) for x in params))).hexdigest()
97 sha1(safe_bytes(func_name + '-' + '-'.join(str(x) for x in params))).hexdigest()
98 return lockkey
98 return lockkey
99
99
100
100
101 def locked_task(func):
101 def locked_task(func):
102 def __wrapper(func, *fargs, **fkwargs):
102 def __wrapper(func, *fargs, **fkwargs):
103 lockkey = __get_lockkey(func, *fargs, **fkwargs)
103 lockkey = __get_lockkey(func, *fargs, **fkwargs)
104 lockkey_path = config.get('cache_dir') or config['app_conf']['cache_dir'] # Backward compatibility for TurboGears < 2.4
105
106 log.info('running task with lockkey %s', lockkey)
104 log.info('running task with lockkey %s', lockkey)
107 try:
105 try:
108 l = DaemonLock(os.path.join(lockkey_path, lockkey))
106 l = DaemonLock(os.path.join(config['cache_dir'], lockkey))
109 ret = func(*fargs, **fkwargs)
107 ret = func(*fargs, **fkwargs)
110 l.release()
108 l.release()
111 return ret
109 return ret
112 except LockHeld:
110 except LockHeld:
113 log.info('LockHeld')
111 log.info('LockHeld')
114 return 'Task with key %s already running' % lockkey
112 return 'Task with key %s already running' % lockkey
115
113
116 return decorator(__wrapper, func)
114 return decorator(__wrapper, func)
117
115
118
116
119 def get_session():
117 def get_session():
120 sa = meta.Session()
118 sa = meta.Session()
121 return sa
119 return sa
122
120
123
121
124 def dbsession(func):
122 def dbsession(func):
125 def __wrapper(func, *fargs, **fkwargs):
123 def __wrapper(func, *fargs, **fkwargs):
126 try:
124 try:
127 ret = func(*fargs, **fkwargs)
125 ret = func(*fargs, **fkwargs)
128 return ret
126 return ret
129 finally:
127 finally:
130 if kallithea.CELERY_APP and not kallithea.CELERY_APP.conf.task_always_eager:
128 if kallithea.CELERY_APP and not kallithea.CELERY_APP.conf.task_always_eager:
131 meta.Session.remove()
129 meta.Session.remove()
132
130
133 return decorator(__wrapper, func)
131 return decorator(__wrapper, func)
@@ -1,527 +1,524 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 # This program is free software: you can redistribute it and/or modify
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
5 # (at your option) any later version.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU General Public License
12 # You should have received a copy of the GNU General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 """
14 """
15 kallithea.model.async_tasks
15 kallithea.model.async_tasks
16 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
16 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
17
17
18 Kallithea task modules, containing all task that suppose to be run
18 Kallithea task modules, containing all task that suppose to be run
19 by celery daemon
19 by celery daemon
20
20
21 This file was forked by the Kallithea project in July 2014.
21 This file was forked by the Kallithea project in July 2014.
22 Original author and date, and relevant copyright and licensing information is below:
22 Original author and date, and relevant copyright and licensing information is below:
23 :created_on: Oct 6, 2010
23 :created_on: Oct 6, 2010
24 :author: marcink
24 :author: marcink
25 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 :copyright: (c) 2013 RhodeCode GmbH, and others.
26 :license: GPLv3, see LICENSE.md for more details.
26 :license: GPLv3, see LICENSE.md for more details.
27 """
27 """
28
28
29 import email.message
29 import email.message
30 import email.utils
30 import email.utils
31 import os
31 import os
32 import smtplib
32 import smtplib
33 import time
33 import time
34 import traceback
34 import traceback
35 from collections import OrderedDict
35 from collections import OrderedDict
36 from operator import itemgetter
36 from operator import itemgetter
37 from time import mktime
37 from time import mktime
38
38
39 import celery.utils.log
39 import celery.utils.log
40 from tg import config
40 from tg import config
41
41
42 import kallithea
42 import kallithea
43 from kallithea.lib import celerylib, conf, ext_json, hooks
43 from kallithea.lib import celerylib, conf, ext_json, hooks
44 from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
44 from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
45 from kallithea.lib.utils2 import asbool, ascii_bytes
45 from kallithea.lib.utils2 import asbool, ascii_bytes
46 from kallithea.lib.vcs.utils import author_email, author_name
46 from kallithea.lib.vcs.utils import author_email, author_name
47 from kallithea.model import db, repo, userlog
47 from kallithea.model import db, repo, userlog
48
48
49
49
50 __all__ = ['whoosh_index', 'get_commits_stats', 'send_email']
50 __all__ = ['whoosh_index', 'get_commits_stats', 'send_email']
51
51
52
52
53 log = celery.utils.log.get_task_logger(__name__)
53 log = celery.utils.log.get_task_logger(__name__)
54
54
55
55
56 @celerylib.task
56 @celerylib.task
57 @celerylib.locked_task
57 @celerylib.locked_task
58 @celerylib.dbsession
58 @celerylib.dbsession
59 def whoosh_index(repo_location, full_index):
59 def whoosh_index(repo_location, full_index):
60 celerylib.get_session() # initialize database connection
60 celerylib.get_session() # initialize database connection
61
61
62 index_location = config['index_dir']
62 index_location = config['index_dir']
63 WhooshIndexingDaemon(index_location=index_location,
63 WhooshIndexingDaemon(index_location=index_location,
64 repo_location=repo_location) \
64 repo_location=repo_location) \
65 .run(full_index=full_index)
65 .run(full_index=full_index)
66
66
67
67
68 def _author_username(author):
68 def _author_username(author):
69 """Return the username of the user identified by the email part of the 'author' string,
69 """Return the username of the user identified by the email part of the 'author' string,
70 default to the name or email.
70 default to the name or email.
71 Kind of similar to h.person() ."""
71 Kind of similar to h.person() ."""
72 email = author_email(author)
72 email = author_email(author)
73 if email:
73 if email:
74 user = db.User.get_by_email(email)
74 user = db.User.get_by_email(email)
75 if user is not None:
75 if user is not None:
76 return user.username
76 return user.username
77 # Still nothing? Just pass back the author name if any, else the email
77 # Still nothing? Just pass back the author name if any, else the email
78 return author_name(author) or email
78 return author_name(author) or email
79
79
80
80
81 @celerylib.task
81 @celerylib.task
82 @celerylib.dbsession
82 @celerylib.dbsession
83 def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100):
83 def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100):
84 DBS = celerylib.get_session()
84 DBS = celerylib.get_session()
85 lockkey = celerylib.__get_lockkey('get_commits_stats', repo_name, ts_min_y,
85 lockkey = celerylib.__get_lockkey('get_commits_stats', repo_name, ts_min_y,
86 ts_max_y)
86 ts_max_y)
87 lockkey_path = config.get('cache_dir') or config['app_conf']['cache_dir'] # Backward compatibility for TurboGears < 2.4
88
89 log.info('running task with lockkey %s', lockkey)
87 log.info('running task with lockkey %s', lockkey)
90
91 try:
88 try:
92 lock = celerylib.DaemonLock(os.path.join(lockkey_path, lockkey))
89 lock = celerylib.DaemonLock(os.path.join(config['cache_dir'], lockkey))
93
90
94 co_day_auth_aggr = {}
91 co_day_auth_aggr = {}
95 commits_by_day_aggregate = {}
92 commits_by_day_aggregate = {}
96 db_repo = db.Repository.get_by_repo_name(repo_name)
93 db_repo = db.Repository.get_by_repo_name(repo_name)
97 if db_repo is None:
94 if db_repo is None:
98 return True
95 return True
99
96
100 scm_repo = db_repo.scm_instance
97 scm_repo = db_repo.scm_instance
101 repo_size = scm_repo.count()
98 repo_size = scm_repo.count()
102 # return if repo have no revisions
99 # return if repo have no revisions
103 if repo_size < 1:
100 if repo_size < 1:
104 lock.release()
101 lock.release()
105 return True
102 return True
106
103
107 skip_date_limit = True
104 skip_date_limit = True
108 parse_limit = int(config.get('commit_parse_limit'))
105 parse_limit = int(config.get('commit_parse_limit'))
109 last_rev = None
106 last_rev = None
110 last_cs = None
107 last_cs = None
111 timegetter = itemgetter('time')
108 timegetter = itemgetter('time')
112
109
113 dbrepo = DBS.query(db.Repository) \
110 dbrepo = DBS.query(db.Repository) \
114 .filter(db.Repository.repo_name == repo_name).scalar()
111 .filter(db.Repository.repo_name == repo_name).scalar()
115 cur_stats = DBS.query(db.Statistics) \
112 cur_stats = DBS.query(db.Statistics) \
116 .filter(db.Statistics.repository == dbrepo).scalar()
113 .filter(db.Statistics.repository == dbrepo).scalar()
117
114
118 if cur_stats is not None:
115 if cur_stats is not None:
119 last_rev = cur_stats.stat_on_revision
116 last_rev = cur_stats.stat_on_revision
120
117
121 if last_rev == scm_repo.get_changeset().revision and repo_size > 1:
118 if last_rev == scm_repo.get_changeset().revision and repo_size > 1:
122 # pass silently without any work if we're not on first revision or
119 # pass silently without any work if we're not on first revision or
123 # current state of parsing revision(from db marker) is the
120 # current state of parsing revision(from db marker) is the
124 # last revision
121 # last revision
125 lock.release()
122 lock.release()
126 return True
123 return True
127
124
128 if cur_stats:
125 if cur_stats:
129 commits_by_day_aggregate = OrderedDict(ext_json.loads(
126 commits_by_day_aggregate = OrderedDict(ext_json.loads(
130 cur_stats.commit_activity_combined))
127 cur_stats.commit_activity_combined))
131 co_day_auth_aggr = ext_json.loads(cur_stats.commit_activity)
128 co_day_auth_aggr = ext_json.loads(cur_stats.commit_activity)
132
129
133 log.debug('starting parsing %s', parse_limit)
130 log.debug('starting parsing %s', parse_limit)
134
131
135 last_rev = last_rev + 1 if last_rev and last_rev >= 0 else 0
132 last_rev = last_rev + 1 if last_rev and last_rev >= 0 else 0
136 log.debug('Getting revisions from %s to %s',
133 log.debug('Getting revisions from %s to %s',
137 last_rev, last_rev + parse_limit
134 last_rev, last_rev + parse_limit
138 )
135 )
139 usernames_cache = {}
136 usernames_cache = {}
140 for cs in scm_repo[last_rev:last_rev + parse_limit]:
137 for cs in scm_repo[last_rev:last_rev + parse_limit]:
141 log.debug('parsing %s', cs)
138 log.debug('parsing %s', cs)
142 last_cs = cs # remember last parsed changeset
139 last_cs = cs # remember last parsed changeset
143 tt = cs.date.timetuple()
140 tt = cs.date.timetuple()
144 k = mktime(tt[:3] + (0, 0, 0, 0, 0, 0))
141 k = mktime(tt[:3] + (0, 0, 0, 0, 0, 0))
145
142
146 # get username from author - similar to what h.person does
143 # get username from author - similar to what h.person does
147 username = usernames_cache.get(cs.author)
144 username = usernames_cache.get(cs.author)
148 if username is None:
145 if username is None:
149 username = _author_username(cs.author)
146 username = _author_username(cs.author)
150 usernames_cache[cs.author] = username
147 usernames_cache[cs.author] = username
151
148
152 if username in co_day_auth_aggr:
149 if username in co_day_auth_aggr:
153 try:
150 try:
154 l = [timegetter(x) for x in
151 l = [timegetter(x) for x in
155 co_day_auth_aggr[username]['data']]
152 co_day_auth_aggr[username]['data']]
156 time_pos = l.index(k)
153 time_pos = l.index(k)
157 except ValueError:
154 except ValueError:
158 time_pos = None
155 time_pos = None
159
156
160 if time_pos is not None and time_pos >= 0:
157 if time_pos is not None and time_pos >= 0:
161 datadict = \
158 datadict = \
162 co_day_auth_aggr[username]['data'][time_pos]
159 co_day_auth_aggr[username]['data'][time_pos]
163
160
164 datadict["commits"] += 1
161 datadict["commits"] += 1
165 datadict["added"] += len(cs.added)
162 datadict["added"] += len(cs.added)
166 datadict["changed"] += len(cs.changed)
163 datadict["changed"] += len(cs.changed)
167 datadict["removed"] += len(cs.removed)
164 datadict["removed"] += len(cs.removed)
168
165
169 else:
166 else:
170 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
167 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
171
168
172 datadict = {"time": k,
169 datadict = {"time": k,
173 "commits": 1,
170 "commits": 1,
174 "added": len(cs.added),
171 "added": len(cs.added),
175 "changed": len(cs.changed),
172 "changed": len(cs.changed),
176 "removed": len(cs.removed),
173 "removed": len(cs.removed),
177 }
174 }
178 co_day_auth_aggr[username]['data'] \
175 co_day_auth_aggr[username]['data'] \
179 .append(datadict)
176 .append(datadict)
180
177
181 else:
178 else:
182 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
179 if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
183 co_day_auth_aggr[username] = {
180 co_day_auth_aggr[username] = {
184 "label": username,
181 "label": username,
185 "data": [{"time": k,
182 "data": [{"time": k,
186 "commits": 1,
183 "commits": 1,
187 "added": len(cs.added),
184 "added": len(cs.added),
188 "changed": len(cs.changed),
185 "changed": len(cs.changed),
189 "removed": len(cs.removed),
186 "removed": len(cs.removed),
190 }],
187 }],
191 "schema": ["commits"],
188 "schema": ["commits"],
192 }
189 }
193
190
194 # gather all data by day
191 # gather all data by day
195 if k in commits_by_day_aggregate:
192 if k in commits_by_day_aggregate:
196 commits_by_day_aggregate[k] += 1
193 commits_by_day_aggregate[k] += 1
197 else:
194 else:
198 commits_by_day_aggregate[k] = 1
195 commits_by_day_aggregate[k] = 1
199
196
200 overview_data = sorted(commits_by_day_aggregate.items(),
197 overview_data = sorted(commits_by_day_aggregate.items(),
201 key=itemgetter(0))
198 key=itemgetter(0))
202
199
203 stats = cur_stats if cur_stats else db.Statistics()
200 stats = cur_stats if cur_stats else db.Statistics()
204 stats.commit_activity = ascii_bytes(ext_json.dumps(co_day_auth_aggr))
201 stats.commit_activity = ascii_bytes(ext_json.dumps(co_day_auth_aggr))
205 stats.commit_activity_combined = ascii_bytes(ext_json.dumps(overview_data))
202 stats.commit_activity_combined = ascii_bytes(ext_json.dumps(overview_data))
206
203
207 log.debug('last revision %s', last_rev)
204 log.debug('last revision %s', last_rev)
208 leftovers = len(scm_repo.revisions[last_rev:])
205 leftovers = len(scm_repo.revisions[last_rev:])
209 log.debug('revisions to parse %s', leftovers)
206 log.debug('revisions to parse %s', leftovers)
210
207
211 if last_rev == 0 or leftovers < parse_limit:
208 if last_rev == 0 or leftovers < parse_limit:
212 log.debug('getting code trending stats')
209 log.debug('getting code trending stats')
213 stats.languages = ascii_bytes(ext_json.dumps(__get_codes_stats(repo_name)))
210 stats.languages = ascii_bytes(ext_json.dumps(__get_codes_stats(repo_name)))
214
211
215 try:
212 try:
216 stats.repository = dbrepo
213 stats.repository = dbrepo
217 stats.stat_on_revision = last_cs.revision if last_cs else 0
214 stats.stat_on_revision = last_cs.revision if last_cs else 0
218 DBS.add(stats)
215 DBS.add(stats)
219 DBS.commit()
216 DBS.commit()
220 except:
217 except:
221 log.error(traceback.format_exc())
218 log.error(traceback.format_exc())
222 DBS.rollback()
219 DBS.rollback()
223 lock.release()
220 lock.release()
224 return False
221 return False
225
222
226 # final release
223 # final release
227 lock.release()
224 lock.release()
228
225
229 # execute another task if celery is enabled
226 # execute another task if celery is enabled
230 if len(scm_repo.revisions) > 1 and kallithea.CELERY_APP and recurse_limit > 0:
227 if len(scm_repo.revisions) > 1 and kallithea.CELERY_APP and recurse_limit > 0:
231 get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit - 1)
228 get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit - 1)
232 elif recurse_limit <= 0:
229 elif recurse_limit <= 0:
233 log.debug('Not recursing - limit has been reached')
230 log.debug('Not recursing - limit has been reached')
234 else:
231 else:
235 log.debug('Not recursing')
232 log.debug('Not recursing')
236 except celerylib.LockHeld:
233 except celerylib.LockHeld:
237 log.info('Task with key %s already running', lockkey)
234 log.info('Task with key %s already running', lockkey)
238 return 'Task with key %s already running' % lockkey
235 return 'Task with key %s already running' % lockkey
239
236
240
237
241 @celerylib.task
238 @celerylib.task
242 @celerylib.dbsession
239 @celerylib.dbsession
243 def send_email(recipients, subject, body='', html_body='', headers=None, from_name=None):
240 def send_email(recipients, subject, body='', html_body='', headers=None, from_name=None):
244 """
241 """
245 Sends an email with defined parameters from the .ini files.
242 Sends an email with defined parameters from the .ini files.
246
243
247 :param recipients: list of recipients, if this is None, the defined email
244 :param recipients: list of recipients, if this is None, the defined email
248 address from field 'email_to' and all admins is used instead
245 address from field 'email_to' and all admins is used instead
249 :param subject: subject of the mail
246 :param subject: subject of the mail
250 :param body: plain text body of the mail
247 :param body: plain text body of the mail
251 :param html_body: html version of body
248 :param html_body: html version of body
252 :param headers: dictionary of prepopulated e-mail headers
249 :param headers: dictionary of prepopulated e-mail headers
253 :param from_name: full name to be used as sender of this mail - often a
250 :param from_name: full name to be used as sender of this mail - often a
254 .full_name_or_username value
251 .full_name_or_username value
255 """
252 """
256 assert isinstance(recipients, list), recipients
253 assert isinstance(recipients, list), recipients
257 if headers is None:
254 if headers is None:
258 headers = {}
255 headers = {}
259 else:
256 else:
260 # do not modify the original headers object passed by the caller
257 # do not modify the original headers object passed by the caller
261 headers = headers.copy()
258 headers = headers.copy()
262
259
263 email_config = config
260 email_config = config
264 email_prefix = email_config.get('email_prefix', '')
261 email_prefix = email_config.get('email_prefix', '')
265 if email_prefix:
262 if email_prefix:
266 subject = "%s %s" % (email_prefix, subject)
263 subject = "%s %s" % (email_prefix, subject)
267
264
268 if not recipients:
265 if not recipients:
269 # if recipients are not defined we send to email_config + all admins
266 # if recipients are not defined we send to email_config + all admins
270 recipients = [u.email for u in db.User.query()
267 recipients = [u.email for u in db.User.query()
271 .filter(db.User.admin == True).all()]
268 .filter(db.User.admin == True).all()]
272 if email_config.get('email_to') is not None:
269 if email_config.get('email_to') is not None:
273 recipients += email_config.get('email_to').split(',')
270 recipients += email_config.get('email_to').split(',')
274
271
275 # If there are still no recipients, there are no admins and no address
272 # If there are still no recipients, there are no admins and no address
276 # configured in email_to, so return.
273 # configured in email_to, so return.
277 if not recipients:
274 if not recipients:
278 log.error("No recipients specified and no fallback available.")
275 log.error("No recipients specified and no fallback available.")
279 return False
276 return False
280
277
281 log.warning("No recipients specified for '%s' - sending to admins %s", subject, ' '.join(recipients))
278 log.warning("No recipients specified for '%s' - sending to admins %s", subject, ' '.join(recipients))
282
279
283 # SMTP sender
280 # SMTP sender
284 app_email_from = email_config.get('app_email_from', 'Kallithea')
281 app_email_from = email_config.get('app_email_from', 'Kallithea')
285 # 'From' header
282 # 'From' header
286 if from_name is not None:
283 if from_name is not None:
287 # set From header based on from_name but with a generic e-mail address
284 # set From header based on from_name but with a generic e-mail address
288 # In case app_email_from is in "Some Name <e-mail>" format, we first
285 # In case app_email_from is in "Some Name <e-mail>" format, we first
289 # extract the e-mail address.
286 # extract the e-mail address.
290 envelope_addr = author_email(app_email_from)
287 envelope_addr = author_email(app_email_from)
291 headers['From'] = '"%s" <%s>' % (
288 headers['From'] = '"%s" <%s>' % (
292 email.utils.quote('%s (no-reply)' % from_name),
289 email.utils.quote('%s (no-reply)' % from_name),
293 envelope_addr)
290 envelope_addr)
294
291
295 smtp_server = email_config.get('smtp_server')
292 smtp_server = email_config.get('smtp_server')
296 smtp_port = email_config.get('smtp_port')
293 smtp_port = email_config.get('smtp_port')
297 smtp_use_tls = asbool(email_config.get('smtp_use_tls'))
294 smtp_use_tls = asbool(email_config.get('smtp_use_tls'))
298 smtp_use_ssl = asbool(email_config.get('smtp_use_ssl'))
295 smtp_use_ssl = asbool(email_config.get('smtp_use_ssl'))
299 smtp_auth = email_config.get('smtp_auth') # undocumented - overrule automatic choice of auth mechanism
296 smtp_auth = email_config.get('smtp_auth') # undocumented - overrule automatic choice of auth mechanism
300 smtp_username = email_config.get('smtp_username')
297 smtp_username = email_config.get('smtp_username')
301 smtp_password = email_config.get('smtp_password')
298 smtp_password = email_config.get('smtp_password')
302
299
303 logmsg = ("Mail details:\n"
300 logmsg = ("Mail details:\n"
304 "recipients: %s\n"
301 "recipients: %s\n"
305 "headers: %s\n"
302 "headers: %s\n"
306 "subject: %s\n"
303 "subject: %s\n"
307 "body:\n%s\n"
304 "body:\n%s\n"
308 "html:\n%s\n"
305 "html:\n%s\n"
309 % (' '.join(recipients), headers, subject, body, html_body))
306 % (' '.join(recipients), headers, subject, body, html_body))
310
307
311 if smtp_server:
308 if smtp_server:
312 log.debug("Sending e-mail. " + logmsg)
309 log.debug("Sending e-mail. " + logmsg)
313 else:
310 else:
314 log.error("SMTP mail server not configured - cannot send e-mail.")
311 log.error("SMTP mail server not configured - cannot send e-mail.")
315 log.warning(logmsg)
312 log.warning(logmsg)
316 return False
313 return False
317
314
318 msg = email.message.EmailMessage()
315 msg = email.message.EmailMessage()
319 msg['Subject'] = subject
316 msg['Subject'] = subject
320 msg['From'] = app_email_from # fallback - might be overridden by a header
317 msg['From'] = app_email_from # fallback - might be overridden by a header
321 msg['To'] = ', '.join(recipients)
318 msg['To'] = ', '.join(recipients)
322 msg['Date'] = email.utils.formatdate(time.time())
319 msg['Date'] = email.utils.formatdate(time.time())
323
320
324 for key, value in headers.items():
321 for key, value in headers.items():
325 del msg[key] # Delete key first to make sure add_header will replace header (if any), no matter the casing
322 del msg[key] # Delete key first to make sure add_header will replace header (if any), no matter the casing
326 msg.add_header(key, value)
323 msg.add_header(key, value)
327
324
328 msg.set_content(body)
325 msg.set_content(body)
329 msg.add_alternative(html_body, subtype='html')
326 msg.add_alternative(html_body, subtype='html')
330
327
331 try:
328 try:
332 if smtp_use_ssl:
329 if smtp_use_ssl:
333 smtp_serv = smtplib.SMTP_SSL(smtp_server, smtp_port)
330 smtp_serv = smtplib.SMTP_SSL(smtp_server, smtp_port)
334 else:
331 else:
335 smtp_serv = smtplib.SMTP(smtp_server, smtp_port)
332 smtp_serv = smtplib.SMTP(smtp_server, smtp_port)
336
333
337 if smtp_use_tls:
334 if smtp_use_tls:
338 smtp_serv.starttls()
335 smtp_serv.starttls()
339
336
340 if smtp_auth:
337 if smtp_auth:
341 smtp_serv.ehlo() # populate esmtp_features
338 smtp_serv.ehlo() # populate esmtp_features
342 smtp_serv.esmtp_features["auth"] = smtp_auth
339 smtp_serv.esmtp_features["auth"] = smtp_auth
343
340
344 if smtp_username and smtp_password is not None:
341 if smtp_username and smtp_password is not None:
345 smtp_serv.login(smtp_username, smtp_password)
342 smtp_serv.login(smtp_username, smtp_password)
346
343
347 smtp_serv.sendmail(app_email_from, recipients, msg.as_string())
344 smtp_serv.sendmail(app_email_from, recipients, msg.as_string())
348 smtp_serv.quit()
345 smtp_serv.quit()
349
346
350 log.info('Mail was sent to: %s' % recipients)
347 log.info('Mail was sent to: %s' % recipients)
351 except:
348 except:
352 log.error('Mail sending failed')
349 log.error('Mail sending failed')
353 log.error(traceback.format_exc())
350 log.error(traceback.format_exc())
354 return False
351 return False
355 return True
352 return True
356
353
357
354
358 @celerylib.task
355 @celerylib.task
359 @celerylib.dbsession
356 @celerylib.dbsession
360 def create_repo(form_data, cur_user):
357 def create_repo(form_data, cur_user):
361 DBS = celerylib.get_session()
358 DBS = celerylib.get_session()
362
359
363 cur_user = db.User.guess_instance(cur_user)
360 cur_user = db.User.guess_instance(cur_user)
364
361
365 owner = cur_user
362 owner = cur_user
366 repo_name = form_data['repo_name']
363 repo_name = form_data['repo_name']
367 repo_name_full = form_data['repo_name_full']
364 repo_name_full = form_data['repo_name_full']
368 repo_type = form_data['repo_type']
365 repo_type = form_data['repo_type']
369 description = form_data['repo_description']
366 description = form_data['repo_description']
370 private = form_data['repo_private']
367 private = form_data['repo_private']
371 clone_uri = form_data.get('clone_uri')
368 clone_uri = form_data.get('clone_uri')
372 repo_group = form_data['repo_group']
369 repo_group = form_data['repo_group']
373 landing_rev = form_data['repo_landing_rev']
370 landing_rev = form_data['repo_landing_rev']
374 copy_fork_permissions = form_data.get('copy_permissions')
371 copy_fork_permissions = form_data.get('copy_permissions')
375 copy_group_permissions = form_data.get('repo_copy_permissions')
372 copy_group_permissions = form_data.get('repo_copy_permissions')
376 fork_of = form_data.get('fork_parent_id')
373 fork_of = form_data.get('fork_parent_id')
377 state = form_data.get('repo_state', db.Repository.STATE_PENDING)
374 state = form_data.get('repo_state', db.Repository.STATE_PENDING)
378
375
379 # repo creation defaults, private and repo_type are filled in form
376 # repo creation defaults, private and repo_type are filled in form
380 defs = db.Setting.get_default_repo_settings(strip_prefix=True)
377 defs = db.Setting.get_default_repo_settings(strip_prefix=True)
381 enable_statistics = defs.get('repo_enable_statistics')
378 enable_statistics = defs.get('repo_enable_statistics')
382 enable_downloads = defs.get('repo_enable_downloads')
379 enable_downloads = defs.get('repo_enable_downloads')
383
380
384 try:
381 try:
385 db_repo = repo.RepoModel()._create_repo(
382 db_repo = repo.RepoModel()._create_repo(
386 repo_name=repo_name_full,
383 repo_name=repo_name_full,
387 repo_type=repo_type,
384 repo_type=repo_type,
388 description=description,
385 description=description,
389 owner=owner,
386 owner=owner,
390 private=private,
387 private=private,
391 clone_uri=clone_uri,
388 clone_uri=clone_uri,
392 repo_group=repo_group,
389 repo_group=repo_group,
393 landing_rev=landing_rev,
390 landing_rev=landing_rev,
394 fork_of=fork_of,
391 fork_of=fork_of,
395 copy_fork_permissions=copy_fork_permissions,
392 copy_fork_permissions=copy_fork_permissions,
396 copy_group_permissions=copy_group_permissions,
393 copy_group_permissions=copy_group_permissions,
397 enable_statistics=enable_statistics,
394 enable_statistics=enable_statistics,
398 enable_downloads=enable_downloads,
395 enable_downloads=enable_downloads,
399 state=state
396 state=state
400 )
397 )
401
398
402 userlog.action_logger(cur_user, 'user_created_repo',
399 userlog.action_logger(cur_user, 'user_created_repo',
403 form_data['repo_name_full'], '')
400 form_data['repo_name_full'], '')
404
401
405 DBS.commit()
402 DBS.commit()
406 # now create this repo on Filesystem
403 # now create this repo on Filesystem
407 repo.RepoModel()._create_filesystem_repo(
404 repo.RepoModel()._create_filesystem_repo(
408 repo_name=repo_name,
405 repo_name=repo_name,
409 repo_type=repo_type,
406 repo_type=repo_type,
410 repo_group=db.RepoGroup.guess_instance(repo_group),
407 repo_group=db.RepoGroup.guess_instance(repo_group),
411 clone_uri=clone_uri,
408 clone_uri=clone_uri,
412 )
409 )
413 db_repo = db.Repository.get_by_repo_name(repo_name_full)
410 db_repo = db.Repository.get_by_repo_name(repo_name_full)
414 hooks.log_create_repository(db_repo.get_dict(), created_by=owner.username)
411 hooks.log_create_repository(db_repo.get_dict(), created_by=owner.username)
415
412
416 # update repo changeset caches initially
413 # update repo changeset caches initially
417 db_repo.update_changeset_cache()
414 db_repo.update_changeset_cache()
418
415
419 # set new created state
416 # set new created state
420 db_repo.set_state(db.Repository.STATE_CREATED)
417 db_repo.set_state(db.Repository.STATE_CREATED)
421 DBS.commit()
418 DBS.commit()
422 except Exception as e:
419 except Exception as e:
423 log.warning('Exception %s occurred when forking repository, '
420 log.warning('Exception %s occurred when forking repository, '
424 'doing cleanup...' % e)
421 'doing cleanup...' % e)
425 # rollback things manually !
422 # rollback things manually !
426 db_repo = db.Repository.get_by_repo_name(repo_name_full)
423 db_repo = db.Repository.get_by_repo_name(repo_name_full)
427 if db_repo:
424 if db_repo:
428 db.Repository.delete(db_repo.repo_id)
425 db.Repository.delete(db_repo.repo_id)
429 DBS.commit()
426 DBS.commit()
430 repo.RepoModel()._delete_filesystem_repo(db_repo)
427 repo.RepoModel()._delete_filesystem_repo(db_repo)
431 raise
428 raise
432
429
433 return True
430 return True
434
431
435
432
436 @celerylib.task
433 @celerylib.task
437 @celerylib.dbsession
434 @celerylib.dbsession
438 def create_repo_fork(form_data, cur_user):
435 def create_repo_fork(form_data, cur_user):
439 """
436 """
440 Creates a fork of repository using interval VCS methods
437 Creates a fork of repository using interval VCS methods
441
438
442 :param form_data:
439 :param form_data:
443 :param cur_user:
440 :param cur_user:
444 """
441 """
445 DBS = celerylib.get_session()
442 DBS = celerylib.get_session()
446
443
447 base_path = kallithea.CONFIG['base_path']
444 base_path = kallithea.CONFIG['base_path']
448 cur_user = db.User.guess_instance(cur_user)
445 cur_user = db.User.guess_instance(cur_user)
449
446
450 repo_name = form_data['repo_name'] # fork in this case
447 repo_name = form_data['repo_name'] # fork in this case
451 repo_name_full = form_data['repo_name_full']
448 repo_name_full = form_data['repo_name_full']
452
449
453 repo_type = form_data['repo_type']
450 repo_type = form_data['repo_type']
454 owner = cur_user
451 owner = cur_user
455 private = form_data['private']
452 private = form_data['private']
456 clone_uri = form_data.get('clone_uri')
453 clone_uri = form_data.get('clone_uri')
457 repo_group = form_data['repo_group']
454 repo_group = form_data['repo_group']
458 landing_rev = form_data['landing_rev']
455 landing_rev = form_data['landing_rev']
459 copy_fork_permissions = form_data.get('copy_permissions')
456 copy_fork_permissions = form_data.get('copy_permissions')
460
457
461 try:
458 try:
462 fork_of = db.Repository.guess_instance(form_data.get('fork_parent_id'))
459 fork_of = db.Repository.guess_instance(form_data.get('fork_parent_id'))
463
460
464 repo.RepoModel()._create_repo(
461 repo.RepoModel()._create_repo(
465 repo_name=repo_name_full,
462 repo_name=repo_name_full,
466 repo_type=repo_type,
463 repo_type=repo_type,
467 description=form_data['description'],
464 description=form_data['description'],
468 owner=owner,
465 owner=owner,
469 private=private,
466 private=private,
470 clone_uri=clone_uri,
467 clone_uri=clone_uri,
471 repo_group=repo_group,
468 repo_group=repo_group,
472 landing_rev=landing_rev,
469 landing_rev=landing_rev,
473 fork_of=fork_of,
470 fork_of=fork_of,
474 copy_fork_permissions=copy_fork_permissions
471 copy_fork_permissions=copy_fork_permissions
475 )
472 )
476 userlog.action_logger(cur_user, 'user_forked_repo:%s' % repo_name_full,
473 userlog.action_logger(cur_user, 'user_forked_repo:%s' % repo_name_full,
477 fork_of.repo_name, '')
474 fork_of.repo_name, '')
478 DBS.commit()
475 DBS.commit()
479
476
480 source_repo_path = os.path.join(base_path, fork_of.repo_name)
477 source_repo_path = os.path.join(base_path, fork_of.repo_name)
481
478
482 # now create this repo on Filesystem
479 # now create this repo on Filesystem
483 repo.RepoModel()._create_filesystem_repo(
480 repo.RepoModel()._create_filesystem_repo(
484 repo_name=repo_name,
481 repo_name=repo_name,
485 repo_type=repo_type,
482 repo_type=repo_type,
486 repo_group=db.RepoGroup.guess_instance(repo_group),
483 repo_group=db.RepoGroup.guess_instance(repo_group),
487 clone_uri=source_repo_path,
484 clone_uri=source_repo_path,
488 )
485 )
489 db_repo = db.Repository.get_by_repo_name(repo_name_full)
486 db_repo = db.Repository.get_by_repo_name(repo_name_full)
490 hooks.log_create_repository(db_repo.get_dict(), created_by=owner.username)
487 hooks.log_create_repository(db_repo.get_dict(), created_by=owner.username)
491
488
492 # update repo changeset caches initially
489 # update repo changeset caches initially
493 db_repo.update_changeset_cache()
490 db_repo.update_changeset_cache()
494
491
495 # set new created state
492 # set new created state
496 db_repo.set_state(db.Repository.STATE_CREATED)
493 db_repo.set_state(db.Repository.STATE_CREATED)
497 DBS.commit()
494 DBS.commit()
498 except Exception as e:
495 except Exception as e:
499 log.warning('Exception %s occurred when forking repository, '
496 log.warning('Exception %s occurred when forking repository, '
500 'doing cleanup...' % e)
497 'doing cleanup...' % e)
501 # rollback things manually !
498 # rollback things manually !
502 db_repo = db.Repository.get_by_repo_name(repo_name_full)
499 db_repo = db.Repository.get_by_repo_name(repo_name_full)
503 if db_repo:
500 if db_repo:
504 db.Repository.delete(db_repo.repo_id)
501 db.Repository.delete(db_repo.repo_id)
505 DBS.commit()
502 DBS.commit()
506 repo.RepoModel()._delete_filesystem_repo(db_repo)
503 repo.RepoModel()._delete_filesystem_repo(db_repo)
507 raise
504 raise
508
505
509 return True
506 return True
510
507
511
508
512 def __get_codes_stats(repo_name):
509 def __get_codes_stats(repo_name):
513 scm_repo = db.Repository.get_by_repo_name(repo_name).scm_instance
510 scm_repo = db.Repository.get_by_repo_name(repo_name).scm_instance
514
511
515 tip = scm_repo.get_changeset()
512 tip = scm_repo.get_changeset()
516 code_stats = {}
513 code_stats = {}
517
514
518 for _topnode, _dirnodes, filenodes in tip.walk('/'):
515 for _topnode, _dirnodes, filenodes in tip.walk('/'):
519 for filenode in filenodes:
516 for filenode in filenodes:
520 ext = filenode.extension.lower()
517 ext = filenode.extension.lower()
521 if ext in conf.LANGUAGES_EXTENSIONS_MAP and not filenode.is_binary:
518 if ext in conf.LANGUAGES_EXTENSIONS_MAP and not filenode.is_binary:
522 if ext in code_stats:
519 if ext in code_stats:
523 code_stats[ext] += 1
520 code_stats[ext] += 1
524 else:
521 else:
525 code_stats[ext] = 1
522 code_stats[ext] = 1
526
523
527 return code_stats or {}
524 return code_stats or {}
General Comments 0
You need to be logged in to leave comments. Login now