##// END OF EJS Templates
channelstream: cleanup, and re-organize code for posting comments/pr updated messages....
marcink -
r4505:7d9c5b92 stable
parent child Browse files
Show More
@@ -26,6 +26,7 b' from rhodecode.api.utils import ('
26 has_superadmin_permission, Optional, OAttr, get_repo_or_error,
26 has_superadmin_permission, Optional, OAttr, get_repo_or_error,
27 get_pull_request_or_error, get_commit_or_error, get_user_or_error,
27 get_pull_request_or_error, get_commit_or_error, get_user_or_error,
28 validate_repo_permissions, resolve_ref_or_error, validate_set_owner_permissions)
28 validate_repo_permissions, resolve_ref_or_error, validate_set_owner_permissions)
29 from rhodecode.lib import channelstream
29 from rhodecode.lib.auth import (HasRepoPermissionAnyApi)
30 from rhodecode.lib.auth import (HasRepoPermissionAnyApi)
30 from rhodecode.lib.base import vcs_operation_context
31 from rhodecode.lib.base import vcs_operation_context
31 from rhodecode.lib.utils2 import str2bool
32 from rhodecode.lib.utils2 import str2bool
@@ -502,16 +503,19 b' def comment_pull_request('
502 },
503 },
503 error : null
504 error : null
504 """
505 """
506 _ = request.translate
507
505 pull_request = get_pull_request_or_error(pullrequestid)
508 pull_request = get_pull_request_or_error(pullrequestid)
506 if Optional.extract(repoid):
509 if Optional.extract(repoid):
507 repo = get_repo_or_error(repoid)
510 repo = get_repo_or_error(repoid)
508 else:
511 else:
509 repo = pull_request.target_repo
512 repo = pull_request.target_repo
510
513
514 db_repo_name = repo.repo_name
511 auth_user = apiuser
515 auth_user = apiuser
512 if not isinstance(userid, Optional):
516 if not isinstance(userid, Optional):
513 is_repo_admin = HasRepoPermissionAnyApi('repository.admin')(
517 is_repo_admin = HasRepoPermissionAnyApi('repository.admin')(
514 user=apiuser, repo_name=repo.repo_name)
518 user=apiuser, repo_name=db_repo_name)
515 if has_superadmin_permission(apiuser) or is_repo_admin:
519 if has_superadmin_permission(apiuser) or is_repo_admin:
516 apiuser = get_user_or_error(userid)
520 apiuser = get_user_or_error(userid)
517 auth_user = apiuser.AuthUser()
521 auth_user = apiuser.AuthUser()
@@ -596,6 +600,7 b' def comment_pull_request('
596 extra_recipients=extra_recipients,
600 extra_recipients=extra_recipients,
597 send_email=send_email
601 send_email=send_email
598 )
602 )
603 is_inline = bool(comment.f_path and comment.line_no)
599
604
600 if allowed_to_change_status and status:
605 if allowed_to_change_status and status:
601 old_calculated_status = pull_request.calculated_review_status()
606 old_calculated_status = pull_request.calculated_review_status()
@@ -628,6 +633,17 b' def comment_pull_request('
628 'comment_id': comment.comment_id if comment else None,
633 'comment_id': comment.comment_id if comment else None,
629 'status': {'given': status, 'was_changed': status_change},
634 'status': {'given': status, 'was_changed': status_change},
630 }
635 }
636
637 comment_broadcast_channel = channelstream.comment_channel(
638 db_repo_name, pull_request_obj=pull_request)
639
640 comment_data = data
641 comment_type = 'inline' if is_inline else 'general'
642 channelstream.comment_channelstream_push(
643 request, comment_broadcast_channel, apiuser,
644 _('posted a new {} comment').format(comment_type),
645 comment_data=comment_data)
646
631 return data
647 return data
632
648
633
649
@@ -881,6 +897,8 b' def update_pull_request('
881 description = Optional.extract(description)
897 description = Optional.extract(description)
882 description_renderer = Optional.extract(description_renderer)
898 description_renderer = Optional.extract(description_renderer)
883
899
900 # Update title/description
901 title_changed = False
884 if title or description:
902 if title or description:
885 PullRequestModel().edit(
903 PullRequestModel().edit(
886 pull_request,
904 pull_request,
@@ -889,8 +907,12 b' def update_pull_request('
889 description_renderer or pull_request.description_renderer,
907 description_renderer or pull_request.description_renderer,
890 apiuser)
908 apiuser)
891 Session().commit()
909 Session().commit()
910 title_changed = True
892
911
893 commit_changes = {"added": [], "common": [], "removed": []}
912 commit_changes = {"added": [], "common": [], "removed": []}
913
914 # Update commits
915 commits_changed = False
894 if str2bool(Optional.extract(update_commits)):
916 if str2bool(Optional.extract(update_commits)):
895
917
896 if pull_request.pull_request_state != PullRequest.STATE_CREATED:
918 if pull_request.pull_request_state != PullRequest.STATE_CREATED:
@@ -906,7 +928,10 b' def update_pull_request('
906 pull_request, db_user)
928 pull_request, db_user)
907 commit_changes = update_response.changes or commit_changes
929 commit_changes = update_response.changes or commit_changes
908 Session().commit()
930 Session().commit()
931 commits_changed = True
909
932
933 # Update reviewers
934 reviewers_changed = False
910 reviewers_changes = {"added": [], "removed": []}
935 reviewers_changes = {"added": [], "removed": []}
911 if reviewers:
936 if reviewers:
912 old_calculated_status = pull_request.calculated_review_status()
937 old_calculated_status = pull_request.calculated_review_status()
@@ -925,6 +950,16 b' def update_pull_request('
925 PullRequestModel().trigger_pull_request_hook(
950 PullRequestModel().trigger_pull_request_hook(
926 pull_request, apiuser, 'review_status_change',
951 pull_request, apiuser, 'review_status_change',
927 data={'status': calculated_status})
952 data={'status': calculated_status})
953 reviewers_changed = True
954
955 observers_changed = False
956
957 # push changed to channelstream
958 if commits_changed or reviewers_changed or observers_changed:
959 pr_broadcast_channel = channelstream.pr_channel(pull_request)
960 msg = 'Pull request was updated.'
961 channelstream.pr_update_channelstream_push(
962 request, pr_broadcast_channel, apiuser, msg)
928
963
929 data = {
964 data = {
930 'msg': 'Updated pull request `{}`'.format(
965 'msg': 'Updated pull request `{}`'.format(
@@ -29,7 +29,7 b' from rhodecode.api.utils import ('
29 get_user_group_or_error, get_user_or_error, validate_repo_permissions,
29 get_user_group_or_error, get_user_or_error, validate_repo_permissions,
30 get_perm_or_error, parse_args, get_origin, build_commit_data,
30 get_perm_or_error, parse_args, get_origin, build_commit_data,
31 validate_set_owner_permissions)
31 validate_set_owner_permissions)
32 from rhodecode.lib import audit_logger, rc_cache
32 from rhodecode.lib import audit_logger, rc_cache, channelstream
33 from rhodecode.lib import repo_maintenance
33 from rhodecode.lib import repo_maintenance
34 from rhodecode.lib.auth import (
34 from rhodecode.lib.auth import (
35 HasPermissionAnyApi, HasUserGroupPermissionAnyApi,
35 HasPermissionAnyApi, HasUserGroupPermissionAnyApi,
@@ -1597,10 +1597,13 b' def comment_commit('
1597 }
1597 }
1598
1598
1599 """
1599 """
1600 _ = request.translate
1601
1600 repo = get_repo_or_error(repoid)
1602 repo = get_repo_or_error(repoid)
1601 if not has_superadmin_permission(apiuser):
1603 if not has_superadmin_permission(apiuser):
1602 _perms = ('repository.read', 'repository.write', 'repository.admin')
1604 _perms = ('repository.read', 'repository.write', 'repository.admin')
1603 validate_repo_permissions(apiuser, repoid, repo, _perms)
1605 validate_repo_permissions(apiuser, repoid, repo, _perms)
1606 db_repo_name = repo.repo_name
1604
1607
1605 try:
1608 try:
1606 commit = repo.scm_instance().get_commit(commit_id=commit_id)
1609 commit = repo.scm_instance().get_commit(commit_id=commit_id)
@@ -1650,6 +1653,8 b' def comment_commit('
1650 extra_recipients=extra_recipients,
1653 extra_recipients=extra_recipients,
1651 send_email=send_email
1654 send_email=send_email
1652 )
1655 )
1656 is_inline = bool(comment.f_path and comment.line_no)
1657
1653 if status:
1658 if status:
1654 # also do a status change
1659 # also do a status change
1655 try:
1660 try:
@@ -1669,6 +1674,17 b' def comment_commit('
1669 data={'comment': comment, 'commit': commit})
1674 data={'comment': comment, 'commit': commit})
1670
1675
1671 Session().commit()
1676 Session().commit()
1677
1678 comment_broadcast_channel = channelstream.comment_channel(
1679 db_repo_name, commit_obj=commit)
1680
1681 comment_data = {'comment': comment, 'comment_id': comment.comment_id}
1682 comment_type = 'inline' if is_inline else 'general'
1683 channelstream.comment_channelstream_push(
1684 request, comment_broadcast_channel, apiuser,
1685 _('posted a new {} comment').format(comment_type),
1686 comment_data=comment_data)
1687
1672 return {
1688 return {
1673 'msg': (
1689 'msg': (
1674 'Commented on commit `%s` for repository `%s`' % (
1690 'Commented on commit `%s` for repository `%s`' % (
@@ -31,7 +31,7 b' from rhodecode.apps._base import RepoApp'
31 from rhodecode.apps.file_store import utils as store_utils
31 from rhodecode.apps.file_store import utils as store_utils
32 from rhodecode.apps.file_store.exceptions import FileNotAllowedException, FileOverSizeException
32 from rhodecode.apps.file_store.exceptions import FileNotAllowedException, FileOverSizeException
33
33
34 from rhodecode.lib import diffs, codeblocks
34 from rhodecode.lib import diffs, codeblocks, channelstream
35 from rhodecode.lib.auth import (
35 from rhodecode.lib.auth import (
36 LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous, CSRFRequired)
36 LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous, CSRFRequired)
37 from rhodecode.lib.ext_json import json
37 from rhodecode.lib.ext_json import json
@@ -210,10 +210,7 b' class RepoCommitsView(RepoAppView):'
210
210
211 # NOTE(marcink): this uses the same voting logic as in pull-requests
211 # NOTE(marcink): this uses the same voting logic as in pull-requests
212 c.commit_review_status = ChangesetStatusModel().calculate_status(review_statuses)
212 c.commit_review_status = ChangesetStatusModel().calculate_status(review_statuses)
213 c.commit_broadcast_channel = u'/repo${}$/commit/{}'.format(
213 c.commit_broadcast_channel = channelstream.comment_channel(c.repo_name, commit_obj=commit)
214 c.repo_name,
215 commit.raw_id
216 )
217
214
218 diff = None
215 diff = None
219 # Iterate over ranges (default commit view is always one commit)
216 # Iterate over ranges (default commit view is always one commit)
@@ -417,6 +414,7 b' class RepoCommitsView(RepoAppView):'
417 resolves_comment_id=resolves_comment_id,
414 resolves_comment_id=resolves_comment_id,
418 auth_user=self._rhodecode_user
415 auth_user=self._rhodecode_user
419 )
416 )
417 is_inline = bool(comment.f_path and comment.line_no)
420
418
421 # get status if set !
419 # get status if set !
422 if status:
420 if status:
@@ -464,6 +462,16 b' class RepoCommitsView(RepoAppView):'
464 data.update(comment.get_dict())
462 data.update(comment.get_dict())
465 data.update({'rendered_text': rendered_comment})
463 data.update({'rendered_text': rendered_comment})
466
464
465 comment_broadcast_channel = channelstream.comment_channel(
466 self.db_repo_name, commit_obj=commit)
467
468 comment_data = data
469 comment_type = 'inline' if is_inline else 'general'
470 channelstream.comment_channelstream_push(
471 self.request, comment_broadcast_channel, self._rhodecode_user,
472 _('posted a new {} comment').format(comment_type),
473 comment_data=comment_data)
474
467 return data
475 return data
468
476
469 @LoginRequired()
477 @LoginRequired()
@@ -311,8 +311,7 b' class RepoPullRequestsView(RepoAppView, '
311 pull_request_id = pull_request.pull_request_id
311 pull_request_id = pull_request.pull_request_id
312
312
313 c.state_progressing = pull_request.is_state_changing()
313 c.state_progressing = pull_request.is_state_changing()
314 c.pr_broadcast_channel = '/repo${}$/pr/{}'.format(
314 c.pr_broadcast_channel = channelstream.pr_channel(pull_request)
315 pull_request.target_repo.repo_name, pull_request.pull_request_id)
316
315
317 _new_state = {
316 _new_state = {
318 'created': PullRequest.STATE_CREATED,
317 'created': PullRequest.STATE_CREATED,
@@ -1238,8 +1237,7 b' class RepoPullRequestsView(RepoAppView, '
1238 'redirect_url': redirect_url}
1237 'redirect_url': redirect_url}
1239
1238
1240 is_state_changing = pull_request.is_state_changing()
1239 is_state_changing = pull_request.is_state_changing()
1241 c.pr_broadcast_channel = '/repo${}$/pr/{}'.format(
1240 c.pr_broadcast_channel = channelstream.pr_channel(pull_request)
1242 pull_request.target_repo.repo_name, pull_request.pull_request_id)
1243
1241
1244 # only owner or admin can update it
1242 # only owner or admin can update it
1245 allowed_to_update = PullRequestModel().check_user_update(
1243 allowed_to_update = PullRequestModel().check_user_update(
@@ -1339,7 +1337,8 b' class RepoPullRequestsView(RepoAppView, '
1339 count_removed=len(resp.changes.removed),
1337 count_removed=len(resp.changes.removed),
1340 change_source=changed)
1338 change_source=changed)
1341 h.flash(msg, category='success')
1339 h.flash(msg, category='success')
1342 self._pr_update_channelstream_push(c.pr_broadcast_channel, msg)
1340 channelstream.pr_update_channelstream_push(
1341 self.request, c.pr_broadcast_channel, self._rhodecode_user, msg)
1343 else:
1342 else:
1344 msg = PullRequestModel.UPDATE_STATUS_MESSAGES[resp.reason]
1343 msg = PullRequestModel.UPDATE_STATUS_MESSAGES[resp.reason]
1345 warning_reasons = [
1344 warning_reasons = [
@@ -1371,7 +1370,8 b' class RepoPullRequestsView(RepoAppView, '
1371
1370
1372 msg = _('Pull request reviewers updated.')
1371 msg = _('Pull request reviewers updated.')
1373 h.flash(msg, category='success')
1372 h.flash(msg, category='success')
1374 self._pr_update_channelstream_push(c.pr_broadcast_channel, msg)
1373 channelstream.pr_update_channelstream_push(
1374 self.request, c.pr_broadcast_channel, self._rhodecode_user, msg)
1375
1375
1376 # trigger status changed if change in reviewers changes the status
1376 # trigger status changed if change in reviewers changes the status
1377 calculated_status = pull_request.calculated_review_status()
1377 calculated_status = pull_request.calculated_review_status()
@@ -1394,7 +1394,8 b' class RepoPullRequestsView(RepoAppView, '
1394 Session().commit()
1394 Session().commit()
1395 msg = _('Pull request observers updated.')
1395 msg = _('Pull request observers updated.')
1396 h.flash(msg, category='success')
1396 h.flash(msg, category='success')
1397 self._pr_update_channelstream_push(c.pr_broadcast_channel, msg)
1397 channelstream.pr_update_channelstream_push(
1398 self.request, c.pr_broadcast_channel, self._rhodecode_user, msg)
1398
1399
1399 @LoginRequired()
1400 @LoginRequired()
1400 @NotAnonymous()
1401 @NotAnonymous()
@@ -1588,6 +1589,7 b' class RepoPullRequestsView(RepoAppView, '
1588 resolves_comment_id=resolves_comment_id,
1589 resolves_comment_id=resolves_comment_id,
1589 auth_user=self._rhodecode_user
1590 auth_user=self._rhodecode_user
1590 )
1591 )
1592 is_inline = bool(comment.f_path and comment.line_no)
1591
1593
1592 if allowed_to_change_status:
1594 if allowed_to_change_status:
1593 # calculate old status before we change it
1595 # calculate old status before we change it
@@ -1636,6 +1638,16 b' class RepoPullRequestsView(RepoAppView, '
1636 data.update(comment.get_dict())
1638 data.update(comment.get_dict())
1637 data.update({'rendered_text': rendered_comment})
1639 data.update({'rendered_text': rendered_comment})
1638
1640
1641 comment_broadcast_channel = channelstream.comment_channel(
1642 self.db_repo_name, pull_request_obj=pull_request)
1643
1644 comment_data = data
1645 comment_type = 'inline' if is_inline else 'general'
1646 channelstream.comment_channelstream_push(
1647 self.request, comment_broadcast_channel, self._rhodecode_user,
1648 _('posted a new {} comment').format(comment_type),
1649 comment_data=comment_data)
1650
1639 return data
1651 return data
1640
1652
1641 @LoginRequired()
1653 @LoginRequired()
@@ -225,14 +225,26 b' def write_history(config, message):'
225
225
226 def get_connection_validators(registry):
226 def get_connection_validators(registry):
227 validators = []
227 validators = []
228 for k, config in registry.rhodecode_plugins.iteritems():
228 for k, config in registry.rhodecode_plugins.items():
229 validator = config.get('channelstream', {}).get('connect_validator')
229 validator = config.get('channelstream', {}).get('connect_validator')
230 if validator:
230 if validator:
231 validators.append(validator)
231 validators.append(validator)
232 return validators
232 return validators
233
233
234
234
235 def get_channelstream_config(registry=None):
236 if not registry:
237 registry = get_current_registry()
238
239 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
240 channelstream_config = rhodecode_plugins.get('channelstream', {})
241 return channelstream_config
242
243
235 def post_message(channel, message, username, registry=None):
244 def post_message(channel, message, username, registry=None):
245 channelstream_config = get_channelstream_config(registry)
246 if not channelstream_config.get('enabled'):
247 return
236
248
237 message_obj = message
249 message_obj = message
238 if isinstance(message, basestring):
250 if isinstance(message, basestring):
@@ -242,26 +254,118 b' def post_message(channel, message, usern'
242 'topic': '/notifications'
254 'topic': '/notifications'
243 }
255 }
244
256
245 if not registry:
257 log.debug('Channelstream: sending notification to channel %s', channel)
246 registry = get_current_registry()
258 payload = {
259 'type': 'message',
260 'timestamp': datetime.datetime.utcnow(),
261 'user': 'system',
262 'exclude_users': [username],
263 'channel': channel,
264 'message': message_obj
265 }
266
267 try:
268 return channelstream_request(
269 channelstream_config, [payload], '/message',
270 raise_exc=False)
271 except ChannelstreamException:
272 log.exception('Failed to send channelstream data')
273 raise
274
275
276 def _reload_link(label):
277 return (
278 '<a onclick="window.location.reload()">'
279 '<strong>{}</strong>'
280 '</a>'.format(label)
281 )
282
283
284 def pr_channel(pull_request):
285 repo_name = pull_request.target_repo.repo_name
286 pull_request_id = pull_request.pull_request_id
287 channel = '/repo${}$/pr/{}'.format(repo_name, pull_request_id)
288 log.debug('Getting pull-request channelstream broadcast channel: %s', channel)
289 return channel
290
291
292 def comment_channel(repo_name, commit_obj=None, pull_request_obj=None):
293 channel = None
294 if commit_obj:
295 channel = u'/repo${}$/commit/{}'.format(
296 repo_name, commit_obj.raw_id
297 )
298 elif pull_request_obj:
299 channel = u'/repo${}$/pr/{}'.format(
300 repo_name, pull_request_obj.pull_request_id
301 )
302 log.debug('Getting comment channelstream broadcast channel: %s', channel)
303
304 return channel
305
306
307 def pr_update_channelstream_push(request, pr_broadcast_channel, user, msg, **kwargs):
308 """
309 Channel push on pull request update
310 """
311 if not pr_broadcast_channel:
312 return
247
313
248 log.debug('Channelstream: sending notification to channel %s', channel)
314 _ = request.translate
249 rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {})
315
250 channelstream_config = rhodecode_plugins.get('channelstream', {})
316 message = '{} {}'.format(
251 if channelstream_config.get('enabled'):
317 msg,
252 payload = {
318 _reload_link(_(' Reload page to load changes')))
253 'type': 'message',
319
254 'timestamp': datetime.datetime.utcnow(),
320 message_obj = {
255 'user': 'system',
321 'message': message,
256 'exclude_users': [username],
322 'level': 'success',
257 'channel': channel,
323 'topic': '/notifications'
258 'message': message_obj
324 }
259 }
325
326 post_message(
327 pr_broadcast_channel, message_obj, user.username,
328 registry=request.registry)
329
330
331 def comment_channelstream_push(request, comment_broadcast_channel, user, msg, **kwargs):
332 """
333 Channelstream push on comment action, on commit, or pull-request
334 """
335 if not comment_broadcast_channel:
336 return
337
338 _ = request.translate
260
339
261 try:
340 comment_data = kwargs.pop('comment_data', {})
262 return channelstream_request(
341 user_data = kwargs.pop('user_data', {})
263 channelstream_config, [payload], '/message',
342 comment_id = comment_data.get('comment_id')
264 raise_exc=False)
343
265 except ChannelstreamException:
344 message = '<strong>{}</strong> {} #{}, {}'.format(
266 log.exception('Failed to send channelstream data')
345 user.username,
267 raise
346 msg,
347 comment_id,
348 _reload_link(_('Reload page to see new comments')),
349 )
350
351 message_obj = {
352 'message': message,
353 'level': 'success',
354 'topic': '/notifications'
355 }
356
357 post_message(
358 comment_broadcast_channel, message_obj, user.username,
359 registry=request.registry)
360
361 message_obj = {
362 'message': None,
363 'user': user.username,
364 'comment_id': comment_id,
365 'comment_data': comment_data,
366 'user_data': user_data,
367 'topic': '/comment'
368 }
369 post_message(
370 comment_broadcast_channel, message_obj, user.username,
371 registry=request.registry)
@@ -462,55 +462,11 b' class CommentsModel(BaseModel):'
462 else:
462 else:
463 action = 'repo.commit.comment.create'
463 action = 'repo.commit.comment.create'
464
464
465 comment_id = comment.comment_id
466 comment_data = comment.get_api_data()
465 comment_data = comment.get_api_data()
467
466
468 self._log_audit_action(
467 self._log_audit_action(
469 action, {'data': comment_data}, auth_user, comment)
468 action, {'data': comment_data}, auth_user, comment)
470
469
471 channel = None
472 if commit_obj:
473 repo_name = repo.repo_name
474 channel = u'/repo${}$/commit/{}'.format(
475 repo_name,
476 commit_obj.raw_id
477 )
478 elif pull_request_obj:
479 repo_name = pr_target_repo.repo_name
480 channel = u'/repo${}$/pr/{}'.format(
481 repo_name,
482 pull_request_obj.pull_request_id
483 )
484
485 if channel:
486 username = user.username
487 message = '<strong>{}</strong> {} #{}, {}'
488 message = message.format(
489 username,
490 _('posted a new comment'),
491 comment_id,
492 _('Refresh the page to see new comments.'))
493
494 message_obj = {
495 'message': message,
496 'level': 'success',
497 'topic': '/notifications'
498 }
499
500 channelstream.post_message(
501 channel, message_obj, user.username,
502 registry=get_current_registry())
503
504 message_obj = {
505 'message': None,
506 'user': username,
507 'comment_id': comment_id,
508 'topic': '/comment'
509 }
510 channelstream.post_message(
511 channel, message_obj, user.username,
512 registry=get_current_registry())
513
514 return comment
470 return comment
515
471
516 def edit(self, comment_id, text, auth_user, version):
472 def edit(self, comment_id, text, auth_user, version):
General Comments 0
You need to be logged in to leave comments. Login now