##// END OF EJS Templates
wireprotov2peer: wait for initial object before resolving future...
Gregory Szorc -
r40790:15a64330 stable
parent child Browse files
Show More
@@ -1,527 +1,532 b''
1 # wireprotov2peer.py - client side code for wire protocol version 2
1 # wireprotov2peer.py - client side code for wire protocol version 2
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import threading
10 import threading
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 encoding,
14 encoding,
15 error,
15 error,
16 pycompat,
16 pycompat,
17 sslutil,
17 sslutil,
18 url as urlmod,
18 url as urlmod,
19 util,
19 util,
20 wireprotoframing,
20 wireprotoframing,
21 wireprototypes,
21 wireprototypes,
22 )
22 )
23 from .utils import (
23 from .utils import (
24 cborutil,
24 cborutil,
25 )
25 )
26
26
27 def formatrichmessage(atoms):
27 def formatrichmessage(atoms):
28 """Format an encoded message from the framing protocol."""
28 """Format an encoded message from the framing protocol."""
29
29
30 chunks = []
30 chunks = []
31
31
32 for atom in atoms:
32 for atom in atoms:
33 msg = _(atom[b'msg'])
33 msg = _(atom[b'msg'])
34
34
35 if b'args' in atom:
35 if b'args' in atom:
36 msg = msg % tuple(atom[b'args'])
36 msg = msg % tuple(atom[b'args'])
37
37
38 chunks.append(msg)
38 chunks.append(msg)
39
39
40 return b''.join(chunks)
40 return b''.join(chunks)
41
41
42 SUPPORTED_REDIRECT_PROTOCOLS = {
42 SUPPORTED_REDIRECT_PROTOCOLS = {
43 b'http',
43 b'http',
44 b'https',
44 b'https',
45 }
45 }
46
46
47 SUPPORTED_CONTENT_HASHES = {
47 SUPPORTED_CONTENT_HASHES = {
48 b'sha1',
48 b'sha1',
49 b'sha256',
49 b'sha256',
50 }
50 }
51
51
52 def redirecttargetsupported(ui, target):
52 def redirecttargetsupported(ui, target):
53 """Determine whether a redirect target entry is supported.
53 """Determine whether a redirect target entry is supported.
54
54
55 ``target`` should come from the capabilities data structure emitted by
55 ``target`` should come from the capabilities data structure emitted by
56 the server.
56 the server.
57 """
57 """
58 if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
58 if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
59 ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
59 ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
60 % (target[b'name'], target.get(b'protocol', b'')))
60 % (target[b'name'], target.get(b'protocol', b'')))
61 return False
61 return False
62
62
63 if target.get(b'snirequired') and not sslutil.hassni:
63 if target.get(b'snirequired') and not sslutil.hassni:
64 ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
64 ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
65 target[b'name'])
65 target[b'name'])
66 return False
66 return False
67
67
68 if b'tlsversions' in target:
68 if b'tlsversions' in target:
69 tlsversions = set(target[b'tlsversions'])
69 tlsversions = set(target[b'tlsversions'])
70 supported = set()
70 supported = set()
71
71
72 for v in sslutil.supportedprotocols:
72 for v in sslutil.supportedprotocols:
73 assert v.startswith(b'tls')
73 assert v.startswith(b'tls')
74 supported.add(v[3:])
74 supported.add(v[3:])
75
75
76 if not tlsversions & supported:
76 if not tlsversions & supported:
77 ui.note(_('(remote redirect target %s requires unsupported TLS '
77 ui.note(_('(remote redirect target %s requires unsupported TLS '
78 'versions: %s)\n') % (
78 'versions: %s)\n') % (
79 target[b'name'], b', '.join(sorted(tlsversions))))
79 target[b'name'], b', '.join(sorted(tlsversions))))
80 return False
80 return False
81
81
82 ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
82 ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
83
83
84 return True
84 return True
85
85
86 def supportedredirects(ui, apidescriptor):
86 def supportedredirects(ui, apidescriptor):
87 """Resolve the "redirect" command request key given an API descriptor.
87 """Resolve the "redirect" command request key given an API descriptor.
88
88
89 Given an API descriptor returned by the server, returns a data structure
89 Given an API descriptor returned by the server, returns a data structure
90 that can be used in hte "redirect" field of command requests to advertise
90 that can be used in hte "redirect" field of command requests to advertise
91 support for compatible redirect targets.
91 support for compatible redirect targets.
92
92
93 Returns None if no redirect targets are remotely advertised or if none are
93 Returns None if no redirect targets are remotely advertised or if none are
94 supported.
94 supported.
95 """
95 """
96 if not apidescriptor or b'redirect' not in apidescriptor:
96 if not apidescriptor or b'redirect' not in apidescriptor:
97 return None
97 return None
98
98
99 targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
99 targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
100 if redirecttargetsupported(ui, t)]
100 if redirecttargetsupported(ui, t)]
101
101
102 hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
102 hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
103 if h in SUPPORTED_CONTENT_HASHES]
103 if h in SUPPORTED_CONTENT_HASHES]
104
104
105 return {
105 return {
106 b'targets': targets,
106 b'targets': targets,
107 b'hashes': hashes,
107 b'hashes': hashes,
108 }
108 }
109
109
110 class commandresponse(object):
110 class commandresponse(object):
111 """Represents the response to a command request.
111 """Represents the response to a command request.
112
112
113 Instances track the state of the command and hold its results.
113 Instances track the state of the command and hold its results.
114
114
115 An external entity is required to update the state of the object when
115 An external entity is required to update the state of the object when
116 events occur.
116 events occur.
117 """
117 """
118
118
119 def __init__(self, requestid, command, fromredirect=False):
119 def __init__(self, requestid, command, fromredirect=False):
120 self.requestid = requestid
120 self.requestid = requestid
121 self.command = command
121 self.command = command
122 self.fromredirect = fromredirect
122 self.fromredirect = fromredirect
123
123
124 # Whether all remote input related to this command has been
124 # Whether all remote input related to this command has been
125 # received.
125 # received.
126 self._inputcomplete = False
126 self._inputcomplete = False
127
127
128 # We have a lock that is acquired when important object state is
128 # We have a lock that is acquired when important object state is
129 # mutated. This is to prevent race conditions between 1 thread
129 # mutated. This is to prevent race conditions between 1 thread
130 # sending us new data and another consuming it.
130 # sending us new data and another consuming it.
131 self._lock = threading.RLock()
131 self._lock = threading.RLock()
132
132
133 # An event is set when state of the object changes. This event
133 # An event is set when state of the object changes. This event
134 # is waited on by the generator emitting objects.
134 # is waited on by the generator emitting objects.
135 self._serviceable = threading.Event()
135 self._serviceable = threading.Event()
136
136
137 self._pendingevents = []
137 self._pendingevents = []
138 self._pendingerror = None
138 self._pendingerror = None
139 self._decoder = cborutil.bufferingdecoder()
139 self._decoder = cborutil.bufferingdecoder()
140 self._seeninitial = False
140 self._seeninitial = False
141 self._redirect = None
141 self._redirect = None
142
142
143 def _oninputcomplete(self):
143 def _oninputcomplete(self):
144 with self._lock:
144 with self._lock:
145 self._inputcomplete = True
145 self._inputcomplete = True
146 self._serviceable.set()
146 self._serviceable.set()
147
147
148 def _onresponsedata(self, data):
148 def _onresponsedata(self, data):
149 available, readcount, wanted = self._decoder.decode(data)
149 available, readcount, wanted = self._decoder.decode(data)
150
150
151 if not available:
151 if not available:
152 return
152 return
153
153
154 with self._lock:
154 with self._lock:
155 for o in self._decoder.getavailable():
155 for o in self._decoder.getavailable():
156 if not self._seeninitial and not self.fromredirect:
156 if not self._seeninitial and not self.fromredirect:
157 self._handleinitial(o)
157 self._handleinitial(o)
158 continue
158 continue
159
159
160 # We should never see an object after a content redirect,
160 # We should never see an object after a content redirect,
161 # as the spec says the main status object containing the
161 # as the spec says the main status object containing the
162 # content redirect is the only object in the stream. Fail
162 # content redirect is the only object in the stream. Fail
163 # if we see a misbehaving server.
163 # if we see a misbehaving server.
164 if self._redirect:
164 if self._redirect:
165 raise error.Abort(_('received unexpected response data '
165 raise error.Abort(_('received unexpected response data '
166 'after content redirect; the remote is '
166 'after content redirect; the remote is '
167 'buggy'))
167 'buggy'))
168
168
169 self._pendingevents.append(o)
169 self._pendingevents.append(o)
170
170
171 self._serviceable.set()
171 self._serviceable.set()
172
172
173 def _onerror(self, e):
173 def _onerror(self, e):
174 self._pendingerror = e
174 self._pendingerror = e
175
175
176 with self._lock:
176 with self._lock:
177 self._serviceable.set()
177 self._serviceable.set()
178
178
179 def _handleinitial(self, o):
179 def _handleinitial(self, o):
180 self._seeninitial = True
180 self._seeninitial = True
181 if o[b'status'] == b'ok':
181 if o[b'status'] == b'ok':
182 return
182 return
183
183
184 elif o[b'status'] == b'redirect':
184 elif o[b'status'] == b'redirect':
185 l = o[b'location']
185 l = o[b'location']
186 self._redirect = wireprototypes.alternatelocationresponse(
186 self._redirect = wireprototypes.alternatelocationresponse(
187 url=l[b'url'],
187 url=l[b'url'],
188 mediatype=l[b'mediatype'],
188 mediatype=l[b'mediatype'],
189 size=l.get(b'size'),
189 size=l.get(b'size'),
190 fullhashes=l.get(b'fullhashes'),
190 fullhashes=l.get(b'fullhashes'),
191 fullhashseed=l.get(b'fullhashseed'),
191 fullhashseed=l.get(b'fullhashseed'),
192 serverdercerts=l.get(b'serverdercerts'),
192 serverdercerts=l.get(b'serverdercerts'),
193 servercadercerts=l.get(b'servercadercerts'))
193 servercadercerts=l.get(b'servercadercerts'))
194 return
194 return
195
195
196 atoms = [{'msg': o[b'error'][b'message']}]
196 atoms = [{'msg': o[b'error'][b'message']}]
197 if b'args' in o[b'error']:
197 if b'args' in o[b'error']:
198 atoms[0]['args'] = o[b'error'][b'args']
198 atoms[0]['args'] = o[b'error'][b'args']
199
199
200 raise error.RepoError(formatrichmessage(atoms))
200 raise error.RepoError(formatrichmessage(atoms))
201
201
202 def objects(self):
202 def objects(self):
203 """Obtained decoded objects from this response.
203 """Obtained decoded objects from this response.
204
204
205 This is a generator of data structures that were decoded from the
205 This is a generator of data structures that were decoded from the
206 command response.
206 command response.
207
207
208 Obtaining the next member of the generator may block due to waiting
208 Obtaining the next member of the generator may block due to waiting
209 on external data to become available.
209 on external data to become available.
210
210
211 If the server encountered an error in the middle of serving the data
211 If the server encountered an error in the middle of serving the data
212 or if another error occurred, an exception may be raised when
212 or if another error occurred, an exception may be raised when
213 advancing the generator.
213 advancing the generator.
214 """
214 """
215 while True:
215 while True:
216 # TODO this can infinite loop if self._inputcomplete is never
216 # TODO this can infinite loop if self._inputcomplete is never
217 # set. We likely want to tie the lifetime of this object/state
217 # set. We likely want to tie the lifetime of this object/state
218 # to that of the background thread receiving frames and updating
218 # to that of the background thread receiving frames and updating
219 # our state.
219 # our state.
220 self._serviceable.wait(1.0)
220 self._serviceable.wait(1.0)
221
221
222 if self._pendingerror:
222 if self._pendingerror:
223 raise self._pendingerror
223 raise self._pendingerror
224
224
225 with self._lock:
225 with self._lock:
226 self._serviceable.clear()
226 self._serviceable.clear()
227
227
228 # Make copies because objects could be mutated during
228 # Make copies because objects could be mutated during
229 # iteration.
229 # iteration.
230 stop = self._inputcomplete
230 stop = self._inputcomplete
231 pending = list(self._pendingevents)
231 pending = list(self._pendingevents)
232 self._pendingevents[:] = []
232 self._pendingevents[:] = []
233
233
234 for o in pending:
234 for o in pending:
235 yield o
235 yield o
236
236
237 if stop:
237 if stop:
238 break
238 break
239
239
240 class clienthandler(object):
240 class clienthandler(object):
241 """Object to handle higher-level client activities.
241 """Object to handle higher-level client activities.
242
242
243 The ``clientreactor`` is used to hold low-level state about the frame-based
243 The ``clientreactor`` is used to hold low-level state about the frame-based
244 protocol, such as which requests and streams are active. This type is used
244 protocol, such as which requests and streams are active. This type is used
245 for higher-level operations, such as reading frames from a socket, exposing
245 for higher-level operations, such as reading frames from a socket, exposing
246 and managing a higher-level primitive for representing command responses,
246 and managing a higher-level primitive for representing command responses,
247 etc. This class is what peers should probably use to bridge wire activity
247 etc. This class is what peers should probably use to bridge wire activity
248 with the higher-level peer API.
248 with the higher-level peer API.
249 """
249 """
250
250
251 def __init__(self, ui, clientreactor, opener=None,
251 def __init__(self, ui, clientreactor, opener=None,
252 requestbuilder=util.urlreq.request):
252 requestbuilder=util.urlreq.request):
253 self._ui = ui
253 self._ui = ui
254 self._reactor = clientreactor
254 self._reactor = clientreactor
255 self._requests = {}
255 self._requests = {}
256 self._futures = {}
256 self._futures = {}
257 self._responses = {}
257 self._responses = {}
258 self._redirects = []
258 self._redirects = []
259 self._frameseof = False
259 self._frameseof = False
260 self._opener = opener or urlmod.opener(ui)
260 self._opener = opener or urlmod.opener(ui)
261 self._requestbuilder = requestbuilder
261 self._requestbuilder = requestbuilder
262
262
263 def callcommand(self, command, args, f, redirect=None):
263 def callcommand(self, command, args, f, redirect=None):
264 """Register a request to call a command.
264 """Register a request to call a command.
265
265
266 Returns an iterable of frames that should be sent over the wire.
266 Returns an iterable of frames that should be sent over the wire.
267 """
267 """
268 request, action, meta = self._reactor.callcommand(command, args,
268 request, action, meta = self._reactor.callcommand(command, args,
269 redirect=redirect)
269 redirect=redirect)
270
270
271 if action != 'noop':
271 if action != 'noop':
272 raise error.ProgrammingError('%s not yet supported' % action)
272 raise error.ProgrammingError('%s not yet supported' % action)
273
273
274 rid = request.requestid
274 rid = request.requestid
275 self._requests[rid] = request
275 self._requests[rid] = request
276 self._futures[rid] = f
276 self._futures[rid] = f
277 # TODO we need some kind of lifetime on response instances otherwise
277 # TODO we need some kind of lifetime on response instances otherwise
278 # objects() may deadlock.
278 # objects() may deadlock.
279 self._responses[rid] = commandresponse(rid, command)
279 self._responses[rid] = commandresponse(rid, command)
280
280
281 return iter(())
281 return iter(())
282
282
283 def flushcommands(self):
283 def flushcommands(self):
284 """Flush all queued commands.
284 """Flush all queued commands.
285
285
286 Returns an iterable of frames that should be sent over the wire.
286 Returns an iterable of frames that should be sent over the wire.
287 """
287 """
288 action, meta = self._reactor.flushcommands()
288 action, meta = self._reactor.flushcommands()
289
289
290 if action != 'sendframes':
290 if action != 'sendframes':
291 raise error.ProgrammingError('%s not yet supported' % action)
291 raise error.ProgrammingError('%s not yet supported' % action)
292
292
293 return meta['framegen']
293 return meta['framegen']
294
294
295 def readdata(self, framefh):
295 def readdata(self, framefh):
296 """Attempt to read data and do work.
296 """Attempt to read data and do work.
297
297
298 Returns None if no data was read. Presumably this means we're
298 Returns None if no data was read. Presumably this means we're
299 done with all read I/O.
299 done with all read I/O.
300 """
300 """
301 if not self._frameseof:
301 if not self._frameseof:
302 frame = wireprotoframing.readframe(framefh)
302 frame = wireprotoframing.readframe(framefh)
303 if frame is None:
303 if frame is None:
304 # TODO tell reactor?
304 # TODO tell reactor?
305 self._frameseof = True
305 self._frameseof = True
306 else:
306 else:
307 self._ui.note(_('received %r\n') % frame)
307 self._ui.note(_('received %r\n') % frame)
308 self._processframe(frame)
308 self._processframe(frame)
309
309
310 # Also try to read the first redirect.
310 # Also try to read the first redirect.
311 if self._redirects:
311 if self._redirects:
312 if not self._processredirect(*self._redirects[0]):
312 if not self._processredirect(*self._redirects[0]):
313 self._redirects.pop(0)
313 self._redirects.pop(0)
314
314
315 if self._frameseof and not self._redirects:
315 if self._frameseof and not self._redirects:
316 return None
316 return None
317
317
318 return True
318 return True
319
319
320 def _processframe(self, frame):
320 def _processframe(self, frame):
321 """Process a single read frame."""
321 """Process a single read frame."""
322
322
323 action, meta = self._reactor.onframerecv(frame)
323 action, meta = self._reactor.onframerecv(frame)
324
324
325 if action == 'error':
325 if action == 'error':
326 e = error.RepoError(meta['message'])
326 e = error.RepoError(meta['message'])
327
327
328 if frame.requestid in self._responses:
328 if frame.requestid in self._responses:
329 self._responses[frame.requestid]._oninputcomplete()
329 self._responses[frame.requestid]._oninputcomplete()
330
330
331 if frame.requestid in self._futures:
331 if frame.requestid in self._futures:
332 self._futures[frame.requestid].set_exception(e)
332 self._futures[frame.requestid].set_exception(e)
333 del self._futures[frame.requestid]
333 del self._futures[frame.requestid]
334 else:
334 else:
335 raise e
335 raise e
336
336
337 return
337 return
338 elif action == 'noop':
338 elif action == 'noop':
339 return
339 return
340 elif action == 'responsedata':
340 elif action == 'responsedata':
341 # Handled below.
341 # Handled below.
342 pass
342 pass
343 else:
343 else:
344 raise error.ProgrammingError('action not handled: %s' % action)
344 raise error.ProgrammingError('action not handled: %s' % action)
345
345
346 if frame.requestid not in self._requests:
346 if frame.requestid not in self._requests:
347 raise error.ProgrammingError(
347 raise error.ProgrammingError(
348 'received frame for unknown request; this is either a bug in '
348 'received frame for unknown request; this is either a bug in '
349 'the clientreactor not screening for this or this instance was '
349 'the clientreactor not screening for this or this instance was '
350 'never told about this request: %r' % frame)
350 'never told about this request: %r' % frame)
351
351
352 response = self._responses[frame.requestid]
352 response = self._responses[frame.requestid]
353
353
354 if action == 'responsedata':
354 if action == 'responsedata':
355 # Any failures processing this frame should bubble up to the
355 # Any failures processing this frame should bubble up to the
356 # future tracking the request.
356 # future tracking the request.
357 try:
357 try:
358 self._processresponsedata(frame, meta, response)
358 self._processresponsedata(frame, meta, response)
359 except BaseException as e:
359 except BaseException as e:
360 # If an exception occurs before the future is resolved,
360 # If an exception occurs before the future is resolved,
361 # fail the future. Otherwise, we stuff the exception on
361 # fail the future. Otherwise, we stuff the exception on
362 # the response object so it can be raised during objects()
362 # the response object so it can be raised during objects()
363 # iteration. If nothing is consuming objects(), we could
363 # iteration. If nothing is consuming objects(), we could
364 # silently swallow this exception. That's a risk we'll have to
364 # silently swallow this exception. That's a risk we'll have to
365 # take.
365 # take.
366 if frame.requestid in self._futures:
366 if frame.requestid in self._futures:
367 self._futures[frame.requestid].set_exception(e)
367 self._futures[frame.requestid].set_exception(e)
368 del self._futures[frame.requestid]
368 del self._futures[frame.requestid]
369 response._oninputcomplete()
369 response._oninputcomplete()
370 else:
370 else:
371 response._onerror(e)
371 response._onerror(e)
372 else:
372 else:
373 raise error.ProgrammingError(
373 raise error.ProgrammingError(
374 'unhandled action from clientreactor: %s' % action)
374 'unhandled action from clientreactor: %s' % action)
375
375
376 def _processresponsedata(self, frame, meta, response):
376 def _processresponsedata(self, frame, meta, response):
377 # This can raise. The caller can handle it.
377 # This can raise. The caller can handle it.
378 response._onresponsedata(meta['data'])
378 response._onresponsedata(meta['data'])
379
379
380 # If we got a content redirect response, we want to fetch it and
380 # We need to be careful about resolving futures prematurely. If a
381 # expose the data as if we received it inline. But we also want to
381 # response is a redirect response, resolving the future before the
382 # keep our internal request accounting in order. Our strategy is to
382 # redirect is processed would result in the consumer seeing an
383 # basically put meaningful response handling on pause until EOS occurs
383 # empty stream of objects, since they'd be consuming our
384 # and the stream accounting is in a good state. At that point, we follow
384 # response.objects() instead of the redirect's response.objects().
385 # the redirect and replace the response object with its data.
385 #
386 # Our strategy is to not resolve/finish the request until either
387 # EOS occurs or until the initial response object is fully received.
386
388
387 redirect = response._redirect
389 # Always react to eos.
388 handlefuture = False if redirect else True
389
390 if meta['eos']:
390 if meta['eos']:
391 response._oninputcomplete()
391 response._oninputcomplete()
392 del self._requests[frame.requestid]
392 del self._requests[frame.requestid]
393
393
394 if redirect:
394 # Not EOS but we haven't decoded the initial response object yet.
395 self._followredirect(frame.requestid, redirect)
395 # Return and wait for more data.
396 return
396 elif not response._seeninitial:
397 return
397
398
398 if not handlefuture:
399 # The specification says no objects should follow the initial/redirect
400 # object. So it should be safe to handle the redirect object if one is
401 # decoded, without having to wait for EOS.
402 if response._redirect:
403 self._followredirect(frame.requestid, response._redirect)
399 return
404 return
400
405
401 # If the command has a decoder, we wait until all input has been
406 # If the command has a decoder, we wait until all input has been
402 # received before resolving the future. Otherwise we resolve the
407 # received before resolving the future. Otherwise we resolve the
403 # future immediately.
408 # future immediately.
404 if frame.requestid not in self._futures:
409 if frame.requestid not in self._futures:
405 return
410 return
406
411
407 if response.command not in COMMAND_DECODERS:
412 if response.command not in COMMAND_DECODERS:
408 self._futures[frame.requestid].set_result(response.objects())
413 self._futures[frame.requestid].set_result(response.objects())
409 del self._futures[frame.requestid]
414 del self._futures[frame.requestid]
410 elif response._inputcomplete:
415 elif response._inputcomplete:
411 decoded = COMMAND_DECODERS[response.command](response.objects())
416 decoded = COMMAND_DECODERS[response.command](response.objects())
412 self._futures[frame.requestid].set_result(decoded)
417 self._futures[frame.requestid].set_result(decoded)
413 del self._futures[frame.requestid]
418 del self._futures[frame.requestid]
414
419
415 def _followredirect(self, requestid, redirect):
420 def _followredirect(self, requestid, redirect):
416 """Called to initiate redirect following for a request."""
421 """Called to initiate redirect following for a request."""
417 self._ui.note(_('(following redirect to %s)\n') % redirect.url)
422 self._ui.note(_('(following redirect to %s)\n') % redirect.url)
418
423
419 # TODO handle framed responses.
424 # TODO handle framed responses.
420 if redirect.mediatype != b'application/mercurial-cbor':
425 if redirect.mediatype != b'application/mercurial-cbor':
421 raise error.Abort(_('cannot handle redirects for the %s media type')
426 raise error.Abort(_('cannot handle redirects for the %s media type')
422 % redirect.mediatype)
427 % redirect.mediatype)
423
428
424 if redirect.fullhashes:
429 if redirect.fullhashes:
425 self._ui.warn(_('(support for validating hashes on content '
430 self._ui.warn(_('(support for validating hashes on content '
426 'redirects not supported)\n'))
431 'redirects not supported)\n'))
427
432
428 if redirect.serverdercerts or redirect.servercadercerts:
433 if redirect.serverdercerts or redirect.servercadercerts:
429 self._ui.warn(_('(support for pinning server certificates on '
434 self._ui.warn(_('(support for pinning server certificates on '
430 'content redirects not supported)\n'))
435 'content redirects not supported)\n'))
431
436
432 headers = {
437 headers = {
433 r'Accept': redirect.mediatype,
438 r'Accept': redirect.mediatype,
434 }
439 }
435
440
436 req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
441 req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
437
442
438 try:
443 try:
439 res = self._opener.open(req)
444 res = self._opener.open(req)
440 except util.urlerr.httperror as e:
445 except util.urlerr.httperror as e:
441 if e.code == 401:
446 if e.code == 401:
442 raise error.Abort(_('authorization failed'))
447 raise error.Abort(_('authorization failed'))
443 raise
448 raise
444 except util.httplib.HTTPException as e:
449 except util.httplib.HTTPException as e:
445 self._ui.debug('http error requesting %s\n' % req.get_full_url())
450 self._ui.debug('http error requesting %s\n' % req.get_full_url())
446 self._ui.traceback()
451 self._ui.traceback()
447 raise IOError(None, e)
452 raise IOError(None, e)
448
453
449 urlmod.wrapresponse(res)
454 urlmod.wrapresponse(res)
450
455
451 # The existing response object is associated with frame data. Rather
456 # The existing response object is associated with frame data. Rather
452 # than try to normalize its state, just create a new object.
457 # than try to normalize its state, just create a new object.
453 oldresponse = self._responses[requestid]
458 oldresponse = self._responses[requestid]
454 self._responses[requestid] = commandresponse(requestid,
459 self._responses[requestid] = commandresponse(requestid,
455 oldresponse.command,
460 oldresponse.command,
456 fromredirect=True)
461 fromredirect=True)
457
462
458 self._redirects.append((requestid, res))
463 self._redirects.append((requestid, res))
459
464
460 def _processredirect(self, rid, res):
465 def _processredirect(self, rid, res):
461 """Called to continue processing a response from a redirect.
466 """Called to continue processing a response from a redirect.
462
467
463 Returns a bool indicating if the redirect is still serviceable.
468 Returns a bool indicating if the redirect is still serviceable.
464 """
469 """
465 response = self._responses[rid]
470 response = self._responses[rid]
466
471
467 try:
472 try:
468 data = res.read(32768)
473 data = res.read(32768)
469 response._onresponsedata(data)
474 response._onresponsedata(data)
470
475
471 # We're at end of stream.
476 # We're at end of stream.
472 if not data:
477 if not data:
473 response._oninputcomplete()
478 response._oninputcomplete()
474
479
475 if rid not in self._futures:
480 if rid not in self._futures:
476 return bool(data)
481 return bool(data)
477
482
478 if response.command not in COMMAND_DECODERS:
483 if response.command not in COMMAND_DECODERS:
479 self._futures[rid].set_result(response.objects())
484 self._futures[rid].set_result(response.objects())
480 del self._futures[rid]
485 del self._futures[rid]
481 elif response._inputcomplete:
486 elif response._inputcomplete:
482 decoded = COMMAND_DECODERS[response.command](response.objects())
487 decoded = COMMAND_DECODERS[response.command](response.objects())
483 self._futures[rid].set_result(decoded)
488 self._futures[rid].set_result(decoded)
484 del self._futures[rid]
489 del self._futures[rid]
485
490
486 return bool(data)
491 return bool(data)
487
492
488 except BaseException as e:
493 except BaseException as e:
489 self._futures[rid].set_exception(e)
494 self._futures[rid].set_exception(e)
490 del self._futures[rid]
495 del self._futures[rid]
491 response._oninputcomplete()
496 response._oninputcomplete()
492 return False
497 return False
493
498
494 def decodebranchmap(objs):
499 def decodebranchmap(objs):
495 # Response should be a single CBOR map of branch name to array of nodes.
500 # Response should be a single CBOR map of branch name to array of nodes.
496 bm = next(objs)
501 bm = next(objs)
497
502
498 return {encoding.tolocal(k): v for k, v in bm.items()}
503 return {encoding.tolocal(k): v for k, v in bm.items()}
499
504
500 def decodeheads(objs):
505 def decodeheads(objs):
501 # Array of node bytestrings.
506 # Array of node bytestrings.
502 return next(objs)
507 return next(objs)
503
508
504 def decodeknown(objs):
509 def decodeknown(objs):
505 # Bytestring where each byte is a 0 or 1.
510 # Bytestring where each byte is a 0 or 1.
506 raw = next(objs)
511 raw = next(objs)
507
512
508 return [True if c == '1' else False for c in raw]
513 return [True if c == '1' else False for c in raw]
509
514
510 def decodelistkeys(objs):
515 def decodelistkeys(objs):
511 # Map with bytestring keys and values.
516 # Map with bytestring keys and values.
512 return next(objs)
517 return next(objs)
513
518
514 def decodelookup(objs):
519 def decodelookup(objs):
515 return next(objs)
520 return next(objs)
516
521
517 def decodepushkey(objs):
522 def decodepushkey(objs):
518 return next(objs)
523 return next(objs)
519
524
520 COMMAND_DECODERS = {
525 COMMAND_DECODERS = {
521 'branchmap': decodebranchmap,
526 'branchmap': decodebranchmap,
522 'heads': decodeheads,
527 'heads': decodeheads,
523 'known': decodeknown,
528 'known': decodeknown,
524 'listkeys': decodelistkeys,
529 'listkeys': decodelistkeys,
525 'lookup': decodelookup,
530 'lookup': decodelookup,
526 'pushkey': decodepushkey,
531 'pushkey': decodepushkey,
527 }
532 }
General Comments 0
You need to be logged in to leave comments. Login now