##// END OF EJS Templates
wireprotov2: raise exception in objects() if future has been resolved...
Gregory Szorc -
r40172:ed4ebbb9 default
parent child Browse files
Show More
@@ -1,500 +1,519
1 1 # wireprotov2peer.py - client side code for wire protocol version 2
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import threading
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 encoding,
15 15 error,
16 16 pycompat,
17 17 sslutil,
18 18 url as urlmod,
19 19 util,
20 20 wireprotoframing,
21 21 wireprototypes,
22 22 )
23 23 from .utils import (
24 24 cborutil,
25 25 )
26 26
27 27 def formatrichmessage(atoms):
28 28 """Format an encoded message from the framing protocol."""
29 29
30 30 chunks = []
31 31
32 32 for atom in atoms:
33 33 msg = _(atom[b'msg'])
34 34
35 35 if b'args' in atom:
36 36 msg = msg % tuple(atom[b'args'])
37 37
38 38 chunks.append(msg)
39 39
40 40 return b''.join(chunks)
41 41
42 42 SUPPORTED_REDIRECT_PROTOCOLS = {
43 43 b'http',
44 44 b'https',
45 45 }
46 46
47 47 SUPPORTED_CONTENT_HASHES = {
48 48 b'sha1',
49 49 b'sha256',
50 50 }
51 51
52 52 def redirecttargetsupported(ui, target):
53 53 """Determine whether a redirect target entry is supported.
54 54
55 55 ``target`` should come from the capabilities data structure emitted by
56 56 the server.
57 57 """
58 58 if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
59 59 ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
60 60 % (target[b'name'], target.get(b'protocol', b'')))
61 61 return False
62 62
63 63 if target.get(b'snirequired') and not sslutil.hassni:
64 64 ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
65 65 target[b'name'])
66 66 return False
67 67
68 68 if b'tlsversions' in target:
69 69 tlsversions = set(target[b'tlsversions'])
70 70 supported = set()
71 71
72 72 for v in sslutil.supportedprotocols:
73 73 assert v.startswith(b'tls')
74 74 supported.add(v[3:])
75 75
76 76 if not tlsversions & supported:
77 77 ui.note(_('(remote redirect target %s requires unsupported TLS '
78 78 'versions: %s)\n') % (
79 79 target[b'name'], b', '.join(sorted(tlsversions))))
80 80 return False
81 81
82 82 ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
83 83
84 84 return True
85 85
86 86 def supportedredirects(ui, apidescriptor):
87 87 """Resolve the "redirect" command request key given an API descriptor.
88 88
89 89 Given an API descriptor returned by the server, returns a data structure
90 90 that can be used in hte "redirect" field of command requests to advertise
91 91 support for compatible redirect targets.
92 92
93 93 Returns None if no redirect targets are remotely advertised or if none are
94 94 supported.
95 95 """
96 96 if not apidescriptor or b'redirect' not in apidescriptor:
97 97 return None
98 98
99 99 targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
100 100 if redirecttargetsupported(ui, t)]
101 101
102 102 hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
103 103 if h in SUPPORTED_CONTENT_HASHES]
104 104
105 105 return {
106 106 b'targets': targets,
107 107 b'hashes': hashes,
108 108 }
109 109
110 110 class commandresponse(object):
111 111 """Represents the response to a command request.
112 112
113 113 Instances track the state of the command and hold its results.
114 114
115 115 An external entity is required to update the state of the object when
116 116 events occur.
117 117 """
118 118
119 119 def __init__(self, requestid, command, fromredirect=False):
120 120 self.requestid = requestid
121 121 self.command = command
122 122 self.fromredirect = fromredirect
123 123
124 124 # Whether all remote input related to this command has been
125 125 # received.
126 126 self._inputcomplete = False
127 127
128 128 # We have a lock that is acquired when important object state is
129 129 # mutated. This is to prevent race conditions between 1 thread
130 130 # sending us new data and another consuming it.
131 131 self._lock = threading.RLock()
132 132
133 133 # An event is set when state of the object changes. This event
134 134 # is waited on by the generator emitting objects.
135 135 self._serviceable = threading.Event()
136 136
137 137 self._pendingevents = []
138 self._pendingerror = None
138 139 self._decoder = cborutil.bufferingdecoder()
139 140 self._seeninitial = False
140 141 self._redirect = None
141 142
142 143 def _oninputcomplete(self):
143 144 with self._lock:
144 145 self._inputcomplete = True
145 146 self._serviceable.set()
146 147
147 148 def _onresponsedata(self, data):
148 149 available, readcount, wanted = self._decoder.decode(data)
149 150
150 151 if not available:
151 152 return
152 153
153 154 with self._lock:
154 155 for o in self._decoder.getavailable():
155 156 if not self._seeninitial and not self.fromredirect:
156 157 self._handleinitial(o)
157 158 continue
158 159
159 160 # We should never see an object after a content redirect,
160 161 # as the spec says the main status object containing the
161 162 # content redirect is the only object in the stream. Fail
162 163 # if we see a misbehaving server.
163 164 if self._redirect:
164 165 raise error.Abort(_('received unexpected response data '
165 166 'after content redirect; the remote is '
166 167 'buggy'))
167 168
168 169 self._pendingevents.append(o)
169 170
170 171 self._serviceable.set()
171 172
173 def _onerror(self, e):
174 self._pendingerror = e
175
176 with self._lock:
177 self._serviceable.set()
178
172 179 def _handleinitial(self, o):
173 180 self._seeninitial = True
174 181 if o[b'status'] == b'ok':
175 182 return
176 183
177 184 elif o[b'status'] == b'redirect':
178 185 l = o[b'location']
179 186 self._redirect = wireprototypes.alternatelocationresponse(
180 187 url=l[b'url'],
181 188 mediatype=l[b'mediatype'],
182 189 size=l.get(b'size'),
183 190 fullhashes=l.get(b'fullhashes'),
184 191 fullhashseed=l.get(b'fullhashseed'),
185 192 serverdercerts=l.get(b'serverdercerts'),
186 193 servercadercerts=l.get(b'servercadercerts'))
187 194 return
188 195
189 196 atoms = [{'msg': o[b'error'][b'message']}]
190 197 if b'args' in o[b'error']:
191 198 atoms[0]['args'] = o[b'error'][b'args']
192 199
193 200 raise error.RepoError(formatrichmessage(atoms))
194 201
195 202 def objects(self):
196 203 """Obtained decoded objects from this response.
197 204
198 205 This is a generator of data structures that were decoded from the
199 206 command response.
200 207
201 208 Obtaining the next member of the generator may block due to waiting
202 209 on external data to become available.
203 210
204 211 If the server encountered an error in the middle of serving the data
205 212 or if another error occurred, an exception may be raised when
206 213 advancing the generator.
207 214 """
208 215 while True:
209 216 # TODO this can infinite loop if self._inputcomplete is never
210 217 # set. We likely want to tie the lifetime of this object/state
211 218 # to that of the background thread receiving frames and updating
212 219 # our state.
213 220 self._serviceable.wait(1.0)
214 221
222 if self._pendingerror:
223 raise self._pendingerror
224
215 225 with self._lock:
216 226 self._serviceable.clear()
217 227
218 228 # Make copies because objects could be mutated during
219 229 # iteration.
220 230 stop = self._inputcomplete
221 231 pending = list(self._pendingevents)
222 232 self._pendingevents[:] = []
223 233
224 234 for o in pending:
225 235 yield o
226 236
227 237 if stop:
228 238 break
229 239
230 240 class clienthandler(object):
231 241 """Object to handle higher-level client activities.
232 242
233 243 The ``clientreactor`` is used to hold low-level state about the frame-based
234 244 protocol, such as which requests and streams are active. This type is used
235 245 for higher-level operations, such as reading frames from a socket, exposing
236 246 and managing a higher-level primitive for representing command responses,
237 247 etc. This class is what peers should probably use to bridge wire activity
238 248 with the higher-level peer API.
239 249 """
240 250
241 251 def __init__(self, ui, clientreactor, opener=None,
242 252 requestbuilder=util.urlreq.request):
243 253 self._ui = ui
244 254 self._reactor = clientreactor
245 255 self._requests = {}
246 256 self._futures = {}
247 257 self._responses = {}
248 258 self._redirects = []
249 259 self._frameseof = False
250 260 self._opener = opener or urlmod.opener(ui)
251 261 self._requestbuilder = requestbuilder
252 262
253 263 def callcommand(self, command, args, f, redirect=None):
254 264 """Register a request to call a command.
255 265
256 266 Returns an iterable of frames that should be sent over the wire.
257 267 """
258 268 request, action, meta = self._reactor.callcommand(command, args,
259 269 redirect=redirect)
260 270
261 271 if action != 'noop':
262 272 raise error.ProgrammingError('%s not yet supported' % action)
263 273
264 274 rid = request.requestid
265 275 self._requests[rid] = request
266 276 self._futures[rid] = f
267 277 # TODO we need some kind of lifetime on response instances otherwise
268 278 # objects() may deadlock.
269 279 self._responses[rid] = commandresponse(rid, command)
270 280
271 281 return iter(())
272 282
273 283 def flushcommands(self):
274 284 """Flush all queued commands.
275 285
276 286 Returns an iterable of frames that should be sent over the wire.
277 287 """
278 288 action, meta = self._reactor.flushcommands()
279 289
280 290 if action != 'sendframes':
281 291 raise error.ProgrammingError('%s not yet supported' % action)
282 292
283 293 return meta['framegen']
284 294
285 295 def readdata(self, framefh):
286 296 """Attempt to read data and do work.
287 297
288 298 Returns None if no data was read. Presumably this means we're
289 299 done with all read I/O.
290 300 """
291 301 if not self._frameseof:
292 302 frame = wireprotoframing.readframe(framefh)
293 303 if frame is None:
294 304 # TODO tell reactor?
295 305 self._frameseof = True
296 306 else:
297 307 self._ui.note(_('received %r\n') % frame)
298 308 self._processframe(frame)
299 309
300 310 # Also try to read the first redirect.
301 311 if self._redirects:
302 312 if not self._processredirect(*self._redirects[0]):
303 313 self._redirects.pop(0)
304 314
305 315 if self._frameseof and not self._redirects:
306 316 return None
307 317
308 318 return True
309 319
310 320 def _processframe(self, frame):
311 321 """Process a single read frame."""
312 322
313 323 action, meta = self._reactor.onframerecv(frame)
314 324
315 325 if action == 'error':
316 326 e = error.RepoError(meta['message'])
317 327
318 328 if frame.requestid in self._responses:
319 329 self._responses[frame.requestid]._oninputcomplete()
320 330
321 331 if frame.requestid in self._futures:
322 332 self._futures[frame.requestid].set_exception(e)
323 333 del self._futures[frame.requestid]
324 334 else:
325 335 raise e
326 336
327 337 return
328 338 elif action == 'noop':
329 339 return
330 340
331 341 if frame.requestid not in self._requests:
332 342 raise error.ProgrammingError(
333 343 'received frame for unknown request; this is either a bug in '
334 344 'the clientreactor not screening for this or this instance was '
335 345 'never told about this request: %r' % frame)
336 346
337 347 response = self._responses[frame.requestid]
338 348
339 349 if action == 'responsedata':
340 350 # Any failures processing this frame should bubble up to the
341 351 # future tracking the request.
342 352 try:
343 353 self._processresponsedata(frame, meta, response)
344 354 except BaseException as e:
345 self._futures[frame.requestid].set_exception(e)
346 del self._futures[frame.requestid]
347 response._oninputcomplete()
355 # If an exception occurs before the future is resolved,
356 # fail the future. Otherwise, we stuff the exception on
357 # the response object so it can be raised during objects()
358 # iteration. If nothing is consuming objects(), we could
359 # silently swallow this exception. That's a risk we'll have to
360 # take.
361 if frame.requestid in self._futures:
362 self._futures[frame.requestid].set_exception(e)
363 del self._futures[frame.requestid]
364 response._oninputcomplete()
365 else:
366 response._onerror(e)
348 367 else:
349 368 raise error.ProgrammingError(
350 369 'unhandled action from clientreactor: %s' % action)
351 370
352 371 def _processresponsedata(self, frame, meta, response):
353 372 # This can raise. The caller can handle it.
354 373 response._onresponsedata(meta['data'])
355 374
356 375 # If we got a content redirect response, we want to fetch it and
357 376 # expose the data as if we received it inline. But we also want to
358 377 # keep our internal request accounting in order. Our strategy is to
359 378 # basically put meaningful response handling on pause until EOS occurs
360 379 # and the stream accounting is in a good state. At that point, we follow
361 380 # the redirect and replace the response object with its data.
362 381
363 382 redirect = response._redirect
364 383 handlefuture = False if redirect else True
365 384
366 385 if meta['eos']:
367 386 response._oninputcomplete()
368 387 del self._requests[frame.requestid]
369 388
370 389 if redirect:
371 390 self._followredirect(frame.requestid, redirect)
372 391 return
373 392
374 393 if not handlefuture:
375 394 return
376 395
377 396 # If the command has a decoder, we wait until all input has been
378 397 # received before resolving the future. Otherwise we resolve the
379 398 # future immediately.
380 399 if frame.requestid not in self._futures:
381 400 return
382 401
383 402 if response.command not in COMMAND_DECODERS:
384 403 self._futures[frame.requestid].set_result(response.objects())
385 404 del self._futures[frame.requestid]
386 405 elif response._inputcomplete:
387 406 decoded = COMMAND_DECODERS[response.command](response.objects())
388 407 self._futures[frame.requestid].set_result(decoded)
389 408 del self._futures[frame.requestid]
390 409
391 410 def _followredirect(self, requestid, redirect):
392 411 """Called to initiate redirect following for a request."""
393 412 self._ui.note(_('(following redirect to %s)\n') % redirect.url)
394 413
395 414 # TODO handle framed responses.
396 415 if redirect.mediatype != b'application/mercurial-cbor':
397 416 raise error.Abort(_('cannot handle redirects for the %s media type')
398 417 % redirect.mediatype)
399 418
400 419 if redirect.fullhashes:
401 420 self._ui.warn(_('(support for validating hashes on content '
402 421 'redirects not supported)\n'))
403 422
404 423 if redirect.serverdercerts or redirect.servercadercerts:
405 424 self._ui.warn(_('(support for pinning server certificates on '
406 425 'content redirects not supported)\n'))
407 426
408 427 headers = {
409 428 r'Accept': redirect.mediatype,
410 429 }
411 430
412 431 req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
413 432
414 433 try:
415 434 res = self._opener.open(req)
416 435 except util.urlerr.httperror as e:
417 436 if e.code == 401:
418 437 raise error.Abort(_('authorization failed'))
419 438 raise
420 439 except util.httplib.HTTPException as e:
421 440 self._ui.debug('http error requesting %s\n' % req.get_full_url())
422 441 self._ui.traceback()
423 442 raise IOError(None, e)
424 443
425 444 urlmod.wrapresponse(res)
426 445
427 446 # The existing response object is associated with frame data. Rather
428 447 # than try to normalize its state, just create a new object.
429 448 oldresponse = self._responses[requestid]
430 449 self._responses[requestid] = commandresponse(requestid,
431 450 oldresponse.command,
432 451 fromredirect=True)
433 452
434 453 self._redirects.append((requestid, res))
435 454
436 455 def _processredirect(self, rid, res):
437 456 """Called to continue processing a response from a redirect."""
438 457 response = self._responses[rid]
439 458
440 459 try:
441 460 data = res.read(32768)
442 461 response._onresponsedata(data)
443 462
444 463 # We're at end of stream.
445 464 if not data:
446 465 response._oninputcomplete()
447 466
448 467 if rid not in self._futures:
449 468 return
450 469
451 470 if response.command not in COMMAND_DECODERS:
452 471 self._futures[rid].set_result(response.objects())
453 472 del self._futures[rid]
454 473 elif response._inputcomplete:
455 474 decoded = COMMAND_DECODERS[response.command](response.objects())
456 475 self._futures[rid].set_result(decoded)
457 476 del self._futures[rid]
458 477
459 478 return bool(data)
460 479
461 480 except BaseException as e:
462 481 self._futures[rid].set_exception(e)
463 482 del self._futures[rid]
464 483 response._oninputcomplete()
465 484 return False
466 485
467 486 def decodebranchmap(objs):
468 487 # Response should be a single CBOR map of branch name to array of nodes.
469 488 bm = next(objs)
470 489
471 490 return {encoding.tolocal(k): v for k, v in bm.items()}
472 491
473 492 def decodeheads(objs):
474 493 # Array of node bytestrings.
475 494 return next(objs)
476 495
477 496 def decodeknown(objs):
478 497 # Bytestring where each byte is a 0 or 1.
479 498 raw = next(objs)
480 499
481 500 return [True if c == '1' else False for c in raw]
482 501
483 502 def decodelistkeys(objs):
484 503 # Map with bytestring keys and values.
485 504 return next(objs)
486 505
487 506 def decodelookup(objs):
488 507 return next(objs)
489 508
490 509 def decodepushkey(objs):
491 510 return next(objs)
492 511
493 512 COMMAND_DECODERS = {
494 513 'branchmap': decodebranchmap,
495 514 'heads': decodeheads,
496 515 'known': decodeknown,
497 516 'listkeys': decodelistkeys,
498 517 'lookup': decodelookup,
499 518 'pushkey': decodepushkey,
500 519 }
General Comments 0
You need to be logged in to leave comments. Login now