##// END OF EJS Templates
wireprotov2: handle noop action...
Gregory Szorc -
r40169:cfeba1aa default
parent child Browse files
Show More
@@ -1,498 +1,500 b''
1 # wireprotov2peer.py - client side code for wire protocol version 2
1 # wireprotov2peer.py - client side code for wire protocol version 2
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import threading
10 import threading
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 encoding,
14 encoding,
15 error,
15 error,
16 pycompat,
16 pycompat,
17 sslutil,
17 sslutil,
18 url as urlmod,
18 url as urlmod,
19 util,
19 util,
20 wireprotoframing,
20 wireprotoframing,
21 wireprototypes,
21 wireprototypes,
22 )
22 )
23 from .utils import (
23 from .utils import (
24 cborutil,
24 cborutil,
25 )
25 )
26
26
27 def formatrichmessage(atoms):
27 def formatrichmessage(atoms):
28 """Format an encoded message from the framing protocol."""
28 """Format an encoded message from the framing protocol."""
29
29
30 chunks = []
30 chunks = []
31
31
32 for atom in atoms:
32 for atom in atoms:
33 msg = _(atom[b'msg'])
33 msg = _(atom[b'msg'])
34
34
35 if b'args' in atom:
35 if b'args' in atom:
36 msg = msg % tuple(atom[b'args'])
36 msg = msg % tuple(atom[b'args'])
37
37
38 chunks.append(msg)
38 chunks.append(msg)
39
39
40 return b''.join(chunks)
40 return b''.join(chunks)
41
41
42 SUPPORTED_REDIRECT_PROTOCOLS = {
42 SUPPORTED_REDIRECT_PROTOCOLS = {
43 b'http',
43 b'http',
44 b'https',
44 b'https',
45 }
45 }
46
46
47 SUPPORTED_CONTENT_HASHES = {
47 SUPPORTED_CONTENT_HASHES = {
48 b'sha1',
48 b'sha1',
49 b'sha256',
49 b'sha256',
50 }
50 }
51
51
52 def redirecttargetsupported(ui, target):
52 def redirecttargetsupported(ui, target):
53 """Determine whether a redirect target entry is supported.
53 """Determine whether a redirect target entry is supported.
54
54
55 ``target`` should come from the capabilities data structure emitted by
55 ``target`` should come from the capabilities data structure emitted by
56 the server.
56 the server.
57 """
57 """
58 if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
58 if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
59 ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
59 ui.note(_('(remote redirect target %s uses unsupported protocol: %s)\n')
60 % (target[b'name'], target.get(b'protocol', b'')))
60 % (target[b'name'], target.get(b'protocol', b'')))
61 return False
61 return False
62
62
63 if target.get(b'snirequired') and not sslutil.hassni:
63 if target.get(b'snirequired') and not sslutil.hassni:
64 ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
64 ui.note(_('(redirect target %s requires SNI, which is unsupported)\n') %
65 target[b'name'])
65 target[b'name'])
66 return False
66 return False
67
67
68 if b'tlsversions' in target:
68 if b'tlsversions' in target:
69 tlsversions = set(target[b'tlsversions'])
69 tlsversions = set(target[b'tlsversions'])
70 supported = set()
70 supported = set()
71
71
72 for v in sslutil.supportedprotocols:
72 for v in sslutil.supportedprotocols:
73 assert v.startswith(b'tls')
73 assert v.startswith(b'tls')
74 supported.add(v[3:])
74 supported.add(v[3:])
75
75
76 if not tlsversions & supported:
76 if not tlsversions & supported:
77 ui.note(_('(remote redirect target %s requires unsupported TLS '
77 ui.note(_('(remote redirect target %s requires unsupported TLS '
78 'versions: %s)\n') % (
78 'versions: %s)\n') % (
79 target[b'name'], b', '.join(sorted(tlsversions))))
79 target[b'name'], b', '.join(sorted(tlsversions))))
80 return False
80 return False
81
81
82 ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
82 ui.note(_('(remote redirect target %s is compatible)\n') % target[b'name'])
83
83
84 return True
84 return True
85
85
86 def supportedredirects(ui, apidescriptor):
86 def supportedredirects(ui, apidescriptor):
87 """Resolve the "redirect" command request key given an API descriptor.
87 """Resolve the "redirect" command request key given an API descriptor.
88
88
89 Given an API descriptor returned by the server, returns a data structure
89 Given an API descriptor returned by the server, returns a data structure
90 that can be used in hte "redirect" field of command requests to advertise
90 that can be used in hte "redirect" field of command requests to advertise
91 support for compatible redirect targets.
91 support for compatible redirect targets.
92
92
93 Returns None if no redirect targets are remotely advertised or if none are
93 Returns None if no redirect targets are remotely advertised or if none are
94 supported.
94 supported.
95 """
95 """
96 if not apidescriptor or b'redirect' not in apidescriptor:
96 if not apidescriptor or b'redirect' not in apidescriptor:
97 return None
97 return None
98
98
99 targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
99 targets = [t[b'name'] for t in apidescriptor[b'redirect'][b'targets']
100 if redirecttargetsupported(ui, t)]
100 if redirecttargetsupported(ui, t)]
101
101
102 hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
102 hashes = [h for h in apidescriptor[b'redirect'][b'hashes']
103 if h in SUPPORTED_CONTENT_HASHES]
103 if h in SUPPORTED_CONTENT_HASHES]
104
104
105 return {
105 return {
106 b'targets': targets,
106 b'targets': targets,
107 b'hashes': hashes,
107 b'hashes': hashes,
108 }
108 }
109
109
110 class commandresponse(object):
110 class commandresponse(object):
111 """Represents the response to a command request.
111 """Represents the response to a command request.
112
112
113 Instances track the state of the command and hold its results.
113 Instances track the state of the command and hold its results.
114
114
115 An external entity is required to update the state of the object when
115 An external entity is required to update the state of the object when
116 events occur.
116 events occur.
117 """
117 """
118
118
119 def __init__(self, requestid, command, fromredirect=False):
119 def __init__(self, requestid, command, fromredirect=False):
120 self.requestid = requestid
120 self.requestid = requestid
121 self.command = command
121 self.command = command
122 self.fromredirect = fromredirect
122 self.fromredirect = fromredirect
123
123
124 # Whether all remote input related to this command has been
124 # Whether all remote input related to this command has been
125 # received.
125 # received.
126 self._inputcomplete = False
126 self._inputcomplete = False
127
127
128 # We have a lock that is acquired when important object state is
128 # We have a lock that is acquired when important object state is
129 # mutated. This is to prevent race conditions between 1 thread
129 # mutated. This is to prevent race conditions between 1 thread
130 # sending us new data and another consuming it.
130 # sending us new data and another consuming it.
131 self._lock = threading.RLock()
131 self._lock = threading.RLock()
132
132
133 # An event is set when state of the object changes. This event
133 # An event is set when state of the object changes. This event
134 # is waited on by the generator emitting objects.
134 # is waited on by the generator emitting objects.
135 self._serviceable = threading.Event()
135 self._serviceable = threading.Event()
136
136
137 self._pendingevents = []
137 self._pendingevents = []
138 self._decoder = cborutil.bufferingdecoder()
138 self._decoder = cborutil.bufferingdecoder()
139 self._seeninitial = False
139 self._seeninitial = False
140 self._redirect = None
140 self._redirect = None
141
141
142 def _oninputcomplete(self):
142 def _oninputcomplete(self):
143 with self._lock:
143 with self._lock:
144 self._inputcomplete = True
144 self._inputcomplete = True
145 self._serviceable.set()
145 self._serviceable.set()
146
146
147 def _onresponsedata(self, data):
147 def _onresponsedata(self, data):
148 available, readcount, wanted = self._decoder.decode(data)
148 available, readcount, wanted = self._decoder.decode(data)
149
149
150 if not available:
150 if not available:
151 return
151 return
152
152
153 with self._lock:
153 with self._lock:
154 for o in self._decoder.getavailable():
154 for o in self._decoder.getavailable():
155 if not self._seeninitial and not self.fromredirect:
155 if not self._seeninitial and not self.fromredirect:
156 self._handleinitial(o)
156 self._handleinitial(o)
157 continue
157 continue
158
158
159 # We should never see an object after a content redirect,
159 # We should never see an object after a content redirect,
160 # as the spec says the main status object containing the
160 # as the spec says the main status object containing the
161 # content redirect is the only object in the stream. Fail
161 # content redirect is the only object in the stream. Fail
162 # if we see a misbehaving server.
162 # if we see a misbehaving server.
163 if self._redirect:
163 if self._redirect:
164 raise error.Abort(_('received unexpected response data '
164 raise error.Abort(_('received unexpected response data '
165 'after content redirect; the remote is '
165 'after content redirect; the remote is '
166 'buggy'))
166 'buggy'))
167
167
168 self._pendingevents.append(o)
168 self._pendingevents.append(o)
169
169
170 self._serviceable.set()
170 self._serviceable.set()
171
171
172 def _handleinitial(self, o):
172 def _handleinitial(self, o):
173 self._seeninitial = True
173 self._seeninitial = True
174 if o[b'status'] == b'ok':
174 if o[b'status'] == b'ok':
175 return
175 return
176
176
177 elif o[b'status'] == b'redirect':
177 elif o[b'status'] == b'redirect':
178 l = o[b'location']
178 l = o[b'location']
179 self._redirect = wireprototypes.alternatelocationresponse(
179 self._redirect = wireprototypes.alternatelocationresponse(
180 url=l[b'url'],
180 url=l[b'url'],
181 mediatype=l[b'mediatype'],
181 mediatype=l[b'mediatype'],
182 size=l.get(b'size'),
182 size=l.get(b'size'),
183 fullhashes=l.get(b'fullhashes'),
183 fullhashes=l.get(b'fullhashes'),
184 fullhashseed=l.get(b'fullhashseed'),
184 fullhashseed=l.get(b'fullhashseed'),
185 serverdercerts=l.get(b'serverdercerts'),
185 serverdercerts=l.get(b'serverdercerts'),
186 servercadercerts=l.get(b'servercadercerts'))
186 servercadercerts=l.get(b'servercadercerts'))
187 return
187 return
188
188
189 atoms = [{'msg': o[b'error'][b'message']}]
189 atoms = [{'msg': o[b'error'][b'message']}]
190 if b'args' in o[b'error']:
190 if b'args' in o[b'error']:
191 atoms[0]['args'] = o[b'error'][b'args']
191 atoms[0]['args'] = o[b'error'][b'args']
192
192
193 raise error.RepoError(formatrichmessage(atoms))
193 raise error.RepoError(formatrichmessage(atoms))
194
194
195 def objects(self):
195 def objects(self):
196 """Obtained decoded objects from this response.
196 """Obtained decoded objects from this response.
197
197
198 This is a generator of data structures that were decoded from the
198 This is a generator of data structures that were decoded from the
199 command response.
199 command response.
200
200
201 Obtaining the next member of the generator may block due to waiting
201 Obtaining the next member of the generator may block due to waiting
202 on external data to become available.
202 on external data to become available.
203
203
204 If the server encountered an error in the middle of serving the data
204 If the server encountered an error in the middle of serving the data
205 or if another error occurred, an exception may be raised when
205 or if another error occurred, an exception may be raised when
206 advancing the generator.
206 advancing the generator.
207 """
207 """
208 while True:
208 while True:
209 # TODO this can infinite loop if self._inputcomplete is never
209 # TODO this can infinite loop if self._inputcomplete is never
210 # set. We likely want to tie the lifetime of this object/state
210 # set. We likely want to tie the lifetime of this object/state
211 # to that of the background thread receiving frames and updating
211 # to that of the background thread receiving frames and updating
212 # our state.
212 # our state.
213 self._serviceable.wait(1.0)
213 self._serviceable.wait(1.0)
214
214
215 with self._lock:
215 with self._lock:
216 self._serviceable.clear()
216 self._serviceable.clear()
217
217
218 # Make copies because objects could be mutated during
218 # Make copies because objects could be mutated during
219 # iteration.
219 # iteration.
220 stop = self._inputcomplete
220 stop = self._inputcomplete
221 pending = list(self._pendingevents)
221 pending = list(self._pendingevents)
222 self._pendingevents[:] = []
222 self._pendingevents[:] = []
223
223
224 for o in pending:
224 for o in pending:
225 yield o
225 yield o
226
226
227 if stop:
227 if stop:
228 break
228 break
229
229
230 class clienthandler(object):
230 class clienthandler(object):
231 """Object to handle higher-level client activities.
231 """Object to handle higher-level client activities.
232
232
233 The ``clientreactor`` is used to hold low-level state about the frame-based
233 The ``clientreactor`` is used to hold low-level state about the frame-based
234 protocol, such as which requests and streams are active. This type is used
234 protocol, such as which requests and streams are active. This type is used
235 for higher-level operations, such as reading frames from a socket, exposing
235 for higher-level operations, such as reading frames from a socket, exposing
236 and managing a higher-level primitive for representing command responses,
236 and managing a higher-level primitive for representing command responses,
237 etc. This class is what peers should probably use to bridge wire activity
237 etc. This class is what peers should probably use to bridge wire activity
238 with the higher-level peer API.
238 with the higher-level peer API.
239 """
239 """
240
240
241 def __init__(self, ui, clientreactor, opener=None,
241 def __init__(self, ui, clientreactor, opener=None,
242 requestbuilder=util.urlreq.request):
242 requestbuilder=util.urlreq.request):
243 self._ui = ui
243 self._ui = ui
244 self._reactor = clientreactor
244 self._reactor = clientreactor
245 self._requests = {}
245 self._requests = {}
246 self._futures = {}
246 self._futures = {}
247 self._responses = {}
247 self._responses = {}
248 self._redirects = []
248 self._redirects = []
249 self._frameseof = False
249 self._frameseof = False
250 self._opener = opener or urlmod.opener(ui)
250 self._opener = opener or urlmod.opener(ui)
251 self._requestbuilder = requestbuilder
251 self._requestbuilder = requestbuilder
252
252
253 def callcommand(self, command, args, f, redirect=None):
253 def callcommand(self, command, args, f, redirect=None):
254 """Register a request to call a command.
254 """Register a request to call a command.
255
255
256 Returns an iterable of frames that should be sent over the wire.
256 Returns an iterable of frames that should be sent over the wire.
257 """
257 """
258 request, action, meta = self._reactor.callcommand(command, args,
258 request, action, meta = self._reactor.callcommand(command, args,
259 redirect=redirect)
259 redirect=redirect)
260
260
261 if action != 'noop':
261 if action != 'noop':
262 raise error.ProgrammingError('%s not yet supported' % action)
262 raise error.ProgrammingError('%s not yet supported' % action)
263
263
264 rid = request.requestid
264 rid = request.requestid
265 self._requests[rid] = request
265 self._requests[rid] = request
266 self._futures[rid] = f
266 self._futures[rid] = f
267 # TODO we need some kind of lifetime on response instances otherwise
267 # TODO we need some kind of lifetime on response instances otherwise
268 # objects() may deadlock.
268 # objects() may deadlock.
269 self._responses[rid] = commandresponse(rid, command)
269 self._responses[rid] = commandresponse(rid, command)
270
270
271 return iter(())
271 return iter(())
272
272
273 def flushcommands(self):
273 def flushcommands(self):
274 """Flush all queued commands.
274 """Flush all queued commands.
275
275
276 Returns an iterable of frames that should be sent over the wire.
276 Returns an iterable of frames that should be sent over the wire.
277 """
277 """
278 action, meta = self._reactor.flushcommands()
278 action, meta = self._reactor.flushcommands()
279
279
280 if action != 'sendframes':
280 if action != 'sendframes':
281 raise error.ProgrammingError('%s not yet supported' % action)
281 raise error.ProgrammingError('%s not yet supported' % action)
282
282
283 return meta['framegen']
283 return meta['framegen']
284
284
285 def readdata(self, framefh):
285 def readdata(self, framefh):
286 """Attempt to read data and do work.
286 """Attempt to read data and do work.
287
287
288 Returns None if no data was read. Presumably this means we're
288 Returns None if no data was read. Presumably this means we're
289 done with all read I/O.
289 done with all read I/O.
290 """
290 """
291 if not self._frameseof:
291 if not self._frameseof:
292 frame = wireprotoframing.readframe(framefh)
292 frame = wireprotoframing.readframe(framefh)
293 if frame is None:
293 if frame is None:
294 # TODO tell reactor?
294 # TODO tell reactor?
295 self._frameseof = True
295 self._frameseof = True
296 else:
296 else:
297 self._ui.note(_('received %r\n') % frame)
297 self._ui.note(_('received %r\n') % frame)
298 self._processframe(frame)
298 self._processframe(frame)
299
299
300 # Also try to read the first redirect.
300 # Also try to read the first redirect.
301 if self._redirects:
301 if self._redirects:
302 if not self._processredirect(*self._redirects[0]):
302 if not self._processredirect(*self._redirects[0]):
303 self._redirects.pop(0)
303 self._redirects.pop(0)
304
304
305 if self._frameseof and not self._redirects:
305 if self._frameseof and not self._redirects:
306 return None
306 return None
307
307
308 return True
308 return True
309
309
310 def _processframe(self, frame):
310 def _processframe(self, frame):
311 """Process a single read frame."""
311 """Process a single read frame."""
312
312
313 action, meta = self._reactor.onframerecv(frame)
313 action, meta = self._reactor.onframerecv(frame)
314
314
315 if action == 'error':
315 if action == 'error':
316 e = error.RepoError(meta['message'])
316 e = error.RepoError(meta['message'])
317
317
318 if frame.requestid in self._responses:
318 if frame.requestid in self._responses:
319 self._responses[frame.requestid]._oninputcomplete()
319 self._responses[frame.requestid]._oninputcomplete()
320
320
321 if frame.requestid in self._futures:
321 if frame.requestid in self._futures:
322 self._futures[frame.requestid].set_exception(e)
322 self._futures[frame.requestid].set_exception(e)
323 del self._futures[frame.requestid]
323 del self._futures[frame.requestid]
324 else:
324 else:
325 raise e
325 raise e
326
326
327 return
327 return
328 elif action == 'noop':
329 return
328
330
329 if frame.requestid not in self._requests:
331 if frame.requestid not in self._requests:
330 raise error.ProgrammingError(
332 raise error.ProgrammingError(
331 'received frame for unknown request; this is either a bug in '
333 'received frame for unknown request; this is either a bug in '
332 'the clientreactor not screening for this or this instance was '
334 'the clientreactor not screening for this or this instance was '
333 'never told about this request: %r' % frame)
335 'never told about this request: %r' % frame)
334
336
335 response = self._responses[frame.requestid]
337 response = self._responses[frame.requestid]
336
338
337 if action == 'responsedata':
339 if action == 'responsedata':
338 # Any failures processing this frame should bubble up to the
340 # Any failures processing this frame should bubble up to the
339 # future tracking the request.
341 # future tracking the request.
340 try:
342 try:
341 self._processresponsedata(frame, meta, response)
343 self._processresponsedata(frame, meta, response)
342 except BaseException as e:
344 except BaseException as e:
343 self._futures[frame.requestid].set_exception(e)
345 self._futures[frame.requestid].set_exception(e)
344 del self._futures[frame.requestid]
346 del self._futures[frame.requestid]
345 response._oninputcomplete()
347 response._oninputcomplete()
346 else:
348 else:
347 raise error.ProgrammingError(
349 raise error.ProgrammingError(
348 'unhandled action from clientreactor: %s' % action)
350 'unhandled action from clientreactor: %s' % action)
349
351
350 def _processresponsedata(self, frame, meta, response):
352 def _processresponsedata(self, frame, meta, response):
351 # This can raise. The caller can handle it.
353 # This can raise. The caller can handle it.
352 response._onresponsedata(meta['data'])
354 response._onresponsedata(meta['data'])
353
355
354 # If we got a content redirect response, we want to fetch it and
356 # If we got a content redirect response, we want to fetch it and
355 # expose the data as if we received it inline. But we also want to
357 # expose the data as if we received it inline. But we also want to
356 # keep our internal request accounting in order. Our strategy is to
358 # keep our internal request accounting in order. Our strategy is to
357 # basically put meaningful response handling on pause until EOS occurs
359 # basically put meaningful response handling on pause until EOS occurs
358 # and the stream accounting is in a good state. At that point, we follow
360 # and the stream accounting is in a good state. At that point, we follow
359 # the redirect and replace the response object with its data.
361 # the redirect and replace the response object with its data.
360
362
361 redirect = response._redirect
363 redirect = response._redirect
362 handlefuture = False if redirect else True
364 handlefuture = False if redirect else True
363
365
364 if meta['eos']:
366 if meta['eos']:
365 response._oninputcomplete()
367 response._oninputcomplete()
366 del self._requests[frame.requestid]
368 del self._requests[frame.requestid]
367
369
368 if redirect:
370 if redirect:
369 self._followredirect(frame.requestid, redirect)
371 self._followredirect(frame.requestid, redirect)
370 return
372 return
371
373
372 if not handlefuture:
374 if not handlefuture:
373 return
375 return
374
376
375 # If the command has a decoder, we wait until all input has been
377 # If the command has a decoder, we wait until all input has been
376 # received before resolving the future. Otherwise we resolve the
378 # received before resolving the future. Otherwise we resolve the
377 # future immediately.
379 # future immediately.
378 if frame.requestid not in self._futures:
380 if frame.requestid not in self._futures:
379 return
381 return
380
382
381 if response.command not in COMMAND_DECODERS:
383 if response.command not in COMMAND_DECODERS:
382 self._futures[frame.requestid].set_result(response.objects())
384 self._futures[frame.requestid].set_result(response.objects())
383 del self._futures[frame.requestid]
385 del self._futures[frame.requestid]
384 elif response._inputcomplete:
386 elif response._inputcomplete:
385 decoded = COMMAND_DECODERS[response.command](response.objects())
387 decoded = COMMAND_DECODERS[response.command](response.objects())
386 self._futures[frame.requestid].set_result(decoded)
388 self._futures[frame.requestid].set_result(decoded)
387 del self._futures[frame.requestid]
389 del self._futures[frame.requestid]
388
390
389 def _followredirect(self, requestid, redirect):
391 def _followredirect(self, requestid, redirect):
390 """Called to initiate redirect following for a request."""
392 """Called to initiate redirect following for a request."""
391 self._ui.note(_('(following redirect to %s)\n') % redirect.url)
393 self._ui.note(_('(following redirect to %s)\n') % redirect.url)
392
394
393 # TODO handle framed responses.
395 # TODO handle framed responses.
394 if redirect.mediatype != b'application/mercurial-cbor':
396 if redirect.mediatype != b'application/mercurial-cbor':
395 raise error.Abort(_('cannot handle redirects for the %s media type')
397 raise error.Abort(_('cannot handle redirects for the %s media type')
396 % redirect.mediatype)
398 % redirect.mediatype)
397
399
398 if redirect.fullhashes:
400 if redirect.fullhashes:
399 self._ui.warn(_('(support for validating hashes on content '
401 self._ui.warn(_('(support for validating hashes on content '
400 'redirects not supported)\n'))
402 'redirects not supported)\n'))
401
403
402 if redirect.serverdercerts or redirect.servercadercerts:
404 if redirect.serverdercerts or redirect.servercadercerts:
403 self._ui.warn(_('(support for pinning server certificates on '
405 self._ui.warn(_('(support for pinning server certificates on '
404 'content redirects not supported)\n'))
406 'content redirects not supported)\n'))
405
407
406 headers = {
408 headers = {
407 r'Accept': redirect.mediatype,
409 r'Accept': redirect.mediatype,
408 }
410 }
409
411
410 req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
412 req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
411
413
412 try:
414 try:
413 res = self._opener.open(req)
415 res = self._opener.open(req)
414 except util.urlerr.httperror as e:
416 except util.urlerr.httperror as e:
415 if e.code == 401:
417 if e.code == 401:
416 raise error.Abort(_('authorization failed'))
418 raise error.Abort(_('authorization failed'))
417 raise
419 raise
418 except util.httplib.HTTPException as e:
420 except util.httplib.HTTPException as e:
419 self._ui.debug('http error requesting %s\n' % req.get_full_url())
421 self._ui.debug('http error requesting %s\n' % req.get_full_url())
420 self._ui.traceback()
422 self._ui.traceback()
421 raise IOError(None, e)
423 raise IOError(None, e)
422
424
423 urlmod.wrapresponse(res)
425 urlmod.wrapresponse(res)
424
426
425 # The existing response object is associated with frame data. Rather
427 # The existing response object is associated with frame data. Rather
426 # than try to normalize its state, just create a new object.
428 # than try to normalize its state, just create a new object.
427 oldresponse = self._responses[requestid]
429 oldresponse = self._responses[requestid]
428 self._responses[requestid] = commandresponse(requestid,
430 self._responses[requestid] = commandresponse(requestid,
429 oldresponse.command,
431 oldresponse.command,
430 fromredirect=True)
432 fromredirect=True)
431
433
432 self._redirects.append((requestid, res))
434 self._redirects.append((requestid, res))
433
435
434 def _processredirect(self, rid, res):
436 def _processredirect(self, rid, res):
435 """Called to continue processing a response from a redirect."""
437 """Called to continue processing a response from a redirect."""
436 response = self._responses[rid]
438 response = self._responses[rid]
437
439
438 try:
440 try:
439 data = res.read(32768)
441 data = res.read(32768)
440 response._onresponsedata(data)
442 response._onresponsedata(data)
441
443
442 # We're at end of stream.
444 # We're at end of stream.
443 if not data:
445 if not data:
444 response._oninputcomplete()
446 response._oninputcomplete()
445
447
446 if rid not in self._futures:
448 if rid not in self._futures:
447 return
449 return
448
450
449 if response.command not in COMMAND_DECODERS:
451 if response.command not in COMMAND_DECODERS:
450 self._futures[rid].set_result(response.objects())
452 self._futures[rid].set_result(response.objects())
451 del self._futures[rid]
453 del self._futures[rid]
452 elif response._inputcomplete:
454 elif response._inputcomplete:
453 decoded = COMMAND_DECODERS[response.command](response.objects())
455 decoded = COMMAND_DECODERS[response.command](response.objects())
454 self._futures[rid].set_result(decoded)
456 self._futures[rid].set_result(decoded)
455 del self._futures[rid]
457 del self._futures[rid]
456
458
457 return bool(data)
459 return bool(data)
458
460
459 except BaseException as e:
461 except BaseException as e:
460 self._futures[rid].set_exception(e)
462 self._futures[rid].set_exception(e)
461 del self._futures[rid]
463 del self._futures[rid]
462 response._oninputcomplete()
464 response._oninputcomplete()
463 return False
465 return False
464
466
465 def decodebranchmap(objs):
467 def decodebranchmap(objs):
466 # Response should be a single CBOR map of branch name to array of nodes.
468 # Response should be a single CBOR map of branch name to array of nodes.
467 bm = next(objs)
469 bm = next(objs)
468
470
469 return {encoding.tolocal(k): v for k, v in bm.items()}
471 return {encoding.tolocal(k): v for k, v in bm.items()}
470
472
471 def decodeheads(objs):
473 def decodeheads(objs):
472 # Array of node bytestrings.
474 # Array of node bytestrings.
473 return next(objs)
475 return next(objs)
474
476
475 def decodeknown(objs):
477 def decodeknown(objs):
476 # Bytestring where each byte is a 0 or 1.
478 # Bytestring where each byte is a 0 or 1.
477 raw = next(objs)
479 raw = next(objs)
478
480
479 return [True if c == '1' else False for c in raw]
481 return [True if c == '1' else False for c in raw]
480
482
481 def decodelistkeys(objs):
483 def decodelistkeys(objs):
482 # Map with bytestring keys and values.
484 # Map with bytestring keys and values.
483 return next(objs)
485 return next(objs)
484
486
485 def decodelookup(objs):
487 def decodelookup(objs):
486 return next(objs)
488 return next(objs)
487
489
488 def decodepushkey(objs):
490 def decodepushkey(objs):
489 return next(objs)
491 return next(objs)
490
492
491 COMMAND_DECODERS = {
493 COMMAND_DECODERS = {
492 'branchmap': decodebranchmap,
494 'branchmap': decodebranchmap,
493 'heads': decodeheads,
495 'heads': decodeheads,
494 'known': decodeknown,
496 'known': decodeknown,
495 'listkeys': decodelistkeys,
497 'listkeys': decodelistkeys,
496 'lookup': decodelookup,
498 'lookup': decodelookup,
497 'pushkey': decodepushkey,
499 'pushkey': decodepushkey,
498 }
500 }
General Comments 0
You need to be logged in to leave comments. Login now