##// END OF EJS Templates
wireprotov2peer: rewrite character traversal to use slices...
Gregory Szorc -
r41423:e6c1c647 default
parent child Browse files
Show More
@@ -1,532 +1,532 b''
1 1 # wireprotov2peer.py - client side code for wire protocol version 2
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import threading
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 encoding,
15 15 error,
16 16 pycompat,
17 17 sslutil,
18 18 url as urlmod,
19 19 util,
20 20 wireprotoframing,
21 21 wireprototypes,
22 22 )
23 23 from .utils import (
24 24 cborutil,
25 25 )
26 26
27 27 def formatrichmessage(atoms):
28 28 """Format an encoded message from the framing protocol."""
29 29
30 30 chunks = []
31 31
32 32 for atom in atoms:
33 33 msg = _(atom[b'msg'])
34 34
35 35 if b'args' in atom:
36 36 msg = msg % tuple(atom[b'args'])
37 37
38 38 chunks.append(msg)
39 39
40 40 return b''.join(chunks)
41 41
42 42 SUPPORTED_REDIRECT_PROTOCOLS = {
43 43 b'http',
44 44 b'https',
45 45 }
46 46
47 47 SUPPORTED_CONTENT_HASHES = {
48 48 b'sha1',
49 49 b'sha256',
50 50 }
51 51
52 52 def redirecttargetsupported(ui, target):
53 53 """Determine whether a redirect target entry is supported.
54 54
55 55 ``target`` should come from the capabilities data structure emitted by
56 56 the server.
57 57 """
58 58 if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
59 59 ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
60 60 % (target[b'name'], target.get(b'protocol', b'')))
61 61 return False
62 62
63 63 if target.get(b'snirequired') and not sslutil.hassni:
64 64 ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
65 65 target[b'name'])
66 66 return False
67 67
68 68 if b'tlsversions' in target:
69 69 tlsversions = set(target[b'tlsversions'])
70 70 supported = set()
71 71
72 72 for v in sslutil.supportedprotocols:
73 73 assert v.startswith(b'tls')
74 74 supported.add(v[3:])
75 75
76 76 if not tlsversions & supported:
77 77 ui.note(_('(remote redirect target %s requires unsupported TLS '
78 78 'versions: %s)\n') % (
79 79 target[b'name'], b', '.join(sorted(tlsversions))))
80 80 return False
81 81
82 82 ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
83 83
84 84 return True
85 85
86 86 def supportedredirects(ui, apidescriptor):
87 87 """Resolve the "redirect" command request key given an API descriptor.
88 88
89 89 Given an API descriptor returned by the server, returns a data structure
90 90 that can be used in hte "redirect" field of command requests to advertise
91 91 support for compatible redirect targets.
92 92
93 93 Returns None if no redirect targets are remotely advertised or if none are
94 94 supported.
95 95 """
96 96 if not apidescriptor or b'redirect' not in apidescriptor:
97 97 return None
98 98
99 99 targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
100 100 if redirecttargetsupported(ui, t)]
101 101
102 102 hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
103 103 if h in SUPPORTED_CONTENT_HASHES]
104 104
105 105 return {
106 106 b'targets': targets,
107 107 b'hashes': hashes,
108 108 }
109 109
110 110 class commandresponse(object):
111 111 """Represents the response to a command request.
112 112
113 113 Instances track the state of the command and hold its results.
114 114
115 115 An external entity is required to update the state of the object when
116 116 events occur.
117 117 """
118 118
119 119 def __init__(self, requestid, command, fromredirect=False):
120 120 self.requestid = requestid
121 121 self.command = command
122 122 self.fromredirect = fromredirect
123 123
124 124 # Whether all remote input related to this command has been
125 125 # received.
126 126 self._inputcomplete = False
127 127
128 128 # We have a lock that is acquired when important object state is
129 129 # mutated. This is to prevent race conditions between 1 thread
130 130 # sending us new data and another consuming it.
131 131 self._lock = threading.RLock()
132 132
133 133 # An event is set when state of the object changes. This event
134 134 # is waited on by the generator emitting objects.
135 135 self._serviceable = threading.Event()
136 136
137 137 self._pendingevents = []
138 138 self._pendingerror = None
139 139 self._decoder = cborutil.bufferingdecoder()
140 140 self._seeninitial = False
141 141 self._redirect = None
142 142
143 143 def _oninputcomplete(self):
144 144 with self._lock:
145 145 self._inputcomplete = True
146 146 self._serviceable.set()
147 147
148 148 def _onresponsedata(self, data):
149 149 available, readcount, wanted = self._decoder.decode(data)
150 150
151 151 if not available:
152 152 return
153 153
154 154 with self._lock:
155 155 for o in self._decoder.getavailable():
156 156 if not self._seeninitial and not self.fromredirect:
157 157 self._handleinitial(o)
158 158 continue
159 159
160 160 # We should never see an object after a content redirect,
161 161 # as the spec says the main status object containing the
162 162 # content redirect is the only object in the stream. Fail
163 163 # if we see a misbehaving server.
164 164 if self._redirect:
165 165 raise error.Abort(_('received unexpected response data '
166 166 'after content redirect; the remote is '
167 167 'buggy'))
168 168
169 169 self._pendingevents.append(o)
170 170
171 171 self._serviceable.set()
172 172
173 173 def _onerror(self, e):
174 174 self._pendingerror = e
175 175
176 176 with self._lock:
177 177 self._serviceable.set()
178 178
179 179 def _handleinitial(self, o):
180 180 self._seeninitial = True
181 181 if o[b'status'] == b'ok':
182 182 return
183 183
184 184 elif o[b'status'] == b'redirect':
185 185 l = o[b'location']
186 186 self._redirect = wireprototypes.alternatelocationresponse(
187 187 url=l[b'url'],
188 188 mediatype=l[b'mediatype'],
189 189 size=l.get(b'size'),
190 190 fullhashes=l.get(b'fullhashes'),
191 191 fullhashseed=l.get(b'fullhashseed'),
192 192 serverdercerts=l.get(b'serverdercerts'),
193 193 servercadercerts=l.get(b'servercadercerts'))
194 194 return
195 195
196 196 atoms = [{'msg': o[b'error'][b'message']}]
197 197 if b'args' in o[b'error']:
198 198 atoms[0]['args'] = o[b'error'][b'args']
199 199
200 200 raise error.RepoError(formatrichmessage(atoms))
201 201
202 202 def objects(self):
203 203 """Obtained decoded objects from this response.
204 204
205 205 This is a generator of data structures that were decoded from the
206 206 command response.
207 207
208 208 Obtaining the next member of the generator may block due to waiting
209 209 on external data to become available.
210 210
211 211 If the server encountered an error in the middle of serving the data
212 212 or if another error occurred, an exception may be raised when
213 213 advancing the generator.
214 214 """
215 215 while True:
216 216 # TODO this can infinite loop if self._inputcomplete is never
217 217 # set. We likely want to tie the lifetime of this object/state
218 218 # to that of the background thread receiving frames and updating
219 219 # our state.
220 220 self._serviceable.wait(1.0)
221 221
222 222 if self._pendingerror:
223 223 raise self._pendingerror
224 224
225 225 with self._lock:
226 226 self._serviceable.clear()
227 227
228 228 # Make copies because objects could be mutated during
229 229 # iteration.
230 230 stop = self._inputcomplete
231 231 pending = list(self._pendingevents)
232 232 self._pendingevents[:] = []
233 233
234 234 for o in pending:
235 235 yield o
236 236
237 237 if stop:
238 238 break
239 239
240 240 class clienthandler(object):
241 241 """Object to handle higher-level client activities.
242 242
243 243 The ``clientreactor`` is used to hold low-level state about the frame-based
244 244 protocol, such as which requests and streams are active. This type is used
245 245 for higher-level operations, such as reading frames from a socket, exposing
246 246 and managing a higher-level primitive for representing command responses,
247 247 etc. This class is what peers should probably use to bridge wire activity
248 248 with the higher-level peer API.
249 249 """
250 250
251 251 def __init__(self, ui, clientreactor, opener=None,
252 252 requestbuilder=util.urlreq.request):
253 253 self._ui = ui
254 254 self._reactor = clientreactor
255 255 self._requests = {}
256 256 self._futures = {}
257 257 self._responses = {}
258 258 self._redirects = []
259 259 self._frameseof = False
260 260 self._opener = opener or urlmod.opener(ui)
261 261 self._requestbuilder = requestbuilder
262 262
263 263 def callcommand(self, command, args, f, redirect=None):
264 264 """Register a request to call a command.
265 265
266 266 Returns an iterable of frames that should be sent over the wire.
267 267 """
268 268 request, action, meta = self._reactor.callcommand(command, args,
269 269 redirect=redirect)
270 270
271 271 if action != 'noop':
272 272 raise error.ProgrammingError('%s not yet supported' % action)
273 273
274 274 rid = request.requestid
275 275 self._requests[rid] = request
276 276 self._futures[rid] = f
277 277 # TODO we need some kind of lifetime on response instances otherwise
278 278 # objects() may deadlock.
279 279 self._responses[rid] = commandresponse(rid, command)
280 280
281 281 return iter(())
282 282
283 283 def flushcommands(self):
284 284 """Flush all queued commands.
285 285
286 286 Returns an iterable of frames that should be sent over the wire.
287 287 """
288 288 action, meta = self._reactor.flushcommands()
289 289
290 290 if action != 'sendframes':
291 291 raise error.ProgrammingError('%s not yet supported' % action)
292 292
293 293 return meta['framegen']
294 294
295 295 def readdata(self, framefh):
296 296 """Attempt to read data and do work.
297 297
298 298 Returns None if no data was read. Presumably this means we're
299 299 done with all read I/O.
300 300 """
301 301 if not self._frameseof:
302 302 frame = wireprotoframing.readframe(framefh)
303 303 if frame is None:
304 304 # TODO tell reactor?
305 305 self._frameseof = True
306 306 else:
307 307 self._ui.note(_('received %r\n') % frame)
308 308 self._processframe(frame)
309 309
310 310 # Also try to read the first redirect.
311 311 if self._redirects:
312 312 if not self._processredirect(*self._redirects[0]):
313 313 self._redirects.pop(0)
314 314
315 315 if self._frameseof and not self._redirects:
316 316 return None
317 317
318 318 return True
319 319
320 320 def _processframe(self, frame):
321 321 """Process a single read frame."""
322 322
323 323 action, meta = self._reactor.onframerecv(frame)
324 324
325 325 if action == 'error':
326 326 e = error.RepoError(meta['message'])
327 327
328 328 if frame.requestid in self._responses:
329 329 self._responses[frame.requestid]._oninputcomplete()
330 330
331 331 if frame.requestid in self._futures:
332 332 self._futures[frame.requestid].set_exception(e)
333 333 del self._futures[frame.requestid]
334 334 else:
335 335 raise e
336 336
337 337 return
338 338 elif action == 'noop':
339 339 return
340 340 elif action == 'responsedata':
341 341 # Handled below.
342 342 pass
343 343 else:
344 344 raise error.ProgrammingError('action not handled: %s' % action)
345 345
346 346 if frame.requestid not in self._requests:
347 347 raise error.ProgrammingError(
348 348 'received frame for unknown request; this is either a bug in '
349 349 'the clientreactor not screening for this or this instance was '
350 350 'never told about this request: %r' % frame)
351 351
352 352 response = self._responses[frame.requestid]
353 353
354 354 if action == 'responsedata':
355 355 # Any failures processing this frame should bubble up to the
356 356 # future tracking the request.
357 357 try:
358 358 self._processresponsedata(frame, meta, response)
359 359 except BaseException as e:
360 360 # If an exception occurs before the future is resolved,
361 361 # fail the future. Otherwise, we stuff the exception on
362 362 # the response object so it can be raised during objects()
363 363 # iteration. If nothing is consuming objects(), we could
364 364 # silently swallow this exception. That's a risk we'll have to
365 365 # take.
366 366 if frame.requestid in self._futures:
367 367 self._futures[frame.requestid].set_exception(e)
368 368 del self._futures[frame.requestid]
369 369 response._oninputcomplete()
370 370 else:
371 371 response._onerror(e)
372 372 else:
373 373 raise error.ProgrammingError(
374 374 'unhandled action from clientreactor: %s' % action)
375 375
376 376 def _processresponsedata(self, frame, meta, response):
377 377 # This can raise. The caller can handle it.
378 378 response._onresponsedata(meta['data'])
379 379
380 380 # We need to be careful about resolving futures prematurely. If a
381 381 # response is a redirect response, resolving the future before the
382 382 # redirect is processed would result in the consumer seeing an
383 383 # empty stream of objects, since they'd be consuming our
384 384 # response.objects() instead of the redirect's response.objects().
385 385 #
386 386 # Our strategy is to not resolve/finish the request until either
387 387 # EOS occurs or until the initial response object is fully received.
388 388
389 389 # Always react to eos.
390 390 if meta['eos']:
391 391 response._oninputcomplete()
392 392 del self._requests[frame.requestid]
393 393
394 394 # Not EOS but we haven't decoded the initial response object yet.
395 395 # Return and wait for more data.
396 396 elif not response._seeninitial:
397 397 return
398 398
399 399 # The specification says no objects should follow the initial/redirect
400 400 # object. So it should be safe to handle the redirect object if one is
401 401 # decoded, without having to wait for EOS.
402 402 if response._redirect:
403 403 self._followredirect(frame.requestid, response._redirect)
404 404 return
405 405
406 406 # If the command has a decoder, we wait until all input has been
407 407 # received before resolving the future. Otherwise we resolve the
408 408 # future immediately.
409 409 if frame.requestid not in self._futures:
410 410 return
411 411
412 412 if response.command not in COMMAND_DECODERS:
413 413 self._futures[frame.requestid].set_result(response.objects())
414 414 del self._futures[frame.requestid]
415 415 elif response._inputcomplete:
416 416 decoded = COMMAND_DECODERS[response.command](response.objects())
417 417 self._futures[frame.requestid].set_result(decoded)
418 418 del self._futures[frame.requestid]
419 419
420 420 def _followredirect(self, requestid, redirect):
421 421 """Called to initiate redirect following for a request."""
422 422 self._ui.note(_('(following redirect to %s)\n') % redirect.url)
423 423
424 424 # TODO handle framed responses.
425 425 if redirect.mediatype != b'application/mercurial-cbor':
426 426 raise error.Abort(_('cannot handle redirects for the %s media type')
427 427 % redirect.mediatype)
428 428
429 429 if redirect.fullhashes:
430 430 self._ui.warn(_('(support for validating hashes on content '
431 431 'redirects not supported)\n'))
432 432
433 433 if redirect.serverdercerts or redirect.servercadercerts:
434 434 self._ui.warn(_('(support for pinning server certificates on '
435 435 'content redirects not supported)\n'))
436 436
437 437 headers = {
438 438 r'Accept': redirect.mediatype,
439 439 }
440 440
441 441 req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
442 442
443 443 try:
444 444 res = self._opener.open(req)
445 445 except util.urlerr.httperror as e:
446 446 if e.code == 401:
447 447 raise error.Abort(_('authorization failed'))
448 448 raise
449 449 except util.httplib.HTTPException as e:
450 450 self._ui.debug('http error requesting %s\n' % req.get_full_url())
451 451 self._ui.traceback()
452 452 raise IOError(None, e)
453 453
454 454 urlmod.wrapresponse(res)
455 455
456 456 # The existing response object is associated with frame data. Rather
457 457 # than try to normalize its state, just create a new object.
458 458 oldresponse = self._responses[requestid]
459 459 self._responses[requestid] = commandresponse(requestid,
460 460 oldresponse.command,
461 461 fromredirect=True)
462 462
463 463 self._redirects.append((requestid, res))
464 464
465 465 def _processredirect(self, rid, res):
466 466 """Called to continue processing a response from a redirect.
467 467
468 468 Returns a bool indicating if the redirect is still serviceable.
469 469 """
470 470 response = self._responses[rid]
471 471
472 472 try:
473 473 data = res.read(32768)
474 474 response._onresponsedata(data)
475 475
476 476 # We're at end of stream.
477 477 if not data:
478 478 response._oninputcomplete()
479 479
480 480 if rid not in self._futures:
481 481 return bool(data)
482 482
483 483 if response.command not in COMMAND_DECODERS:
484 484 self._futures[rid].set_result(response.objects())
485 485 del self._futures[rid]
486 486 elif response._inputcomplete:
487 487 decoded = COMMAND_DECODERS[response.command](response.objects())
488 488 self._futures[rid].set_result(decoded)
489 489 del self._futures[rid]
490 490
491 491 return bool(data)
492 492
493 493 except BaseException as e:
494 494 self._futures[rid].set_exception(e)
495 495 del self._futures[rid]
496 496 response._oninputcomplete()
497 497 return False
498 498
499 499 def decodebranchmap(objs):
500 500 # Response should be a single CBOR map of branch name to array of nodes.
501 501 bm = next(objs)
502 502
503 503 return {encoding.tolocal(k): v for k, v in bm.items()}
504 504
505 505 def decodeheads(objs):
506 506 # Array of node bytestrings.
507 507 return next(objs)
508 508
509 509 def decodeknown(objs):
510 510 # Bytestring where each byte is a 0 or 1.
511 511 raw = next(objs)
512 512
513 return [True if c == '1' else False for c in raw]
513 return [True if raw[i:i + 1] == b'1' else False for i in range(len(raw))]
514 514
515 515 def decodelistkeys(objs):
516 516 # Map with bytestring keys and values.
517 517 return next(objs)
518 518
519 519 def decodelookup(objs):
520 520 return next(objs)
521 521
522 522 def decodepushkey(objs):
523 523 return next(objs)
524 524
525 525 COMMAND_DECODERS = {
526 526 'branchmap': decodebranchmap,
527 527 'heads': decodeheads,
528 528 'known': decodeknown,
529 529 'listkeys': decodelistkeys,
530 530 'lookup': decodelookup,
531 531 'pushkey': decodepushkey,
532 532 }
General Comments 0
You need to be logged in to leave comments. Login now