##// END OF EJS Templates
wireprotov2: handle noop action...
Gregory Szorc -
r40169:cfeba1aa default
parent child Browse files
Show More
@@ -1,498 +1,500 b''
1 1 # wireprotov2peer.py - client side code for wire protocol version 2
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import threading
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 encoding,
15 15 error,
16 16 pycompat,
17 17 sslutil,
18 18 url as urlmod,
19 19 util,
20 20 wireprotoframing,
21 21 wireprototypes,
22 22 )
23 23 from .utils import (
24 24 cborutil,
25 25 )
26 26
27 27 def formatrichmessage(atoms):
28 28 """Format an encoded message from the framing protocol."""
29 29
30 30 chunks = []
31 31
32 32 for atom in atoms:
33 33 msg = _(atom[b'msg'])
34 34
35 35 if b'args' in atom:
36 36 msg = msg % tuple(atom[b'args'])
37 37
38 38 chunks.append(msg)
39 39
40 40 return b''.join(chunks)
41 41
42 42 SUPPORTED_REDIRECT_PROTOCOLS = {
43 43 b'http',
44 44 b'https',
45 45 }
46 46
47 47 SUPPORTED_CONTENT_HASHES = {
48 48 b'sha1',
49 49 b'sha256',
50 50 }
51 51
52 52 def redirecttargetsupported(ui, target):
53 53 """Determine whether a redirect target entry is supported.
54 54
55 55 ``target`` should come from the capabilities data structure emitted by
56 56 the server.
57 57 """
58 58 if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
59 59 ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
60 60 % (target[b'name'], target.get(b'protocol', b'')))
61 61 return False
62 62
63 63 if target.get(b'snirequired') and not sslutil.hassni:
64 64 ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
65 65 target[b'name'])
66 66 return False
67 67
68 68 if b'tlsversions' in target:
69 69 tlsversions = set(target[b'tlsversions'])
70 70 supported = set()
71 71
72 72 for v in sslutil.supportedprotocols:
73 73 assert v.startswith(b'tls')
74 74 supported.add(v[3:])
75 75
76 76 if not tlsversions & supported:
77 77 ui.note(_('(remote redirect target %s requires unsupported TLS '
78 78 'versions: %s)\n') % (
79 79 target[b'name'], b', '.join(sorted(tlsversions))))
80 80 return False
81 81
82 82 ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
83 83
84 84 return True
85 85
86 86 def supportedredirects(ui, apidescriptor):
87 87 """Resolve the "redirect" command request key given an API descriptor.
88 88
89 89 Given an API descriptor returned by the server, returns a data structure
90 90 that can be used in hte "redirect" field of command requests to advertise
91 91 support for compatible redirect targets.
92 92
93 93 Returns None if no redirect targets are remotely advertised or if none are
94 94 supported.
95 95 """
96 96 if not apidescriptor or b'redirect' not in apidescriptor:
97 97 return None
98 98
99 99 targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
100 100 if redirecttargetsupported(ui, t)]
101 101
102 102 hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
103 103 if h in SUPPORTED_CONTENT_HASHES]
104 104
105 105 return {
106 106 b'targets': targets,
107 107 b'hashes': hashes,
108 108 }
109 109
110 110 class commandresponse(object):
111 111 """Represents the response to a command request.
112 112
113 113 Instances track the state of the command and hold its results.
114 114
115 115 An external entity is required to update the state of the object when
116 116 events occur.
117 117 """
118 118
119 119 def __init__(self, requestid, command, fromredirect=False):
120 120 self.requestid = requestid
121 121 self.command = command
122 122 self.fromredirect = fromredirect
123 123
124 124 # Whether all remote input related to this command has been
125 125 # received.
126 126 self._inputcomplete = False
127 127
128 128 # We have a lock that is acquired when important object state is
129 129 # mutated. This is to prevent race conditions between 1 thread
130 130 # sending us new data and another consuming it.
131 131 self._lock = threading.RLock()
132 132
133 133 # An event is set when state of the object changes. This event
134 134 # is waited on by the generator emitting objects.
135 135 self._serviceable = threading.Event()
136 136
137 137 self._pendingevents = []
138 138 self._decoder = cborutil.bufferingdecoder()
139 139 self._seeninitial = False
140 140 self._redirect = None
141 141
142 142 def _oninputcomplete(self):
143 143 with self._lock:
144 144 self._inputcomplete = True
145 145 self._serviceable.set()
146 146
147 147 def _onresponsedata(self, data):
148 148 available, readcount, wanted = self._decoder.decode(data)
149 149
150 150 if not available:
151 151 return
152 152
153 153 with self._lock:
154 154 for o in self._decoder.getavailable():
155 155 if not self._seeninitial and not self.fromredirect:
156 156 self._handleinitial(o)
157 157 continue
158 158
159 159 # We should never see an object after a content redirect,
160 160 # as the spec says the main status object containing the
161 161 # content redirect is the only object in the stream. Fail
162 162 # if we see a misbehaving server.
163 163 if self._redirect:
164 164 raise error.Abort(_('received unexpected response data '
165 165 'after content redirect; the remote is '
166 166 'buggy'))
167 167
168 168 self._pendingevents.append(o)
169 169
170 170 self._serviceable.set()
171 171
172 172 def _handleinitial(self, o):
173 173 self._seeninitial = True
174 174 if o[b'status'] == b'ok':
175 175 return
176 176
177 177 elif o[b'status'] == b'redirect':
178 178 l = o[b'location']
179 179 self._redirect = wireprototypes.alternatelocationresponse(
180 180 url=l[b'url'],
181 181 mediatype=l[b'mediatype'],
182 182 size=l.get(b'size'),
183 183 fullhashes=l.get(b'fullhashes'),
184 184 fullhashseed=l.get(b'fullhashseed'),
185 185 serverdercerts=l.get(b'serverdercerts'),
186 186 servercadercerts=l.get(b'servercadercerts'))
187 187 return
188 188
189 189 atoms = [{'msg': o[b'error'][b'message']}]
190 190 if b'args' in o[b'error']:
191 191 atoms[0]['args'] = o[b'error'][b'args']
192 192
193 193 raise error.RepoError(formatrichmessage(atoms))
194 194
195 195 def objects(self):
196 196 """Obtained decoded objects from this response.
197 197
198 198 This is a generator of data structures that were decoded from the
199 199 command response.
200 200
201 201 Obtaining the next member of the generator may block due to waiting
202 202 on external data to become available.
203 203
204 204 If the server encountered an error in the middle of serving the data
205 205 or if another error occurred, an exception may be raised when
206 206 advancing the generator.
207 207 """
208 208 while True:
209 209 # TODO this can infinite loop if self._inputcomplete is never
210 210 # set. We likely want to tie the lifetime of this object/state
211 211 # to that of the background thread receiving frames and updating
212 212 # our state.
213 213 self._serviceable.wait(1.0)
214 214
215 215 with self._lock:
216 216 self._serviceable.clear()
217 217
218 218 # Make copies because objects could be mutated during
219 219 # iteration.
220 220 stop = self._inputcomplete
221 221 pending = list(self._pendingevents)
222 222 self._pendingevents[:] = []
223 223
224 224 for o in pending:
225 225 yield o
226 226
227 227 if stop:
228 228 break
229 229
230 230 class clienthandler(object):
231 231 """Object to handle higher-level client activities.
232 232
233 233 The ``clientreactor`` is used to hold low-level state about the frame-based
234 234 protocol, such as which requests and streams are active. This type is used
235 235 for higher-level operations, such as reading frames from a socket, exposing
236 236 and managing a higher-level primitive for representing command responses,
237 237 etc. This class is what peers should probably use to bridge wire activity
238 238 with the higher-level peer API.
239 239 """
240 240
241 241 def __init__(self, ui, clientreactor, opener=None,
242 242 requestbuilder=util.urlreq.request):
243 243 self._ui = ui
244 244 self._reactor = clientreactor
245 245 self._requests = {}
246 246 self._futures = {}
247 247 self._responses = {}
248 248 self._redirects = []
249 249 self._frameseof = False
250 250 self._opener = opener or urlmod.opener(ui)
251 251 self._requestbuilder = requestbuilder
252 252
253 253 def callcommand(self, command, args, f, redirect=None):
254 254 """Register a request to call a command.
255 255
256 256 Returns an iterable of frames that should be sent over the wire.
257 257 """
258 258 request, action, meta = self._reactor.callcommand(command, args,
259 259 redirect=redirect)
260 260
261 261 if action != 'noop':
262 262 raise error.ProgrammingError('%s not yet supported' % action)
263 263
264 264 rid = request.requestid
265 265 self._requests[rid] = request
266 266 self._futures[rid] = f
267 267 # TODO we need some kind of lifetime on response instances otherwise
268 268 # objects() may deadlock.
269 269 self._responses[rid] = commandresponse(rid, command)
270 270
271 271 return iter(())
272 272
273 273 def flushcommands(self):
274 274 """Flush all queued commands.
275 275
276 276 Returns an iterable of frames that should be sent over the wire.
277 277 """
278 278 action, meta = self._reactor.flushcommands()
279 279
280 280 if action != 'sendframes':
281 281 raise error.ProgrammingError('%s not yet supported' % action)
282 282
283 283 return meta['framegen']
284 284
285 285 def readdata(self, framefh):
286 286 """Attempt to read data and do work.
287 287
288 288 Returns None if no data was read. Presumably this means we're
289 289 done with all read I/O.
290 290 """
291 291 if not self._frameseof:
292 292 frame = wireprotoframing.readframe(framefh)
293 293 if frame is None:
294 294 # TODO tell reactor?
295 295 self._frameseof = True
296 296 else:
297 297 self._ui.note(_('received %r\n') % frame)
298 298 self._processframe(frame)
299 299
300 300 # Also try to read the first redirect.
301 301 if self._redirects:
302 302 if not self._processredirect(*self._redirects[0]):
303 303 self._redirects.pop(0)
304 304
305 305 if self._frameseof and not self._redirects:
306 306 return None
307 307
308 308 return True
309 309
310 310 def _processframe(self, frame):
311 311 """Process a single read frame."""
312 312
313 313 action, meta = self._reactor.onframerecv(frame)
314 314
315 315 if action == 'error':
316 316 e = error.RepoError(meta['message'])
317 317
318 318 if frame.requestid in self._responses:
319 319 self._responses[frame.requestid]._oninputcomplete()
320 320
321 321 if frame.requestid in self._futures:
322 322 self._futures[frame.requestid].set_exception(e)
323 323 del self._futures[frame.requestid]
324 324 else:
325 325 raise e
326 326
327 327 return
328 elif action == 'noop':
329 return
328 330
329 331 if frame.requestid not in self._requests:
330 332 raise error.ProgrammingError(
331 333 'received frame for unknown request; this is either a bug in '
332 334 'the clientreactor not screening for this or this instance was '
333 335 'never told about this request: %r' % frame)
334 336
335 337 response = self._responses[frame.requestid]
336 338
337 339 if action == 'responsedata':
338 340 # Any failures processing this frame should bubble up to the
339 341 # future tracking the request.
340 342 try:
341 343 self._processresponsedata(frame, meta, response)
342 344 except BaseException as e:
343 345 self._futures[frame.requestid].set_exception(e)
344 346 del self._futures[frame.requestid]
345 347 response._oninputcomplete()
346 348 else:
347 349 raise error.ProgrammingError(
348 350 'unhandled action from clientreactor: %s' % action)
349 351
350 352 def _processresponsedata(self, frame, meta, response):
351 353 # This can raise. The caller can handle it.
352 354 response._onresponsedata(meta['data'])
353 355
354 356 # If we got a content redirect response, we want to fetch it and
355 357 # expose the data as if we received it inline. But we also want to
356 358 # keep our internal request accounting in order. Our strategy is to
357 359 # basically put meaningful response handling on pause until EOS occurs
358 360 # and the stream accounting is in a good state. At that point, we follow
359 361 # the redirect and replace the response object with its data.
360 362
361 363 redirect = response._redirect
362 364 handlefuture = False if redirect else True
363 365
364 366 if meta['eos']:
365 367 response._oninputcomplete()
366 368 del self._requests[frame.requestid]
367 369
368 370 if redirect:
369 371 self._followredirect(frame.requestid, redirect)
370 372 return
371 373
372 374 if not handlefuture:
373 375 return
374 376
375 377 # If the command has a decoder, we wait until all input has been
376 378 # received before resolving the future. Otherwise we resolve the
377 379 # future immediately.
378 380 if frame.requestid not in self._futures:
379 381 return
380 382
381 383 if response.command not in COMMAND_DECODERS:
382 384 self._futures[frame.requestid].set_result(response.objects())
383 385 del self._futures[frame.requestid]
384 386 elif response._inputcomplete:
385 387 decoded = COMMAND_DECODERS[response.command](response.objects())
386 388 self._futures[frame.requestid].set_result(decoded)
387 389 del self._futures[frame.requestid]
388 390
389 391 def _followredirect(self, requestid, redirect):
390 392 """Called to initiate redirect following for a request."""
391 393 self._ui.note(_('(following redirect to %s)\n') % redirect.url)
392 394
393 395 # TODO handle framed responses.
394 396 if redirect.mediatype != b'application/mercurial-cbor':
395 397 raise error.Abort(_('cannot handle redirects for the %s media type')
396 398 % redirect.mediatype)
397 399
398 400 if redirect.fullhashes:
399 401 self._ui.warn(_('(support for validating hashes on content '
400 402 'redirects not supported)\n'))
401 403
402 404 if redirect.serverdercerts or redirect.servercadercerts:
403 405 self._ui.warn(_('(support for pinning server certificates on '
404 406 'content redirects not supported)\n'))
405 407
406 408 headers = {
407 409 r'Accept': redirect.mediatype,
408 410 }
409 411
410 412 req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
411 413
412 414 try:
413 415 res = self._opener.open(req)
414 416 except util.urlerr.httperror as e:
415 417 if e.code == 401:
416 418 raise error.Abort(_('authorization failed'))
417 419 raise
418 420 except util.httplib.HTTPException as e:
419 421 self._ui.debug('http error requesting %s\n' % req.get_full_url())
420 422 self._ui.traceback()
421 423 raise IOError(None, e)
422 424
423 425 urlmod.wrapresponse(res)
424 426
425 427 # The existing response object is associated with frame data. Rather
426 428 # than try to normalize its state, just create a new object.
427 429 oldresponse = self._responses[requestid]
428 430 self._responses[requestid] = commandresponse(requestid,
429 431 oldresponse.command,
430 432 fromredirect=True)
431 433
432 434 self._redirects.append((requestid, res))
433 435
434 436 def _processredirect(self, rid, res):
435 437 """Called to continue processing a response from a redirect."""
436 438 response = self._responses[rid]
437 439
438 440 try:
439 441 data = res.read(32768)
440 442 response._onresponsedata(data)
441 443
442 444 # We're at end of stream.
443 445 if not data:
444 446 response._oninputcomplete()
445 447
446 448 if rid not in self._futures:
447 449 return
448 450
449 451 if response.command not in COMMAND_DECODERS:
450 452 self._futures[rid].set_result(response.objects())
451 453 del self._futures[rid]
452 454 elif response._inputcomplete:
453 455 decoded = COMMAND_DECODERS[response.command](response.objects())
454 456 self._futures[rid].set_result(decoded)
455 457 del self._futures[rid]
456 458
457 459 return bool(data)
458 460
459 461 except BaseException as e:
460 462 self._futures[rid].set_exception(e)
461 463 del self._futures[rid]
462 464 response._oninputcomplete()
463 465 return False
464 466
465 467 def decodebranchmap(objs):
466 468 # Response should be a single CBOR map of branch name to array of nodes.
467 469 bm = next(objs)
468 470
469 471 return {encoding.tolocal(k): v for k, v in bm.items()}
470 472
471 473 def decodeheads(objs):
472 474 # Array of node bytestrings.
473 475 return next(objs)
474 476
475 477 def decodeknown(objs):
476 478 # Bytestring where each byte is a 0 or 1.
477 479 raw = next(objs)
478 480
479 481 return [True if c == '1' else False for c in raw]
480 482
481 483 def decodelistkeys(objs):
482 484 # Map with bytestring keys and values.
483 485 return next(objs)
484 486
485 487 def decodelookup(objs):
486 488 return next(objs)
487 489
488 490 def decodepushkey(objs):
489 491 return next(objs)
490 492
491 493 COMMAND_DECODERS = {
492 494 'branchmap': decodebranchmap,
493 495 'heads': decodeheads,
494 496 'known': decodeknown,
495 497 'listkeys': decodelistkeys,
496 498 'lookup': decodelookup,
497 499 'pushkey': decodepushkey,
498 500 }
General Comments 0
You need to be logged in to leave comments. Login now