##// END OF EJS Templates
wireprotov2peer: always return a bool from _processredirect()...
Gregory Szorc -
r40789:94b0d0f9 stable
parent child Browse files
Show More
@@ -1,524 +1,527 b''
1 1 # wireprotov2peer.py - client side code for wire protocol version 2
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import threading
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 encoding,
15 15 error,
16 16 pycompat,
17 17 sslutil,
18 18 url as urlmod,
19 19 util,
20 20 wireprotoframing,
21 21 wireprototypes,
22 22 )
23 23 from .utils import (
24 24 cborutil,
25 25 )
26 26
27 27 def formatrichmessage(atoms):
28 28 """Format an encoded message from the framing protocol."""
29 29
30 30 chunks = []
31 31
32 32 for atom in atoms:
33 33 msg = _(atom[b'msg'])
34 34
35 35 if b'args' in atom:
36 36 msg = msg % tuple(atom[b'args'])
37 37
38 38 chunks.append(msg)
39 39
40 40 return b''.join(chunks)
41 41
42 42 SUPPORTED_REDIRECT_PROTOCOLS = {
43 43 b'http',
44 44 b'https',
45 45 }
46 46
47 47 SUPPORTED_CONTENT_HASHES = {
48 48 b'sha1',
49 49 b'sha256',
50 50 }
51 51
52 52 def redirecttargetsupported(ui, target):
53 53 """Determine whether a redirect target entry is supported.
54 54
55 55 ``target`` should come from the capabilities data structure emitted by
56 56 the server.
57 57 """
58 58 if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
59 59 ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
60 60 % (target[b'name'], target.get(b'protocol', b'')))
61 61 return False
62 62
63 63 if target.get(b'snirequired') and not sslutil.hassni:
64 64 ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
65 65 target[b'name'])
66 66 return False
67 67
68 68 if b'tlsversions' in target:
69 69 tlsversions = set(target[b'tlsversions'])
70 70 supported = set()
71 71
72 72 for v in sslutil.supportedprotocols:
73 73 assert v.startswith(b'tls')
74 74 supported.add(v[3:])
75 75
76 76 if not tlsversions & supported:
77 77 ui.note(_('(remote redirect target %s requires unsupported TLS '
78 78 'versions: %s)\n') % (
79 79 target[b'name'], b', '.join(sorted(tlsversions))))
80 80 return False
81 81
82 82 ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
83 83
84 84 return True
85 85
86 86 def supportedredirects(ui, apidescriptor):
87 87 """Resolve the "redirect" command request key given an API descriptor.
88 88
89 89 Given an API descriptor returned by the server, returns a data structure
90 90 that can be used in hte "redirect" field of command requests to advertise
91 91 support for compatible redirect targets.
92 92
93 93 Returns None if no redirect targets are remotely advertised or if none are
94 94 supported.
95 95 """
96 96 if not apidescriptor or b'redirect' not in apidescriptor:
97 97 return None
98 98
99 99 targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
100 100 if redirecttargetsupported(ui, t)]
101 101
102 102 hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
103 103 if h in SUPPORTED_CONTENT_HASHES]
104 104
105 105 return {
106 106 b'targets': targets,
107 107 b'hashes': hashes,
108 108 }
109 109
110 110 class commandresponse(object):
111 111 """Represents the response to a command request.
112 112
113 113 Instances track the state of the command and hold its results.
114 114
115 115 An external entity is required to update the state of the object when
116 116 events occur.
117 117 """
118 118
119 119 def __init__(self, requestid, command, fromredirect=False):
120 120 self.requestid = requestid
121 121 self.command = command
122 122 self.fromredirect = fromredirect
123 123
124 124 # Whether all remote input related to this command has been
125 125 # received.
126 126 self._inputcomplete = False
127 127
128 128 # We have a lock that is acquired when important object state is
129 129 # mutated. This is to prevent race conditions between 1 thread
130 130 # sending us new data and another consuming it.
131 131 self._lock = threading.RLock()
132 132
133 133 # An event is set when state of the object changes. This event
134 134 # is waited on by the generator emitting objects.
135 135 self._serviceable = threading.Event()
136 136
137 137 self._pendingevents = []
138 138 self._pendingerror = None
139 139 self._decoder = cborutil.bufferingdecoder()
140 140 self._seeninitial = False
141 141 self._redirect = None
142 142
143 143 def _oninputcomplete(self):
144 144 with self._lock:
145 145 self._inputcomplete = True
146 146 self._serviceable.set()
147 147
148 148 def _onresponsedata(self, data):
149 149 available, readcount, wanted = self._decoder.decode(data)
150 150
151 151 if not available:
152 152 return
153 153
154 154 with self._lock:
155 155 for o in self._decoder.getavailable():
156 156 if not self._seeninitial and not self.fromredirect:
157 157 self._handleinitial(o)
158 158 continue
159 159
160 160 # We should never see an object after a content redirect,
161 161 # as the spec says the main status object containing the
162 162 # content redirect is the only object in the stream. Fail
163 163 # if we see a misbehaving server.
164 164 if self._redirect:
165 165 raise error.Abort(_('received unexpected response data '
166 166 'after content redirect; the remote is '
167 167 'buggy'))
168 168
169 169 self._pendingevents.append(o)
170 170
171 171 self._serviceable.set()
172 172
173 173 def _onerror(self, e):
174 174 self._pendingerror = e
175 175
176 176 with self._lock:
177 177 self._serviceable.set()
178 178
179 179 def _handleinitial(self, o):
180 180 self._seeninitial = True
181 181 if o[b'status'] == b'ok':
182 182 return
183 183
184 184 elif o[b'status'] == b'redirect':
185 185 l = o[b'location']
186 186 self._redirect = wireprototypes.alternatelocationresponse(
187 187 url=l[b'url'],
188 188 mediatype=l[b'mediatype'],
189 189 size=l.get(b'size'),
190 190 fullhashes=l.get(b'fullhashes'),
191 191 fullhashseed=l.get(b'fullhashseed'),
192 192 serverdercerts=l.get(b'serverdercerts'),
193 193 servercadercerts=l.get(b'servercadercerts'))
194 194 return
195 195
196 196 atoms = [{'msg': o[b'error'][b'message']}]
197 197 if b'args' in o[b'error']:
198 198 atoms[0]['args'] = o[b'error'][b'args']
199 199
200 200 raise error.RepoError(formatrichmessage(atoms))
201 201
202 202 def objects(self):
203 203 """Obtained decoded objects from this response.
204 204
205 205 This is a generator of data structures that were decoded from the
206 206 command response.
207 207
208 208 Obtaining the next member of the generator may block due to waiting
209 209 on external data to become available.
210 210
211 211 If the server encountered an error in the middle of serving the data
212 212 or if another error occurred, an exception may be raised when
213 213 advancing the generator.
214 214 """
215 215 while True:
216 216 # TODO this can infinite loop if self._inputcomplete is never
217 217 # set. We likely want to tie the lifetime of this object/state
218 218 # to that of the background thread receiving frames and updating
219 219 # our state.
220 220 self._serviceable.wait(1.0)
221 221
222 222 if self._pendingerror:
223 223 raise self._pendingerror
224 224
225 225 with self._lock:
226 226 self._serviceable.clear()
227 227
228 228 # Make copies because objects could be mutated during
229 229 # iteration.
230 230 stop = self._inputcomplete
231 231 pending = list(self._pendingevents)
232 232 self._pendingevents[:] = []
233 233
234 234 for o in pending:
235 235 yield o
236 236
237 237 if stop:
238 238 break
239 239
240 240 class clienthandler(object):
241 241 """Object to handle higher-level client activities.
242 242
243 243 The ``clientreactor`` is used to hold low-level state about the frame-based
244 244 protocol, such as which requests and streams are active. This type is used
245 245 for higher-level operations, such as reading frames from a socket, exposing
246 246 and managing a higher-level primitive for representing command responses,
247 247 etc. This class is what peers should probably use to bridge wire activity
248 248 with the higher-level peer API.
249 249 """
250 250
251 251 def __init__(self, ui, clientreactor, opener=None,
252 252 requestbuilder=util.urlreq.request):
253 253 self._ui = ui
254 254 self._reactor = clientreactor
255 255 self._requests = {}
256 256 self._futures = {}
257 257 self._responses = {}
258 258 self._redirects = []
259 259 self._frameseof = False
260 260 self._opener = opener or urlmod.opener(ui)
261 261 self._requestbuilder = requestbuilder
262 262
263 263 def callcommand(self, command, args, f, redirect=None):
264 264 """Register a request to call a command.
265 265
266 266 Returns an iterable of frames that should be sent over the wire.
267 267 """
268 268 request, action, meta = self._reactor.callcommand(command, args,
269 269 redirect=redirect)
270 270
271 271 if action != 'noop':
272 272 raise error.ProgrammingError('%s not yet supported' % action)
273 273
274 274 rid = request.requestid
275 275 self._requests[rid] = request
276 276 self._futures[rid] = f
277 277 # TODO we need some kind of lifetime on response instances otherwise
278 278 # objects() may deadlock.
279 279 self._responses[rid] = commandresponse(rid, command)
280 280
281 281 return iter(())
282 282
283 283 def flushcommands(self):
284 284 """Flush all queued commands.
285 285
286 286 Returns an iterable of frames that should be sent over the wire.
287 287 """
288 288 action, meta = self._reactor.flushcommands()
289 289
290 290 if action != 'sendframes':
291 291 raise error.ProgrammingError('%s not yet supported' % action)
292 292
293 293 return meta['framegen']
294 294
295 295 def readdata(self, framefh):
296 296 """Attempt to read data and do work.
297 297
298 298 Returns None if no data was read. Presumably this means we're
299 299 done with all read I/O.
300 300 """
301 301 if not self._frameseof:
302 302 frame = wireprotoframing.readframe(framefh)
303 303 if frame is None:
304 304 # TODO tell reactor?
305 305 self._frameseof = True
306 306 else:
307 307 self._ui.note(_('received %r\n') % frame)
308 308 self._processframe(frame)
309 309
310 310 # Also try to read the first redirect.
311 311 if self._redirects:
312 312 if not self._processredirect(*self._redirects[0]):
313 313 self._redirects.pop(0)
314 314
315 315 if self._frameseof and not self._redirects:
316 316 return None
317 317
318 318 return True
319 319
320 320 def _processframe(self, frame):
321 321 """Process a single read frame."""
322 322
323 323 action, meta = self._reactor.onframerecv(frame)
324 324
325 325 if action == 'error':
326 326 e = error.RepoError(meta['message'])
327 327
328 328 if frame.requestid in self._responses:
329 329 self._responses[frame.requestid]._oninputcomplete()
330 330
331 331 if frame.requestid in self._futures:
332 332 self._futures[frame.requestid].set_exception(e)
333 333 del self._futures[frame.requestid]
334 334 else:
335 335 raise e
336 336
337 337 return
338 338 elif action == 'noop':
339 339 return
340 340 elif action == 'responsedata':
341 341 # Handled below.
342 342 pass
343 343 else:
344 344 raise error.ProgrammingError('action not handled: %s' % action)
345 345
346 346 if frame.requestid not in self._requests:
347 347 raise error.ProgrammingError(
348 348 'received frame for unknown request; this is either a bug in '
349 349 'the clientreactor not screening for this or this instance was '
350 350 'never told about this request: %r' % frame)
351 351
352 352 response = self._responses[frame.requestid]
353 353
354 354 if action == 'responsedata':
355 355 # Any failures processing this frame should bubble up to the
356 356 # future tracking the request.
357 357 try:
358 358 self._processresponsedata(frame, meta, response)
359 359 except BaseException as e:
360 360 # If an exception occurs before the future is resolved,
361 361 # fail the future. Otherwise, we stuff the exception on
362 362 # the response object so it can be raised during objects()
363 363 # iteration. If nothing is consuming objects(), we could
364 364 # silently swallow this exception. That's a risk we'll have to
365 365 # take.
366 366 if frame.requestid in self._futures:
367 367 self._futures[frame.requestid].set_exception(e)
368 368 del self._futures[frame.requestid]
369 369 response._oninputcomplete()
370 370 else:
371 371 response._onerror(e)
372 372 else:
373 373 raise error.ProgrammingError(
374 374 'unhandled action from clientreactor: %s' % action)
375 375
376 376 def _processresponsedata(self, frame, meta, response):
377 377 # This can raise. The caller can handle it.
378 378 response._onresponsedata(meta['data'])
379 379
380 380 # If we got a content redirect response, we want to fetch it and
381 381 # expose the data as if we received it inline. But we also want to
382 382 # keep our internal request accounting in order. Our strategy is to
383 383 # basically put meaningful response handling on pause until EOS occurs
384 384 # and the stream accounting is in a good state. At that point, we follow
385 385 # the redirect and replace the response object with its data.
386 386
387 387 redirect = response._redirect
388 388 handlefuture = False if redirect else True
389 389
390 390 if meta['eos']:
391 391 response._oninputcomplete()
392 392 del self._requests[frame.requestid]
393 393
394 394 if redirect:
395 395 self._followredirect(frame.requestid, redirect)
396 396 return
397 397
398 398 if not handlefuture:
399 399 return
400 400
401 401 # If the command has a decoder, we wait until all input has been
402 402 # received before resolving the future. Otherwise we resolve the
403 403 # future immediately.
404 404 if frame.requestid not in self._futures:
405 405 return
406 406
407 407 if response.command not in COMMAND_DECODERS:
408 408 self._futures[frame.requestid].set_result(response.objects())
409 409 del self._futures[frame.requestid]
410 410 elif response._inputcomplete:
411 411 decoded = COMMAND_DECODERS[response.command](response.objects())
412 412 self._futures[frame.requestid].set_result(decoded)
413 413 del self._futures[frame.requestid]
414 414
415 415 def _followredirect(self, requestid, redirect):
416 416 """Called to initiate redirect following for a request."""
417 417 self._ui.note(_('(following redirect to %s)\n') % redirect.url)
418 418
419 419 # TODO handle framed responses.
420 420 if redirect.mediatype != b'application/mercurial-cbor':
421 421 raise error.Abort(_('cannot handle redirects for the %s media type')
422 422 % redirect.mediatype)
423 423
424 424 if redirect.fullhashes:
425 425 self._ui.warn(_('(support for validating hashes on content '
426 426 'redirects not supported)\n'))
427 427
428 428 if redirect.serverdercerts or redirect.servercadercerts:
429 429 self._ui.warn(_('(support for pinning server certificates on '
430 430 'content redirects not supported)\n'))
431 431
432 432 headers = {
433 433 r'Accept': redirect.mediatype,
434 434 }
435 435
436 436 req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
437 437
438 438 try:
439 439 res = self._opener.open(req)
440 440 except util.urlerr.httperror as e:
441 441 if e.code == 401:
442 442 raise error.Abort(_('authorization failed'))
443 443 raise
444 444 except util.httplib.HTTPException as e:
445 445 self._ui.debug('http error requesting %s\n' % req.get_full_url())
446 446 self._ui.traceback()
447 447 raise IOError(None, e)
448 448
449 449 urlmod.wrapresponse(res)
450 450
451 451 # The existing response object is associated with frame data. Rather
452 452 # than try to normalize its state, just create a new object.
453 453 oldresponse = self._responses[requestid]
454 454 self._responses[requestid] = commandresponse(requestid,
455 455 oldresponse.command,
456 456 fromredirect=True)
457 457
458 458 self._redirects.append((requestid, res))
459 459
460 460 def _processredirect(self, rid, res):
461 """Called to continue processing a response from a redirect."""
461 """Called to continue processing a response from a redirect.
462
463 Returns a bool indicating if the redirect is still serviceable.
464 """
462 465 response = self._responses[rid]
463 466
464 467 try:
465 468 data = res.read(32768)
466 469 response._onresponsedata(data)
467 470
468 471 # We're at end of stream.
469 472 if not data:
470 473 response._oninputcomplete()
471 474
472 475 if rid not in self._futures:
473 return
476 return bool(data)
474 477
475 478 if response.command not in COMMAND_DECODERS:
476 479 self._futures[rid].set_result(response.objects())
477 480 del self._futures[rid]
478 481 elif response._inputcomplete:
479 482 decoded = COMMAND_DECODERS[response.command](response.objects())
480 483 self._futures[rid].set_result(decoded)
481 484 del self._futures[rid]
482 485
483 486 return bool(data)
484 487
485 488 except BaseException as e:
486 489 self._futures[rid].set_exception(e)
487 490 del self._futures[rid]
488 491 response._oninputcomplete()
489 492 return False
490 493
491 494 def decodebranchmap(objs):
492 495 # Response should be a single CBOR map of branch name to array of nodes.
493 496 bm = next(objs)
494 497
495 498 return {encoding.tolocal(k): v for k, v in bm.items()}
496 499
497 500 def decodeheads(objs):
498 501 # Array of node bytestrings.
499 502 return next(objs)
500 503
501 504 def decodeknown(objs):
502 505 # Bytestring where each byte is a 0 or 1.
503 506 raw = next(objs)
504 507
505 508 return [True if c == '1' else False for c in raw]
506 509
507 510 def decodelistkeys(objs):
508 511 # Map with bytestring keys and values.
509 512 return next(objs)
510 513
511 514 def decodelookup(objs):
512 515 return next(objs)
513 516
514 517 def decodepushkey(objs):
515 518 return next(objs)
516 519
517 520 COMMAND_DECODERS = {
518 521 'branchmap': decodebranchmap,
519 522 'heads': decodeheads,
520 523 'known': decodeknown,
521 524 'listkeys': decodelistkeys,
522 525 'lookup': decodelookup,
523 526 'pushkey': decodepushkey,
524 527 }
General Comments 0
You need to be logged in to leave comments. Login now