##// END OF EJS Templates
fix(svn): fixed txn_id extraction from the data buffer....
fix(svn): fixed txn_id extraction from the data buffer. - This buffer is always BYTES - use new python3 getvalue that can get the data and not exhaust the iterator - fixes # 5707

File last commit:

r5088:8f6d1ed6 default
r5199:2548bbeb default
Show More
api.py
81 lines | 2.5 KiB | text/x-python | PythonLexer
project: added all source files and assets
r1
copyrights: updated for 2023
r5088 # Copyright (C) 2010-2023 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
tests: fixed all tests for python3 BIG changes
r5087
project: added all source files and assets
r1
import requests
tests: fixed all tests for python3 BIG changes
r5087 from rhodecode.lib.ext_json import sjson as json
project: added all source files and assets
r1
class ApiError(Exception):
"""Error when accessing the API."""
def __init__(self, response):
super(ApiError, self).__init__(response.get('error'))
self.response = response
class RCApi(object):
"""Helper class for accessing the RhodeCode API."""
def __init__(self, api_key,
rc_endpoint='http://admin:qweqwe@localhost:5000'):
self.api_key = api_key
self.rc_endpoint = rc_endpoint.rstrip('/')
self.api_endpoint = '%s/_admin/api' % self.rc_endpoint
self._id = 1
def _execute(self, data):
data.update({
'id': self._id,
'api_key': self.api_key,
})
self._id += 1
response = requests.post(self.api_endpoint, data=json.dumps(data))
try:
json_response = response.json()
except Exception:
json_response = {}
if response.status_code != 200 or json_response.get('error'):
raise ApiError(json_response)
return json_response
def create_repo(self, name, repo_type, description):
data = {
'method': 'create_repo',
'args': {
'repo_name': name,
'repo_type': repo_type,
'description': description,
}
}
self._execute(data)
return '%s/%s' % (self.rc_endpoint, name)
def delete_repo(self, name):
data = {
'method': 'delete_repo',
'args': {
'repoid': name,
}
}
return self._execute(data)