|
|
"""Functions for Github API requests."""
|
|
|
|
|
|
try:
|
|
|
input = raw_input
|
|
|
except NameError:
|
|
|
pass
|
|
|
|
|
|
import re
|
|
|
import sys
|
|
|
|
|
|
import requests
|
|
|
import getpass
|
|
|
import json
|
|
|
|
|
|
try:
|
|
|
import requests_cache
|
|
|
except ImportError:
|
|
|
print("cache not available, install `requests_cache` for caching.", file=sys.stderr)
|
|
|
else:
|
|
|
requests_cache.install_cache("gh_api", expire_after=3600)
|
|
|
|
|
|
# Keyring stores passwords by a 'username', but we're not storing a username and
|
|
|
# password
|
|
|
import socket
|
|
|
fake_username = 'ipython_tools_%s' % socket.gethostname().replace('.','_').replace('-','_')
|
|
|
|
|
|
class Obj(dict):
|
|
|
"""Dictionary with attribute access to names."""
|
|
|
def __getattr__(self, name):
|
|
|
try:
|
|
|
return self[name]
|
|
|
except KeyError as e:
|
|
|
raise AttributeError(name) from e
|
|
|
|
|
|
def __setattr__(self, name, val):
|
|
|
self[name] = val
|
|
|
|
|
|
token = None
|
|
|
def get_auth_token():
|
|
|
global token
|
|
|
|
|
|
if token is not None:
|
|
|
return token
|
|
|
|
|
|
import keyring
|
|
|
token = keyring.get_password('github', fake_username)
|
|
|
if token is not None:
|
|
|
return token
|
|
|
|
|
|
print("Please enter your github username and password. These are not "
|
|
|
"stored, only used to get an oAuth token. You can revoke this at "
|
|
|
"any time on Github.\n"
|
|
|
"Username: ", file=sys.stderr, end='')
|
|
|
user = input('')
|
|
|
pw = getpass.getpass("Password: ", stream=sys.stderr)
|
|
|
|
|
|
auth_request = {
|
|
|
"scopes": [
|
|
|
"public_repo",
|
|
|
"gist"
|
|
|
],
|
|
|
"note": "IPython tools %s" % socket.gethostname(),
|
|
|
"note_url": "https://github.com/ipython/ipython/tree/master/tools",
|
|
|
}
|
|
|
response = requests.post('https://api.github.com/authorizations',
|
|
|
auth=(user, pw), data=json.dumps(auth_request))
|
|
|
if response.status_code == 401 and \
|
|
|
'required;' in response.headers.get('X-GitHub-OTP', ''):
|
|
|
print("Your login API requested a one time password", file=sys.stderr)
|
|
|
otp = getpass.getpass("One Time Password: ", stream=sys.stderr)
|
|
|
response = requests.post('https://api.github.com/authorizations',
|
|
|
auth=(user, pw),
|
|
|
data=json.dumps(auth_request),
|
|
|
headers={'X-GitHub-OTP':otp})
|
|
|
response.raise_for_status()
|
|
|
token = json.loads(response.text)['token']
|
|
|
keyring.set_password('github', fake_username, token)
|
|
|
return token
|
|
|
|
|
|
def make_auth_header():
|
|
|
return {'Authorization': 'token ' + get_auth_token()}
|
|
|
|
|
|
|
|
|
def get_pull_request(project, num, auth=False):
|
|
|
"""get pull request info by number
|
|
|
"""
|
|
|
url = "https://api.github.com/repos/{project}/pulls/{num}".format(project=project, num=num)
|
|
|
if auth:
|
|
|
header = make_auth_header()
|
|
|
else:
|
|
|
header = None
|
|
|
print("fetching %s" % url, file=sys.stderr)
|
|
|
response = requests.get(url, headers=header)
|
|
|
response.raise_for_status()
|
|
|
return json.loads(response.text, object_hook=Obj)
|
|
|
|
|
|
element_pat = re.compile(r'<(.+?)>')
|
|
|
rel_pat = re.compile(r'rel=[\'"](\w+)[\'"]')
|
|
|
|
|
|
def get_paged_request(url, headers=None, **params):
|
|
|
"""get a full list, handling APIv3's paging"""
|
|
|
results = []
|
|
|
params.setdefault("per_page", 100)
|
|
|
while True:
|
|
|
if '?' in url:
|
|
|
params = None
|
|
|
print("fetching %s" % url, file=sys.stderr)
|
|
|
else:
|
|
|
print("fetching %s with %s" % (url, params), file=sys.stderr)
|
|
|
response = requests.get(url, headers=headers, params=params)
|
|
|
response.raise_for_status()
|
|
|
results.extend(response.json())
|
|
|
if 'next' in response.links:
|
|
|
url = response.links['next']['url']
|
|
|
else:
|
|
|
break
|
|
|
return results
|
|
|
|
|
|
def get_issues_list(project, auth=False, **params):
|
|
|
"""get issues list"""
|
|
|
params.setdefault("state", "closed")
|
|
|
url = "https://api.github.com/repos/{project}/issues".format(project=project)
|
|
|
if auth:
|
|
|
headers = make_auth_header()
|
|
|
else:
|
|
|
headers = None
|
|
|
pages = get_paged_request(url, headers=headers, **params)
|
|
|
return pages
|
|
|
|
|
|
def get_milestones(project, auth=False, **params):
|
|
|
params.setdefault('state', 'all')
|
|
|
url = "https://api.github.com/repos/{project}/milestones".format(project=project)
|
|
|
if auth:
|
|
|
headers = make_auth_header()
|
|
|
else:
|
|
|
headers = None
|
|
|
milestones = get_paged_request(url, headers=headers, **params)
|
|
|
return milestones
|
|
|
|
|
|
def get_milestone_id(project, milestone, auth=False, **params):
|
|
|
milestones = get_milestones(project, auth=auth, **params)
|
|
|
for mstone in milestones:
|
|
|
if mstone['title'] == milestone:
|
|
|
return mstone['number']
|
|
|
else:
|
|
|
raise ValueError("milestone %s not found" % milestone)
|
|
|
|
|
|
def is_pull_request(issue):
|
|
|
"""Return True if the given issue is a pull request."""
|
|
|
return bool(issue.get('pull_request', {}).get('html_url', None))
|
|
|
|
|
|
def get_authors(pr):
|
|
|
print("getting authors for #%i" % pr['number'], file=sys.stderr)
|
|
|
h = make_auth_header()
|
|
|
r = requests.get(pr['commits_url'], headers=h)
|
|
|
r.raise_for_status()
|
|
|
commits = r.json()
|
|
|
authors = []
|
|
|
for commit in commits:
|
|
|
author = commit['commit']['author']
|
|
|
authors.append("%s <%s>" % (author['name'], author['email']))
|
|
|
return authors
|
|
|
|
|
|
|