##// END OF EJS Templates
model: comments: allow selective retrieval of inline comments...
Thomas De Schampheleire -
r7409:445d6875 default
parent child Browse files
Show More
@@ -1,283 +1,304 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 kallithea.model.comment
16 16 ~~~~~~~~~~~~~~~~~~~~~~~
17 17
18 18 comments model for Kallithea
19 19
20 20 This file was forked by the Kallithea project in July 2014.
21 21 Original author and date, and relevant copyright and licensing information is below:
22 22 :created_on: Nov 11, 2011
23 23 :author: marcink
24 24 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 25 :license: GPLv3, see LICENSE.md for more details.
26 26 """
27 27
28 28 import logging
29 29
30 30 from tg.i18n import ugettext as _
31 31 from collections import defaultdict
32 32
33 33 from kallithea.lib.utils2 import extract_mentioned_users, safe_unicode
34 34 from kallithea.lib import helpers as h
35 35 from kallithea.model.db import ChangesetComment, User, \
36 36 PullRequest, Repository
37 37 from kallithea.model.notification import NotificationModel
38 38 from kallithea.model.meta import Session
39 39
40 40 log = logging.getLogger(__name__)
41 41
42 42
43 43 def _list_changeset_commenters(revision):
44 44 return (Session().query(User)
45 45 .join(ChangesetComment.author)
46 46 .filter(ChangesetComment.revision == revision)
47 47 .all())
48 48
49 49 def _list_pull_request_commenters(pull_request):
50 50 return (Session().query(User)
51 51 .join(ChangesetComment.author)
52 52 .filter(ChangesetComment.pull_request_id == pull_request.pull_request_id)
53 53 .all())
54 54
55 55
56 56 class ChangesetCommentsModel(object):
57 57
58 58 def _get_notification_data(self, repo, comment, author, comment_text,
59 59 line_no=None, revision=None, pull_request=None,
60 60 status_change=None, closing_pr=False):
61 61 """
62 62 :returns: tuple (subj,body,recipients,notification_type,email_kwargs)
63 63 """
64 64 # make notification
65 65 body = comment_text # text of the comment
66 66 line = ''
67 67 if line_no:
68 68 line = _('on line %s') % line_no
69 69
70 70 # changeset
71 71 if revision:
72 72 notification_type = NotificationModel.TYPE_CHANGESET_COMMENT
73 73 cs = repo.scm_instance.get_changeset(revision)
74 74 desc = cs.short_id
75 75
76 76 threading = ['%s-rev-%s@%s' % (repo.repo_name, revision, h.canonical_hostname())]
77 77 if line_no: # TODO: url to file _and_ line number
78 78 threading.append('%s-rev-%s-line-%s@%s' % (repo.repo_name, revision, line_no,
79 79 h.canonical_hostname()))
80 80 comment_url = h.canonical_url('changeset_home',
81 81 repo_name=repo.repo_name,
82 82 revision=revision,
83 83 anchor='comment-%s' % comment.comment_id)
84 84 subj = safe_unicode(
85 85 h.link_to('Re changeset: %(desc)s %(line)s' % \
86 86 {'desc': desc, 'line': line},
87 87 comment_url)
88 88 )
89 89 # get the current participants of this changeset
90 90 recipients = _list_changeset_commenters(revision)
91 91 # add changeset author if it's known locally
92 92 cs_author = User.get_from_cs_author(cs.author)
93 93 if not cs_author:
94 94 # use repo owner if we cannot extract the author correctly
95 95 # FIXME: just use committer name even if not a user
96 96 cs_author = repo.owner
97 97 recipients.append(cs_author)
98 98
99 99 email_kwargs = {
100 100 'status_change': status_change,
101 101 'cs_comment_user': author.full_name_and_username,
102 102 'cs_target_repo': h.canonical_url('summary_home', repo_name=repo.repo_name),
103 103 'cs_comment_url': comment_url,
104 104 'cs_url': h.canonical_url('changeset_home', repo_name=repo.repo_name, revision=revision),
105 105 'raw_id': revision,
106 106 'message': cs.message,
107 107 'message_short': h.shorter(cs.message, 50, firstline=True),
108 108 'cs_author': cs_author,
109 109 'repo_name': repo.repo_name,
110 110 'short_id': h.short_id(revision),
111 111 'branch': cs.branch,
112 112 'comment_username': author.username,
113 113 'threading': threading,
114 114 }
115 115 # pull request
116 116 elif pull_request:
117 117 notification_type = NotificationModel.TYPE_PULL_REQUEST_COMMENT
118 118 desc = comment.pull_request.title
119 119 _org_ref_type, org_ref_name, _org_rev = comment.pull_request.org_ref.split(':')
120 120 _other_ref_type, other_ref_name, _other_rev = comment.pull_request.other_ref.split(':')
121 121 threading = ['%s-pr-%s@%s' % (pull_request.other_repo.repo_name,
122 122 pull_request.pull_request_id,
123 123 h.canonical_hostname())]
124 124 if line_no: # TODO: url to file _and_ line number
125 125 threading.append('%s-pr-%s-line-%s@%s' % (pull_request.other_repo.repo_name,
126 126 pull_request.pull_request_id, line_no,
127 127 h.canonical_hostname()))
128 128 comment_url = pull_request.url(canonical=True,
129 129 anchor='comment-%s' % comment.comment_id)
130 130 subj = safe_unicode(
131 131 h.link_to('Re pull request %(pr_nice_id)s: %(desc)s %(line)s' %
132 132 {'desc': desc,
133 133 'pr_nice_id': comment.pull_request.nice_id(),
134 134 'line': line},
135 135 comment_url)
136 136 )
137 137 # get the current participants of this pull request
138 138 recipients = _list_pull_request_commenters(pull_request)
139 139 recipients.append(pull_request.owner)
140 140 recipients += pull_request.get_reviewer_users()
141 141
142 142 # set some variables for email notification
143 143 email_kwargs = {
144 144 'pr_title': pull_request.title,
145 145 'pr_title_short': h.shorter(pull_request.title, 50),
146 146 'pr_nice_id': pull_request.nice_id(),
147 147 'status_change': status_change,
148 148 'closing_pr': closing_pr,
149 149 'pr_comment_url': comment_url,
150 150 'pr_url': pull_request.url(canonical=True),
151 151 'pr_comment_user': author.full_name_and_username,
152 152 'pr_target_repo': h.canonical_url('summary_home',
153 153 repo_name=pull_request.other_repo.repo_name),
154 154 'pr_target_branch': other_ref_name,
155 155 'pr_source_repo': h.canonical_url('summary_home',
156 156 repo_name=pull_request.org_repo.repo_name),
157 157 'pr_source_branch': org_ref_name,
158 158 'pr_owner': pull_request.owner,
159 159 'pr_owner_username': pull_request.owner.username,
160 160 'repo_name': pull_request.other_repo.repo_name,
161 161 'comment_username': author.username,
162 162 'threading': threading,
163 163 }
164 164
165 165 return subj, body, recipients, notification_type, email_kwargs
166 166
167 167 def create(self, text, repo, author, revision=None, pull_request=None,
168 168 f_path=None, line_no=None, status_change=None, closing_pr=False,
169 169 send_email=True):
170 170 """
171 171 Creates a new comment for either a changeset or a pull request.
172 172 status_change and closing_pr is only for the optional email.
173 173
174 174 Returns the created comment.
175 175 """
176 176 if not status_change and not text:
177 177 log.warning('Missing text for comment, skipping...')
178 178 return None
179 179
180 180 repo = Repository.guess_instance(repo)
181 181 author = User.guess_instance(author)
182 182 comment = ChangesetComment()
183 183 comment.repo = repo
184 184 comment.author = author
185 185 comment.text = text
186 186 comment.f_path = f_path
187 187 comment.line_no = line_no
188 188
189 189 if revision is not None:
190 190 comment.revision = revision
191 191 elif pull_request is not None:
192 192 pull_request = PullRequest.guess_instance(pull_request)
193 193 comment.pull_request = pull_request
194 194 else:
195 195 raise Exception('Please specify revision or pull_request_id')
196 196
197 197 Session().add(comment)
198 198 Session().flush()
199 199
200 200 if send_email:
201 201 (subj, body, recipients, notification_type,
202 202 email_kwargs) = self._get_notification_data(
203 203 repo, comment, author,
204 204 comment_text=text,
205 205 line_no=line_no,
206 206 revision=revision,
207 207 pull_request=pull_request,
208 208 status_change=status_change,
209 209 closing_pr=closing_pr)
210 210 email_kwargs['is_mention'] = False
211 211 # create notification objects, and emails
212 212 NotificationModel().create(
213 213 created_by=author, subject=subj, body=body,
214 214 recipients=recipients, type_=notification_type,
215 215 email_kwargs=email_kwargs,
216 216 )
217 217
218 218 mention_recipients = extract_mentioned_users(body).difference(recipients)
219 219 if mention_recipients:
220 220 email_kwargs['is_mention'] = True
221 221 subj = _('[Mention]') + ' ' + subj
222 222 # FIXME: this subject is wrong and unused!
223 223 NotificationModel().create(
224 224 created_by=author, subject=subj, body=body,
225 225 recipients=mention_recipients,
226 226 type_=notification_type,
227 227 email_kwargs=email_kwargs
228 228 )
229 229
230 230 return comment
231 231
232 232 def delete(self, comment):
233 233 comment = ChangesetComment.guess_instance(comment)
234 234 Session().delete(comment)
235 235
236 236 return comment
237 237
238 238 def get_comments(self, repo_id, revision=None, pull_request=None):
239 239 """
240 240 Gets general comments for either revision or pull_request.
241 241
242 242 Returns a list, ordered by creation date.
243 243 """
244 244 return self._get_comments(repo_id, revision=revision, pull_request=pull_request,
245 245 inline=False)
246 246
247 def get_inline_comments(self, repo_id, revision=None, pull_request=None):
247 def get_inline_comments(self, repo_id, revision=None, pull_request=None,
248 f_path=None, line_no=None):
248 249 """
249 250 Gets inline comments for either revision or pull_request.
250 251
251 252 Returns a list of tuples with file path and list of comments per line number.
252 253 """
253 254 comments = self._get_comments(repo_id, revision=revision, pull_request=pull_request,
254 inline=True)
255 inline=True, f_path=f_path, line_no=line_no)
255 256
256 257 paths = defaultdict(lambda: defaultdict(list))
257 258 for co in comments:
258 259 paths[co.f_path][co.line_no].append(co)
259 260 return paths.items()
260 261
261 def _get_comments(self, repo_id, revision=None, pull_request=None, inline=False):
262 def _get_comments(self, repo_id, revision=None, pull_request=None,
263 inline=False, f_path=None, line_no=None):
262 264 """
263 265 Gets comments for either revision or pull_request_id, either inline or general.
266 If a file path and optionally line number are given, return only the matching inline comments.
264 267 """
268 if f_path is None and line_no is not None:
269 raise Exception("line_no only makes sense if f_path is given.")
270
271 if inline is None and f_path is not None:
272 raise Exception("f_path only makes sense for inline comments.")
273
265 274 q = Session().query(ChangesetComment)
266 275
267 276 if inline:
277 if f_path is not None:
278 # inline comments for a given file...
279 q = q.filter(ChangesetComment.f_path == f_path)
280 if line_no is None:
281 # ... on any line
282 q = q.filter(ChangesetComment.line_no != None)
283 else:
284 # ... on specific line
285 q = q.filter(ChangesetComment.line_no == line_no)
286 else:
287 # all inline comments
268 288 q = q.filter(ChangesetComment.line_no != None) \
269 289 .filter(ChangesetComment.f_path != None)
270 290 else:
291 # all general comments
271 292 q = q.filter(ChangesetComment.line_no == None) \
272 293 .filter(ChangesetComment.f_path == None)
273 294
274 295 if revision is not None:
275 296 q = q.filter(ChangesetComment.revision == revision) \
276 297 .filter(ChangesetComment.repo_id == repo_id)
277 298 elif pull_request is not None:
278 299 pull_request = PullRequest.guess_instance(pull_request)
279 300 q = q.filter(ChangesetComment.pull_request == pull_request)
280 301 else:
281 302 raise Exception('Please specify either revision or pull_request')
282 303
283 304 return q.order_by(ChangesetComment.created_on).all()
@@ -1,155 +1,230 b''
1 1 from kallithea.tests.base import *
2 2 from kallithea.model.comment import ChangesetCommentsModel
3 3 from kallithea.model.db import Repository
4 4
5 import pytest
5 6 from tg.util.webtest import test_context
6 7
7 8 class TestComments(TestController):
8 9
9 def _check_comment_count(self, repo_id, revision, expected_len_comments, expected_len_inline_comments):
10 def _check_comment_count(self, repo_id, revision,
11 expected_len_comments, expected_len_inline_comments,
12 f_path=None, line_no=None
13 ):
10 14 comments = ChangesetCommentsModel().get_comments(repo_id,
11 15 revision=revision)
12 16 assert len(comments) == expected_len_comments
13 17 inline_comments = ChangesetCommentsModel().get_inline_comments(repo_id,
14 revision=revision)
18 revision=revision, f_path=f_path, line_no=line_no)
15 19 assert len(inline_comments) == expected_len_inline_comments
16 20
17 21 return comments, inline_comments
18 22
19 23 def test_create_delete_general_comment(self):
20 24 with test_context(self.app):
21 25 repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
22 26 revision = '9a7b4ff9e8b40bbda72fc75f162325b9baa45cda'
23 27
24 28 self._check_comment_count(repo_id, revision,
25 29 expected_len_comments=0, expected_len_inline_comments=0)
26 30
27 31 text = u'a comment'
28 32 new_comment = ChangesetCommentsModel().create(
29 33 text=text,
30 34 repo=HG_REPO,
31 35 author=TEST_USER_REGULAR_LOGIN,
32 36 revision=revision,
33 37 send_email=False)
34 38
35 39 self._check_comment_count(repo_id, revision,
36 40 expected_len_comments=1, expected_len_inline_comments=0)
37 41
38 42 ChangesetCommentsModel().delete(new_comment)
39 43
40 44 self._check_comment_count(repo_id, revision,
41 45 expected_len_comments=0, expected_len_inline_comments=0)
42 46
43 47 def test_create_delete_inline_comment(self):
44 48 with test_context(self.app):
45 49 repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
46 50 revision = '9a7b4ff9e8b40bbda72fc75f162325b9baa45cda'
47 51
48 52 self._check_comment_count(repo_id, revision,
49 53 expected_len_comments=0, expected_len_inline_comments=0)
50 54
51 55 text = u'an inline comment'
52 56 f_path = u'vcs/tests/base.py'
53 57 line_no = u'n50'
54 58 new_comment = ChangesetCommentsModel().create(
55 59 text=text,
56 60 repo=HG_REPO,
57 61 author=TEST_USER_REGULAR_LOGIN,
58 62 revision=revision,
59 63 f_path=f_path,
60 64 line_no=line_no,
61 65 send_email=False)
62 66
63 67 comments, inline_comments = self._check_comment_count(repo_id, revision,
64 68 expected_len_comments=0, expected_len_inline_comments=1)
65 69 # inline_comments is a list of tuples (file_path, dict)
66 70 # where the dict keys are line numbers and values are lists of comments
67 71 assert inline_comments[0][0] == f_path
68 72 assert len(inline_comments[0][1]) == 1
69 73 assert line_no in inline_comments[0][1]
70 74 assert inline_comments[0][1][line_no][0].text == text
71 75
72 76 ChangesetCommentsModel().delete(new_comment)
73 77
74 78 self._check_comment_count(repo_id, revision,
75 79 expected_len_comments=0, expected_len_inline_comments=0)
76 80
77 81 def test_create_delete_multiple_inline_comments(self):
78 82 with test_context(self.app):
79 83 repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
80 84 revision = '9a7b4ff9e8b40bbda72fc75f162325b9baa45cda'
81 85
82 86 self._check_comment_count(repo_id, revision,
83 87 expected_len_comments=0, expected_len_inline_comments=0)
84 88
85 89 text = u'an inline comment'
86 90 f_path = u'vcs/tests/base.py'
87 91 line_no = u'n50'
88 92 new_comment = ChangesetCommentsModel().create(
89 93 text=text,
90 94 repo=HG_REPO,
91 95 author=TEST_USER_REGULAR_LOGIN,
92 96 revision=revision,
93 97 f_path=f_path,
94 98 line_no=line_no,
95 99 send_email=False)
96 100
97 101 text2 = u'another inline comment, same file'
98 102 line_no2 = u'o41'
99 103 new_comment2 = ChangesetCommentsModel().create(
100 104 text=text2,
101 105 repo=HG_REPO,
102 106 author=TEST_USER_REGULAR_LOGIN,
103 107 revision=revision,
104 108 f_path=f_path,
105 109 line_no=line_no2,
106 110 send_email=False)
107 111
108 112 text3 = u'another inline comment, same file'
109 113 f_path3 = u'vcs/tests/test_hg.py'
110 114 line_no3 = u'n159'
111 115 new_comment3 = ChangesetCommentsModel().create(
112 116 text=text3,
113 117 repo=HG_REPO,
114 118 author=TEST_USER_REGULAR_LOGIN,
115 119 revision=revision,
116 120 f_path=f_path3,
117 121 line_no=line_no3,
118 122 send_email=False)
119 123
120 124 comments, inline_comments = self._check_comment_count(repo_id, revision,
121 125 expected_len_comments=0, expected_len_inline_comments=2)
122 126 # inline_comments is a list of tuples (file_path, dict)
123 127 # where the dict keys are line numbers and values are lists of comments
124 128 assert inline_comments[1][0] == f_path
125 129 assert len(inline_comments[1][1]) == 2
126 130 assert inline_comments[1][1][line_no][0].text == text
127 131 assert inline_comments[1][1][line_no2][0].text == text2
128 132
129 133 assert inline_comments[0][0] == f_path3
130 134 assert len(inline_comments[0][1]) == 1
131 135 assert line_no3 in inline_comments[0][1]
132 136 assert inline_comments[0][1][line_no3][0].text == text3
133 137
134 138 # now delete only one comment
135 139 ChangesetCommentsModel().delete(new_comment2)
136 140
137 141 comments, inline_comments = self._check_comment_count(repo_id, revision,
138 142 expected_len_comments=0, expected_len_inline_comments=2)
139 143 # inline_comments is a list of tuples (file_path, dict)
140 144 # where the dict keys are line numbers and values are lists of comments
141 145 assert inline_comments[1][0] == f_path
142 146 assert len(inline_comments[1][1]) == 1
143 147 assert inline_comments[1][1][line_no][0].text == text
144 148
145 149 assert inline_comments[0][0] == f_path3
146 150 assert len(inline_comments[0][1]) == 1
147 151 assert line_no3 in inline_comments[0][1]
148 152 assert inline_comments[0][1][line_no3][0].text == text3
149 153
150 154 # now delete all others
151 155 ChangesetCommentsModel().delete(new_comment)
152 156 ChangesetCommentsModel().delete(new_comment3)
153 157
154 158 self._check_comment_count(repo_id, revision,
155 159 expected_len_comments=0, expected_len_inline_comments=0)
160
161 def test_selective_retrieval_of_inline_comments(self):
162 with test_context(self.app):
163 repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
164 revision = '9a7b4ff9e8b40bbda72fc75f162325b9baa45cda'
165
166 self._check_comment_count(repo_id, revision,
167 expected_len_comments=0, expected_len_inline_comments=0)
168
169 text = u'an inline comment'
170 f_path = u'vcs/tests/base.py'
171 line_no = u'n50'
172 new_comment = ChangesetCommentsModel().create(
173 text=text,
174 repo=HG_REPO,
175 author=TEST_USER_REGULAR_LOGIN,
176 revision=revision,
177 f_path=f_path,
178 line_no=line_no,
179 send_email=False)
180
181 text2 = u'another inline comment, same file'
182 line_no2 = u'o41'
183 new_comment2 = ChangesetCommentsModel().create(
184 text=text2,
185 repo=HG_REPO,
186 author=TEST_USER_REGULAR_LOGIN,
187 revision=revision,
188 f_path=f_path,
189 line_no=line_no2,
190 send_email=False)
191
192 text3 = u'another inline comment, same file'
193 f_path3 = u'vcs/tests/test_hg.py'
194 line_no3 = u'n159'
195 new_comment3 = ChangesetCommentsModel().create(
196 text=text3,
197 repo=HG_REPO,
198 author=TEST_USER_REGULAR_LOGIN,
199 revision=revision,
200 f_path=f_path3,
201 line_no=line_no3,
202 send_email=False)
203
204 # now selectively retrieve comments of one file
205 comments, inline_comments = self._check_comment_count(repo_id, revision,
206 f_path=f_path,
207 expected_len_comments=0, expected_len_inline_comments=1)
208 # inline_comments is a list of tuples (file_path, dict)
209 # where the dict keys are line numbers and values are lists of comments
210 assert inline_comments[0][0] == f_path
211 assert len(inline_comments[0][1]) == 2
212 assert inline_comments[0][1][line_no][0].text == text
213 assert inline_comments[0][1][line_no2][0].text == text2
214
215 # now selectively retrieve comments of one file, one line
216 comments, inline_comments = self._check_comment_count(repo_id, revision,
217 f_path=f_path, line_no=line_no2,
218 expected_len_comments=0, expected_len_inline_comments=1)
219 # inline_comments is a list of tuples (file_path, dict)
220 # where the dict keys are line numbers and values are lists of comments
221 assert inline_comments[0][0] == f_path
222 assert len(inline_comments[0][1]) == 1
223 assert inline_comments[0][1][line_no2][0].text == text2
224
225 # verify that retrieval based on line_no but no f_path fails
226 with pytest.raises(Exception) as excinfo:
227 self._check_comment_count(repo_id, revision,
228 f_path=None, line_no=line_no2,
229 expected_len_comments=0, expected_len_inline_comments=0)
230 assert 'line_no only makes sense if f_path is given' in str(excinfo.value)
General Comments 0
You need to be logged in to leave comments. Login now