##// END OF EJS Templates
api: get_repos allows now to filter by root locations, and optionally traverse the returned data....
marcink -
r1267:b965be9c default
parent child Browse files
Show More
@@ -22,7 +22,8 b''
22 22 import pytest
23 23
24 24 from rhodecode.model.repo import RepoModel
25 from rhodecode.api.tests.utils import build_data, api_call, assert_ok, jsonify
25 from rhodecode.api.tests.utils import (
26 build_data, api_call, assert_ok, assert_error, jsonify)
26 27 from rhodecode.model.db import User
27 28
28 29
@@ -40,6 +41,76 b' class TestGetRepos(object):'
40 41 expected = ret
41 42 assert_ok(id_, expected, given=response.body)
42 43
44 def test_api_get_repos_only_toplevel(self, user_util):
45 repo_group = user_util.create_repo_group(auto_cleanup=True)
46 user_util.create_repo(parent=repo_group)
47
48 id_, params = build_data(self.apikey, 'get_repos', traverse=0)
49 response = api_call(self.app, params)
50
51 result = []
52 for repo in RepoModel().get_repos_for_root(root=None):
53 result.append(repo.get_api_data(include_secrets=True))
54 expected = jsonify(result)
55
56 assert_ok(id_, expected, given=response.body)
57
58 def test_api_get_repos_with_wrong_root(self):
59 id_, params = build_data(self.apikey, 'get_repos', root='abracadabra')
60 response = api_call(self.app, params)
61
62 expected = 'Root repository group `abracadabra` does not exist'
63 assert_error(id_, expected, given=response.body)
64
65 def test_api_get_repos_with_root(self, user_util):
66 repo_group = user_util.create_repo_group(auto_cleanup=True)
67 repo_group_name = repo_group.group_name
68
69 user_util.create_repo(parent=repo_group)
70 user_util.create_repo(parent=repo_group)
71
72 # nested, should not show up
73 user_util._test_name = '{}/'.format(repo_group_name)
74 sub_repo_group = user_util.create_repo_group(auto_cleanup=True)
75 user_util.create_repo(parent=sub_repo_group)
76
77 id_, params = build_data(self.apikey, 'get_repos',
78 root=repo_group_name, traverse=0)
79 response = api_call(self.app, params)
80
81 result = []
82 for repo in RepoModel().get_repos_for_root(repo_group):
83 result.append(repo.get_api_data(include_secrets=True))
84
85 assert len(result) == 2
86 expected = jsonify(result)
87 assert_ok(id_, expected, given=response.body)
88
89 def test_api_get_repos_with_root_and_traverse(self, user_util):
90 repo_group = user_util.create_repo_group(auto_cleanup=True)
91 repo_group_name = repo_group.group_name
92
93 user_util.create_repo(parent=repo_group)
94 user_util.create_repo(parent=repo_group)
95
96 # nested, should not show up
97 user_util._test_name = '{}/'.format(repo_group_name)
98 sub_repo_group = user_util.create_repo_group(auto_cleanup=True)
99 user_util.create_repo(parent=sub_repo_group)
100
101 id_, params = build_data(self.apikey, 'get_repos',
102 root=repo_group_name, traverse=1)
103 response = api_call(self.app, params)
104
105 result = []
106 for repo in RepoModel().get_repos_for_root(
107 repo_group_name, traverse=True):
108 result.append(repo.get_api_data(include_secrets=True))
109
110 assert len(result) == 3
111 expected = jsonify(result)
112 assert_ok(id_, expected, given=response.body)
113
43 114 def test_api_get_repos_non_admin(self):
44 115 id_, params = build_data(self.apikey_regular, 'get_repos')
45 116 response = api_call(self.app, params)
@@ -36,7 +36,7 b' from rhodecode.lib.ext_json import json'
36 36 from rhodecode.model.changeset_status import ChangesetStatusModel
37 37 from rhodecode.model.comment import ChangesetCommentsModel
38 38 from rhodecode.model.db import (
39 Session, ChangesetStatus, RepositoryField, Repository)
39 Session, ChangesetStatus, RepositoryField, Repository, RepoGroup)
40 40 from rhodecode.model.repo import RepoModel
41 41 from rhodecode.model.scm import ScmModel, RepoList
42 42 from rhodecode.model.settings import SettingsModel, VcsSettingsModel
@@ -217,7 +217,7 b' def get_repo(request, apiuser, repoid, c'
217 217
218 218
219 219 @jsonrpc_method()
220 def get_repos(request, apiuser):
220 def get_repos(request, apiuser, root=Optional(None), traverse=Optional(True)):
221 221 """
222 222 Lists all existing repositories.
223 223
@@ -226,6 +226,14 b' def get_repos(request, apiuser):'
226 226
227 227 :param apiuser: This is filled automatically from the |authtoken|.
228 228 :type apiuser: AuthUser
229 :param root: specify root repository group to fetch repositories.
230 filters the returned repositories to be members of given root group.
231 :type root: Optional(None)
232 :param traverse: traverse given root into subrepositories. With this flag
233 set to False, it will only return top-level repositories from `root`.
234 if root is empty it will return just top-level repositories.
235 :type traverse: Optional(True)
236
229 237
230 238 Example output:
231 239
@@ -257,8 +265,28 b' def get_repos(request, apiuser):'
257 265 _perms = ('repository.read', 'repository.write', 'repository.admin',)
258 266 extras = {'user': apiuser}
259 267
260 repo_list = RepoList(
261 RepoModel().get_all(), perm_set=_perms, extra_kwargs=extras)
268 root = Optional.extract(root)
269 traverse = Optional.extract(traverse, binary=True)
270
271 if root:
272 # verify parent existance, if it's empty return an error
273 parent = RepoGroup.get_by_group_name(root)
274 if not parent:
275 raise JSONRPCError(
276 'Root repository group `{}` does not exist'.format(root))
277
278 if traverse:
279 repos = RepoModel().get_repos_for_root(root=root, traverse=traverse)
280 else:
281 repos = RepoModel().get_repos_for_root(root=parent)
282 else:
283 if traverse:
284 repos = RepoModel().get_all()
285 else:
286 # return just top-level
287 repos = RepoModel().get_repos_for_root(root=None)
288
289 repo_list = RepoList(repos, perm_set=_perms, extra_kwargs=extras)
262 290 return [repo.get_api_data(include_secrets=include_secrets)
263 291 for repo in repo_list]
264 292
@@ -143,6 +143,19 b' class RepoModel(BaseModel):'
143 143
144 144 return None
145 145
146 def get_repos_for_root(self, root, traverse=False):
147 if traverse:
148 like_expression = u'{}%'.format(safe_unicode(root))
149 repos = Repository.query().filter(
150 Repository.repo_name.like(like_expression)).all()
151 else:
152 if root and not isinstance(root, RepoGroup):
153 raise ValueError(
154 'Root must be an instance '
155 'of RepoGroup, got:{} instead'.format(type(root)))
156 repos = Repository.query().filter(Repository.group == root).all()
157 return repos
158
146 159 def get_url(self, repo):
147 160 return h.url('summary_home', repo_name=safe_str(repo.repo_name),
148 161 qualified=True)
General Comments 0
You need to be logged in to leave comments. Login now