##// END OF EJS Templates
wireprotov2peer: stream decoded responses...
Gregory Szorc -
r39597:d06834e0 default
parent child Browse files
Show More
@@ -3240,7 +3240,7 b' def debugwireproto(ui, repo, path=None, '
3240 3240 res = e.callcommand(command, args).result()
3241 3241
3242 3242 if isinstance(res, wireprotov2peer.commandresponse):
3243 val = list(res.cborobjects())
3243 val = res.objects()
3244 3244 ui.status(_('response: %s\n') %
3245 3245 stringutil.pprint(val, bprefix=True, indent=2))
3246 3246 else:
@@ -7,11 +7,12 b''
7 7
8 8 from __future__ import absolute_import
9 9
10 import threading
11
10 12 from .i18n import _
11 13 from . import (
12 14 encoding,
13 15 error,
14 util,
15 16 wireprotoframing,
16 17 )
17 18 from .utils import (
@@ -34,20 +35,101 b' def formatrichmessage(atoms):'
34 35 return b''.join(chunks)
35 36
36 37 class commandresponse(object):
37 """Represents the response to a command request."""
38 """Represents the response to a command request.
39
40 Instances track the state of the command and hold its results.
41
42 An external entity is required to update the state of the object when
43 events occur.
44 """
38 45
39 46 def __init__(self, requestid, command):
40 47 self.requestid = requestid
41 48 self.command = command
42 49
43 self.b = util.bytesio()
50 # Whether all remote input related to this command has been
51 # received.
52 self._inputcomplete = False
53
54 # We have a lock that is acquired when important object state is
55 # mutated. This is to prevent race conditions between 1 thread
56 # sending us new data and another consuming it.
57 self._lock = threading.RLock()
58
59 # An event is set when state of the object changes. This event
60 # is waited on by the generator emitting objects.
61 self._serviceable = threading.Event()
62
63 self._pendingevents = []
64 self._decoder = cborutil.bufferingdecoder()
65 self._seeninitial = False
66
67 def _oninputcomplete(self):
68 with self._lock:
69 self._inputcomplete = True
70 self._serviceable.set()
71
72 def _onresponsedata(self, data):
73 available, readcount, wanted = self._decoder.decode(data)
74
75 if not available:
76 return
77
78 with self._lock:
79 for o in self._decoder.getavailable():
80 if not self._seeninitial:
81 self._handleinitial(o)
82 continue
83
84 self._pendingevents.append(o)
85
86 self._serviceable.set()
44 87
45 def cborobjects(self):
46 """Obtain decoded CBOR objects from this response."""
47 self.b.seek(0)
88 def _handleinitial(self, o):
89 self._seeninitial = True
90 if o[b'status'] == 'ok':
91 return
92
93 atoms = [{'msg': o[b'error'][b'message']}]
94 if b'args' in o[b'error']:
95 atoms[0]['args'] = o[b'error'][b'args']
96
97 raise error.RepoError(formatrichmessage(atoms))
98
99 def objects(self):
100 """Obtained decoded objects from this response.
101
102 This is a generator of data structures that were decoded from the
103 command response.
104
105 Obtaining the next member of the generator may block due to waiting
106 on external data to become available.
48 107
49 for v in cborutil.decodeall(self.b.getvalue()):
50 yield v
108 If the server encountered an error in the middle of serving the data
109 or if another error occurred, an exception may be raised when
110 advancing the generator.
111 """
112 while True:
113 # TODO this can infinite loop if self._inputcomplete is never
114 # set. We likely want to tie the lifetime of this object/state
115 # to that of the background thread receiving frames and updating
116 # our state.
117 self._serviceable.wait(1.0)
118
119 with self._lock:
120 self._serviceable.clear()
121
122 # Make copies because objects could be mutated during
123 # iteration.
124 stop = self._inputcomplete
125 pending = list(self._pendingevents)
126 self._pendingevents[:] = []
127
128 for o in pending:
129 yield o
130
131 if stop:
132 break
51 133
52 134 class clienthandler(object):
53 135 """Object to handle higher-level client activities.
@@ -80,6 +162,8 b' class clienthandler(object):'
80 162 rid = request.requestid
81 163 self._requests[rid] = request
82 164 self._futures[rid] = f
165 # TODO we need some kind of lifetime on response instances otherwise
166 # objects() may deadlock.
83 167 self._responses[rid] = commandresponse(rid, command)
84 168
85 169 return iter(())
@@ -119,8 +203,12 b' class clienthandler(object):'
119 203 if action == 'error':
120 204 e = error.RepoError(meta['message'])
121 205
206 if frame.requestid in self._responses:
207 self._responses[frame.requestid]._oninputcomplete()
208
122 209 if frame.requestid in self._futures:
123 210 self._futures[frame.requestid].set_exception(e)
211 del self._futures[frame.requestid]
124 212 else:
125 213 raise e
126 214
@@ -141,39 +229,32 b' class clienthandler(object):'
141 229 self._processresponsedata(frame, meta, response)
142 230 except BaseException as e:
143 231 self._futures[frame.requestid].set_exception(e)
232 del self._futures[frame.requestid]
233 response._oninputcomplete()
144 234 else:
145 235 raise error.ProgrammingError(
146 236 'unhandled action from clientreactor: %s' % action)
147 237
148 238 def _processresponsedata(self, frame, meta, response):
149 # This buffers all data until end of stream is received. This
150 # is bad for performance.
151 # TODO make response data streamable
152 response.b.write(meta['data'])
239 # This can raise. The caller can handle it.
240 response._onresponsedata(meta['data'])
153 241
154 242 if meta['eos']:
155 # If the command has a decoder, resolve the future to the
156 # decoded value. Otherwise resolve to the rich response object.
157 decoder = COMMAND_DECODERS.get(response.command)
158
159 # TODO consider always resolving the overall status map.
160 if decoder:
161 objs = response.cborobjects()
162
163 overall = next(objs)
243 response._oninputcomplete()
244 del self._requests[frame.requestid]
164 245
165 if overall['status'] == 'ok':
166 self._futures[frame.requestid].set_result(decoder(objs))
167 else:
168 atoms = [{'msg': overall['error']['message']}]
169 if 'args' in overall['error']:
170 atoms[0]['args'] = overall['error']['args']
171 e = error.RepoError(formatrichmessage(atoms))
172 self._futures[frame.requestid].set_exception(e)
173 else:
174 self._futures[frame.requestid].set_result(response)
246 # If the command has a decoder, we wait until all input has been
247 # received before resolving the future. Otherwise we resolve the
248 # future immediately.
249 if frame.requestid not in self._futures:
250 return
175 251
176 del self._requests[frame.requestid]
252 if response.command not in COMMAND_DECODERS:
253 self._futures[frame.requestid].set_result(response.objects())
254 del self._futures[frame.requestid]
255 elif response._inputcomplete:
256 decoded = COMMAND_DECODERS[response.command](response.objects())
257 self._futures[frame.requestid].set_result(decoded)
177 258 del self._futures[frame.requestid]
178 259
179 260 def decodebranchmap(objs):
@@ -225,10 +225,7 b' Request to read-only command works out o'
225 225 s> 0\r\n
226 226 s> \r\n
227 227 received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
228 response: [
229 {
230 b'status': b'ok'
231 },
228 response: gen[
232 229 b'customreadonly bytes response'
233 230 ]
234 231
@@ -349,10 +349,7 b' capabilities command returns expected in'
349 349 s> 0\r\n
350 350 s> \r\n
351 351 received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
352 response: [
353 {
354 b'status': b'ok'
355 },
352 response: gen[
356 353 {
357 354 b'commands': {
358 355 b'branchmap': {
General Comments 0
You need to be logged in to leave comments. Login now