Show More
@@ -3240,7 +3240,7 b' def debugwireproto(ui, repo, path=None, ' | |||
|
3240 | 3240 | res = e.callcommand(command, args).result() |
|
3241 | 3241 | |
|
3242 | 3242 | if isinstance(res, wireprotov2peer.commandresponse): |
|
3243 |
val = |
|
|
3243 | val = res.objects() | |
|
3244 | 3244 | ui.status(_('response: %s\n') % |
|
3245 | 3245 | stringutil.pprint(val, bprefix=True, indent=2)) |
|
3246 | 3246 | else: |
@@ -7,11 +7,12 b'' | |||
|
7 | 7 | |
|
8 | 8 | from __future__ import absolute_import |
|
9 | 9 | |
|
10 | import threading | |
|
11 | ||
|
10 | 12 | from .i18n import _ |
|
11 | 13 | from . import ( |
|
12 | 14 | encoding, |
|
13 | 15 | error, |
|
14 | util, | |
|
15 | 16 | wireprotoframing, |
|
16 | 17 | ) |
|
17 | 18 | from .utils import ( |
@@ -34,20 +35,101 b' def formatrichmessage(atoms):' | |||
|
34 | 35 | return b''.join(chunks) |
|
35 | 36 | |
|
36 | 37 | class commandresponse(object): |
|
37 |
"""Represents the response to a command request. |
|
|
38 | """Represents the response to a command request. | |
|
39 | ||
|
40 | Instances track the state of the command and hold its results. | |
|
41 | ||
|
42 | An external entity is required to update the state of the object when | |
|
43 | events occur. | |
|
44 | """ | |
|
38 | 45 | |
|
39 | 46 | def __init__(self, requestid, command): |
|
40 | 47 | self.requestid = requestid |
|
41 | 48 | self.command = command |
|
42 | 49 | |
|
43 | self.b = util.bytesio() | |
|
50 | # Whether all remote input related to this command has been | |
|
51 | # received. | |
|
52 | self._inputcomplete = False | |
|
53 | ||
|
54 | # We have a lock that is acquired when important object state is | |
|
55 | # mutated. This is to prevent race conditions between 1 thread | |
|
56 | # sending us new data and another consuming it. | |
|
57 | self._lock = threading.RLock() | |
|
58 | ||
|
59 | # An event is set when state of the object changes. This event | |
|
60 | # is waited on by the generator emitting objects. | |
|
61 | self._serviceable = threading.Event() | |
|
62 | ||
|
63 | self._pendingevents = [] | |
|
64 | self._decoder = cborutil.bufferingdecoder() | |
|
65 | self._seeninitial = False | |
|
66 | ||
|
67 | def _oninputcomplete(self): | |
|
68 | with self._lock: | |
|
69 | self._inputcomplete = True | |
|
70 | self._serviceable.set() | |
|
71 | ||
|
72 | def _onresponsedata(self, data): | |
|
73 | available, readcount, wanted = self._decoder.decode(data) | |
|
74 | ||
|
75 | if not available: | |
|
76 | return | |
|
77 | ||
|
78 | with self._lock: | |
|
79 | for o in self._decoder.getavailable(): | |
|
80 | if not self._seeninitial: | |
|
81 | self._handleinitial(o) | |
|
82 | continue | |
|
83 | ||
|
84 | self._pendingevents.append(o) | |
|
85 | ||
|
86 | self._serviceable.set() | |
|
44 | 87 | |
|
45 | def cborobjects(self): | |
|
46 | """Obtain decoded CBOR objects from this response.""" | |
|
47 | self.b.seek(0) | |
|
88 | def _handleinitial(self, o): | |
|
89 | self._seeninitial = True | |
|
90 | if o[b'status'] == 'ok': | |
|
91 | return | |
|
92 | ||
|
93 | atoms = [{'msg': o[b'error'][b'message']}] | |
|
94 | if b'args' in o[b'error']: | |
|
95 | atoms[0]['args'] = o[b'error'][b'args'] | |
|
96 | ||
|
97 | raise error.RepoError(formatrichmessage(atoms)) | |
|
98 | ||
|
99 | def objects(self): | |
|
100 | """Obtained decoded objects from this response. | |
|
101 | ||
|
102 | This is a generator of data structures that were decoded from the | |
|
103 | command response. | |
|
104 | ||
|
105 | Obtaining the next member of the generator may block due to waiting | |
|
106 | on external data to become available. | |
|
48 | 107 |
|
|
49 | for v in cborutil.decodeall(self.b.getvalue()): | |
|
50 | yield v | |
|
108 | If the server encountered an error in the middle of serving the data | |
|
109 | or if another error occurred, an exception may be raised when | |
|
110 | advancing the generator. | |
|
111 | """ | |
|
112 | while True: | |
|
113 | # TODO this can infinite loop if self._inputcomplete is never | |
|
114 | # set. We likely want to tie the lifetime of this object/state | |
|
115 | # to that of the background thread receiving frames and updating | |
|
116 | # our state. | |
|
117 | self._serviceable.wait(1.0) | |
|
118 | ||
|
119 | with self._lock: | |
|
120 | self._serviceable.clear() | |
|
121 | ||
|
122 | # Make copies because objects could be mutated during | |
|
123 | # iteration. | |
|
124 | stop = self._inputcomplete | |
|
125 | pending = list(self._pendingevents) | |
|
126 | self._pendingevents[:] = [] | |
|
127 | ||
|
128 | for o in pending: | |
|
129 | yield o | |
|
130 | ||
|
131 | if stop: | |
|
132 | break | |
|
51 | 133 | |
|
52 | 134 | class clienthandler(object): |
|
53 | 135 | """Object to handle higher-level client activities. |
@@ -80,6 +162,8 b' class clienthandler(object):' | |||
|
80 | 162 | rid = request.requestid |
|
81 | 163 | self._requests[rid] = request |
|
82 | 164 | self._futures[rid] = f |
|
165 | # TODO we need some kind of lifetime on response instances otherwise | |
|
166 | # objects() may deadlock. | |
|
83 | 167 | self._responses[rid] = commandresponse(rid, command) |
|
84 | 168 | |
|
85 | 169 | return iter(()) |
@@ -119,8 +203,12 b' class clienthandler(object):' | |||
|
119 | 203 | if action == 'error': |
|
120 | 204 | e = error.RepoError(meta['message']) |
|
121 | 205 | |
|
206 | if frame.requestid in self._responses: | |
|
207 | self._responses[frame.requestid]._oninputcomplete() | |
|
208 | ||
|
122 | 209 | if frame.requestid in self._futures: |
|
123 | 210 | self._futures[frame.requestid].set_exception(e) |
|
211 | del self._futures[frame.requestid] | |
|
124 | 212 | else: |
|
125 | 213 | raise e |
|
126 | 214 | |
@@ -141,39 +229,32 b' class clienthandler(object):' | |||
|
141 | 229 | self._processresponsedata(frame, meta, response) |
|
142 | 230 | except BaseException as e: |
|
143 | 231 | self._futures[frame.requestid].set_exception(e) |
|
232 | del self._futures[frame.requestid] | |
|
233 | response._oninputcomplete() | |
|
144 | 234 | else: |
|
145 | 235 | raise error.ProgrammingError( |
|
146 | 236 | 'unhandled action from clientreactor: %s' % action) |
|
147 | 237 | |
|
148 | 238 | def _processresponsedata(self, frame, meta, response): |
|
149 | # This buffers all data until end of stream is received. This | |
|
150 | # is bad for performance. | |
|
151 | # TODO make response data streamable | |
|
152 | response.b.write(meta['data']) | |
|
239 | # This can raise. The caller can handle it. | |
|
240 | response._onresponsedata(meta['data']) | |
|
153 | 241 | |
|
154 | 242 | if meta['eos']: |
|
155 | # If the command has a decoder, resolve the future to the | |
|
156 | # decoded value. Otherwise resolve to the rich response object. | |
|
157 | decoder = COMMAND_DECODERS.get(response.command) | |
|
158 | ||
|
159 | # TODO consider always resolving the overall status map. | |
|
160 | if decoder: | |
|
161 | objs = response.cborobjects() | |
|
162 | ||
|
163 | overall = next(objs) | |
|
243 | response._oninputcomplete() | |
|
244 | del self._requests[frame.requestid] | |
|
164 | 245 | |
|
165 | if overall['status'] == 'ok': | |
|
166 | self._futures[frame.requestid].set_result(decoder(objs)) | |
|
167 | else: | |
|
168 | atoms = [{'msg': overall['error']['message']}] | |
|
169 | if 'args' in overall['error']: | |
|
170 | atoms[0]['args'] = overall['error']['args'] | |
|
171 | e = error.RepoError(formatrichmessage(atoms)) | |
|
172 | self._futures[frame.requestid].set_exception(e) | |
|
173 | else: | |
|
174 | self._futures[frame.requestid].set_result(response) | |
|
246 | # If the command has a decoder, we wait until all input has been | |
|
247 | # received before resolving the future. Otherwise we resolve the | |
|
248 | # future immediately. | |
|
249 | if frame.requestid not in self._futures: | |
|
250 | return | |
|
175 | 251 | |
|
176 | del self._requests[frame.requestid] | |
|
252 | if response.command not in COMMAND_DECODERS: | |
|
253 | self._futures[frame.requestid].set_result(response.objects()) | |
|
254 | del self._futures[frame.requestid] | |
|
255 | elif response._inputcomplete: | |
|
256 | decoded = COMMAND_DECODERS[response.command](response.objects()) | |
|
257 | self._futures[frame.requestid].set_result(decoded) | |
|
177 | 258 | del self._futures[frame.requestid] |
|
178 | 259 | |
|
179 | 260 | def decodebranchmap(objs): |
@@ -225,10 +225,7 b' Request to read-only command works out o' | |||
|
225 | 225 | s> 0\r\n |
|
226 | 226 | s> \r\n |
|
227 | 227 | received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) |
|
228 | response: [ | |
|
229 | { | |
|
230 | b'status': b'ok' | |
|
231 | }, | |
|
228 | response: gen[ | |
|
232 | 229 | b'customreadonly bytes response' |
|
233 | 230 | ] |
|
234 | 231 |
@@ -349,10 +349,7 b' capabilities command returns expected in' | |||
|
349 | 349 | s> 0\r\n |
|
350 | 350 | s> \r\n |
|
351 | 351 | received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos) |
|
352 | response: [ | |
|
353 | { | |
|
354 | b'status': b'ok' | |
|
355 | }, | |
|
352 | response: gen[ | |
|
356 | 353 | { |
|
357 | 354 | b'commands': { |
|
358 | 355 | b'branchmap': { |
General Comments 0
You need to be logged in to leave comments.
Login now