##// END OF EJS Templates
api: use consistent way to extract users, repos, repo groups and user groups by id or name....
marcink -
r1530:1efcb4ee default
parent child Browse files
Show More
@@ -1,268 +1,294 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 import pytest
23 23 from mock import Mock, patch
24 24
25 25 from rhodecode.api import utils
26 26 from rhodecode.api import JSONRPCError
27 27 from rhodecode.lib.vcs.exceptions import RepositoryError
28 28
29 29
30 30 class TestGetCommitOrError(object):
31 31 def setup(self):
32 32 self.commit_hash = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa10'
33 33
34 34 @pytest.mark.parametrize("ref", ['ref', '12345', 'a:b:c:d', 'branch:name'])
35 35 def test_ref_cannot_be_parsed(self, ref):
36 36 repo = Mock()
37 37 with pytest.raises(JSONRPCError) as excinfo:
38 38 utils.get_commit_or_error(ref, repo)
39 39 expected_message = (
40 40 'Ref `{ref}` given in a wrong format. Please check the API'
41 41 ' documentation for more details'.format(ref=ref)
42 42 )
43 43 assert excinfo.value.message == expected_message
44 44
45 45 def test_success_with_hash_specified(self):
46 46 repo = Mock()
47 47 ref_type = 'branch'
48 48 ref = '{}:master:{}'.format(ref_type, self.commit_hash)
49 49
50 50 with patch('rhodecode.api.utils.get_commit_from_ref_name') as get_commit:
51 51 result = utils.get_commit_or_error(ref, repo)
52 52 get_commit.assert_called_once_with(
53 53 repo, self.commit_hash)
54 54 assert result == get_commit()
55 55
56 56 def test_raises_an_error_when_commit_not_found(self):
57 57 repo = Mock()
58 58 ref = 'branch:master:{}'.format(self.commit_hash)
59 59
60 60 with patch('rhodecode.api.utils.get_commit_from_ref_name') as get_commit:
61 61 get_commit.side_effect = RepositoryError('Commit not found')
62 62 with pytest.raises(JSONRPCError) as excinfo:
63 63 utils.get_commit_or_error(ref, repo)
64 64 expected_message = 'Ref `{}` does not exist'.format(ref)
65 65 assert excinfo.value.message == expected_message
66 66
67 67
68 68 class TestResolveRefOrError(object):
69 69 def setup(self):
70 70 self.commit_hash = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa10'
71 71
72 72 def test_success_with_no_hash_specified(self):
73 73 repo = Mock()
74 74 ref_type = 'branch'
75 75 ref_name = 'master'
76 76 ref = '{}:{}'.format(ref_type, ref_name)
77 77
78 78 with patch('rhodecode.api.utils._get_ref_hash') \
79 79 as _get_ref_hash:
80 80 _get_ref_hash.return_value = self.commit_hash
81 81 result = utils.resolve_ref_or_error(ref, repo)
82 82 _get_ref_hash.assert_called_once_with(repo, ref_type, ref_name)
83 83 assert result == '{}:{}'.format(ref, self.commit_hash)
84 84
85 85 def test_non_supported_refs(self):
86 86 repo = Mock()
87 87 ref = 'ancestor:ref'
88 88 with pytest.raises(JSONRPCError) as excinfo:
89 89 utils.resolve_ref_or_error(ref, repo)
90 90 expected_message = 'The specified ancestor `ref` does not exist'
91 91 assert excinfo.value.message == expected_message
92 92
93 93 def test_branch_is_not_found(self):
94 94 repo = Mock()
95 95 ref = 'branch:non-existing-one'
96 96 with patch('rhodecode.api.utils._get_ref_hash')\
97 97 as _get_ref_hash:
98 98 _get_ref_hash.side_effect = KeyError()
99 99 with pytest.raises(JSONRPCError) as excinfo:
100 100 utils.resolve_ref_or_error(ref, repo)
101 101 expected_message = (
102 102 'The specified branch `non-existing-one` does not exist')
103 103 assert excinfo.value.message == expected_message
104 104
105 105 def test_bookmark_is_not_found(self):
106 106 repo = Mock()
107 107 ref = 'bookmark:non-existing-one'
108 108 with patch('rhodecode.api.utils._get_ref_hash')\
109 109 as _get_ref_hash:
110 110 _get_ref_hash.side_effect = KeyError()
111 111 with pytest.raises(JSONRPCError) as excinfo:
112 112 utils.resolve_ref_or_error(ref, repo)
113 113 expected_message = (
114 114 'The specified bookmark `non-existing-one` does not exist')
115 115 assert excinfo.value.message == expected_message
116 116
117 117 @pytest.mark.parametrize("ref", ['ref', '12345', 'a:b:c:d'])
118 118 def test_ref_cannot_be_parsed(self, ref):
119 119 repo = Mock()
120 120 with pytest.raises(JSONRPCError) as excinfo:
121 121 utils.resolve_ref_or_error(ref, repo)
122 122 expected_message = (
123 123 'Ref `{ref}` given in a wrong format. Please check the API'
124 124 ' documentation for more details'.format(ref=ref)
125 125 )
126 126 assert excinfo.value.message == expected_message
127 127
128 128
129 129 class TestGetRefHash(object):
130 130 def setup(self):
131 131 self.commit_hash = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa10'
132 132 self.bookmark_name = 'test-bookmark'
133 133
134 134 @pytest.mark.parametrize("alias, branch_name", [
135 135 ("git", "master"),
136 136 ("hg", "default")
137 137 ])
138 138 def test_returns_hash_by_branch_name(self, alias, branch_name):
139 139 with patch('rhodecode.model.db.Repository') as repo:
140 140 repo.scm_instance().alias = alias
141 141 repo.scm_instance().branches = {branch_name: self.commit_hash}
142 142 result_hash = utils._get_ref_hash(repo, 'branch', branch_name)
143 143 assert result_hash == self.commit_hash
144 144
145 145 @pytest.mark.parametrize("alias, branch_name", [
146 146 ("git", "master"),
147 147 ("hg", "default")
148 148 ])
149 149 def test_raises_error_when_branch_is_not_found(self, alias, branch_name):
150 150 with patch('rhodecode.model.db.Repository') as repo:
151 151 repo.scm_instance().alias = alias
152 152 repo.scm_instance().branches = {}
153 153 with pytest.raises(KeyError):
154 154 utils._get_ref_hash(repo, 'branch', branch_name)
155 155
156 156 def test_returns_hash_when_bookmark_is_specified_for_hg(self):
157 157 with patch('rhodecode.model.db.Repository') as repo:
158 158 repo.scm_instance().alias = 'hg'
159 159 repo.scm_instance().bookmarks = {
160 160 self.bookmark_name: self.commit_hash}
161 161 result_hash = utils._get_ref_hash(
162 162 repo, 'bookmark', self.bookmark_name)
163 163 assert result_hash == self.commit_hash
164 164
165 165 def test_raises_error_when_bookmark_is_not_found_in_hg_repo(self):
166 166 with patch('rhodecode.model.db.Repository') as repo:
167 167 repo.scm_instance().alias = 'hg'
168 168 repo.scm_instance().bookmarks = {}
169 169 with pytest.raises(KeyError):
170 170 utils._get_ref_hash(repo, 'bookmark', self.bookmark_name)
171 171
172 172 def test_raises_error_when_bookmark_is_specified_for_git(self):
173 173 with patch('rhodecode.model.db.Repository') as repo:
174 174 repo.scm_instance().alias = 'git'
175 175 repo.scm_instance().bookmarks = {
176 176 self.bookmark_name: self.commit_hash}
177 177 with pytest.raises(ValueError):
178 178 utils._get_ref_hash(repo, 'bookmark', self.bookmark_name)
179 179
180 180
181 181 class TestUserByNameOrError(object):
182 182 def test_user_found_by_id(self):
183 183 fake_user = Mock(id=123)
184
185 patcher = patch('rhodecode.model.user.UserModel.get_user')
186 with patcher as get_user:
187 get_user.return_value = fake_user
188
189 patcher = patch('rhodecode.model.user.UserModel.get_by_username')
190 with patcher as get_by_username:
191 result = utils.get_user_or_error(123)
192 assert result == fake_user
193
194 def test_user_not_found_by_id_as_str(self):
195 fake_user = Mock(id=123)
196
184 197 patcher = patch('rhodecode.model.user.UserModel.get_user')
185 198 with patcher as get_user:
186 199 get_user.return_value = fake_user
187 result = utils.get_user_or_error('123')
188 assert result == fake_user
200 patcher = patch('rhodecode.model.user.UserModel.get_by_username')
201 with patcher as get_by_username:
202 get_by_username.return_value = None
203
204 with pytest.raises(JSONRPCError):
205 utils.get_user_or_error('123')
189 206
190 207 def test_user_found_by_name(self):
191 208 fake_user = Mock(id=123)
192 patcher = patch('rhodecode.model.user.UserModel.get_by_username')
193 with patcher as get_by_username:
194 get_by_username.return_value = fake_user
195 result = utils.get_user_or_error('test')
196 assert result == fake_user
209
210 patcher = patch('rhodecode.model.user.UserModel.get_user')
211 with patcher as get_user:
212 get_user.return_value = None
213
214 patcher = patch('rhodecode.model.user.UserModel.get_by_username')
215 with patcher as get_by_username:
216 get_by_username.return_value = fake_user
217
218 result = utils.get_user_or_error('test')
219 assert result == fake_user
197 220
198 221 def test_user_not_found_by_id(self):
199 222 patcher = patch('rhodecode.model.user.UserModel.get_user')
200 223 with patcher as get_user:
201 224 get_user.return_value = None
202 with pytest.raises(JSONRPCError) as excinfo:
203 utils.get_user_or_error('123')
225 patcher = patch('rhodecode.model.user.UserModel.get_by_username')
226 with patcher as get_by_username:
227 get_by_username.return_value = None
204 228
205 expected_message = 'user `123` does not exist'
206 assert excinfo.value.message == expected_message
229 with pytest.raises(JSONRPCError) as excinfo:
230 utils.get_user_or_error(123)
231
232 expected_message = 'user `123` does not exist'
233 assert excinfo.value.message == expected_message
207 234
208 235 def test_user_not_found_by_name(self):
209 236 patcher = patch('rhodecode.model.user.UserModel.get_by_username')
210 237 with patcher as get_by_username:
211 238 get_by_username.return_value = None
212 239 with pytest.raises(JSONRPCError) as excinfo:
213 240 utils.get_user_or_error('test')
214 241
215 242 expected_message = 'user `test` does not exist'
216 243 assert excinfo.value.message == expected_message
217 244
218 245
219 class TestGetCommitDict:
220
246 class TestGetCommitDict(object):
221 247 @pytest.mark.parametrize('filename, expected', [
222 248 (b'sp\xc3\xa4cial', u'sp\xe4cial'),
223 249 (b'sp\xa4cial', u'sp\ufffdcial'),
224 250 ])
225 251 def test_decodes_filenames_to_unicode(self, filename, expected):
226 252 result = utils._get_commit_dict(filename=filename, op='A')
227 253 assert result['filename'] == expected
228 254
229 255
230 256 class TestRepoAccess(object):
231 257 def setup_method(self, method):
232 258
233 259 self.admin_perm_patch = patch(
234 260 'rhodecode.api.utils.HasPermissionAnyApi')
235 261 self.repo_perm_patch = patch(
236 262 'rhodecode.api.utils.HasRepoPermissionAnyApi')
237 263
238 264 def test_has_superadmin_permission_checks_for_admin(self):
239 265 admin_mock = Mock()
240 266 with self.admin_perm_patch as amock:
241 267 amock.return_value = admin_mock
242 268 assert utils.has_superadmin_permission('fake_user')
243 269 amock.assert_called_once_with('hg.admin')
244 270
245 271 admin_mock.assert_called_once_with(user='fake_user')
246 272
247 273 def test_has_repo_permissions_checks_for_repo_access(self):
248 274 repo_mock = Mock()
249 275 fake_repo = Mock()
250 276 with self.repo_perm_patch as rmock:
251 277 rmock.return_value = repo_mock
252 278 assert utils.validate_repo_permissions(
253 279 'fake_user', 'fake_repo_id', fake_repo,
254 280 ['perm1', 'perm2'])
255 281 rmock.assert_called_once_with(*['perm1', 'perm2'])
256 282
257 283 repo_mock.assert_called_once_with(
258 284 user='fake_user', repo_name=fake_repo.repo_name)
259 285
260 286 def test_has_repo_permissions_raises_not_found(self):
261 287 repo_mock = Mock(return_value=False)
262 288 fake_repo = Mock()
263 289 with self.repo_perm_patch as rmock:
264 290 rmock.return_value = repo_mock
265 291 with pytest.raises(JSONRPCError) as excinfo:
266 292 utils.validate_repo_permissions(
267 293 'fake_user', 'fake_repo_id', fake_repo, 'perms')
268 294 assert 'fake_repo_id' in excinfo
@@ -1,412 +1,442 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 JSON RPC utils
23 23 """
24 24
25 25 import collections
26 26 import logging
27 27
28 28 from rhodecode.api.exc import JSONRPCError
29 29 from rhodecode.lib.auth import (
30 30 HasPermissionAnyApi, HasRepoPermissionAnyApi, HasRepoGroupPermissionAnyApi)
31 31 from rhodecode.lib.utils import safe_unicode
32 32 from rhodecode.lib.vcs.exceptions import RepositoryError
33 33 from rhodecode.controllers.utils import get_commit_from_ref_name
34 34 from rhodecode.lib.utils2 import str2bool
35 35
36 36 log = logging.getLogger(__name__)
37 37
38 38
39 39 class OAttr(object):
40 40 """
41 41 Special Option that defines other attribute, and can default to them
42 42
43 43 Example::
44 44
45 45 def test(apiuser, userid=Optional(OAttr('apiuser')):
46 46 user = Optional.extract(userid, evaluate_locals=local())
47 47 #if we pass in userid, we get it, else it will default to apiuser
48 48 #attribute
49 49 """
50 50
51 51 def __init__(self, attr_name):
52 52 self.attr_name = attr_name
53 53
54 54 def __repr__(self):
55 55 return '<OptionalAttr:%s>' % self.attr_name
56 56
57 57 def __call__(self):
58 58 return self
59 59
60 60
61 61 class Optional(object):
62 62 """
63 63 Defines an optional parameter::
64 64
65 65 param = param.getval() if isinstance(param, Optional) else param
66 66 param = param() if isinstance(param, Optional) else param
67 67
68 68 is equivalent of::
69 69
70 70 param = Optional.extract(param)
71 71
72 72 """
73 73
74 74 def __init__(self, type_):
75 75 self.type_ = type_
76 76
77 77 def __repr__(self):
78 78 return '<Optional:%s>' % self.type_.__repr__()
79 79
80 80 def __call__(self):
81 81 return self.getval()
82 82
83 83 def getval(self, evaluate_locals=None):
84 84 """
85 85 returns value from this Optional instance
86 86 """
87 87 if isinstance(self.type_, OAttr):
88 88 param_name = self.type_.attr_name
89 89 if evaluate_locals:
90 90 return evaluate_locals[param_name]
91 91 # use params name
92 92 return param_name
93 93 return self.type_
94 94
95 95 @classmethod
96 96 def extract(cls, val, evaluate_locals=None, binary=None):
97 97 """
98 98 Extracts value from Optional() instance
99 99
100 100 :param val:
101 101 :return: original value if it's not Optional instance else
102 102 value of instance
103 103 """
104 104 if isinstance(val, cls):
105 105 val = val.getval(evaluate_locals)
106 106
107 107 if binary:
108 108 val = str2bool(val)
109 109
110 110 return val
111 111
112 112
113 113 def parse_args(cli_args, key_prefix=''):
114 114 from rhodecode.lib.utils2 import (escape_split)
115 115 kwargs = collections.defaultdict(dict)
116 116 for el in escape_split(cli_args, ','):
117 117 kv = escape_split(el, '=', 1)
118 118 if len(kv) == 2:
119 119 k, v = kv
120 120 kwargs[key_prefix + k] = v
121 121 return kwargs
122 122
123 123
124 124 def get_origin(obj):
125 125 """
126 126 Get origin of permission from object.
127 127
128 128 :param obj:
129 129 """
130 130 origin = 'permission'
131 131
132 132 if getattr(obj, 'owner_row', '') and getattr(obj, 'admin_row', ''):
133 133 # admin and owner case, maybe we should use dual string ?
134 134 origin = 'owner'
135 135 elif getattr(obj, 'owner_row', ''):
136 136 origin = 'owner'
137 137 elif getattr(obj, 'admin_row', ''):
138 138 origin = 'super-admin'
139 139 return origin
140 140
141 141
142 142 def store_update(updates, attr, name):
143 143 """
144 144 Stores param in updates dict if it's not instance of Optional
145 145 allows easy updates of passed in params
146 146 """
147 147 if not isinstance(attr, Optional):
148 148 updates[name] = attr
149 149
150 150
151 151 def has_superadmin_permission(apiuser):
152 152 """
153 153 Return True if apiuser is admin or return False
154 154
155 155 :param apiuser:
156 156 """
157 157 if HasPermissionAnyApi('hg.admin')(user=apiuser):
158 158 return True
159 159 return False
160 160
161 161
162 162 def validate_repo_permissions(apiuser, repoid, repo, perms):
163 163 """
164 164 Raise JsonRPCError if apiuser is not authorized or return True
165 165
166 166 :param apiuser:
167 167 :param repoid:
168 168 :param repo:
169 169 :param perms:
170 170 """
171 171 if not HasRepoPermissionAnyApi(*perms)(
172 172 user=apiuser, repo_name=repo.repo_name):
173 173 raise JSONRPCError(
174 174 'repository `%s` does not exist' % repoid)
175 175
176 176 return True
177 177
178 178
179 179 def validate_repo_group_permissions(apiuser, repogroupid, repo_group, perms):
180 180 """
181 181 Raise JsonRPCError if apiuser is not authorized or return True
182 182
183 183 :param apiuser:
184 184 :param repogroupid: just the id of repository group
185 185 :param repo_group: instance of repo_group
186 186 :param perms:
187 187 """
188 188 if not HasRepoGroupPermissionAnyApi(*perms)(
189 189 user=apiuser, group_name=repo_group.group_name):
190 190 raise JSONRPCError(
191 191 'repository group `%s` does not exist' % repogroupid)
192 192
193 193 return True
194 194
195 195
196 196 def validate_set_owner_permissions(apiuser, owner):
197 197 if isinstance(owner, Optional):
198 198 owner = get_user_or_error(apiuser.user_id)
199 199 else:
200 200 if has_superadmin_permission(apiuser):
201 201 owner = get_user_or_error(owner)
202 202 else:
203 203 # forbid setting owner for non-admins
204 204 raise JSONRPCError(
205 205 'Only RhodeCode super-admin can specify `owner` param')
206 206 return owner
207 207
208 208
209 209 def get_user_or_error(userid):
210 210 """
211 211 Get user by id or name or return JsonRPCError if not found
212 212
213 213 :param userid:
214 214 """
215 215 from rhodecode.model.user import UserModel
216 user_model = UserModel()
216 217
217 user_model = UserModel()
218 try:
219 user = user_model.get_user(int(userid))
220 except ValueError:
218 if isinstance(userid, (int, long)):
219 try:
220 user = user_model.get_user(userid)
221 except ValueError:
222 user = None
223 else:
221 224 user = user_model.get_by_username(userid)
222 225
223 226 if user is None:
224 raise JSONRPCError("user `%s` does not exist" % (userid,))
227 raise JSONRPCError(
228 'user `%s` does not exist' % (userid,))
225 229 return user
226 230
227 231
228 232 def get_repo_or_error(repoid):
229 233 """
230 234 Get repo by id or name or return JsonRPCError if not found
231 235
232 236 :param repoid:
233 237 """
234 238 from rhodecode.model.repo import RepoModel
239 repo_model = RepoModel()
235 240
236 repo = RepoModel().get_repo(repoid)
241 if isinstance(repoid, (int, long)):
242 try:
243 repo = repo_model.get_repo(repoid)
244 except ValueError:
245 repo = None
246 else:
247 repo = repo_model.get_by_repo_name(repoid)
248
237 249 if repo is None:
238 raise JSONRPCError('repository `%s` does not exist' % (repoid,))
250 raise JSONRPCError(
251 'repository `%s` does not exist' % (repoid,))
239 252 return repo
240 253
241 254
242 255 def get_repo_group_or_error(repogroupid):
243 256 """
244 257 Get repo group by id or name or return JsonRPCError if not found
245 258
246 259 :param repogroupid:
247 260 """
248 261 from rhodecode.model.repo_group import RepoGroupModel
262 repo_group_model = RepoGroupModel()
249 263
250 repo_group = RepoGroupModel()._get_repo_group(repogroupid)
264 if isinstance(repogroupid, (int, long)):
265 try:
266 repo_group = repo_group_model._get_repo_group(repogroupid)
267 except ValueError:
268 repo_group = None
269 else:
270 repo_group = repo_group_model.get_by_group_name(repogroupid)
271
251 272 if repo_group is None:
252 273 raise JSONRPCError(
253 274 'repository group `%s` does not exist' % (repogroupid,))
254 275 return repo_group
255 276
256 277
257 278 def get_user_group_or_error(usergroupid):
258 279 """
259 280 Get user group by id or name or return JsonRPCError if not found
260 281
261 282 :param usergroupid:
262 283 """
263 284 from rhodecode.model.user_group import UserGroupModel
285 user_group_model = UserGroupModel()
264 286
265 user_group = UserGroupModel().get_group(usergroupid)
287 if isinstance(usergroupid, (int, long)):
288 try:
289 user_group = user_group_model.get_group(usergroupid)
290 except ValueError:
291 user_group = None
292 else:
293 user_group = user_group_model.get_by_name(usergroupid)
294
266 295 if user_group is None:
267 raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
296 raise JSONRPCError(
297 'user group `%s` does not exist' % (usergroupid,))
268 298 return user_group
269 299
270 300
271 301 def get_perm_or_error(permid, prefix=None):
272 302 """
273 303 Get permission by id or name or return JsonRPCError if not found
274 304
275 305 :param permid:
276 306 """
277 307 from rhodecode.model.permission import PermissionModel
278 308
279 309 perm = PermissionModel.cls.get_by_key(permid)
280 310 if perm is None:
281 311 raise JSONRPCError('permission `%s` does not exist' % (permid,))
282 312 if prefix:
283 313 if not perm.permission_name.startswith(prefix):
284 314 raise JSONRPCError('permission `%s` is invalid, '
285 315 'should start with %s' % (permid, prefix))
286 316 return perm
287 317
288 318
289 319 def get_gist_or_error(gistid):
290 320 """
291 321 Get gist by id or gist_access_id or return JsonRPCError if not found
292 322
293 323 :param gistid:
294 324 """
295 325 from rhodecode.model.gist import GistModel
296 326
297 327 gist = GistModel.cls.get_by_access_id(gistid)
298 328 if gist is None:
299 329 raise JSONRPCError('gist `%s` does not exist' % (gistid,))
300 330 return gist
301 331
302 332
303 333 def get_pull_request_or_error(pullrequestid):
304 334 """
305 335 Get pull request by id or return JsonRPCError if not found
306 336
307 337 :param pullrequestid:
308 338 """
309 339 from rhodecode.model.pull_request import PullRequestModel
310 340
311 341 try:
312 342 pull_request = PullRequestModel().get(int(pullrequestid))
313 343 except ValueError:
314 344 raise JSONRPCError('pullrequestid must be an integer')
315 345 if not pull_request:
316 346 raise JSONRPCError('pull request `%s` does not exist' % (
317 347 pullrequestid,))
318 348 return pull_request
319 349
320 350
321 351 def build_commit_data(commit, detail_level):
322 352 parsed_diff = []
323 353 if detail_level == 'extended':
324 354 for f in commit.added:
325 355 parsed_diff.append(_get_commit_dict(filename=f.path, op='A'))
326 356 for f in commit.changed:
327 357 parsed_diff.append(_get_commit_dict(filename=f.path, op='M'))
328 358 for f in commit.removed:
329 359 parsed_diff.append(_get_commit_dict(filename=f.path, op='D'))
330 360
331 361 elif detail_level == 'full':
332 362 from rhodecode.lib.diffs import DiffProcessor
333 363 diff_processor = DiffProcessor(commit.diff())
334 364 for dp in diff_processor.prepare():
335 365 del dp['stats']['ops']
336 366 _stats = dp['stats']
337 367 parsed_diff.append(_get_commit_dict(
338 368 filename=dp['filename'], op=dp['operation'],
339 369 new_revision=dp['new_revision'],
340 370 old_revision=dp['old_revision'],
341 371 raw_diff=dp['raw_diff'], stats=_stats))
342 372
343 373 return parsed_diff
344 374
345 375
346 376 def get_commit_or_error(ref, repo):
347 377 try:
348 378 ref_type, _, ref_hash = ref.split(':')
349 379 except ValueError:
350 380 raise JSONRPCError(
351 381 'Ref `{ref}` given in a wrong format. Please check the API'
352 382 ' documentation for more details'.format(ref=ref))
353 383 try:
354 384 # TODO: dan: refactor this to use repo.scm_instance().get_commit()
355 385 # once get_commit supports ref_types
356 386 return get_commit_from_ref_name(repo, ref_hash)
357 387 except RepositoryError:
358 388 raise JSONRPCError('Ref `{ref}` does not exist'.format(ref=ref))
359 389
360 390
361 391 def resolve_ref_or_error(ref, repo):
362 392 def _parse_ref(type_, name, hash_=None):
363 393 return type_, name, hash_
364 394
365 395 try:
366 396 ref_type, ref_name, ref_hash = _parse_ref(*ref.split(':'))
367 397 except TypeError:
368 398 raise JSONRPCError(
369 399 'Ref `{ref}` given in a wrong format. Please check the API'
370 400 ' documentation for more details'.format(ref=ref))
371 401
372 402 try:
373 403 ref_hash = ref_hash or _get_ref_hash(repo, ref_type, ref_name)
374 404 except (KeyError, ValueError):
375 405 raise JSONRPCError(
376 406 'The specified {type} `{name}` does not exist'.format(
377 407 type=ref_type, name=ref_name))
378 408
379 409 return ':'.join([ref_type, ref_name, ref_hash])
380 410
381 411
382 412 def _get_commit_dict(
383 413 filename, op, new_revision=None, old_revision=None,
384 414 raw_diff=None, stats=None):
385 415 if stats is None:
386 416 stats = {
387 417 "added": None,
388 418 "binary": None,
389 419 "deleted": None
390 420 }
391 421 return {
392 422 "filename": safe_unicode(filename),
393 423 "op": op,
394 424
395 425 # extra details
396 426 "new_revision": new_revision,
397 427 "old_revision": old_revision,
398 428
399 429 "raw_diff": raw_diff,
400 430 "stats": stats
401 431 }
402 432
403 433
404 434 # TODO: mikhail: Think about moving this function to some library
405 435 def _get_ref_hash(repo, type_, name):
406 436 vcs_repo = repo.scm_instance()
407 437 if type_ == 'branch' and vcs_repo.alias in ('hg', 'git'):
408 438 return vcs_repo.branches[name]
409 439 elif type_ == 'bookmark' and vcs_repo.alias == 'hg':
410 440 return vcs_repo.bookmarks[name]
411 441 else:
412 442 raise ValueError()
General Comments 0
You need to be logged in to leave comments. Login now