##// END OF EJS Templates
wireprotov2peer: always return a bool from _processredirect()...
Gregory Szorc -
r40789:94b0d0f9 stable
parent child Browse files
Show More
@@ -1,524 +1,527 b''
1 # wireprotov2peer.py - client side code for wire protocol version 2
1 # wireprotov2peer.py - client side code for wire protocol version 2
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import threading
10 import threading
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 encoding,
14 encoding,
15 error,
15 error,
16 pycompat,
16 pycompat,
17 sslutil,
17 sslutil,
18 url as urlmod,
18 url as urlmod,
19 util,
19 util,
20 wireprotoframing,
20 wireprotoframing,
21 wireprototypes,
21 wireprototypes,
22 )
22 )
23 from .utils import (
23 from .utils import (
24 cborutil,
24 cborutil,
25 )
25 )
26
26
27 def formatrichmessage(atoms):
27 def formatrichmessage(atoms):
28 """Format an encoded message from the framing protocol."""
28 """Format an encoded message from the framing protocol."""
29
29
30 chunks = []
30 chunks = []
31
31
32 for atom in atoms:
32 for atom in atoms:
33 msg = _(atom[b'msg'])
33 msg = _(atom[b'msg'])
34
34
35 if b'args' in atom:
35 if b'args' in atom:
36 msg = msg % tuple(atom[b'args'])
36 msg = msg % tuple(atom[b'args'])
37
37
38 chunks.append(msg)
38 chunks.append(msg)
39
39
40 return b''.join(chunks)
40 return b''.join(chunks)
41
41
42 SUPPORTED_REDIRECT_PROTOCOLS = {
42 SUPPORTED_REDIRECT_PROTOCOLS = {
43 b'http',
43 b'http',
44 b'https',
44 b'https',
45 }
45 }
46
46
47 SUPPORTED_CONTENT_HASHES = {
47 SUPPORTED_CONTENT_HASHES = {
48 b'sha1',
48 b'sha1',
49 b'sha256',
49 b'sha256',
50 }
50 }
51
51
52 def redirecttargetsupported(ui, target):
52 def redirecttargetsupported(ui, target):
53 """Determine whether a redirect target entry is supported.
53 """Determine whether a redirect target entry is supported.
54
54
55 ``target`` should come from the capabilities data structure emitted by
55 ``target`` should come from the capabilities data structure emitted by
56 the server.
56 the server.
57 """
57 """
58 if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
58 if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
59 ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
59 ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
60 % (target[b'name'], target.get(b'protocol', b'')))
60 % (target[b'name'], target.get(b'protocol', b'')))
61 return False
61 return False
62
62
63 if target.get(b'snirequired') and not sslutil.hassni:
63 if target.get(b'snirequired') and not sslutil.hassni:
64 ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
64 ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
65 target[b'name'])
65 target[b'name'])
66 return False
66 return False
67
67
68 if b'tlsversions' in target:
68 if b'tlsversions' in target:
69 tlsversions = set(target[b'tlsversions'])
69 tlsversions = set(target[b'tlsversions'])
70 supported = set()
70 supported = set()
71
71
72 for v in sslutil.supportedprotocols:
72 for v in sslutil.supportedprotocols:
73 assert v.startswith(b'tls')
73 assert v.startswith(b'tls')
74 supported.add(v[3:])
74 supported.add(v[3:])
75
75
76 if not tlsversions & supported:
76 if not tlsversions & supported:
77 ui.note(_('(remote redirect target %s requires unsupported TLS '
77 ui.note(_('(remote redirect target %s requires unsupported TLS '
78 'versions: %s)\n') % (
78 'versions: %s)\n') % (
79 target[b'name'], b', '.join(sorted(tlsversions))))
79 target[b'name'], b', '.join(sorted(tlsversions))))
80 return False
80 return False
81
81
82 ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
82 ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
83
83
84 return True
84 return True
85
85
86 def supportedredirects(ui, apidescriptor):
86 def supportedredirects(ui, apidescriptor):
87 """Resolve the "redirect" command request key given an API descriptor.
87 """Resolve the "redirect" command request key given an API descriptor.
88
88
89 Given an API descriptor returned by the server, returns a data structure
89 Given an API descriptor returned by the server, returns a data structure
90 that can be used in hte "redirect" field of command requests to advertise
90 that can be used in hte "redirect" field of command requests to advertise
91 support for compatible redirect targets.
91 support for compatible redirect targets.
92
92
93 Returns None if no redirect targets are remotely advertised or if none are
93 Returns None if no redirect targets are remotely advertised or if none are
94 supported.
94 supported.
95 """
95 """
96 if not apidescriptor or b'redirect' not in apidescriptor:
96 if not apidescriptor or b'redirect' not in apidescriptor:
97 return None
97 return None
98
98
99 targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
99 targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
100 if redirecttargetsupported(ui, t)]
100 if redirecttargetsupported(ui, t)]
101
101
102 hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
102 hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
103 if h in SUPPORTED_CONTENT_HASHES]
103 if h in SUPPORTED_CONTENT_HASHES]
104
104
105 return {
105 return {
106 b'targets': targets,
106 b'targets': targets,
107 b'hashes': hashes,
107 b'hashes': hashes,
108 }
108 }
109
109
110 class commandresponse(object):
110 class commandresponse(object):
111 """Represents the response to a command request.
111 """Represents the response to a command request.
112
112
113 Instances track the state of the command and hold its results.
113 Instances track the state of the command and hold its results.
114
114
115 An external entity is required to update the state of the object when
115 An external entity is required to update the state of the object when
116 events occur.
116 events occur.
117 """
117 """
118
118
119 def __init__(self, requestid, command, fromredirect=False):
119 def __init__(self, requestid, command, fromredirect=False):
120 self.requestid = requestid
120 self.requestid = requestid
121 self.command = command
121 self.command = command
122 self.fromredirect = fromredirect
122 self.fromredirect = fromredirect
123
123
124 # Whether all remote input related to this command has been
124 # Whether all remote input related to this command has been
125 # received.
125 # received.
126 self._inputcomplete = False
126 self._inputcomplete = False
127
127
128 # We have a lock that is acquired when important object state is
128 # We have a lock that is acquired when important object state is
129 # mutated. This is to prevent race conditions between 1 thread
129 # mutated. This is to prevent race conditions between 1 thread
130 # sending us new data and another consuming it.
130 # sending us new data and another consuming it.
131 self._lock = threading.RLock()
131 self._lock = threading.RLock()
132
132
133 # An event is set when state of the object changes. This event
133 # An event is set when state of the object changes. This event
134 # is waited on by the generator emitting objects.
134 # is waited on by the generator emitting objects.
135 self._serviceable = threading.Event()
135 self._serviceable = threading.Event()
136
136
137 self._pendingevents = []
137 self._pendingevents = []
138 self._pendingerror = None
138 self._pendingerror = None
139 self._decoder = cborutil.bufferingdecoder()
139 self._decoder = cborutil.bufferingdecoder()
140 self._seeninitial = False
140 self._seeninitial = False
141 self._redirect = None
141 self._redirect = None
142
142
143 def _oninputcomplete(self):
143 def _oninputcomplete(self):
144 with self._lock:
144 with self._lock:
145 self._inputcomplete = True
145 self._inputcomplete = True
146 self._serviceable.set()
146 self._serviceable.set()
147
147
148 def _onresponsedata(self, data):
148 def _onresponsedata(self, data):
149 available, readcount, wanted = self._decoder.decode(data)
149 available, readcount, wanted = self._decoder.decode(data)
150
150
151 if not available:
151 if not available:
152 return
152 return
153
153
154 with self._lock:
154 with self._lock:
155 for o in self._decoder.getavailable():
155 for o in self._decoder.getavailable():
156 if not self._seeninitial and not self.fromredirect:
156 if not self._seeninitial and not self.fromredirect:
157 self._handleinitial(o)
157 self._handleinitial(o)
158 continue
158 continue
159
159
160 # We should never see an object after a content redirect,
160 # We should never see an object after a content redirect,
161 # as the spec says the main status object containing the
161 # as the spec says the main status object containing the
162 # content redirect is the only object in the stream. Fail
162 # content redirect is the only object in the stream. Fail
163 # if we see a misbehaving server.
163 # if we see a misbehaving server.
164 if self._redirect:
164 if self._redirect:
165 raise error.Abort(_('received unexpected response data '
165 raise error.Abort(_('received unexpected response data '
166 'after content redirect; the remote is '
166 'after content redirect; the remote is '
167 'buggy'))
167 'buggy'))
168
168
169 self._pendingevents.append(o)
169 self._pendingevents.append(o)
170
170
171 self._serviceable.set()
171 self._serviceable.set()
172
172
173 def _onerror(self, e):
173 def _onerror(self, e):
174 self._pendingerror = e
174 self._pendingerror = e
175
175
176 with self._lock:
176 with self._lock:
177 self._serviceable.set()
177 self._serviceable.set()
178
178
179 def _handleinitial(self, o):
179 def _handleinitial(self, o):
180 self._seeninitial = True
180 self._seeninitial = True
181 if o[b'status'] == b'ok':
181 if o[b'status'] == b'ok':
182 return
182 return
183
183
184 elif o[b'status'] == b'redirect':
184 elif o[b'status'] == b'redirect':
185 l = o[b'location']
185 l = o[b'location']
186 self._redirect = wireprototypes.alternatelocationresponse(
186 self._redirect = wireprototypes.alternatelocationresponse(
187 url=l[b'url'],
187 url=l[b'url'],
188 mediatype=l[b'mediatype'],
188 mediatype=l[b'mediatype'],
189 size=l.get(b'size'),
189 size=l.get(b'size'),
190 fullhashes=l.get(b'fullhashes'),
190 fullhashes=l.get(b'fullhashes'),
191 fullhashseed=l.get(b'fullhashseed'),
191 fullhashseed=l.get(b'fullhashseed'),
192 serverdercerts=l.get(b'serverdercerts'),
192 serverdercerts=l.get(b'serverdercerts'),
193 servercadercerts=l.get(b'servercadercerts'))
193 servercadercerts=l.get(b'servercadercerts'))
194 return
194 return
195
195
196 atoms = [{'msg': o[b'error'][b'message']}]
196 atoms = [{'msg': o[b'error'][b'message']}]
197 if b'args' in o[b'error']:
197 if b'args' in o[b'error']:
198 atoms[0]['args'] = o[b'error'][b'args']
198 atoms[0]['args'] = o[b'error'][b'args']
199
199
200 raise error.RepoError(formatrichmessage(atoms))
200 raise error.RepoError(formatrichmessage(atoms))
201
201
202 def objects(self):
202 def objects(self):
203 """Obtained decoded objects from this response.
203 """Obtained decoded objects from this response.
204
204
205 This is a generator of data structures that were decoded from the
205 This is a generator of data structures that were decoded from the
206 command response.
206 command response.
207
207
208 Obtaining the next member of the generator may block due to waiting
208 Obtaining the next member of the generator may block due to waiting
209 on external data to become available.
209 on external data to become available.
210
210
211 If the server encountered an error in the middle of serving the data
211 If the server encountered an error in the middle of serving the data
212 or if another error occurred, an exception may be raised when
212 or if another error occurred, an exception may be raised when
213 advancing the generator.
213 advancing the generator.
214 """
214 """
215 while True:
215 while True:
216 # TODO this can infinite loop if self._inputcomplete is never
216 # TODO this can infinite loop if self._inputcomplete is never
217 # set. We likely want to tie the lifetime of this object/state
217 # set. We likely want to tie the lifetime of this object/state
218 # to that of the background thread receiving frames and updating
218 # to that of the background thread receiving frames and updating
219 # our state.
219 # our state.
220 self._serviceable.wait(1.0)
220 self._serviceable.wait(1.0)
221
221
222 if self._pendingerror:
222 if self._pendingerror:
223 raise self._pendingerror
223 raise self._pendingerror
224
224
225 with self._lock:
225 with self._lock:
226 self._serviceable.clear()
226 self._serviceable.clear()
227
227
228 # Make copies because objects could be mutated during
228 # Make copies because objects could be mutated during
229 # iteration.
229 # iteration.
230 stop = self._inputcomplete
230 stop = self._inputcomplete
231 pending = list(self._pendingevents)
231 pending = list(self._pendingevents)
232 self._pendingevents[:] = []
232 self._pendingevents[:] = []
233
233
234 for o in pending:
234 for o in pending:
235 yield o
235 yield o
236
236
237 if stop:
237 if stop:
238 break
238 break
239
239
240 class clienthandler(object):
240 class clienthandler(object):
241 """Object to handle higher-level client activities.
241 """Object to handle higher-level client activities.
242
242
243 The ``clientreactor`` is used to hold low-level state about the frame-based
243 The ``clientreactor`` is used to hold low-level state about the frame-based
244 protocol, such as which requests and streams are active. This type is used
244 protocol, such as which requests and streams are active. This type is used
245 for higher-level operations, such as reading frames from a socket, exposing
245 for higher-level operations, such as reading frames from a socket, exposing
246 and managing a higher-level primitive for representing command responses,
246 and managing a higher-level primitive for representing command responses,
247 etc. This class is what peers should probably use to bridge wire activity
247 etc. This class is what peers should probably use to bridge wire activity
248 with the higher-level peer API.
248 with the higher-level peer API.
249 """
249 """
250
250
251 def __init__(self, ui, clientreactor, opener=None,
251 def __init__(self, ui, clientreactor, opener=None,
252 requestbuilder=util.urlreq.request):
252 requestbuilder=util.urlreq.request):
253 self._ui = ui
253 self._ui = ui
254 self._reactor = clientreactor
254 self._reactor = clientreactor
255 self._requests = {}
255 self._requests = {}
256 self._futures = {}
256 self._futures = {}
257 self._responses = {}
257 self._responses = {}
258 self._redirects = []
258 self._redirects = []
259 self._frameseof = False
259 self._frameseof = False
260 self._opener = opener or urlmod.opener(ui)
260 self._opener = opener or urlmod.opener(ui)
261 self._requestbuilder = requestbuilder
261 self._requestbuilder = requestbuilder
262
262
263 def callcommand(self, command, args, f, redirect=None):
263 def callcommand(self, command, args, f, redirect=None):
264 """Register a request to call a command.
264 """Register a request to call a command.
265
265
266 Returns an iterable of frames that should be sent over the wire.
266 Returns an iterable of frames that should be sent over the wire.
267 """
267 """
268 request, action, meta = self._reactor.callcommand(command, args,
268 request, action, meta = self._reactor.callcommand(command, args,
269 redirect=redirect)
269 redirect=redirect)
270
270
271 if action != 'noop':
271 if action != 'noop':
272 raise error.ProgrammingError('%s not yet supported' % action)
272 raise error.ProgrammingError('%s not yet supported' % action)
273
273
274 rid = request.requestid
274 rid = request.requestid
275 self._requests[rid] = request
275 self._requests[rid] = request
276 self._futures[rid] = f
276 self._futures[rid] = f
277 # TODO we need some kind of lifetime on response instances otherwise
277 # TODO we need some kind of lifetime on response instances otherwise
278 # objects() may deadlock.
278 # objects() may deadlock.
279 self._responses[rid] = commandresponse(rid, command)
279 self._responses[rid] = commandresponse(rid, command)
280
280
281 return iter(())
281 return iter(())
282
282
283 def flushcommands(self):
283 def flushcommands(self):
284 """Flush all queued commands.
284 """Flush all queued commands.
285
285
286 Returns an iterable of frames that should be sent over the wire.
286 Returns an iterable of frames that should be sent over the wire.
287 """
287 """
288 action, meta = self._reactor.flushcommands()
288 action, meta = self._reactor.flushcommands()
289
289
290 if action != 'sendframes':
290 if action != 'sendframes':
291 raise error.ProgrammingError('%s not yet supported' % action)
291 raise error.ProgrammingError('%s not yet supported' % action)
292
292
293 return meta['framegen']
293 return meta['framegen']
294
294
295 def readdata(self, framefh):
295 def readdata(self, framefh):
296 """Attempt to read data and do work.
296 """Attempt to read data and do work.
297
297
298 Returns None if no data was read. Presumably this means we're
298 Returns None if no data was read. Presumably this means we're
299 done with all read I/O.
299 done with all read I/O.
300 """
300 """
301 if not self._frameseof:
301 if not self._frameseof:
302 frame = wireprotoframing.readframe(framefh)
302 frame = wireprotoframing.readframe(framefh)
303 if frame is None:
303 if frame is None:
304 # TODO tell reactor?
304 # TODO tell reactor?
305 self._frameseof = True
305 self._frameseof = True
306 else:
306 else:
307 self._ui.note(_('received %r\n') % frame)
307 self._ui.note(_('received %r\n') % frame)
308 self._processframe(frame)
308 self._processframe(frame)
309
309
310 # Also try to read the first redirect.
310 # Also try to read the first redirect.
311 if self._redirects:
311 if self._redirects:
312 if not self._processredirect(*self._redirects[0]):
312 if not self._processredirect(*self._redirects[0]):
313 self._redirects.pop(0)
313 self._redirects.pop(0)
314
314
315 if self._frameseof and not self._redirects:
315 if self._frameseof and not self._redirects:
316 return None
316 return None
317
317
318 return True
318 return True
319
319
320 def _processframe(self, frame):
320 def _processframe(self, frame):
321 """Process a single read frame."""
321 """Process a single read frame."""
322
322
323 action, meta = self._reactor.onframerecv(frame)
323 action, meta = self._reactor.onframerecv(frame)
324
324
325 if action == 'error':
325 if action == 'error':
326 e = error.RepoError(meta['message'])
326 e = error.RepoError(meta['message'])
327
327
328 if frame.requestid in self._responses:
328 if frame.requestid in self._responses:
329 self._responses[frame.requestid]._oninputcomplete()
329 self._responses[frame.requestid]._oninputcomplete()
330
330
331 if frame.requestid in self._futures:
331 if frame.requestid in self._futures:
332 self._futures[frame.requestid].set_exception(e)
332 self._futures[frame.requestid].set_exception(e)
333 del self._futures[frame.requestid]
333 del self._futures[frame.requestid]
334 else:
334 else:
335 raise e
335 raise e
336
336
337 return
337 return
338 elif action == 'noop':
338 elif action == 'noop':
339 return
339 return
340 elif action == 'responsedata':
340 elif action == 'responsedata':
341 # Handled below.
341 # Handled below.
342 pass
342 pass
343 else:
343 else:
344 raise error.ProgrammingError('action not handled: %s' % action)
344 raise error.ProgrammingError('action not handled: %s' % action)
345
345
346 if frame.requestid not in self._requests:
346 if frame.requestid not in self._requests:
347 raise error.ProgrammingError(
347 raise error.ProgrammingError(
348 'received frame for unknown request; this is either a bug in '
348 'received frame for unknown request; this is either a bug in '
349 'the clientreactor not screening for this or this instance was '
349 'the clientreactor not screening for this or this instance was '
350 'never told about this request: %r' % frame)
350 'never told about this request: %r' % frame)
351
351
352 response = self._responses[frame.requestid]
352 response = self._responses[frame.requestid]
353
353
354 if action == 'responsedata':
354 if action == 'responsedata':
355 # Any failures processing this frame should bubble up to the
355 # Any failures processing this frame should bubble up to the
356 # future tracking the request.
356 # future tracking the request.
357 try:
357 try:
358 self._processresponsedata(frame, meta, response)
358 self._processresponsedata(frame, meta, response)
359 except BaseException as e:
359 except BaseException as e:
360 # If an exception occurs before the future is resolved,
360 # If an exception occurs before the future is resolved,
361 # fail the future. Otherwise, we stuff the exception on
361 # fail the future. Otherwise, we stuff the exception on
362 # the response object so it can be raised during objects()
362 # the response object so it can be raised during objects()
363 # iteration. If nothing is consuming objects(), we could
363 # iteration. If nothing is consuming objects(), we could
364 # silently swallow this exception. That's a risk we'll have to
364 # silently swallow this exception. That's a risk we'll have to
365 # take.
365 # take.
366 if frame.requestid in self._futures:
366 if frame.requestid in self._futures:
367 self._futures[frame.requestid].set_exception(e)
367 self._futures[frame.requestid].set_exception(e)
368 del self._futures[frame.requestid]
368 del self._futures[frame.requestid]
369 response._oninputcomplete()
369 response._oninputcomplete()
370 else:
370 else:
371 response._onerror(e)
371 response._onerror(e)
372 else:
372 else:
373 raise error.ProgrammingError(
373 raise error.ProgrammingError(
374 'unhandled action from clientreactor: %s' % action)
374 'unhandled action from clientreactor: %s' % action)
375
375
376 def _processresponsedata(self, frame, meta, response):
376 def _processresponsedata(self, frame, meta, response):
377 # This can raise. The caller can handle it.
377 # This can raise. The caller can handle it.
378 response._onresponsedata(meta['data'])
378 response._onresponsedata(meta['data'])
379
379
380 # If we got a content redirect response, we want to fetch it and
380 # If we got a content redirect response, we want to fetch it and
381 # expose the data as if we received it inline. But we also want to
381 # expose the data as if we received it inline. But we also want to
382 # keep our internal request accounting in order. Our strategy is to
382 # keep our internal request accounting in order. Our strategy is to
383 # basically put meaningful response handling on pause until EOS occurs
383 # basically put meaningful response handling on pause until EOS occurs
384 # and the stream accounting is in a good state. At that point, we follow
384 # and the stream accounting is in a good state. At that point, we follow
385 # the redirect and replace the response object with its data.
385 # the redirect and replace the response object with its data.
386
386
387 redirect = response._redirect
387 redirect = response._redirect
388 handlefuture = False if redirect else True
388 handlefuture = False if redirect else True
389
389
390 if meta['eos']:
390 if meta['eos']:
391 response._oninputcomplete()
391 response._oninputcomplete()
392 del self._requests[frame.requestid]
392 del self._requests[frame.requestid]
393
393
394 if redirect:
394 if redirect:
395 self._followredirect(frame.requestid, redirect)
395 self._followredirect(frame.requestid, redirect)
396 return
396 return
397
397
398 if not handlefuture:
398 if not handlefuture:
399 return
399 return
400
400
401 # If the command has a decoder, we wait until all input has been
401 # If the command has a decoder, we wait until all input has been
402 # received before resolving the future. Otherwise we resolve the
402 # received before resolving the future. Otherwise we resolve the
403 # future immediately.
403 # future immediately.
404 if frame.requestid not in self._futures:
404 if frame.requestid not in self._futures:
405 return
405 return
406
406
407 if response.command not in COMMAND_DECODERS:
407 if response.command not in COMMAND_DECODERS:
408 self._futures[frame.requestid].set_result(response.objects())
408 self._futures[frame.requestid].set_result(response.objects())
409 del self._futures[frame.requestid]
409 del self._futures[frame.requestid]
410 elif response._inputcomplete:
410 elif response._inputcomplete:
411 decoded = COMMAND_DECODERS[response.command](response.objects())
411 decoded = COMMAND_DECODERS[response.command](response.objects())
412 self._futures[frame.requestid].set_result(decoded)
412 self._futures[frame.requestid].set_result(decoded)
413 del self._futures[frame.requestid]
413 del self._futures[frame.requestid]
414
414
415 def _followredirect(self, requestid, redirect):
415 def _followredirect(self, requestid, redirect):
416 """Called to initiate redirect following for a request."""
416 """Called to initiate redirect following for a request."""
417 self._ui.note(_('(following redirect to %s)\n') % redirect.url)
417 self._ui.note(_('(following redirect to %s)\n') % redirect.url)
418
418
419 # TODO handle framed responses.
419 # TODO handle framed responses.
420 if redirect.mediatype != b'application/mercurial-cbor':
420 if redirect.mediatype != b'application/mercurial-cbor':
421 raise error.Abort(_('cannot handle redirects for the %s media type')
421 raise error.Abort(_('cannot handle redirects for the %s media type')
422 % redirect.mediatype)
422 % redirect.mediatype)
423
423
424 if redirect.fullhashes:
424 if redirect.fullhashes:
425 self._ui.warn(_('(support for validating hashes on content '
425 self._ui.warn(_('(support for validating hashes on content '
426 'redirects not supported)\n'))
426 'redirects not supported)\n'))
427
427
428 if redirect.serverdercerts or redirect.servercadercerts:
428 if redirect.serverdercerts or redirect.servercadercerts:
429 self._ui.warn(_('(support for pinning server certificates on '
429 self._ui.warn(_('(support for pinning server certificates on '
430 'content redirects not supported)\n'))
430 'content redirects not supported)\n'))
431
431
432 headers = {
432 headers = {
433 r'Accept': redirect.mediatype,
433 r'Accept': redirect.mediatype,
434 }
434 }
435
435
436 req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
436 req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
437
437
438 try:
438 try:
439 res = self._opener.open(req)
439 res = self._opener.open(req)
440 except util.urlerr.httperror as e:
440 except util.urlerr.httperror as e:
441 if e.code == 401:
441 if e.code == 401:
442 raise error.Abort(_('authorization failed'))
442 raise error.Abort(_('authorization failed'))
443 raise
443 raise
444 except util.httplib.HTTPException as e:
444 except util.httplib.HTTPException as e:
445 self._ui.debug('http error requesting %s\n' % req.get_full_url())
445 self._ui.debug('http error requesting %s\n' % req.get_full_url())
446 self._ui.traceback()
446 self._ui.traceback()
447 raise IOError(None, e)
447 raise IOError(None, e)
448
448
449 urlmod.wrapresponse(res)
449 urlmod.wrapresponse(res)
450
450
451 # The existing response object is associated with frame data. Rather
451 # The existing response object is associated with frame data. Rather
452 # than try to normalize its state, just create a new object.
452 # than try to normalize its state, just create a new object.
453 oldresponse = self._responses[requestid]
453 oldresponse = self._responses[requestid]
454 self._responses[requestid] = commandresponse(requestid,
454 self._responses[requestid] = commandresponse(requestid,
455 oldresponse.command,
455 oldresponse.command,
456 fromredirect=True)
456 fromredirect=True)
457
457
458 self._redirects.append((requestid, res))
458 self._redirects.append((requestid, res))
459
459
460 def _processredirect(self, rid, res):
460 def _processredirect(self, rid, res):
461 """Called to continue processing a response from a redirect."""
461 """Called to continue processing a response from a redirect.
462
463 Returns a bool indicating if the redirect is still serviceable.
464 """
462 response = self._responses[rid]
465 response = self._responses[rid]
463
466
464 try:
467 try:
465 data = res.read(32768)
468 data = res.read(32768)
466 response._onresponsedata(data)
469 response._onresponsedata(data)
467
470
468 # We're at end of stream.
471 # We're at end of stream.
469 if not data:
472 if not data:
470 response._oninputcomplete()
473 response._oninputcomplete()
471
474
472 if rid not in self._futures:
475 if rid not in self._futures:
473 return
476 return bool(data)
474
477
475 if response.command not in COMMAND_DECODERS:
478 if response.command not in COMMAND_DECODERS:
476 self._futures[rid].set_result(response.objects())
479 self._futures[rid].set_result(response.objects())
477 del self._futures[rid]
480 del self._futures[rid]
478 elif response._inputcomplete:
481 elif response._inputcomplete:
479 decoded = COMMAND_DECODERS[response.command](response.objects())
482 decoded = COMMAND_DECODERS[response.command](response.objects())
480 self._futures[rid].set_result(decoded)
483 self._futures[rid].set_result(decoded)
481 del self._futures[rid]
484 del self._futures[rid]
482
485
483 return bool(data)
486 return bool(data)
484
487
485 except BaseException as e:
488 except BaseException as e:
486 self._futures[rid].set_exception(e)
489 self._futures[rid].set_exception(e)
487 del self._futures[rid]
490 del self._futures[rid]
488 response._oninputcomplete()
491 response._oninputcomplete()
489 return False
492 return False
490
493
491 def decodebranchmap(objs):
494 def decodebranchmap(objs):
492 # Response should be a single CBOR map of branch name to array of nodes.
495 # Response should be a single CBOR map of branch name to array of nodes.
493 bm = next(objs)
496 bm = next(objs)
494
497
495 return {encoding.tolocal(k): v for k, v in bm.items()}
498 return {encoding.tolocal(k): v for k, v in bm.items()}
496
499
497 def decodeheads(objs):
500 def decodeheads(objs):
498 # Array of node bytestrings.
501 # Array of node bytestrings.
499 return next(objs)
502 return next(objs)
500
503
501 def decodeknown(objs):
504 def decodeknown(objs):
502 # Bytestring where each byte is a 0 or 1.
505 # Bytestring where each byte is a 0 or 1.
503 raw = next(objs)
506 raw = next(objs)
504
507
505 return [True if c == '1' else False for c in raw]
508 return [True if c == '1' else False for c in raw]
506
509
507 def decodelistkeys(objs):
510 def decodelistkeys(objs):
508 # Map with bytestring keys and values.
511 # Map with bytestring keys and values.
509 return next(objs)
512 return next(objs)
510
513
511 def decodelookup(objs):
514 def decodelookup(objs):
512 return next(objs)
515 return next(objs)
513
516
514 def decodepushkey(objs):
517 def decodepushkey(objs):
515 return next(objs)
518 return next(objs)
516
519
517 COMMAND_DECODERS = {
520 COMMAND_DECODERS = {
518 'branchmap': decodebranchmap,
521 'branchmap': decodebranchmap,
519 'heads': decodeheads,
522 'heads': decodeheads,
520 'known': decodeknown,
523 'known': decodeknown,
521 'listkeys': decodelistkeys,
524 'listkeys': decodelistkeys,
522 'lookup': decodelookup,
525 'lookup': decodelookup,
523 'pushkey': decodepushkey,
526 'pushkey': decodepushkey,
524 }
527 }
General Comments 0
You need to be logged in to leave comments. Login now