Show More
@@ -1,283 +1,304 b'' | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | # This program is free software: you can redistribute it and/or modify |
|
3 | 3 | # it under the terms of the GNU General Public License as published by |
|
4 | 4 | # the Free Software Foundation, either version 3 of the License, or |
|
5 | 5 | # (at your option) any later version. |
|
6 | 6 | # |
|
7 | 7 | # This program is distributed in the hope that it will be useful, |
|
8 | 8 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
9 | 9 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
10 | 10 | # GNU General Public License for more details. |
|
11 | 11 | # |
|
12 | 12 | # You should have received a copy of the GNU General Public License |
|
13 | 13 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
14 | 14 | """ |
|
15 | 15 | kallithea.model.comment |
|
16 | 16 | ~~~~~~~~~~~~~~~~~~~~~~~ |
|
17 | 17 | |
|
18 | 18 | comments model for Kallithea |
|
19 | 19 | |
|
20 | 20 | This file was forked by the Kallithea project in July 2014. |
|
21 | 21 | Original author and date, and relevant copyright and licensing information is below: |
|
22 | 22 | :created_on: Nov 11, 2011 |
|
23 | 23 | :author: marcink |
|
24 | 24 | :copyright: (c) 2013 RhodeCode GmbH, and others. |
|
25 | 25 | :license: GPLv3, see LICENSE.md for more details. |
|
26 | 26 | """ |
|
27 | 27 | |
|
28 | 28 | import logging |
|
29 | 29 | |
|
30 | 30 | from tg.i18n import ugettext as _ |
|
31 | 31 | from collections import defaultdict |
|
32 | 32 | |
|
33 | 33 | from kallithea.lib.utils2 import extract_mentioned_users, safe_unicode |
|
34 | 34 | from kallithea.lib import helpers as h |
|
35 | 35 | from kallithea.model.db import ChangesetComment, User, \ |
|
36 | 36 | PullRequest, Repository |
|
37 | 37 | from kallithea.model.notification import NotificationModel |
|
38 | 38 | from kallithea.model.meta import Session |
|
39 | 39 | |
|
40 | 40 | log = logging.getLogger(__name__) |
|
41 | 41 | |
|
42 | 42 | |
|
43 | 43 | def _list_changeset_commenters(revision): |
|
44 | 44 | return (Session().query(User) |
|
45 | 45 | .join(ChangesetComment.author) |
|
46 | 46 | .filter(ChangesetComment.revision == revision) |
|
47 | 47 | .all()) |
|
48 | 48 | |
|
49 | 49 | def _list_pull_request_commenters(pull_request): |
|
50 | 50 | return (Session().query(User) |
|
51 | 51 | .join(ChangesetComment.author) |
|
52 | 52 | .filter(ChangesetComment.pull_request_id == pull_request.pull_request_id) |
|
53 | 53 | .all()) |
|
54 | 54 | |
|
55 | 55 | |
|
56 | 56 | class ChangesetCommentsModel(object): |
|
57 | 57 | |
|
58 | 58 | def _get_notification_data(self, repo, comment, author, comment_text, |
|
59 | 59 | line_no=None, revision=None, pull_request=None, |
|
60 | 60 | status_change=None, closing_pr=False): |
|
61 | 61 | """ |
|
62 | 62 | :returns: tuple (subj,body,recipients,notification_type,email_kwargs) |
|
63 | 63 | """ |
|
64 | 64 | # make notification |
|
65 | 65 | body = comment_text # text of the comment |
|
66 | 66 | line = '' |
|
67 | 67 | if line_no: |
|
68 | 68 | line = _('on line %s') % line_no |
|
69 | 69 | |
|
70 | 70 | # changeset |
|
71 | 71 | if revision: |
|
72 | 72 | notification_type = NotificationModel.TYPE_CHANGESET_COMMENT |
|
73 | 73 | cs = repo.scm_instance.get_changeset(revision) |
|
74 | 74 | desc = cs.short_id |
|
75 | 75 | |
|
76 | 76 | threading = ['%s-rev-%s@%s' % (repo.repo_name, revision, h.canonical_hostname())] |
|
77 | 77 | if line_no: # TODO: url to file _and_ line number |
|
78 | 78 | threading.append('%s-rev-%s-line-%s@%s' % (repo.repo_name, revision, line_no, |
|
79 | 79 | h.canonical_hostname())) |
|
80 | 80 | comment_url = h.canonical_url('changeset_home', |
|
81 | 81 | repo_name=repo.repo_name, |
|
82 | 82 | revision=revision, |
|
83 | 83 | anchor='comment-%s' % comment.comment_id) |
|
84 | 84 | subj = safe_unicode( |
|
85 | 85 | h.link_to('Re changeset: %(desc)s %(line)s' % \ |
|
86 | 86 | {'desc': desc, 'line': line}, |
|
87 | 87 | comment_url) |
|
88 | 88 | ) |
|
89 | 89 | # get the current participants of this changeset |
|
90 | 90 | recipients = _list_changeset_commenters(revision) |
|
91 | 91 | # add changeset author if it's known locally |
|
92 | 92 | cs_author = User.get_from_cs_author(cs.author) |
|
93 | 93 | if not cs_author: |
|
94 | 94 | # use repo owner if we cannot extract the author correctly |
|
95 | 95 | # FIXME: just use committer name even if not a user |
|
96 | 96 | cs_author = repo.owner |
|
97 | 97 | recipients.append(cs_author) |
|
98 | 98 | |
|
99 | 99 | email_kwargs = { |
|
100 | 100 | 'status_change': status_change, |
|
101 | 101 | 'cs_comment_user': author.full_name_and_username, |
|
102 | 102 | 'cs_target_repo': h.canonical_url('summary_home', repo_name=repo.repo_name), |
|
103 | 103 | 'cs_comment_url': comment_url, |
|
104 | 104 | 'cs_url': h.canonical_url('changeset_home', repo_name=repo.repo_name, revision=revision), |
|
105 | 105 | 'raw_id': revision, |
|
106 | 106 | 'message': cs.message, |
|
107 | 107 | 'message_short': h.shorter(cs.message, 50, firstline=True), |
|
108 | 108 | 'cs_author': cs_author, |
|
109 | 109 | 'repo_name': repo.repo_name, |
|
110 | 110 | 'short_id': h.short_id(revision), |
|
111 | 111 | 'branch': cs.branch, |
|
112 | 112 | 'comment_username': author.username, |
|
113 | 113 | 'threading': threading, |
|
114 | 114 | } |
|
115 | 115 | # pull request |
|
116 | 116 | elif pull_request: |
|
117 | 117 | notification_type = NotificationModel.TYPE_PULL_REQUEST_COMMENT |
|
118 | 118 | desc = comment.pull_request.title |
|
119 | 119 | _org_ref_type, org_ref_name, _org_rev = comment.pull_request.org_ref.split(':') |
|
120 | 120 | _other_ref_type, other_ref_name, _other_rev = comment.pull_request.other_ref.split(':') |
|
121 | 121 | threading = ['%s-pr-%s@%s' % (pull_request.other_repo.repo_name, |
|
122 | 122 | pull_request.pull_request_id, |
|
123 | 123 | h.canonical_hostname())] |
|
124 | 124 | if line_no: # TODO: url to file _and_ line number |
|
125 | 125 | threading.append('%s-pr-%s-line-%s@%s' % (pull_request.other_repo.repo_name, |
|
126 | 126 | pull_request.pull_request_id, line_no, |
|
127 | 127 | h.canonical_hostname())) |
|
128 | 128 | comment_url = pull_request.url(canonical=True, |
|
129 | 129 | anchor='comment-%s' % comment.comment_id) |
|
130 | 130 | subj = safe_unicode( |
|
131 | 131 | h.link_to('Re pull request %(pr_nice_id)s: %(desc)s %(line)s' % |
|
132 | 132 | {'desc': desc, |
|
133 | 133 | 'pr_nice_id': comment.pull_request.nice_id(), |
|
134 | 134 | 'line': line}, |
|
135 | 135 | comment_url) |
|
136 | 136 | ) |
|
137 | 137 | # get the current participants of this pull request |
|
138 | 138 | recipients = _list_pull_request_commenters(pull_request) |
|
139 | 139 | recipients.append(pull_request.owner) |
|
140 | 140 | recipients += pull_request.get_reviewer_users() |
|
141 | 141 | |
|
142 | 142 | # set some variables for email notification |
|
143 | 143 | email_kwargs = { |
|
144 | 144 | 'pr_title': pull_request.title, |
|
145 | 145 | 'pr_title_short': h.shorter(pull_request.title, 50), |
|
146 | 146 | 'pr_nice_id': pull_request.nice_id(), |
|
147 | 147 | 'status_change': status_change, |
|
148 | 148 | 'closing_pr': closing_pr, |
|
149 | 149 | 'pr_comment_url': comment_url, |
|
150 | 150 | 'pr_url': pull_request.url(canonical=True), |
|
151 | 151 | 'pr_comment_user': author.full_name_and_username, |
|
152 | 152 | 'pr_target_repo': h.canonical_url('summary_home', |
|
153 | 153 | repo_name=pull_request.other_repo.repo_name), |
|
154 | 154 | 'pr_target_branch': other_ref_name, |
|
155 | 155 | 'pr_source_repo': h.canonical_url('summary_home', |
|
156 | 156 | repo_name=pull_request.org_repo.repo_name), |
|
157 | 157 | 'pr_source_branch': org_ref_name, |
|
158 | 158 | 'pr_owner': pull_request.owner, |
|
159 | 159 | 'pr_owner_username': pull_request.owner.username, |
|
160 | 160 | 'repo_name': pull_request.other_repo.repo_name, |
|
161 | 161 | 'comment_username': author.username, |
|
162 | 162 | 'threading': threading, |
|
163 | 163 | } |
|
164 | 164 | |
|
165 | 165 | return subj, body, recipients, notification_type, email_kwargs |
|
166 | 166 | |
|
167 | 167 | def create(self, text, repo, author, revision=None, pull_request=None, |
|
168 | 168 | f_path=None, line_no=None, status_change=None, closing_pr=False, |
|
169 | 169 | send_email=True): |
|
170 | 170 | """ |
|
171 | 171 | Creates a new comment for either a changeset or a pull request. |
|
172 | 172 | status_change and closing_pr is only for the optional email. |
|
173 | 173 | |
|
174 | 174 | Returns the created comment. |
|
175 | 175 | """ |
|
176 | 176 | if not status_change and not text: |
|
177 | 177 | log.warning('Missing text for comment, skipping...') |
|
178 | 178 | return None |
|
179 | 179 | |
|
180 | 180 | repo = Repository.guess_instance(repo) |
|
181 | 181 | author = User.guess_instance(author) |
|
182 | 182 | comment = ChangesetComment() |
|
183 | 183 | comment.repo = repo |
|
184 | 184 | comment.author = author |
|
185 | 185 | comment.text = text |
|
186 | 186 | comment.f_path = f_path |
|
187 | 187 | comment.line_no = line_no |
|
188 | 188 | |
|
189 | 189 | if revision is not None: |
|
190 | 190 | comment.revision = revision |
|
191 | 191 | elif pull_request is not None: |
|
192 | 192 | pull_request = PullRequest.guess_instance(pull_request) |
|
193 | 193 | comment.pull_request = pull_request |
|
194 | 194 | else: |
|
195 | 195 | raise Exception('Please specify revision or pull_request_id') |
|
196 | 196 | |
|
197 | 197 | Session().add(comment) |
|
198 | 198 | Session().flush() |
|
199 | 199 | |
|
200 | 200 | if send_email: |
|
201 | 201 | (subj, body, recipients, notification_type, |
|
202 | 202 | email_kwargs) = self._get_notification_data( |
|
203 | 203 | repo, comment, author, |
|
204 | 204 | comment_text=text, |
|
205 | 205 | line_no=line_no, |
|
206 | 206 | revision=revision, |
|
207 | 207 | pull_request=pull_request, |
|
208 | 208 | status_change=status_change, |
|
209 | 209 | closing_pr=closing_pr) |
|
210 | 210 | email_kwargs['is_mention'] = False |
|
211 | 211 | # create notification objects, and emails |
|
212 | 212 | NotificationModel().create( |
|
213 | 213 | created_by=author, subject=subj, body=body, |
|
214 | 214 | recipients=recipients, type_=notification_type, |
|
215 | 215 | email_kwargs=email_kwargs, |
|
216 | 216 | ) |
|
217 | 217 | |
|
218 | 218 | mention_recipients = extract_mentioned_users(body).difference(recipients) |
|
219 | 219 | if mention_recipients: |
|
220 | 220 | email_kwargs['is_mention'] = True |
|
221 | 221 | subj = _('[Mention]') + ' ' + subj |
|
222 | 222 | # FIXME: this subject is wrong and unused! |
|
223 | 223 | NotificationModel().create( |
|
224 | 224 | created_by=author, subject=subj, body=body, |
|
225 | 225 | recipients=mention_recipients, |
|
226 | 226 | type_=notification_type, |
|
227 | 227 | email_kwargs=email_kwargs |
|
228 | 228 | ) |
|
229 | 229 | |
|
230 | 230 | return comment |
|
231 | 231 | |
|
232 | 232 | def delete(self, comment): |
|
233 | 233 | comment = ChangesetComment.guess_instance(comment) |
|
234 | 234 | Session().delete(comment) |
|
235 | 235 | |
|
236 | 236 | return comment |
|
237 | 237 | |
|
238 | 238 | def get_comments(self, repo_id, revision=None, pull_request=None): |
|
239 | 239 | """ |
|
240 | 240 | Gets general comments for either revision or pull_request. |
|
241 | 241 | |
|
242 | 242 | Returns a list, ordered by creation date. |
|
243 | 243 | """ |
|
244 | 244 | return self._get_comments(repo_id, revision=revision, pull_request=pull_request, |
|
245 | 245 | inline=False) |
|
246 | 246 | |
|
247 |
def get_inline_comments(self, repo_id, revision=None, pull_request=None |
|
|
247 | def get_inline_comments(self, repo_id, revision=None, pull_request=None, | |
|
248 | f_path=None, line_no=None): | |
|
248 | 249 | """ |
|
249 | 250 | Gets inline comments for either revision or pull_request. |
|
250 | 251 | |
|
251 | 252 | Returns a list of tuples with file path and list of comments per line number. |
|
252 | 253 | """ |
|
253 | 254 | comments = self._get_comments(repo_id, revision=revision, pull_request=pull_request, |
|
254 | inline=True) | |
|
255 | inline=True, f_path=f_path, line_no=line_no) | |
|
255 | 256 | |
|
256 | 257 | paths = defaultdict(lambda: defaultdict(list)) |
|
257 | 258 | for co in comments: |
|
258 | 259 | paths[co.f_path][co.line_no].append(co) |
|
259 | 260 | return paths.items() |
|
260 | 261 | |
|
261 |
def _get_comments(self, repo_id, revision=None, pull_request=None, |
|
|
262 | def _get_comments(self, repo_id, revision=None, pull_request=None, | |
|
263 | inline=False, f_path=None, line_no=None): | |
|
262 | 264 | """ |
|
263 | 265 | Gets comments for either revision or pull_request_id, either inline or general. |
|
266 | If a file path and optionally line number are given, return only the matching inline comments. | |
|
264 | 267 | """ |
|
268 | if f_path is None and line_no is not None: | |
|
269 | raise Exception("line_no only makes sense if f_path is given.") | |
|
270 | ||
|
271 | if inline is None and f_path is not None: | |
|
272 | raise Exception("f_path only makes sense for inline comments.") | |
|
273 | ||
|
265 | 274 | q = Session().query(ChangesetComment) |
|
266 | 275 | |
|
267 | 276 | if inline: |
|
268 | q = q.filter(ChangesetComment.line_no != None) \ | |
|
269 | .filter(ChangesetComment.f_path != None) | |
|
277 | if f_path is not None: | |
|
278 | # inline comments for a given file... | |
|
279 | q = q.filter(ChangesetComment.f_path == f_path) | |
|
280 | if line_no is None: | |
|
281 | # ... on any line | |
|
282 | q = q.filter(ChangesetComment.line_no != None) | |
|
283 | else: | |
|
284 | # ... on specific line | |
|
285 | q = q.filter(ChangesetComment.line_no == line_no) | |
|
286 | else: | |
|
287 | # all inline comments | |
|
288 | q = q.filter(ChangesetComment.line_no != None) \ | |
|
289 | .filter(ChangesetComment.f_path != None) | |
|
270 | 290 | else: |
|
291 | # all general comments | |
|
271 | 292 | q = q.filter(ChangesetComment.line_no == None) \ |
|
272 | 293 | .filter(ChangesetComment.f_path == None) |
|
273 | 294 | |
|
274 | 295 | if revision is not None: |
|
275 | 296 | q = q.filter(ChangesetComment.revision == revision) \ |
|
276 | 297 | .filter(ChangesetComment.repo_id == repo_id) |
|
277 | 298 | elif pull_request is not None: |
|
278 | 299 | pull_request = PullRequest.guess_instance(pull_request) |
|
279 | 300 | q = q.filter(ChangesetComment.pull_request == pull_request) |
|
280 | 301 | else: |
|
281 | 302 | raise Exception('Please specify either revision or pull_request') |
|
282 | 303 | |
|
283 | 304 | return q.order_by(ChangesetComment.created_on).all() |
@@ -1,155 +1,230 b'' | |||
|
1 | 1 | from kallithea.tests.base import * |
|
2 | 2 | from kallithea.model.comment import ChangesetCommentsModel |
|
3 | 3 | from kallithea.model.db import Repository |
|
4 | 4 | |
|
5 | import pytest | |
|
5 | 6 | from tg.util.webtest import test_context |
|
6 | 7 | |
|
7 | 8 | class TestComments(TestController): |
|
8 | 9 | |
|
9 |
def _check_comment_count(self, repo_id, revision, |
|
|
10 | def _check_comment_count(self, repo_id, revision, | |
|
11 | expected_len_comments, expected_len_inline_comments, | |
|
12 | f_path=None, line_no=None | |
|
13 | ): | |
|
10 | 14 | comments = ChangesetCommentsModel().get_comments(repo_id, |
|
11 | 15 | revision=revision) |
|
12 | 16 | assert len(comments) == expected_len_comments |
|
13 | 17 | inline_comments = ChangesetCommentsModel().get_inline_comments(repo_id, |
|
14 | revision=revision) | |
|
18 | revision=revision, f_path=f_path, line_no=line_no) | |
|
15 | 19 | assert len(inline_comments) == expected_len_inline_comments |
|
16 | 20 | |
|
17 | 21 | return comments, inline_comments |
|
18 | 22 | |
|
19 | 23 | def test_create_delete_general_comment(self): |
|
20 | 24 | with test_context(self.app): |
|
21 | 25 | repo_id = Repository.get_by_repo_name(HG_REPO).repo_id |
|
22 | 26 | revision = '9a7b4ff9e8b40bbda72fc75f162325b9baa45cda' |
|
23 | 27 | |
|
24 | 28 | self._check_comment_count(repo_id, revision, |
|
25 | 29 | expected_len_comments=0, expected_len_inline_comments=0) |
|
26 | 30 | |
|
27 | 31 | text = u'a comment' |
|
28 | 32 | new_comment = ChangesetCommentsModel().create( |
|
29 | 33 | text=text, |
|
30 | 34 | repo=HG_REPO, |
|
31 | 35 | author=TEST_USER_REGULAR_LOGIN, |
|
32 | 36 | revision=revision, |
|
33 | 37 | send_email=False) |
|
34 | 38 | |
|
35 | 39 | self._check_comment_count(repo_id, revision, |
|
36 | 40 | expected_len_comments=1, expected_len_inline_comments=0) |
|
37 | 41 | |
|
38 | 42 | ChangesetCommentsModel().delete(new_comment) |
|
39 | 43 | |
|
40 | 44 | self._check_comment_count(repo_id, revision, |
|
41 | 45 | expected_len_comments=0, expected_len_inline_comments=0) |
|
42 | 46 | |
|
43 | 47 | def test_create_delete_inline_comment(self): |
|
44 | 48 | with test_context(self.app): |
|
45 | 49 | repo_id = Repository.get_by_repo_name(HG_REPO).repo_id |
|
46 | 50 | revision = '9a7b4ff9e8b40bbda72fc75f162325b9baa45cda' |
|
47 | 51 | |
|
48 | 52 | self._check_comment_count(repo_id, revision, |
|
49 | 53 | expected_len_comments=0, expected_len_inline_comments=0) |
|
50 | 54 | |
|
51 | 55 | text = u'an inline comment' |
|
52 | 56 | f_path = u'vcs/tests/base.py' |
|
53 | 57 | line_no = u'n50' |
|
54 | 58 | new_comment = ChangesetCommentsModel().create( |
|
55 | 59 | text=text, |
|
56 | 60 | repo=HG_REPO, |
|
57 | 61 | author=TEST_USER_REGULAR_LOGIN, |
|
58 | 62 | revision=revision, |
|
59 | 63 | f_path=f_path, |
|
60 | 64 | line_no=line_no, |
|
61 | 65 | send_email=False) |
|
62 | 66 | |
|
63 | 67 | comments, inline_comments = self._check_comment_count(repo_id, revision, |
|
64 | 68 | expected_len_comments=0, expected_len_inline_comments=1) |
|
65 | 69 | # inline_comments is a list of tuples (file_path, dict) |
|
66 | 70 | # where the dict keys are line numbers and values are lists of comments |
|
67 | 71 | assert inline_comments[0][0] == f_path |
|
68 | 72 | assert len(inline_comments[0][1]) == 1 |
|
69 | 73 | assert line_no in inline_comments[0][1] |
|
70 | 74 | assert inline_comments[0][1][line_no][0].text == text |
|
71 | 75 | |
|
72 | 76 | ChangesetCommentsModel().delete(new_comment) |
|
73 | 77 | |
|
74 | 78 | self._check_comment_count(repo_id, revision, |
|
75 | 79 | expected_len_comments=0, expected_len_inline_comments=0) |
|
76 | 80 | |
|
77 | 81 | def test_create_delete_multiple_inline_comments(self): |
|
78 | 82 | with test_context(self.app): |
|
79 | 83 | repo_id = Repository.get_by_repo_name(HG_REPO).repo_id |
|
80 | 84 | revision = '9a7b4ff9e8b40bbda72fc75f162325b9baa45cda' |
|
81 | 85 | |
|
82 | 86 | self._check_comment_count(repo_id, revision, |
|
83 | 87 | expected_len_comments=0, expected_len_inline_comments=0) |
|
84 | 88 | |
|
85 | 89 | text = u'an inline comment' |
|
86 | 90 | f_path = u'vcs/tests/base.py' |
|
87 | 91 | line_no = u'n50' |
|
88 | 92 | new_comment = ChangesetCommentsModel().create( |
|
89 | 93 | text=text, |
|
90 | 94 | repo=HG_REPO, |
|
91 | 95 | author=TEST_USER_REGULAR_LOGIN, |
|
92 | 96 | revision=revision, |
|
93 | 97 | f_path=f_path, |
|
94 | 98 | line_no=line_no, |
|
95 | 99 | send_email=False) |
|
96 | 100 | |
|
97 | 101 | text2 = u'another inline comment, same file' |
|
98 | 102 | line_no2 = u'o41' |
|
99 | 103 | new_comment2 = ChangesetCommentsModel().create( |
|
100 | 104 | text=text2, |
|
101 | 105 | repo=HG_REPO, |
|
102 | 106 | author=TEST_USER_REGULAR_LOGIN, |
|
103 | 107 | revision=revision, |
|
104 | 108 | f_path=f_path, |
|
105 | 109 | line_no=line_no2, |
|
106 | 110 | send_email=False) |
|
107 | 111 | |
|
108 | 112 | text3 = u'another inline comment, same file' |
|
109 | 113 | f_path3 = u'vcs/tests/test_hg.py' |
|
110 | 114 | line_no3 = u'n159' |
|
111 | 115 | new_comment3 = ChangesetCommentsModel().create( |
|
112 | 116 | text=text3, |
|
113 | 117 | repo=HG_REPO, |
|
114 | 118 | author=TEST_USER_REGULAR_LOGIN, |
|
115 | 119 | revision=revision, |
|
116 | 120 | f_path=f_path3, |
|
117 | 121 | line_no=line_no3, |
|
118 | 122 | send_email=False) |
|
119 | 123 | |
|
120 | 124 | comments, inline_comments = self._check_comment_count(repo_id, revision, |
|
121 | 125 | expected_len_comments=0, expected_len_inline_comments=2) |
|
122 | 126 | # inline_comments is a list of tuples (file_path, dict) |
|
123 | 127 | # where the dict keys are line numbers and values are lists of comments |
|
124 | 128 | assert inline_comments[1][0] == f_path |
|
125 | 129 | assert len(inline_comments[1][1]) == 2 |
|
126 | 130 | assert inline_comments[1][1][line_no][0].text == text |
|
127 | 131 | assert inline_comments[1][1][line_no2][0].text == text2 |
|
128 | 132 | |
|
129 | 133 | assert inline_comments[0][0] == f_path3 |
|
130 | 134 | assert len(inline_comments[0][1]) == 1 |
|
131 | 135 | assert line_no3 in inline_comments[0][1] |
|
132 | 136 | assert inline_comments[0][1][line_no3][0].text == text3 |
|
133 | 137 | |
|
134 | 138 | # now delete only one comment |
|
135 | 139 | ChangesetCommentsModel().delete(new_comment2) |
|
136 | 140 | |
|
137 | 141 | comments, inline_comments = self._check_comment_count(repo_id, revision, |
|
138 | 142 | expected_len_comments=0, expected_len_inline_comments=2) |
|
139 | 143 | # inline_comments is a list of tuples (file_path, dict) |
|
140 | 144 | # where the dict keys are line numbers and values are lists of comments |
|
141 | 145 | assert inline_comments[1][0] == f_path |
|
142 | 146 | assert len(inline_comments[1][1]) == 1 |
|
143 | 147 | assert inline_comments[1][1][line_no][0].text == text |
|
144 | 148 | |
|
145 | 149 | assert inline_comments[0][0] == f_path3 |
|
146 | 150 | assert len(inline_comments[0][1]) == 1 |
|
147 | 151 | assert line_no3 in inline_comments[0][1] |
|
148 | 152 | assert inline_comments[0][1][line_no3][0].text == text3 |
|
149 | 153 | |
|
150 | 154 | # now delete all others |
|
151 | 155 | ChangesetCommentsModel().delete(new_comment) |
|
152 | 156 | ChangesetCommentsModel().delete(new_comment3) |
|
153 | 157 | |
|
154 | 158 | self._check_comment_count(repo_id, revision, |
|
155 | 159 | expected_len_comments=0, expected_len_inline_comments=0) |
|
160 | ||
|
161 | def test_selective_retrieval_of_inline_comments(self): | |
|
162 | with test_context(self.app): | |
|
163 | repo_id = Repository.get_by_repo_name(HG_REPO).repo_id | |
|
164 | revision = '9a7b4ff9e8b40bbda72fc75f162325b9baa45cda' | |
|
165 | ||
|
166 | self._check_comment_count(repo_id, revision, | |
|
167 | expected_len_comments=0, expected_len_inline_comments=0) | |
|
168 | ||
|
169 | text = u'an inline comment' | |
|
170 | f_path = u'vcs/tests/base.py' | |
|
171 | line_no = u'n50' | |
|
172 | new_comment = ChangesetCommentsModel().create( | |
|
173 | text=text, | |
|
174 | repo=HG_REPO, | |
|
175 | author=TEST_USER_REGULAR_LOGIN, | |
|
176 | revision=revision, | |
|
177 | f_path=f_path, | |
|
178 | line_no=line_no, | |
|
179 | send_email=False) | |
|
180 | ||
|
181 | text2 = u'another inline comment, same file' | |
|
182 | line_no2 = u'o41' | |
|
183 | new_comment2 = ChangesetCommentsModel().create( | |
|
184 | text=text2, | |
|
185 | repo=HG_REPO, | |
|
186 | author=TEST_USER_REGULAR_LOGIN, | |
|
187 | revision=revision, | |
|
188 | f_path=f_path, | |
|
189 | line_no=line_no2, | |
|
190 | send_email=False) | |
|
191 | ||
|
192 | text3 = u'another inline comment, same file' | |
|
193 | f_path3 = u'vcs/tests/test_hg.py' | |
|
194 | line_no3 = u'n159' | |
|
195 | new_comment3 = ChangesetCommentsModel().create( | |
|
196 | text=text3, | |
|
197 | repo=HG_REPO, | |
|
198 | author=TEST_USER_REGULAR_LOGIN, | |
|
199 | revision=revision, | |
|
200 | f_path=f_path3, | |
|
201 | line_no=line_no3, | |
|
202 | send_email=False) | |
|
203 | ||
|
204 | # now selectively retrieve comments of one file | |
|
205 | comments, inline_comments = self._check_comment_count(repo_id, revision, | |
|
206 | f_path=f_path, | |
|
207 | expected_len_comments=0, expected_len_inline_comments=1) | |
|
208 | # inline_comments is a list of tuples (file_path, dict) | |
|
209 | # where the dict keys are line numbers and values are lists of comments | |
|
210 | assert inline_comments[0][0] == f_path | |
|
211 | assert len(inline_comments[0][1]) == 2 | |
|
212 | assert inline_comments[0][1][line_no][0].text == text | |
|
213 | assert inline_comments[0][1][line_no2][0].text == text2 | |
|
214 | ||
|
215 | # now selectively retrieve comments of one file, one line | |
|
216 | comments, inline_comments = self._check_comment_count(repo_id, revision, | |
|
217 | f_path=f_path, line_no=line_no2, | |
|
218 | expected_len_comments=0, expected_len_inline_comments=1) | |
|
219 | # inline_comments is a list of tuples (file_path, dict) | |
|
220 | # where the dict keys are line numbers and values are lists of comments | |
|
221 | assert inline_comments[0][0] == f_path | |
|
222 | assert len(inline_comments[0][1]) == 1 | |
|
223 | assert inline_comments[0][1][line_no2][0].text == text2 | |
|
224 | ||
|
225 | # verify that retrieval based on line_no but no f_path fails | |
|
226 | with pytest.raises(Exception) as excinfo: | |
|
227 | self._check_comment_count(repo_id, revision, | |
|
228 | f_path=None, line_no=line_no2, | |
|
229 | expected_len_comments=0, expected_len_inline_comments=0) | |
|
230 | assert 'line_no only makes sense if f_path is given' in str(excinfo.value) |
General Comments 0
You need to be logged in to leave comments.
Login now