##// END OF EJS Templates
Change os.path functions to pathlib.Path functions in gh_api.py...
Hussaina Begum Nandyala -
Show More
@@ -1,303 +1,304 b''
1 1 """Functions for Github API requests."""
2 2
3 3 try:
4 4 input = raw_input
5 5 except NameError:
6 6 pass
7 7
8 8 import os
9 9 import re
10 10 import sys
11 11
12 12 import requests
13 13 import getpass
14 14 import json
15 from pathlib import Path
15 16
16 17 try:
17 18 import requests_cache
18 19 except ImportError:
19 20 print("cache not available, install `requests_cache` for caching.", file=sys.stderr)
20 21 else:
21 22 requests_cache.install_cache("gh_api", expire_after=3600)
22 23
23 24 # Keyring stores passwords by a 'username', but we're not storing a username and
24 25 # password
25 26 import socket
26 27 fake_username = 'ipython_tools_%s' % socket.gethostname().replace('.','_').replace('-','_')
27 28
28 29 class Obj(dict):
29 30 """Dictionary with attribute access to names."""
30 31 def __getattr__(self, name):
31 32 try:
32 33 return self[name]
33 34 except KeyError as e:
34 35 raise AttributeError(name) from e
35 36
36 37 def __setattr__(self, name, val):
37 38 self[name] = val
38 39
39 40 token = None
40 41 def get_auth_token():
41 42 global token
42 43
43 44 if token is not None:
44 45 return token
45 46
46 47 import keyring
47 48 token = keyring.get_password('github', fake_username)
48 49 if token is not None:
49 50 return token
50 51
51 52 print("Please enter your github username and password. These are not "
52 53 "stored, only used to get an oAuth token. You can revoke this at "
53 54 "any time on Github.\n"
54 55 "Username: ", file=sys.stderr, end='')
55 56 user = input('')
56 57 pw = getpass.getpass("Password: ", stream=sys.stderr)
57 58
58 59 auth_request = {
59 60 "scopes": [
60 61 "public_repo",
61 62 "gist"
62 63 ],
63 64 "note": "IPython tools %s" % socket.gethostname(),
64 65 "note_url": "https://github.com/ipython/ipython/tree/master/tools",
65 66 }
66 67 response = requests.post('https://api.github.com/authorizations',
67 68 auth=(user, pw), data=json.dumps(auth_request))
68 69 if response.status_code == 401 and \
69 70 'required;' in response.headers.get('X-GitHub-OTP', ''):
70 71 print("Your login API requested a one time password", file=sys.stderr)
71 72 otp = getpass.getpass("One Time Password: ", stream=sys.stderr)
72 73 response = requests.post('https://api.github.com/authorizations',
73 74 auth=(user, pw),
74 75 data=json.dumps(auth_request),
75 76 headers={'X-GitHub-OTP':otp})
76 77 response.raise_for_status()
77 78 token = json.loads(response.text)['token']
78 79 keyring.set_password('github', fake_username, token)
79 80 return token
80 81
81 82 def make_auth_header():
82 83 return {'Authorization': 'token ' + get_auth_token()}
83 84
84 85 def post_issue_comment(project, num, body):
85 86 url = 'https://api.github.com/repos/{project}/issues/{num}/comments'.format(project=project, num=num)
86 87 payload = json.dumps({'body': body})
87 88 requests.post(url, data=payload, headers=make_auth_header())
88 89
89 90 def post_gist(content, description='', filename='file', auth=False):
90 91 """Post some text to a Gist, and return the URL."""
91 92 post_data = json.dumps({
92 93 "description": description,
93 94 "public": True,
94 95 "files": {
95 96 filename: {
96 97 "content": content
97 98 }
98 99 }
99 100 }).encode('utf-8')
100 101
101 102 headers = make_auth_header() if auth else {}
102 103 response = requests.post("https://api.github.com/gists", data=post_data, headers=headers)
103 104 response.raise_for_status()
104 105 response_data = json.loads(response.text)
105 106 return response_data['html_url']
106 107
107 108 def get_pull_request(project, num, auth=False):
108 109 """get pull request info by number
109 110 """
110 111 url = "https://api.github.com/repos/{project}/pulls/{num}".format(project=project, num=num)
111 112 if auth:
112 113 header = make_auth_header()
113 114 else:
114 115 header = None
115 116 print("fetching %s" % url, file=sys.stderr)
116 117 response = requests.get(url, headers=header)
117 118 response.raise_for_status()
118 119 return json.loads(response.text, object_hook=Obj)
119 120
120 121 def get_pull_request_files(project, num, auth=False):
121 122 """get list of files in a pull request"""
122 123 url = "https://api.github.com/repos/{project}/pulls/{num}/files".format(project=project, num=num)
123 124 if auth:
124 125 header = make_auth_header()
125 126 else:
126 127 header = None
127 128 return get_paged_request(url, headers=header)
128 129
129 130 element_pat = re.compile(r'<(.+?)>')
130 131 rel_pat = re.compile(r'rel=[\'"](\w+)[\'"]')
131 132
132 133 def get_paged_request(url, headers=None, **params):
133 134 """get a full list, handling APIv3's paging"""
134 135 results = []
135 136 params.setdefault("per_page", 100)
136 137 while True:
137 138 if '?' in url:
138 139 params = None
139 140 print("fetching %s" % url, file=sys.stderr)
140 141 else:
141 142 print("fetching %s with %s" % (url, params), file=sys.stderr)
142 143 response = requests.get(url, headers=headers, params=params)
143 144 response.raise_for_status()
144 145 results.extend(response.json())
145 146 if 'next' in response.links:
146 147 url = response.links['next']['url']
147 148 else:
148 149 break
149 150 return results
150 151
151 152 def get_pulls_list(project, auth=False, **params):
152 153 """get pull request list"""
153 154 params.setdefault("state", "closed")
154 155 url = "https://api.github.com/repos/{project}/pulls".format(project=project)
155 156 if auth:
156 157 headers = make_auth_header()
157 158 else:
158 159 headers = None
159 160 pages = get_paged_request(url, headers=headers, **params)
160 161 return pages
161 162
162 163 def get_issues_list(project, auth=False, **params):
163 164 """get issues list"""
164 165 params.setdefault("state", "closed")
165 166 url = "https://api.github.com/repos/{project}/issues".format(project=project)
166 167 if auth:
167 168 headers = make_auth_header()
168 169 else:
169 170 headers = None
170 171 pages = get_paged_request(url, headers=headers, **params)
171 172 return pages
172 173
173 174 def get_milestones(project, auth=False, **params):
174 175 params.setdefault('state', 'all')
175 176 url = "https://api.github.com/repos/{project}/milestones".format(project=project)
176 177 if auth:
177 178 headers = make_auth_header()
178 179 else:
179 180 headers = None
180 181 milestones = get_paged_request(url, headers=headers, **params)
181 182 return milestones
182 183
183 184 def get_milestone_id(project, milestone, auth=False, **params):
184 185 milestones = get_milestones(project, auth=auth, **params)
185 186 for mstone in milestones:
186 187 if mstone['title'] == milestone:
187 188 return mstone['number']
188 189 else:
189 190 raise ValueError("milestone %s not found" % milestone)
190 191
191 192 def is_pull_request(issue):
192 193 """Return True if the given issue is a pull request."""
193 194 return bool(issue.get('pull_request', {}).get('html_url', None))
194 195
195 196 def get_authors(pr):
196 197 print("getting authors for #%i" % pr['number'], file=sys.stderr)
197 198 h = make_auth_header()
198 199 r = requests.get(pr['commits_url'], headers=h)
199 200 r.raise_for_status()
200 201 commits = r.json()
201 202 authors = []
202 203 for commit in commits:
203 204 author = commit['commit']['author']
204 205 authors.append("%s <%s>" % (author['name'], author['email']))
205 206 return authors
206 207
207 208 # encode_multipart_formdata is from urllib3.filepost
208 209 # The only change is to iter_fields, to enforce S3's required key ordering
209 210
210 211 def iter_fields(fields):
211 212 fields = fields.copy()
212 213 for key in ('key', 'acl', 'Filename', 'success_action_status', 'AWSAccessKeyId',
213 214 'Policy', 'Signature', 'Content-Type', 'file'):
214 215 yield (key, fields.pop(key))
215 216 for (k,v) in fields.items():
216 217 yield k,v
217 218
218 219 def encode_multipart_formdata(fields, boundary=None):
219 220 """
220 221 Encode a dictionary of ``fields`` using the multipart/form-data mime format.
221 222
222 223 :param fields:
223 224 Dictionary of fields or list of (key, value) field tuples. The key is
224 225 treated as the field name, and the value as the body of the form-data
225 226 bytes. If the value is a tuple of two elements, then the first element
226 227 is treated as the filename of the form-data section.
227 228
228 229 Field names and filenames must be unicode.
229 230
230 231 :param boundary:
231 232 If not specified, then a random boundary will be generated using
232 233 :func:`mimetools.choose_boundary`.
233 234 """
234 235 # copy requests imports in here:
235 236 from io import BytesIO
236 237 from requests.packages.urllib3.filepost import (
237 238 choose_boundary, six, writer, b, get_content_type
238 239 )
239 240 body = BytesIO()
240 241 if boundary is None:
241 242 boundary = choose_boundary()
242 243
243 244 for fieldname, value in iter_fields(fields):
244 245 body.write(b('--%s\r\n' % (boundary)))
245 246
246 247 if isinstance(value, tuple):
247 248 filename, data = value
248 249 writer(body).write('Content-Disposition: form-data; name="%s"; '
249 250 'filename="%s"\r\n' % (fieldname, filename))
250 251 body.write(b('Content-Type: %s\r\n\r\n' %
251 252 (get_content_type(filename))))
252 253 else:
253 254 data = value
254 255 writer(body).write('Content-Disposition: form-data; name="%s"\r\n'
255 256 % (fieldname))
256 257 body.write(b'Content-Type: text/plain\r\n\r\n')
257 258
258 259 if isinstance(data, int):
259 260 data = str(data) # Backwards compatibility
260 261 if isinstance(data, six.text_type):
261 262 writer(body).write(data)
262 263 else:
263 264 body.write(data)
264 265
265 266 body.write(b'\r\n')
266 267
267 268 body.write(b('--%s--\r\n' % (boundary)))
268 269
269 270 content_type = b('multipart/form-data; boundary=%s' % boundary)
270 271
271 272 return body.getvalue(), content_type
272 273
273 274
274 275 def post_download(project, filename, name=None, description=""):
275 276 """Upload a file to the GitHub downloads area"""
276 277 if name is None:
277 name = os.path.basename(filename)
278 name = Path(filename).name
278 279 with open(filename, 'rb') as f:
279 280 filedata = f.read()
280 281
281 282 url = "https://api.github.com/repos/{project}/downloads".format(project=project)
282 283
283 284 payload = json.dumps(dict(name=name, size=len(filedata),
284 285 description=description))
285 286 response = requests.post(url, data=payload, headers=make_auth_header())
286 287 response.raise_for_status()
287 288 reply = json.loads(response.content)
288 289 s3_url = reply['s3_url']
289 290
290 291 fields = dict(
291 292 key=reply['path'],
292 293 acl=reply['acl'],
293 294 success_action_status=201,
294 295 Filename=reply['name'],
295 296 AWSAccessKeyId=reply['accesskeyid'],
296 297 Policy=reply['policy'],
297 298 Signature=reply['signature'],
298 299 file=(reply['name'], filedata),
299 300 )
300 301 fields['Content-Type'] = reply['mime_type']
301 302 data, content_type = encode_multipart_formdata(fields)
302 303 s3r = requests.post(s3_url, data=data, headers={'Content-Type': content_type})
303 304 return s3r
General Comments 0
You need to be logged in to leave comments. Login now