##// END OF EJS Templates
move ipython command line logic to one place...
move ipython command line logic to one place it keeps getting repeated, more elegant to move it out into the call

File last commit:

r11584:e45e2d2a
r11834:1c67772e
Show More
gh_api.py
245 lines | 7.7 KiB | text/x-python | PythonLexer
"""Functions for Github API requests."""
from __future__ import print_function
try:
input = raw_input
except NameError:
pass
import os
import re
import sys
import requests
import getpass
import json
try:
import requests_cache
except ImportError:
print("no cache")
else:
requests_cache.install_cache("gh_api")
# Keyring stores passwords by a 'username', but we're not storing a username and
# password
fake_username = 'ipython_tools'
class Obj(dict):
"""Dictionary with attribute access to names."""
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, val):
self[name] = val
token = None
def get_auth_token():
global token
if token is not None:
return token
import keyring
token = keyring.get_password('github', fake_username)
if token is not None:
return token
print("Please enter your github username and password. These are not "
"stored, only used to get an oAuth token. You can revoke this at "
"any time on Github.")
user = input("Username: ")
pw = getpass.getpass("Password: ")
auth_request = {
"scopes": [
"public_repo",
"gist"
],
"note": "IPython tools",
"note_url": "https://github.com/ipython/ipython/tree/master/tools",
}
response = requests.post('https://api.github.com/authorizations',
auth=(user, pw), data=json.dumps(auth_request))
response.raise_for_status()
token = json.loads(response.text)['token']
keyring.set_password('github', fake_username, token)
return token
def make_auth_header():
return {'Authorization': 'token ' + get_auth_token()}
def post_issue_comment(project, num, body):
url = 'https://api.github.com/repos/{project}/issues/{num}/comments'.format(project=project, num=num)
payload = json.dumps({'body': body})
requests.post(url, data=payload, headers=make_auth_header())
def post_gist(content, description='', filename='file', auth=False):
"""Post some text to a Gist, and return the URL."""
post_data = json.dumps({
"description": description,
"public": True,
"files": {
filename: {
"content": content
}
}
}).encode('utf-8')
headers = make_auth_header() if auth else {}
response = requests.post("https://api.github.com/gists", data=post_data, headers=headers)
response.raise_for_status()
response_data = json.loads(response.text)
return response_data['html_url']
def get_pull_request(project, num, auth=False):
"""get pull request info by number
"""
url = "https://api.github.com/repos/{project}/pulls/{num}".format(project=project, num=num)
if auth:
header = make_auth_header()
else:
header = None
response = requests.get(url, headers=header)
response.raise_for_status()
return json.loads(response.text, object_hook=Obj)
element_pat = re.compile(r'<(.+?)>')
rel_pat = re.compile(r'rel=[\'"](\w+)[\'"]')
def get_paged_request(url, headers=None):
"""get a full list, handling APIv3's paging"""
results = []
while True:
print("fetching %s" % url, file=sys.stderr)
response = requests.get(url, headers=headers)
response.raise_for_status()
results.extend(response.json())
if 'next' in response.links:
url = response.links['next']['url']
else:
break
return results
def get_pulls_list(project, state="closed", auth=False):
"""get pull request list
"""
url = "https://api.github.com/repos/{project}/pulls?state={state}&per_page=100".format(project=project, state=state)
if auth:
headers = make_auth_header()
else:
headers = None
pages = get_paged_request(url, headers=headers)
return pages
def get_issues_list(project, state="closed", auth=False):
"""get pull request list
"""
url = "https://api.github.com/repos/{project}/pulls?state={state}&per_page=100".format(project=project, state=state)
if auth:
headers = make_auth_header()
else:
headers = None
pages = get_paged_request(url, headers=headers)
return pages
# encode_multipart_formdata is from urllib3.filepost
# The only change is to iter_fields, to enforce S3's required key ordering
def iter_fields(fields):
fields = fields.copy()
for key in ('key', 'acl', 'Filename', 'success_action_status', 'AWSAccessKeyId',
'Policy', 'Signature', 'Content-Type', 'file'):
yield (key, fields.pop(key))
for (k,v) in fields.items():
yield k,v
def encode_multipart_formdata(fields, boundary=None):
"""
Encode a dictionary of ``fields`` using the multipart/form-data mime format.
:param fields:
Dictionary of fields or list of (key, value) field tuples. The key is
treated as the field name, and the value as the body of the form-data
bytes. If the value is a tuple of two elements, then the first element
is treated as the filename of the form-data section.
Field names and filenames must be unicode.
:param boundary:
If not specified, then a random boundary will be generated using
:func:`mimetools.choose_boundary`.
"""
# copy requests imports in here:
from io import BytesIO
from requests.packages.urllib3.filepost import (
choose_boundary, six, writer, b, get_content_type
)
body = BytesIO()
if boundary is None:
boundary = choose_boundary()
for fieldname, value in iter_fields(fields):
body.write(b('--%s\r\n' % (boundary)))
if isinstance(value, tuple):
filename, data = value
writer(body).write('Content-Disposition: form-data; name="%s"; '
'filename="%s"\r\n' % (fieldname, filename))
body.write(b('Content-Type: %s\r\n\r\n' %
(get_content_type(filename))))
else:
data = value
writer(body).write('Content-Disposition: form-data; name="%s"\r\n'
% (fieldname))
body.write(b'Content-Type: text/plain\r\n\r\n')
if isinstance(data, int):
data = str(data) # Backwards compatibility
if isinstance(data, six.text_type):
writer(body).write(data)
else:
body.write(data)
body.write(b'\r\n')
body.write(b('--%s--\r\n' % (boundary)))
content_type = b('multipart/form-data; boundary=%s' % boundary)
return body.getvalue(), content_type
def post_download(project, filename, name=None, description=""):
"""Upload a file to the GitHub downloads area"""
if name is None:
name = os.path.basename(filename)
with open(filename, 'rb') as f:
filedata = f.read()
url = "https://api.github.com/repos/{project}/downloads".format(project=project)
payload = json.dumps(dict(name=name, size=len(filedata),
description=description))
response = requests.post(url, data=payload, headers=make_auth_header())
response.raise_for_status()
reply = json.loads(response.content)
s3_url = reply['s3_url']
fields = dict(
key=reply['path'],
acl=reply['acl'],
success_action_status=201,
Filename=reply['name'],
AWSAccessKeyId=reply['accesskeyid'],
Policy=reply['policy'],
Signature=reply['signature'],
file=(reply['name'], filedata),
)
fields['Content-Type'] = reply['mime_type']
data, content_type = encode_multipart_formdata(fields)
s3r = requests.post(s3_url, data=data, headers={'Content-Type': content_type})
return s3r