# HG changeset patch # User Marcin Kuzminski # Date 2020-10-06 09:54:29 # Node ID 7d9c5b92bebead1886b81a45b11449f01b12a945 # Parent eb97c0f27e5c2ad17a6ff2bbb2588b2dce441d93 channelstream: cleanup, and re-organize code for posting comments/pr updated messages. - now shares new common code - fixed cases for both views and API in consistent way diff --git a/rhodecode/api/views/pull_request_api.py b/rhodecode/api/views/pull_request_api.py --- a/rhodecode/api/views/pull_request_api.py +++ b/rhodecode/api/views/pull_request_api.py @@ -26,6 +26,7 @@ from rhodecode.api.utils import ( has_superadmin_permission, Optional, OAttr, get_repo_or_error, get_pull_request_or_error, get_commit_or_error, get_user_or_error, validate_repo_permissions, resolve_ref_or_error, validate_set_owner_permissions) +from rhodecode.lib import channelstream from rhodecode.lib.auth import (HasRepoPermissionAnyApi) from rhodecode.lib.base import vcs_operation_context from rhodecode.lib.utils2 import str2bool @@ -502,16 +503,19 @@ def comment_pull_request( }, error : null """ + _ = request.translate + pull_request = get_pull_request_or_error(pullrequestid) if Optional.extract(repoid): repo = get_repo_or_error(repoid) else: repo = pull_request.target_repo + db_repo_name = repo.repo_name auth_user = apiuser if not isinstance(userid, Optional): is_repo_admin = HasRepoPermissionAnyApi('repository.admin')( - user=apiuser, repo_name=repo.repo_name) + user=apiuser, repo_name=db_repo_name) if has_superadmin_permission(apiuser) or is_repo_admin: apiuser = get_user_or_error(userid) auth_user = apiuser.AuthUser() @@ -596,6 +600,7 @@ def comment_pull_request( extra_recipients=extra_recipients, send_email=send_email ) + is_inline = bool(comment.f_path and comment.line_no) if allowed_to_change_status and status: old_calculated_status = pull_request.calculated_review_status() @@ -628,6 +633,17 @@ def comment_pull_request( 'comment_id': comment.comment_id if comment else None, 'status': {'given': status, 'was_changed': status_change}, } + + comment_broadcast_channel = channelstream.comment_channel( + db_repo_name, pull_request_obj=pull_request) + + comment_data = data + comment_type = 'inline' if is_inline else 'general' + channelstream.comment_channelstream_push( + request, comment_broadcast_channel, apiuser, + _('posted a new {} comment').format(comment_type), + comment_data=comment_data) + return data @@ -881,6 +897,8 @@ def update_pull_request( description = Optional.extract(description) description_renderer = Optional.extract(description_renderer) + # Update title/description + title_changed = False if title or description: PullRequestModel().edit( pull_request, @@ -889,8 +907,12 @@ def update_pull_request( description_renderer or pull_request.description_renderer, apiuser) Session().commit() + title_changed = True commit_changes = {"added": [], "common": [], "removed": []} + + # Update commits + commits_changed = False if str2bool(Optional.extract(update_commits)): if pull_request.pull_request_state != PullRequest.STATE_CREATED: @@ -906,7 +928,10 @@ def update_pull_request( pull_request, db_user) commit_changes = update_response.changes or commit_changes Session().commit() + commits_changed = True + # Update reviewers + reviewers_changed = False reviewers_changes = {"added": [], "removed": []} if reviewers: old_calculated_status = pull_request.calculated_review_status() @@ -925,6 +950,16 @@ def update_pull_request( PullRequestModel().trigger_pull_request_hook( pull_request, apiuser, 'review_status_change', data={'status': calculated_status}) + reviewers_changed = True + + observers_changed = False + + # push changed to channelstream + if commits_changed or reviewers_changed or observers_changed: + pr_broadcast_channel = channelstream.pr_channel(pull_request) + msg = 'Pull request was updated.' + channelstream.pr_update_channelstream_push( + request, pr_broadcast_channel, apiuser, msg) data = { 'msg': 'Updated pull request `{}`'.format( diff --git a/rhodecode/api/views/repo_api.py b/rhodecode/api/views/repo_api.py --- a/rhodecode/api/views/repo_api.py +++ b/rhodecode/api/views/repo_api.py @@ -29,7 +29,7 @@ from rhodecode.api.utils import ( get_user_group_or_error, get_user_or_error, validate_repo_permissions, get_perm_or_error, parse_args, get_origin, build_commit_data, validate_set_owner_permissions) -from rhodecode.lib import audit_logger, rc_cache +from rhodecode.lib import audit_logger, rc_cache, channelstream from rhodecode.lib import repo_maintenance from rhodecode.lib.auth import ( HasPermissionAnyApi, HasUserGroupPermissionAnyApi, @@ -1597,10 +1597,13 @@ def comment_commit( } """ + _ = request.translate + repo = get_repo_or_error(repoid) if not has_superadmin_permission(apiuser): _perms = ('repository.read', 'repository.write', 'repository.admin') validate_repo_permissions(apiuser, repoid, repo, _perms) + db_repo_name = repo.repo_name try: commit = repo.scm_instance().get_commit(commit_id=commit_id) @@ -1650,6 +1653,8 @@ def comment_commit( extra_recipients=extra_recipients, send_email=send_email ) + is_inline = bool(comment.f_path and comment.line_no) + if status: # also do a status change try: @@ -1669,6 +1674,17 @@ def comment_commit( data={'comment': comment, 'commit': commit}) Session().commit() + + comment_broadcast_channel = channelstream.comment_channel( + db_repo_name, commit_obj=commit) + + comment_data = {'comment': comment, 'comment_id': comment.comment_id} + comment_type = 'inline' if is_inline else 'general' + channelstream.comment_channelstream_push( + request, comment_broadcast_channel, apiuser, + _('posted a new {} comment').format(comment_type), + comment_data=comment_data) + return { 'msg': ( 'Commented on commit `%s` for repository `%s`' % ( diff --git a/rhodecode/apps/repository/views/repo_commits.py b/rhodecode/apps/repository/views/repo_commits.py --- a/rhodecode/apps/repository/views/repo_commits.py +++ b/rhodecode/apps/repository/views/repo_commits.py @@ -31,7 +31,7 @@ from rhodecode.apps._base import RepoApp from rhodecode.apps.file_store import utils as store_utils from rhodecode.apps.file_store.exceptions import FileNotAllowedException, FileOverSizeException -from rhodecode.lib import diffs, codeblocks +from rhodecode.lib import diffs, codeblocks, channelstream from rhodecode.lib.auth import ( LoginRequired, HasRepoPermissionAnyDecorator, NotAnonymous, CSRFRequired) from rhodecode.lib.ext_json import json @@ -210,10 +210,7 @@ class RepoCommitsView(RepoAppView): # NOTE(marcink): this uses the same voting logic as in pull-requests c.commit_review_status = ChangesetStatusModel().calculate_status(review_statuses) - c.commit_broadcast_channel = u'/repo${}$/commit/{}'.format( - c.repo_name, - commit.raw_id - ) + c.commit_broadcast_channel = channelstream.comment_channel(c.repo_name, commit_obj=commit) diff = None # Iterate over ranges (default commit view is always one commit) @@ -417,6 +414,7 @@ class RepoCommitsView(RepoAppView): resolves_comment_id=resolves_comment_id, auth_user=self._rhodecode_user ) + is_inline = bool(comment.f_path and comment.line_no) # get status if set ! if status: @@ -464,6 +462,16 @@ class RepoCommitsView(RepoAppView): data.update(comment.get_dict()) data.update({'rendered_text': rendered_comment}) + comment_broadcast_channel = channelstream.comment_channel( + self.db_repo_name, commit_obj=commit) + + comment_data = data + comment_type = 'inline' if is_inline else 'general' + channelstream.comment_channelstream_push( + self.request, comment_broadcast_channel, self._rhodecode_user, + _('posted a new {} comment').format(comment_type), + comment_data=comment_data) + return data @LoginRequired() diff --git a/rhodecode/apps/repository/views/repo_pull_requests.py b/rhodecode/apps/repository/views/repo_pull_requests.py --- a/rhodecode/apps/repository/views/repo_pull_requests.py +++ b/rhodecode/apps/repository/views/repo_pull_requests.py @@ -311,8 +311,7 @@ class RepoPullRequestsView(RepoAppView, pull_request_id = pull_request.pull_request_id c.state_progressing = pull_request.is_state_changing() - c.pr_broadcast_channel = '/repo${}$/pr/{}'.format( - pull_request.target_repo.repo_name, pull_request.pull_request_id) + c.pr_broadcast_channel = channelstream.pr_channel(pull_request) _new_state = { 'created': PullRequest.STATE_CREATED, @@ -1238,8 +1237,7 @@ class RepoPullRequestsView(RepoAppView, 'redirect_url': redirect_url} is_state_changing = pull_request.is_state_changing() - c.pr_broadcast_channel = '/repo${}$/pr/{}'.format( - pull_request.target_repo.repo_name, pull_request.pull_request_id) + c.pr_broadcast_channel = channelstream.pr_channel(pull_request) # only owner or admin can update it allowed_to_update = PullRequestModel().check_user_update( @@ -1339,7 +1337,8 @@ class RepoPullRequestsView(RepoAppView, count_removed=len(resp.changes.removed), change_source=changed) h.flash(msg, category='success') - self._pr_update_channelstream_push(c.pr_broadcast_channel, msg) + channelstream.pr_update_channelstream_push( + self.request, c.pr_broadcast_channel, self._rhodecode_user, msg) else: msg = PullRequestModel.UPDATE_STATUS_MESSAGES[resp.reason] warning_reasons = [ @@ -1371,7 +1370,8 @@ class RepoPullRequestsView(RepoAppView, msg = _('Pull request reviewers updated.') h.flash(msg, category='success') - self._pr_update_channelstream_push(c.pr_broadcast_channel, msg) + channelstream.pr_update_channelstream_push( + self.request, c.pr_broadcast_channel, self._rhodecode_user, msg) # trigger status changed if change in reviewers changes the status calculated_status = pull_request.calculated_review_status() @@ -1394,7 +1394,8 @@ class RepoPullRequestsView(RepoAppView, Session().commit() msg = _('Pull request observers updated.') h.flash(msg, category='success') - self._pr_update_channelstream_push(c.pr_broadcast_channel, msg) + channelstream.pr_update_channelstream_push( + self.request, c.pr_broadcast_channel, self._rhodecode_user, msg) @LoginRequired() @NotAnonymous() @@ -1588,6 +1589,7 @@ class RepoPullRequestsView(RepoAppView, resolves_comment_id=resolves_comment_id, auth_user=self._rhodecode_user ) + is_inline = bool(comment.f_path and comment.line_no) if allowed_to_change_status: # calculate old status before we change it @@ -1636,6 +1638,16 @@ class RepoPullRequestsView(RepoAppView, data.update(comment.get_dict()) data.update({'rendered_text': rendered_comment}) + comment_broadcast_channel = channelstream.comment_channel( + self.db_repo_name, pull_request_obj=pull_request) + + comment_data = data + comment_type = 'inline' if is_inline else 'general' + channelstream.comment_channelstream_push( + self.request, comment_broadcast_channel, self._rhodecode_user, + _('posted a new {} comment').format(comment_type), + comment_data=comment_data) + return data @LoginRequired() diff --git a/rhodecode/lib/channelstream.py b/rhodecode/lib/channelstream.py --- a/rhodecode/lib/channelstream.py +++ b/rhodecode/lib/channelstream.py @@ -225,14 +225,26 @@ def write_history(config, message): def get_connection_validators(registry): validators = [] - for k, config in registry.rhodecode_plugins.iteritems(): + for k, config in registry.rhodecode_plugins.items(): validator = config.get('channelstream', {}).get('connect_validator') if validator: validators.append(validator) return validators +def get_channelstream_config(registry=None): + if not registry: + registry = get_current_registry() + + rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {}) + channelstream_config = rhodecode_plugins.get('channelstream', {}) + return channelstream_config + + def post_message(channel, message, username, registry=None): + channelstream_config = get_channelstream_config(registry) + if not channelstream_config.get('enabled'): + return message_obj = message if isinstance(message, basestring): @@ -242,26 +254,118 @@ def post_message(channel, message, usern 'topic': '/notifications' } - if not registry: - registry = get_current_registry() + log.debug('Channelstream: sending notification to channel %s', channel) + payload = { + 'type': 'message', + 'timestamp': datetime.datetime.utcnow(), + 'user': 'system', + 'exclude_users': [username], + 'channel': channel, + 'message': message_obj + } + + try: + return channelstream_request( + channelstream_config, [payload], '/message', + raise_exc=False) + except ChannelstreamException: + log.exception('Failed to send channelstream data') + raise + + +def _reload_link(label): + return ( + '' + '{}' + ''.format(label) + ) + + +def pr_channel(pull_request): + repo_name = pull_request.target_repo.repo_name + pull_request_id = pull_request.pull_request_id + channel = '/repo${}$/pr/{}'.format(repo_name, pull_request_id) + log.debug('Getting pull-request channelstream broadcast channel: %s', channel) + return channel + + +def comment_channel(repo_name, commit_obj=None, pull_request_obj=None): + channel = None + if commit_obj: + channel = u'/repo${}$/commit/{}'.format( + repo_name, commit_obj.raw_id + ) + elif pull_request_obj: + channel = u'/repo${}$/pr/{}'.format( + repo_name, pull_request_obj.pull_request_id + ) + log.debug('Getting comment channelstream broadcast channel: %s', channel) + + return channel + + +def pr_update_channelstream_push(request, pr_broadcast_channel, user, msg, **kwargs): + """ + Channel push on pull request update + """ + if not pr_broadcast_channel: + return - log.debug('Channelstream: sending notification to channel %s', channel) - rhodecode_plugins = getattr(registry, 'rhodecode_plugins', {}) - channelstream_config = rhodecode_plugins.get('channelstream', {}) - if channelstream_config.get('enabled'): - payload = { - 'type': 'message', - 'timestamp': datetime.datetime.utcnow(), - 'user': 'system', - 'exclude_users': [username], - 'channel': channel, - 'message': message_obj - } + _ = request.translate + + message = '{} {}'.format( + msg, + _reload_link(_(' Reload page to load changes'))) + + message_obj = { + 'message': message, + 'level': 'success', + 'topic': '/notifications' + } + + post_message( + pr_broadcast_channel, message_obj, user.username, + registry=request.registry) + + +def comment_channelstream_push(request, comment_broadcast_channel, user, msg, **kwargs): + """ + Channelstream push on comment action, on commit, or pull-request + """ + if not comment_broadcast_channel: + return + + _ = request.translate - try: - return channelstream_request( - channelstream_config, [payload], '/message', - raise_exc=False) - except ChannelstreamException: - log.exception('Failed to send channelstream data') - raise + comment_data = kwargs.pop('comment_data', {}) + user_data = kwargs.pop('user_data', {}) + comment_id = comment_data.get('comment_id') + + message = '{} {} #{}, {}'.format( + user.username, + msg, + comment_id, + _reload_link(_('Reload page to see new comments')), + ) + + message_obj = { + 'message': message, + 'level': 'success', + 'topic': '/notifications' + } + + post_message( + comment_broadcast_channel, message_obj, user.username, + registry=request.registry) + + message_obj = { + 'message': None, + 'user': user.username, + 'comment_id': comment_id, + 'comment_data': comment_data, + 'user_data': user_data, + 'topic': '/comment' + } + post_message( + comment_broadcast_channel, message_obj, user.username, + registry=request.registry) diff --git a/rhodecode/model/comment.py b/rhodecode/model/comment.py --- a/rhodecode/model/comment.py +++ b/rhodecode/model/comment.py @@ -462,55 +462,11 @@ class CommentsModel(BaseModel): else: action = 'repo.commit.comment.create' - comment_id = comment.comment_id comment_data = comment.get_api_data() self._log_audit_action( action, {'data': comment_data}, auth_user, comment) - channel = None - if commit_obj: - repo_name = repo.repo_name - channel = u'/repo${}$/commit/{}'.format( - repo_name, - commit_obj.raw_id - ) - elif pull_request_obj: - repo_name = pr_target_repo.repo_name - channel = u'/repo${}$/pr/{}'.format( - repo_name, - pull_request_obj.pull_request_id - ) - - if channel: - username = user.username - message = '{} {} #{}, {}' - message = message.format( - username, - _('posted a new comment'), - comment_id, - _('Refresh the page to see new comments.')) - - message_obj = { - 'message': message, - 'level': 'success', - 'topic': '/notifications' - } - - channelstream.post_message( - channel, message_obj, user.username, - registry=get_current_registry()) - - message_obj = { - 'message': None, - 'user': username, - 'comment_id': comment_id, - 'topic': '/comment' - } - channelstream.post_message( - channel, message_obj, user.username, - registry=get_current_registry()) - return comment def edit(self, comment_id, text, auth_user, version):