##// END OF EJS Templates
wireprotov2peer: stream decoded responses...
Gregory Szorc -
r39597:d06834e0 default
parent child Browse files
Show More
@@ -3240,7 +3240,7 b' def debugwireproto(ui, repo, path=None, '
3240 res = e.callcommand(command, args).result()
3240 res = e.callcommand(command, args).result()
3241
3241
3242 if isinstance(res, wireprotov2peer.commandresponse):
3242 if isinstance(res, wireprotov2peer.commandresponse):
3243 val = list(res.cborobjects())
3243 val = res.objects()
3244 ui.status(_('response: %s\n') %
3244 ui.status(_('response: %s\n') %
3245 stringutil.pprint(val, bprefix=True, indent=2))
3245 stringutil.pprint(val, bprefix=True, indent=2))
3246 else:
3246 else:
@@ -7,11 +7,12 b''
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import threading
11
10 from .i18n import _
12 from .i18n import _
11 from . import (
13 from . import (
12 encoding,
14 encoding,
13 error,
15 error,
14 util,
15 wireprotoframing,
16 wireprotoframing,
16 )
17 )
17 from .utils import (
18 from .utils import (
@@ -34,20 +35,101 b' def formatrichmessage(atoms):'
34 return b''.join(chunks)
35 return b''.join(chunks)
35
36
36 class commandresponse(object):
37 class commandresponse(object):
37 """Represents the response to a command request."""
38 """Represents the response to a command request.
39
40 Instances track the state of the command and hold its results.
41
42 An external entity is required to update the state of the object when
43 events occur.
44 """
38
45
39 def __init__(self, requestid, command):
46 def __init__(self, requestid, command):
40 self.requestid = requestid
47 self.requestid = requestid
41 self.command = command
48 self.command = command
42
49
43 self.b = util.bytesio()
50 # Whether all remote input related to this command has been
51 # received.
52 self._inputcomplete = False
53
54 # We have a lock that is acquired when important object state is
55 # mutated. This is to prevent race conditions between 1 thread
56 # sending us new data and another consuming it.
57 self._lock = threading.RLock()
58
59 # An event is set when state of the object changes. This event
60 # is waited on by the generator emitting objects.
61 self._serviceable = threading.Event()
62
63 self._pendingevents = []
64 self._decoder = cborutil.bufferingdecoder()
65 self._seeninitial = False
66
67 def _oninputcomplete(self):
68 with self._lock:
69 self._inputcomplete = True
70 self._serviceable.set()
71
72 def _onresponsedata(self, data):
73 available, readcount, wanted = self._decoder.decode(data)
74
75 if not available:
76 return
77
78 with self._lock:
79 for o in self._decoder.getavailable():
80 if not self._seeninitial:
81 self._handleinitial(o)
82 continue
83
84 self._pendingevents.append(o)
85
86 self._serviceable.set()
44
87
45 def cborobjects(self):
88 def _handleinitial(self, o):
46 """Obtain decoded CBOR objects from this response."""
89 self._seeninitial = True
47 self.b.seek(0)
90 if o[b'status'] == 'ok':
91 return
92
93 atoms = [{'msg': o[b'error'][b'message']}]
94 if b'args' in o[b'error']:
95 atoms[0]['args'] = o[b'error'][b'args']
96
97 raise error.RepoError(formatrichmessage(atoms))
98
99 def objects(self):
100 """Obtained decoded objects from this response.
101
102 This is a generator of data structures that were decoded from the
103 command response.
104
105 Obtaining the next member of the generator may block due to waiting
106 on external data to become available.
48
107
49 for v in cborutil.decodeall(self.b.getvalue()):
108 If the server encountered an error in the middle of serving the data
50 yield v
109 or if another error occurred, an exception may be raised when
110 advancing the generator.
111 """
112 while True:
113 # TODO this can infinite loop if self._inputcomplete is never
114 # set. We likely want to tie the lifetime of this object/state
115 # to that of the background thread receiving frames and updating
116 # our state.
117 self._serviceable.wait(1.0)
118
119 with self._lock:
120 self._serviceable.clear()
121
122 # Make copies because objects could be mutated during
123 # iteration.
124 stop = self._inputcomplete
125 pending = list(self._pendingevents)
126 self._pendingevents[:] = []
127
128 for o in pending:
129 yield o
130
131 if stop:
132 break
51
133
52 class clienthandler(object):
134 class clienthandler(object):
53 """Object to handle higher-level client activities.
135 """Object to handle higher-level client activities.
@@ -80,6 +162,8 b' class clienthandler(object):'
80 rid = request.requestid
162 rid = request.requestid
81 self._requests[rid] = request
163 self._requests[rid] = request
82 self._futures[rid] = f
164 self._futures[rid] = f
165 # TODO we need some kind of lifetime on response instances otherwise
166 # objects() may deadlock.
83 self._responses[rid] = commandresponse(rid, command)
167 self._responses[rid] = commandresponse(rid, command)
84
168
85 return iter(())
169 return iter(())
@@ -119,8 +203,12 b' class clienthandler(object):'
119 if action == 'error':
203 if action == 'error':
120 e = error.RepoError(meta['message'])
204 e = error.RepoError(meta['message'])
121
205
206 if frame.requestid in self._responses:
207 self._responses[frame.requestid]._oninputcomplete()
208
122 if frame.requestid in self._futures:
209 if frame.requestid in self._futures:
123 self._futures[frame.requestid].set_exception(e)
210 self._futures[frame.requestid].set_exception(e)
211 del self._futures[frame.requestid]
124 else:
212 else:
125 raise e
213 raise e
126
214
@@ -141,39 +229,32 b' class clienthandler(object):'
141 self._processresponsedata(frame, meta, response)
229 self._processresponsedata(frame, meta, response)
142 except BaseException as e:
230 except BaseException as e:
143 self._futures[frame.requestid].set_exception(e)
231 self._futures[frame.requestid].set_exception(e)
232 del self._futures[frame.requestid]
233 response._oninputcomplete()
144 else:
234 else:
145 raise error.ProgrammingError(
235 raise error.ProgrammingError(
146 'unhandled action from clientreactor: %s' % action)
236 'unhandled action from clientreactor: %s' % action)
147
237
148 def _processresponsedata(self, frame, meta, response):
238 def _processresponsedata(self, frame, meta, response):
149 # This buffers all data until end of stream is received. This
239 # This can raise. The caller can handle it.
150 # is bad for performance.
240 response._onresponsedata(meta['data'])
151 # TODO make response data streamable
152 response.b.write(meta['data'])
153
241
154 if meta['eos']:
242 if meta['eos']:
155 # If the command has a decoder, resolve the future to the
243 response._oninputcomplete()
156 # decoded value. Otherwise resolve to the rich response object.
244 del self._requests[frame.requestid]
157 decoder = COMMAND_DECODERS.get(response.command)
158
159 # TODO consider always resolving the overall status map.
160 if decoder:
161 objs = response.cborobjects()
162
163 overall = next(objs)
164
245
165 if overall['status'] == 'ok':
246 # If the command has a decoder, we wait until all input has been
166 self._futures[frame.requestid].set_result(decoder(objs))
247 # received before resolving the future. Otherwise we resolve the
167 else:
248 # future immediately.
168 atoms = [{'msg': overall['error']['message']}]
249 if frame.requestid not in self._futures:
169 if 'args' in overall['error']:
250 return
170 atoms[0]['args'] = overall['error']['args']
171 e = error.RepoError(formatrichmessage(atoms))
172 self._futures[frame.requestid].set_exception(e)
173 else:
174 self._futures[frame.requestid].set_result(response)
175
251
176 del self._requests[frame.requestid]
252 if response.command not in COMMAND_DECODERS:
253 self._futures[frame.requestid].set_result(response.objects())
254 del self._futures[frame.requestid]
255 elif response._inputcomplete:
256 decoded = COMMAND_DECODERS[response.command](response.objects())
257 self._futures[frame.requestid].set_result(decoded)
177 del self._futures[frame.requestid]
258 del self._futures[frame.requestid]
178
259
179 def decodebranchmap(objs):
260 def decodebranchmap(objs):
@@ -225,10 +225,7 b' Request to read-only command works out o'
225 s> 0\r\n
225 s> 0\r\n
226 s> \r\n
226 s> \r\n
227 received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
227 received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
228 response: [
228 response: gen[
229 {
230 b'status': b'ok'
231 },
232 b'customreadonly bytes response'
229 b'customreadonly bytes response'
233 ]
230 ]
234
231
@@ -349,10 +349,7 b' capabilities command returns expected in'
349 s> 0\r\n
349 s> 0\r\n
350 s> \r\n
350 s> \r\n
351 received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
351 received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
352 response: [
352 response: gen[
353 {
354 b'status': b'ok'
355 },
356 {
353 {
357 b'commands': {
354 b'commands': {
358 b'branchmap': {
355 b'branchmap': {
General Comments 0
You need to be logged in to leave comments. Login now