gh_api.py
303 lines
| 10.1 KiB
| text/x-python
|
PythonLexer
/ tools / gh_api.py
MinRK
|
r11584 | """Functions for Github API requests.""" | ||
Thomas Kluyver
|
r6694 | |||
try: | ||||
input = raw_input | ||||
except NameError: | ||||
pass | ||||
MinRK
|
r7761 | import os | ||
MinRK
|
r11583 | import re | ||
import sys | ||||
MinRK
|
r7761 | |||
Thomas Kluyver
|
r6694 | import requests | ||
import getpass | ||||
import json | ||||
MinRK
|
r11584 | try: | ||
import requests_cache | ||||
except ImportError: | ||||
Matthias Bussonnier
|
r22016 | print("cache not available, install `requests_cache` for caching.", file=sys.stderr) | ||
MinRK
|
r11584 | else: | ||
MinRK
|
r16674 | requests_cache.install_cache("gh_api", expire_after=3600) | ||
MinRK
|
r11584 | |||
Thomas Kluyver
|
r6694 | # Keyring stores passwords by a 'username', but we're not storing a username and | ||
# password | ||||
Matthias Bussonnier
|
r24938 | import socket | ||
fake_username = 'ipython_tools_%s' % socket.gethostname().replace('.','_').replace('-','_') | ||||
Thomas Kluyver
|
r6694 | |||
Thomas Kluyver
|
r7863 | class Obj(dict): | ||
"""Dictionary with attribute access to names.""" | ||||
def __getattr__(self, name): | ||||
try: | ||||
return self[name] | ||||
except KeyError: | ||||
raise AttributeError(name) | ||||
Skipper Seabold
|
r13169 | |||
Thomas Kluyver
|
r7863 | def __setattr__(self, name, val): | ||
self[name] = val | ||||
Thomas Kluyver
|
r6711 | token = None | ||
Thomas Kluyver
|
r6694 | def get_auth_token(): | ||
Thomas Kluyver
|
r6711 | global token | ||
Skipper Seabold
|
r13169 | |||
Thomas Kluyver
|
r6711 | if token is not None: | ||
return token | ||||
Skipper Seabold
|
r13169 | |||
Thomas Kluyver
|
r6711 | import keyring | ||
Thomas Kluyver
|
r6694 | token = keyring.get_password('github', fake_username) | ||
if token is not None: | ||||
return token | ||||
Skipper Seabold
|
r13169 | |||
Thomas Kluyver
|
r6694 | print("Please enter your github username and password. These are not " | ||
Fernando Perez
|
r22673 | "stored, only used to get an oAuth token. You can revoke this at " | ||
"any time on Github.\n" | ||||
"Username: ", file=sys.stderr, end='') | ||||
user = input('') | ||||
pw = getpass.getpass("Password: ", stream=sys.stderr) | ||||
Skipper Seabold
|
r13169 | |||
Thomas Kluyver
|
r6694 | auth_request = { | ||
"scopes": [ | ||||
Thomas Kluyver
|
r6711 | "public_repo", | ||
"gist" | ||||
Thomas Kluyver
|
r6694 | ], | ||
Matthias Bussonnier
|
r24938 | "note": "IPython tools %s" % socket.gethostname(), | ||
Thomas Kluyver
|
r6711 | "note_url": "https://github.com/ipython/ipython/tree/master/tools", | ||
Thomas Kluyver
|
r6694 | } | ||
response = requests.post('https://api.github.com/authorizations', | ||||
auth=(user, pw), data=json.dumps(auth_request)) | ||||
Fernando Perez
|
r22673 | if response.status_code == 401 and \ | ||
Matthias Bussonnier
|
r22783 | 'required;' in response.headers.get('X-GitHub-OTP', ''): | ||
print("Your login API requested a one time password", file=sys.stderr) | ||||
otp = getpass.getpass("One Time Password: ", stream=sys.stderr) | ||||
Pierre Gerold
|
r22014 | response = requests.post('https://api.github.com/authorizations', | ||
auth=(user, pw), | ||||
data=json.dumps(auth_request), | ||||
Matthias Bussonnier
|
r22783 | headers={'X-GitHub-OTP':otp}) | ||
Thomas Kluyver
|
r6694 | response.raise_for_status() | ||
token = json.loads(response.text)['token'] | ||||
keyring.set_password('github', fake_username, token) | ||||
return token | ||||
Thomas Kluyver
|
r6711 | def make_auth_header(): | ||
return {'Authorization': 'token ' + get_auth_token()} | ||||
def post_issue_comment(project, num, body): | ||||
url = 'https://api.github.com/repos/{project}/issues/{num}/comments'.format(project=project, num=num) | ||||
payload = json.dumps({'body': body}) | ||||
Thomas Kluyver
|
r9790 | requests.post(url, data=payload, headers=make_auth_header()) | ||
Thomas Kluyver
|
r6711 | |||
def post_gist(content, description='', filename='file', auth=False): | ||||
"""Post some text to a Gist, and return the URL.""" | ||||
post_data = json.dumps({ | ||||
"description": description, | ||||
"public": True, | ||||
"files": { | ||||
filename: { | ||||
"content": content | ||||
} | ||||
} | ||||
}).encode('utf-8') | ||||
Skipper Seabold
|
r13169 | |||
Thomas Kluyver
|
r6711 | headers = make_auth_header() if auth else {} | ||
response = requests.post("https://api.github.com/gists", data=post_data, headers=headers) | ||||
response.raise_for_status() | ||||
response_data = json.loads(response.text) | ||||
return response_data['html_url'] | ||||
Skipper Seabold
|
r13169 | |||
MinRK
|
r11583 | def get_pull_request(project, num, auth=False): | ||
Matthias BUSSONNIER
|
r7448 | """get pull request info by number | ||
""" | ||||
Thomas Kluyver
|
r7866 | url = "https://api.github.com/repos/{project}/pulls/{num}".format(project=project, num=num) | ||
MinRK
|
r11583 | if auth: | ||
header = make_auth_header() | ||||
else: | ||||
header = None | ||||
Min RK
|
r20266 | print("fetching %s" % url, file=sys.stderr) | ||
MinRK
|
r11583 | response = requests.get(url, headers=header) | ||
Thomas Kluyver
|
r6711 | response.raise_for_status() | ||
Thomas Kluyver
|
r7863 | return json.loads(response.text, object_hook=Obj) | ||
Matthias BUSSONNIER
|
r7254 | |||
MinRK
|
r12129 | def get_pull_request_files(project, num, auth=False): | ||
"""get list of files in a pull request""" | ||||
url = "https://api.github.com/repos/{project}/pulls/{num}/files".format(project=project, num=num) | ||||
if auth: | ||||
header = make_auth_header() | ||||
else: | ||||
header = None | ||||
return get_paged_request(url, headers=header) | ||||
MinRK
|
r11583 | element_pat = re.compile(r'<(.+?)>') | ||
rel_pat = re.compile(r'rel=[\'"](\w+)[\'"]') | ||||
MinRK
|
r12423 | def get_paged_request(url, headers=None, **params): | ||
MinRK
|
r11583 | """get a full list, handling APIv3's paging""" | ||
results = [] | ||||
MinRK
|
r12423 | params.setdefault("per_page", 100) | ||
MinRK
|
r11583 | while True: | ||
MinRK
|
r16674 | if '?' in url: | ||
params = None | ||||
print("fetching %s" % url, file=sys.stderr) | ||||
else: | ||||
print("fetching %s with %s" % (url, params), file=sys.stderr) | ||||
MinRK
|
r12423 | response = requests.get(url, headers=headers, params=params) | ||
MinRK
|
r11583 | response.raise_for_status() | ||
results.extend(response.json()) | ||||
if 'next' in response.links: | ||||
url = response.links['next']['url'] | ||||
else: | ||||
break | ||||
return results | ||||
MinRK
|
r12423 | def get_pulls_list(project, auth=False, **params): | ||
"""get pull request list""" | ||||
params.setdefault("state", "closed") | ||||
url = "https://api.github.com/repos/{project}/pulls".format(project=project) | ||||
MinRK
|
r11583 | if auth: | ||
headers = make_auth_header() | ||||
else: | ||||
headers = None | ||||
MinRK
|
r15993 | pages = get_paged_request(url, headers=headers, **params) | ||
MinRK
|
r11583 | return pages | ||
MinRK
|
r12423 | def get_issues_list(project, auth=False, **params): | ||
"""get issues list""" | ||||
params.setdefault("state", "closed") | ||||
url = "https://api.github.com/repos/{project}/issues".format(project=project) | ||||
MinRK
|
r11583 | if auth: | ||
headers = make_auth_header() | ||||
else: | ||||
headers = None | ||||
MinRK
|
r12423 | pages = get_paged_request(url, headers=headers, **params) | ||
MinRK
|
r11583 | return pages | ||
MinRK
|
r7761 | |||
Skipper Seabold
|
r13169 | def get_milestones(project, auth=False, **params): | ||
Min RK
|
r20436 | params.setdefault('state', 'all') | ||
Skipper Seabold
|
r13169 | url = "https://api.github.com/repos/{project}/milestones".format(project=project) | ||
if auth: | ||||
headers = make_auth_header() | ||||
else: | ||||
headers = None | ||||
Skipper Seabold
|
r13247 | milestones = get_paged_request(url, headers=headers, **params) | ||
return milestones | ||||
Skipper Seabold
|
r13169 | |||
def get_milestone_id(project, milestone, auth=False, **params): | ||||
Skipper Seabold
|
r13247 | milestones = get_milestones(project, auth=auth, **params) | ||
for mstone in milestones: | ||||
if mstone['title'] == milestone: | ||||
return mstone['number'] | ||||
Skipper Seabold
|
r13169 | else: | ||
raise ValueError("milestone %s not found" % milestone) | ||||
MinRK
|
r12423 | def is_pull_request(issue): | ||
"""Return True if the given issue is a pull request.""" | ||||
return bool(issue.get('pull_request', {}).get('html_url', None)) | ||||
MinRK
|
r16790 | def get_authors(pr): | ||
print("getting authors for #%i" % pr['number'], file=sys.stderr) | ||||
h = make_auth_header() | ||||
r = requests.get(pr['commits_url'], headers=h) | ||||
r.raise_for_status() | ||||
commits = r.json() | ||||
authors = [] | ||||
for commit in commits: | ||||
author = commit['commit']['author'] | ||||
authors.append("%s <%s>" % (author['name'], author['email'])) | ||||
return authors | ||||
MinRK
|
r7761 | # encode_multipart_formdata is from urllib3.filepost | ||
# The only change is to iter_fields, to enforce S3's required key ordering | ||||
def iter_fields(fields): | ||||
fields = fields.copy() | ||||
for key in ('key', 'acl', 'Filename', 'success_action_status', 'AWSAccessKeyId', | ||||
'Policy', 'Signature', 'Content-Type', 'file'): | ||||
yield (key, fields.pop(key)) | ||||
for (k,v) in fields.items(): | ||||
yield k,v | ||||
def encode_multipart_formdata(fields, boundary=None): | ||||
""" | ||||
Encode a dictionary of ``fields`` using the multipart/form-data mime format. | ||||
:param fields: | ||||
Dictionary of fields or list of (key, value) field tuples. The key is | ||||
treated as the field name, and the value as the body of the form-data | ||||
bytes. If the value is a tuple of two elements, then the first element | ||||
is treated as the filename of the form-data section. | ||||
Field names and filenames must be unicode. | ||||
:param boundary: | ||||
If not specified, then a random boundary will be generated using | ||||
:func:`mimetools.choose_boundary`. | ||||
""" | ||||
# copy requests imports in here: | ||||
from io import BytesIO | ||||
from requests.packages.urllib3.filepost import ( | ||||
choose_boundary, six, writer, b, get_content_type | ||||
) | ||||
body = BytesIO() | ||||
if boundary is None: | ||||
boundary = choose_boundary() | ||||
for fieldname, value in iter_fields(fields): | ||||
body.write(b('--%s\r\n' % (boundary))) | ||||
if isinstance(value, tuple): | ||||
filename, data = value | ||||
writer(body).write('Content-Disposition: form-data; name="%s"; ' | ||||
'filename="%s"\r\n' % (fieldname, filename)) | ||||
body.write(b('Content-Type: %s\r\n\r\n' % | ||||
(get_content_type(filename)))) | ||||
else: | ||||
data = value | ||||
writer(body).write('Content-Disposition: form-data; name="%s"\r\n' | ||||
% (fieldname)) | ||||
body.write(b'Content-Type: text/plain\r\n\r\n') | ||||
if isinstance(data, int): | ||||
data = str(data) # Backwards compatibility | ||||
if isinstance(data, six.text_type): | ||||
writer(body).write(data) | ||||
else: | ||||
body.write(data) | ||||
body.write(b'\r\n') | ||||
body.write(b('--%s--\r\n' % (boundary))) | ||||
content_type = b('multipart/form-data; boundary=%s' % boundary) | ||||
return body.getvalue(), content_type | ||||
def post_download(project, filename, name=None, description=""): | ||||
"""Upload a file to the GitHub downloads area""" | ||||
if name is None: | ||||
name = os.path.basename(filename) | ||||
with open(filename, 'rb') as f: | ||||
filedata = f.read() | ||||
Skipper Seabold
|
r13169 | |||
MinRK
|
r7761 | url = "https://api.github.com/repos/{project}/downloads".format(project=project) | ||
Skipper Seabold
|
r13169 | |||
MinRK
|
r7761 | payload = json.dumps(dict(name=name, size=len(filedata), | ||
description=description)) | ||||
response = requests.post(url, data=payload, headers=make_auth_header()) | ||||
response.raise_for_status() | ||||
reply = json.loads(response.content) | ||||
s3_url = reply['s3_url'] | ||||
Skipper Seabold
|
r13169 | |||
MinRK
|
r7761 | fields = dict( | ||
key=reply['path'], | ||||
acl=reply['acl'], | ||||
success_action_status=201, | ||||
Filename=reply['name'], | ||||
AWSAccessKeyId=reply['accesskeyid'], | ||||
Policy=reply['policy'], | ||||
Signature=reply['signature'], | ||||
file=(reply['name'], filedata), | ||||
) | ||||
fields['Content-Type'] = reply['mime_type'] | ||||
data, content_type = encode_multipart_formdata(fields) | ||||
s3r = requests.post(s3_url, data=data, headers={'Content-Type': content_type}) | ||||
return s3r | ||||