##// END OF EJS Templates
wireprotov2: raise exception in objects() if future has been resolved...
Gregory Szorc -
r40172:ed4ebbb9 default
parent child Browse files
Show More
@@ -1,500 +1,519
1 # wireprotov2peer.py - client side code for wire protocol version 2
1 # wireprotov2peer.py - client side code for wire protocol version 2
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import threading
10 import threading
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 encoding,
14 encoding,
15 error,
15 error,
16 pycompat,
16 pycompat,
17 sslutil,
17 sslutil,
18 url as urlmod,
18 url as urlmod,
19 util,
19 util,
20 wireprotoframing,
20 wireprotoframing,
21 wireprototypes,
21 wireprototypes,
22 )
22 )
23 from .utils import (
23 from .utils import (
24 cborutil,
24 cborutil,
25 )
25 )
26
26
27 def formatrichmessage(atoms):
27 def formatrichmessage(atoms):
28 """Format an encoded message from the framing protocol."""
28 """Format an encoded message from the framing protocol."""
29
29
30 chunks = []
30 chunks = []
31
31
32 for atom in atoms:
32 for atom in atoms:
33 msg = _(atom[b'msg'])
33 msg = _(atom[b'msg'])
34
34
35 if b'args' in atom:
35 if b'args' in atom:
36 msg = msg % tuple(atom[b'args'])
36 msg = msg % tuple(atom[b'args'])
37
37
38 chunks.append(msg)
38 chunks.append(msg)
39
39
40 return b''.join(chunks)
40 return b''.join(chunks)
41
41
42 SUPPORTED_REDIRECT_PROTOCOLS = {
42 SUPPORTED_REDIRECT_PROTOCOLS = {
43 b'http',
43 b'http',
44 b'https',
44 b'https',
45 }
45 }
46
46
47 SUPPORTED_CONTENT_HASHES = {
47 SUPPORTED_CONTENT_HASHES = {
48 b'sha1',
48 b'sha1',
49 b'sha256',
49 b'sha256',
50 }
50 }
51
51
52 def redirecttargetsupported(ui, target):
52 def redirecttargetsupported(ui, target):
53 """Determine whether a redirect target entry is supported.
53 """Determine whether a redirect target entry is supported.
54
54
55 ``target`` should come from the capabilities data structure emitted by
55 ``target`` should come from the capabilities data structure emitted by
56 the server.
56 the server.
57 """
57 """
58 if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
58 if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
59 ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
59 ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
60 % (target[b'name'], target.get(b'protocol', b'')))
60 % (target[b'name'], target.get(b'protocol', b'')))
61 return False
61 return False
62
62
63 if target.get(b'snirequired') and not sslutil.hassni:
63 if target.get(b'snirequired') and not sslutil.hassni:
64 ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
64 ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
65 target[b'name'])
65 target[b'name'])
66 return False
66 return False
67
67
68 if b'tlsversions' in target:
68 if b'tlsversions' in target:
69 tlsversions = set(target[b'tlsversions'])
69 tlsversions = set(target[b'tlsversions'])
70 supported = set()
70 supported = set()
71
71
72 for v in sslutil.supportedprotocols:
72 for v in sslutil.supportedprotocols:
73 assert v.startswith(b'tls')
73 assert v.startswith(b'tls')
74 supported.add(v[3:])
74 supported.add(v[3:])
75
75
76 if not tlsversions & supported:
76 if not tlsversions & supported:
77 ui.note(_('(remote redirect target %s requires unsupported TLS '
77 ui.note(_('(remote redirect target %s requires unsupported TLS '
78 'versions: %s)\n') % (
78 'versions: %s)\n') % (
79 target[b'name'], b', '.join(sorted(tlsversions))))
79 target[b'name'], b', '.join(sorted(tlsversions))))
80 return False
80 return False
81
81
82 ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
82 ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
83
83
84 return True
84 return True
85
85
86 def supportedredirects(ui, apidescriptor):
86 def supportedredirects(ui, apidescriptor):
87 """Resolve the "redirect" command request key given an API descriptor.
87 """Resolve the "redirect" command request key given an API descriptor.
88
88
89 Given an API descriptor returned by the server, returns a data structure
89 Given an API descriptor returned by the server, returns a data structure
90 that can be used in hte "redirect" field of command requests to advertise
90 that can be used in hte "redirect" field of command requests to advertise
91 support for compatible redirect targets.
91 support for compatible redirect targets.
92
92
93 Returns None if no redirect targets are remotely advertised or if none are
93 Returns None if no redirect targets are remotely advertised or if none are
94 supported.
94 supported.
95 """
95 """
96 if not apidescriptor or b'redirect' not in apidescriptor:
96 if not apidescriptor or b'redirect' not in apidescriptor:
97 return None
97 return None
98
98
99 targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
99 targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
100 if redirecttargetsupported(ui, t)]
100 if redirecttargetsupported(ui, t)]
101
101
102 hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
102 hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
103 if h in SUPPORTED_CONTENT_HASHES]
103 if h in SUPPORTED_CONTENT_HASHES]
104
104
105 return {
105 return {
106 b'targets': targets,
106 b'targets': targets,
107 b'hashes': hashes,
107 b'hashes': hashes,
108 }
108 }
109
109
110 class commandresponse(object):
110 class commandresponse(object):
111 """Represents the response to a command request.
111 """Represents the response to a command request.
112
112
113 Instances track the state of the command and hold its results.
113 Instances track the state of the command and hold its results.
114
114
115 An external entity is required to update the state of the object when
115 An external entity is required to update the state of the object when
116 events occur.
116 events occur.
117 """
117 """
118
118
119 def __init__(self, requestid, command, fromredirect=False):
119 def __init__(self, requestid, command, fromredirect=False):
120 self.requestid = requestid
120 self.requestid = requestid
121 self.command = command
121 self.command = command
122 self.fromredirect = fromredirect
122 self.fromredirect = fromredirect
123
123
124 # Whether all remote input related to this command has been
124 # Whether all remote input related to this command has been
125 # received.
125 # received.
126 self._inputcomplete = False
126 self._inputcomplete = False
127
127
128 # We have a lock that is acquired when important object state is
128 # We have a lock that is acquired when important object state is
129 # mutated. This is to prevent race conditions between 1 thread
129 # mutated. This is to prevent race conditions between 1 thread
130 # sending us new data and another consuming it.
130 # sending us new data and another consuming it.
131 self._lock = threading.RLock()
131 self._lock = threading.RLock()
132
132
133 # An event is set when state of the object changes. This event
133 # An event is set when state of the object changes. This event
134 # is waited on by the generator emitting objects.
134 # is waited on by the generator emitting objects.
135 self._serviceable = threading.Event()
135 self._serviceable = threading.Event()
136
136
137 self._pendingevents = []
137 self._pendingevents = []
138 self._pendingerror = None
138 self._decoder = cborutil.bufferingdecoder()
139 self._decoder = cborutil.bufferingdecoder()
139 self._seeninitial = False
140 self._seeninitial = False
140 self._redirect = None
141 self._redirect = None
141
142
142 def _oninputcomplete(self):
143 def _oninputcomplete(self):
143 with self._lock:
144 with self._lock:
144 self._inputcomplete = True
145 self._inputcomplete = True
145 self._serviceable.set()
146 self._serviceable.set()
146
147
147 def _onresponsedata(self, data):
148 def _onresponsedata(self, data):
148 available, readcount, wanted = self._decoder.decode(data)
149 available, readcount, wanted = self._decoder.decode(data)
149
150
150 if not available:
151 if not available:
151 return
152 return
152
153
153 with self._lock:
154 with self._lock:
154 for o in self._decoder.getavailable():
155 for o in self._decoder.getavailable():
155 if not self._seeninitial and not self.fromredirect:
156 if not self._seeninitial and not self.fromredirect:
156 self._handleinitial(o)
157 self._handleinitial(o)
157 continue
158 continue
158
159
159 # We should never see an object after a content redirect,
160 # We should never see an object after a content redirect,
160 # as the spec says the main status object containing the
161 # as the spec says the main status object containing the
161 # content redirect is the only object in the stream. Fail
162 # content redirect is the only object in the stream. Fail
162 # if we see a misbehaving server.
163 # if we see a misbehaving server.
163 if self._redirect:
164 if self._redirect:
164 raise error.Abort(_('received unexpected response data '
165 raise error.Abort(_('received unexpected response data '
165 'after content redirect; the remote is '
166 'after content redirect; the remote is '
166 'buggy'))
167 'buggy'))
167
168
168 self._pendingevents.append(o)
169 self._pendingevents.append(o)
169
170
170 self._serviceable.set()
171 self._serviceable.set()
171
172
173 def _onerror(self, e):
174 self._pendingerror = e
175
176 with self._lock:
177 self._serviceable.set()
178
172 def _handleinitial(self, o):
179 def _handleinitial(self, o):
173 self._seeninitial = True
180 self._seeninitial = True
174 if o[b'status'] == b'ok':
181 if o[b'status'] == b'ok':
175 return
182 return
176
183
177 elif o[b'status'] == b'redirect':
184 elif o[b'status'] == b'redirect':
178 l = o[b'location']
185 l = o[b'location']
179 self._redirect = wireprototypes.alternatelocationresponse(
186 self._redirect = wireprototypes.alternatelocationresponse(
180 url=l[b'url'],
187 url=l[b'url'],
181 mediatype=l[b'mediatype'],
188 mediatype=l[b'mediatype'],
182 size=l.get(b'size'),
189 size=l.get(b'size'),
183 fullhashes=l.get(b'fullhashes'),
190 fullhashes=l.get(b'fullhashes'),
184 fullhashseed=l.get(b'fullhashseed'),
191 fullhashseed=l.get(b'fullhashseed'),
185 serverdercerts=l.get(b'serverdercerts'),
192 serverdercerts=l.get(b'serverdercerts'),
186 servercadercerts=l.get(b'servercadercerts'))
193 servercadercerts=l.get(b'servercadercerts'))
187 return
194 return
188
195
189 atoms = [{'msg': o[b'error'][b'message']}]
196 atoms = [{'msg': o[b'error'][b'message']}]
190 if b'args' in o[b'error']:
197 if b'args' in o[b'error']:
191 atoms[0]['args'] = o[b'error'][b'args']
198 atoms[0]['args'] = o[b'error'][b'args']
192
199
193 raise error.RepoError(formatrichmessage(atoms))
200 raise error.RepoError(formatrichmessage(atoms))
194
201
195 def objects(self):
202 def objects(self):
196 """Obtained decoded objects from this response.
203 """Obtained decoded objects from this response.
197
204
198 This is a generator of data structures that were decoded from the
205 This is a generator of data structures that were decoded from the
199 command response.
206 command response.
200
207
201 Obtaining the next member of the generator may block due to waiting
208 Obtaining the next member of the generator may block due to waiting
202 on external data to become available.
209 on external data to become available.
203
210
204 If the server encountered an error in the middle of serving the data
211 If the server encountered an error in the middle of serving the data
205 or if another error occurred, an exception may be raised when
212 or if another error occurred, an exception may be raised when
206 advancing the generator.
213 advancing the generator.
207 """
214 """
208 while True:
215 while True:
209 # TODO this can infinite loop if self._inputcomplete is never
216 # TODO this can infinite loop if self._inputcomplete is never
210 # set. We likely want to tie the lifetime of this object/state
217 # set. We likely want to tie the lifetime of this object/state
211 # to that of the background thread receiving frames and updating
218 # to that of the background thread receiving frames and updating
212 # our state.
219 # our state.
213 self._serviceable.wait(1.0)
220 self._serviceable.wait(1.0)
214
221
222 if self._pendingerror:
223 raise self._pendingerror
224
215 with self._lock:
225 with self._lock:
216 self._serviceable.clear()
226 self._serviceable.clear()
217
227
218 # Make copies because objects could be mutated during
228 # Make copies because objects could be mutated during
219 # iteration.
229 # iteration.
220 stop = self._inputcomplete
230 stop = self._inputcomplete
221 pending = list(self._pendingevents)
231 pending = list(self._pendingevents)
222 self._pendingevents[:] = []
232 self._pendingevents[:] = []
223
233
224 for o in pending:
234 for o in pending:
225 yield o
235 yield o
226
236
227 if stop:
237 if stop:
228 break
238 break
229
239
230 class clienthandler(object):
240 class clienthandler(object):
231 """Object to handle higher-level client activities.
241 """Object to handle higher-level client activities.
232
242
233 The ``clientreactor`` is used to hold low-level state about the frame-based
243 The ``clientreactor`` is used to hold low-level state about the frame-based
234 protocol, such as which requests and streams are active. This type is used
244 protocol, such as which requests and streams are active. This type is used
235 for higher-level operations, such as reading frames from a socket, exposing
245 for higher-level operations, such as reading frames from a socket, exposing
236 and managing a higher-level primitive for representing command responses,
246 and managing a higher-level primitive for representing command responses,
237 etc. This class is what peers should probably use to bridge wire activity
247 etc. This class is what peers should probably use to bridge wire activity
238 with the higher-level peer API.
248 with the higher-level peer API.
239 """
249 """
240
250
241 def __init__(self, ui, clientreactor, opener=None,
251 def __init__(self, ui, clientreactor, opener=None,
242 requestbuilder=util.urlreq.request):
252 requestbuilder=util.urlreq.request):
243 self._ui = ui
253 self._ui = ui
244 self._reactor = clientreactor
254 self._reactor = clientreactor
245 self._requests = {}
255 self._requests = {}
246 self._futures = {}
256 self._futures = {}
247 self._responses = {}
257 self._responses = {}
248 self._redirects = []
258 self._redirects = []
249 self._frameseof = False
259 self._frameseof = False
250 self._opener = opener or urlmod.opener(ui)
260 self._opener = opener or urlmod.opener(ui)
251 self._requestbuilder = requestbuilder
261 self._requestbuilder = requestbuilder
252
262
253 def callcommand(self, command, args, f, redirect=None):
263 def callcommand(self, command, args, f, redirect=None):
254 """Register a request to call a command.
264 """Register a request to call a command.
255
265
256 Returns an iterable of frames that should be sent over the wire.
266 Returns an iterable of frames that should be sent over the wire.
257 """
267 """
258 request, action, meta = self._reactor.callcommand(command, args,
268 request, action, meta = self._reactor.callcommand(command, args,
259 redirect=redirect)
269 redirect=redirect)
260
270
261 if action != 'noop':
271 if action != 'noop':
262 raise error.ProgrammingError('%s not yet supported' % action)
272 raise error.ProgrammingError('%s not yet supported' % action)
263
273
264 rid = request.requestid
274 rid = request.requestid
265 self._requests[rid] = request
275 self._requests[rid] = request
266 self._futures[rid] = f
276 self._futures[rid] = f
267 # TODO we need some kind of lifetime on response instances otherwise
277 # TODO we need some kind of lifetime on response instances otherwise
268 # objects() may deadlock.
278 # objects() may deadlock.
269 self._responses[rid] = commandresponse(rid, command)
279 self._responses[rid] = commandresponse(rid, command)
270
280
271 return iter(())
281 return iter(())
272
282
273 def flushcommands(self):
283 def flushcommands(self):
274 """Flush all queued commands.
284 """Flush all queued commands.
275
285
276 Returns an iterable of frames that should be sent over the wire.
286 Returns an iterable of frames that should be sent over the wire.
277 """
287 """
278 action, meta = self._reactor.flushcommands()
288 action, meta = self._reactor.flushcommands()
279
289
280 if action != 'sendframes':
290 if action != 'sendframes':
281 raise error.ProgrammingError('%s not yet supported' % action)
291 raise error.ProgrammingError('%s not yet supported' % action)
282
292
283 return meta['framegen']
293 return meta['framegen']
284
294
285 def readdata(self, framefh):
295 def readdata(self, framefh):
286 """Attempt to read data and do work.
296 """Attempt to read data and do work.
287
297
288 Returns None if no data was read. Presumably this means we're
298 Returns None if no data was read. Presumably this means we're
289 done with all read I/O.
299 done with all read I/O.
290 """
300 """
291 if not self._frameseof:
301 if not self._frameseof:
292 frame = wireprotoframing.readframe(framefh)
302 frame = wireprotoframing.readframe(framefh)
293 if frame is None:
303 if frame is None:
294 # TODO tell reactor?
304 # TODO tell reactor?
295 self._frameseof = True
305 self._frameseof = True
296 else:
306 else:
297 self._ui.note(_('received %r\n') % frame)
307 self._ui.note(_('received %r\n') % frame)
298 self._processframe(frame)
308 self._processframe(frame)
299
309
300 # Also try to read the first redirect.
310 # Also try to read the first redirect.
301 if self._redirects:
311 if self._redirects:
302 if not self._processredirect(*self._redirects[0]):
312 if not self._processredirect(*self._redirects[0]):
303 self._redirects.pop(0)
313 self._redirects.pop(0)
304
314
305 if self._frameseof and not self._redirects:
315 if self._frameseof and not self._redirects:
306 return None
316 return None
307
317
308 return True
318 return True
309
319
310 def _processframe(self, frame):
320 def _processframe(self, frame):
311 """Process a single read frame."""
321 """Process a single read frame."""
312
322
313 action, meta = self._reactor.onframerecv(frame)
323 action, meta = self._reactor.onframerecv(frame)
314
324
315 if action == 'error':
325 if action == 'error':
316 e = error.RepoError(meta['message'])
326 e = error.RepoError(meta['message'])
317
327
318 if frame.requestid in self._responses:
328 if frame.requestid in self._responses:
319 self._responses[frame.requestid]._oninputcomplete()
329 self._responses[frame.requestid]._oninputcomplete()
320
330
321 if frame.requestid in self._futures:
331 if frame.requestid in self._futures:
322 self._futures[frame.requestid].set_exception(e)
332 self._futures[frame.requestid].set_exception(e)
323 del self._futures[frame.requestid]
333 del self._futures[frame.requestid]
324 else:
334 else:
325 raise e
335 raise e
326
336
327 return
337 return
328 elif action == 'noop':
338 elif action == 'noop':
329 return
339 return
330
340
331 if frame.requestid not in self._requests:
341 if frame.requestid not in self._requests:
332 raise error.ProgrammingError(
342 raise error.ProgrammingError(
333 'received frame for unknown request; this is either a bug in '
343 'received frame for unknown request; this is either a bug in '
334 'the clientreactor not screening for this or this instance was '
344 'the clientreactor not screening for this or this instance was '
335 'never told about this request: %r' % frame)
345 'never told about this request: %r' % frame)
336
346
337 response = self._responses[frame.requestid]
347 response = self._responses[frame.requestid]
338
348
339 if action == 'responsedata':
349 if action == 'responsedata':
340 # Any failures processing this frame should bubble up to the
350 # Any failures processing this frame should bubble up to the
341 # future tracking the request.
351 # future tracking the request.
342 try:
352 try:
343 self._processresponsedata(frame, meta, response)
353 self._processresponsedata(frame, meta, response)
344 except BaseException as e:
354 except BaseException as e:
345 self._futures[frame.requestid].set_exception(e)
355 # If an exception occurs before the future is resolved,
346 del self._futures[frame.requestid]
356 # fail the future. Otherwise, we stuff the exception on
347 response._oninputcomplete()
357 # the response object so it can be raised during objects()
358 # iteration. If nothing is consuming objects(), we could
359 # silently swallow this exception. That's a risk we'll have to
360 # take.
361 if frame.requestid in self._futures:
362 self._futures[frame.requestid].set_exception(e)
363 del self._futures[frame.requestid]
364 response._oninputcomplete()
365 else:
366 response._onerror(e)
348 else:
367 else:
349 raise error.ProgrammingError(
368 raise error.ProgrammingError(
350 'unhandled action from clientreactor: %s' % action)
369 'unhandled action from clientreactor: %s' % action)
351
370
352 def _processresponsedata(self, frame, meta, response):
371 def _processresponsedata(self, frame, meta, response):
353 # This can raise. The caller can handle it.
372 # This can raise. The caller can handle it.
354 response._onresponsedata(meta['data'])
373 response._onresponsedata(meta['data'])
355
374
356 # If we got a content redirect response, we want to fetch it and
375 # If we got a content redirect response, we want to fetch it and
357 # expose the data as if we received it inline. But we also want to
376 # expose the data as if we received it inline. But we also want to
358 # keep our internal request accounting in order. Our strategy is to
377 # keep our internal request accounting in order. Our strategy is to
359 # basically put meaningful response handling on pause until EOS occurs
378 # basically put meaningful response handling on pause until EOS occurs
360 # and the stream accounting is in a good state. At that point, we follow
379 # and the stream accounting is in a good state. At that point, we follow
361 # the redirect and replace the response object with its data.
380 # the redirect and replace the response object with its data.
362
381
363 redirect = response._redirect
382 redirect = response._redirect
364 handlefuture = False if redirect else True
383 handlefuture = False if redirect else True
365
384
366 if meta['eos']:
385 if meta['eos']:
367 response._oninputcomplete()
386 response._oninputcomplete()
368 del self._requests[frame.requestid]
387 del self._requests[frame.requestid]
369
388
370 if redirect:
389 if redirect:
371 self._followredirect(frame.requestid, redirect)
390 self._followredirect(frame.requestid, redirect)
372 return
391 return
373
392
374 if not handlefuture:
393 if not handlefuture:
375 return
394 return
376
395
377 # If the command has a decoder, we wait until all input has been
396 # If the command has a decoder, we wait until all input has been
378 # received before resolving the future. Otherwise we resolve the
397 # received before resolving the future. Otherwise we resolve the
379 # future immediately.
398 # future immediately.
380 if frame.requestid not in self._futures:
399 if frame.requestid not in self._futures:
381 return
400 return
382
401
383 if response.command not in COMMAND_DECODERS:
402 if response.command not in COMMAND_DECODERS:
384 self._futures[frame.requestid].set_result(response.objects())
403 self._futures[frame.requestid].set_result(response.objects())
385 del self._futures[frame.requestid]
404 del self._futures[frame.requestid]
386 elif response._inputcomplete:
405 elif response._inputcomplete:
387 decoded = COMMAND_DECODERS[response.command](response.objects())
406 decoded = COMMAND_DECODERS[response.command](response.objects())
388 self._futures[frame.requestid].set_result(decoded)
407 self._futures[frame.requestid].set_result(decoded)
389 del self._futures[frame.requestid]
408 del self._futures[frame.requestid]
390
409
391 def _followredirect(self, requestid, redirect):
410 def _followredirect(self, requestid, redirect):
392 """Called to initiate redirect following for a request."""
411 """Called to initiate redirect following for a request."""
393 self._ui.note(_('(following redirect to %s)\n') % redirect.url)
412 self._ui.note(_('(following redirect to %s)\n') % redirect.url)
394
413
395 # TODO handle framed responses.
414 # TODO handle framed responses.
396 if redirect.mediatype != b'application/mercurial-cbor':
415 if redirect.mediatype != b'application/mercurial-cbor':
397 raise error.Abort(_('cannot handle redirects for the %s media type')
416 raise error.Abort(_('cannot handle redirects for the %s media type')
398 % redirect.mediatype)
417 % redirect.mediatype)
399
418
400 if redirect.fullhashes:
419 if redirect.fullhashes:
401 self._ui.warn(_('(support for validating hashes on content '
420 self._ui.warn(_('(support for validating hashes on content '
402 'redirects not supported)\n'))
421 'redirects not supported)\n'))
403
422
404 if redirect.serverdercerts or redirect.servercadercerts:
423 if redirect.serverdercerts or redirect.servercadercerts:
405 self._ui.warn(_('(support for pinning server certificates on '
424 self._ui.warn(_('(support for pinning server certificates on '
406 'content redirects not supported)\n'))
425 'content redirects not supported)\n'))
407
426
408 headers = {
427 headers = {
409 r'Accept': redirect.mediatype,
428 r'Accept': redirect.mediatype,
410 }
429 }
411
430
412 req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
431 req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
413
432
414 try:
433 try:
415 res = self._opener.open(req)
434 res = self._opener.open(req)
416 except util.urlerr.httperror as e:
435 except util.urlerr.httperror as e:
417 if e.code == 401:
436 if e.code == 401:
418 raise error.Abort(_('authorization failed'))
437 raise error.Abort(_('authorization failed'))
419 raise
438 raise
420 except util.httplib.HTTPException as e:
439 except util.httplib.HTTPException as e:
421 self._ui.debug('http error requesting %s\n' % req.get_full_url())
440 self._ui.debug('http error requesting %s\n' % req.get_full_url())
422 self._ui.traceback()
441 self._ui.traceback()
423 raise IOError(None, e)
442 raise IOError(None, e)
424
443
425 urlmod.wrapresponse(res)
444 urlmod.wrapresponse(res)
426
445
427 # The existing response object is associated with frame data. Rather
446 # The existing response object is associated with frame data. Rather
428 # than try to normalize its state, just create a new object.
447 # than try to normalize its state, just create a new object.
429 oldresponse = self._responses[requestid]
448 oldresponse = self._responses[requestid]
430 self._responses[requestid] = commandresponse(requestid,
449 self._responses[requestid] = commandresponse(requestid,
431 oldresponse.command,
450 oldresponse.command,
432 fromredirect=True)
451 fromredirect=True)
433
452
434 self._redirects.append((requestid, res))
453 self._redirects.append((requestid, res))
435
454
436 def _processredirect(self, rid, res):
455 def _processredirect(self, rid, res):
437 """Called to continue processing a response from a redirect."""
456 """Called to continue processing a response from a redirect."""
438 response = self._responses[rid]
457 response = self._responses[rid]
439
458
440 try:
459 try:
441 data = res.read(32768)
460 data = res.read(32768)
442 response._onresponsedata(data)
461 response._onresponsedata(data)
443
462
444 # We're at end of stream.
463 # We're at end of stream.
445 if not data:
464 if not data:
446 response._oninputcomplete()
465 response._oninputcomplete()
447
466
448 if rid not in self._futures:
467 if rid not in self._futures:
449 return
468 return
450
469
451 if response.command not in COMMAND_DECODERS:
470 if response.command not in COMMAND_DECODERS:
452 self._futures[rid].set_result(response.objects())
471 self._futures[rid].set_result(response.objects())
453 del self._futures[rid]
472 del self._futures[rid]
454 elif response._inputcomplete:
473 elif response._inputcomplete:
455 decoded = COMMAND_DECODERS[response.command](response.objects())
474 decoded = COMMAND_DECODERS[response.command](response.objects())
456 self._futures[rid].set_result(decoded)
475 self._futures[rid].set_result(decoded)
457 del self._futures[rid]
476 del self._futures[rid]
458
477
459 return bool(data)
478 return bool(data)
460
479
461 except BaseException as e:
480 except BaseException as e:
462 self._futures[rid].set_exception(e)
481 self._futures[rid].set_exception(e)
463 del self._futures[rid]
482 del self._futures[rid]
464 response._oninputcomplete()
483 response._oninputcomplete()
465 return False
484 return False
466
485
467 def decodebranchmap(objs):
486 def decodebranchmap(objs):
468 # Response should be a single CBOR map of branch name to array of nodes.
487 # Response should be a single CBOR map of branch name to array of nodes.
469 bm = next(objs)
488 bm = next(objs)
470
489
471 return {encoding.tolocal(k): v for k, v in bm.items()}
490 return {encoding.tolocal(k): v for k, v in bm.items()}
472
491
473 def decodeheads(objs):
492 def decodeheads(objs):
474 # Array of node bytestrings.
493 # Array of node bytestrings.
475 return next(objs)
494 return next(objs)
476
495
477 def decodeknown(objs):
496 def decodeknown(objs):
478 # Bytestring where each byte is a 0 or 1.
497 # Bytestring where each byte is a 0 or 1.
479 raw = next(objs)
498 raw = next(objs)
480
499
481 return [True if c == '1' else False for c in raw]
500 return [True if c == '1' else False for c in raw]
482
501
483 def decodelistkeys(objs):
502 def decodelistkeys(objs):
484 # Map with bytestring keys and values.
503 # Map with bytestring keys and values.
485 return next(objs)
504 return next(objs)
486
505
487 def decodelookup(objs):
506 def decodelookup(objs):
488 return next(objs)
507 return next(objs)
489
508
490 def decodepushkey(objs):
509 def decodepushkey(objs):
491 return next(objs)
510 return next(objs)
492
511
493 COMMAND_DECODERS = {
512 COMMAND_DECODERS = {
494 'branchmap': decodebranchmap,
513 'branchmap': decodebranchmap,
495 'heads': decodeheads,
514 'heads': decodeheads,
496 'known': decodeknown,
515 'known': decodeknown,
497 'listkeys': decodelistkeys,
516 'listkeys': decodelistkeys,
498 'lookup': decodelookup,
517 'lookup': decodelookup,
499 'pushkey': decodepushkey,
518 'pushkey': decodepushkey,
500 }
519 }
General Comments 0
You need to be logged in to leave comments. Login now