##// END OF EJS Templates
wireprotov2: handle stream encoding settings frames...
Gregory Szorc -
r40164:57782791 default
parent child Browse files
Show More
@@ -668,6 +668,13 b' class stream(object):'
668 668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
669 669 payload)
670 670
671 def setdecoder(self, name, extraobjs):
672 """Set the decoder for this stream.
673
674 Receives the stream profile name and any additional CBOR objects
675 decoded from the stream encoding settings frame payloads.
676 """
677
671 678 def ensureserverstream(stream):
672 679 if stream.streamid % 2:
673 680 raise error.ProgrammingError('server should only write to even '
@@ -1367,6 +1374,7 b' class clientreactor(object):'
1367 1374 self._pendingrequests = collections.deque()
1368 1375 self._activerequests = {}
1369 1376 self._incomingstreams = {}
1377 self._streamsettingsdecoders = {}
1370 1378
1371 1379 def callcommand(self, name, args, datafh=None, redirect=None):
1372 1380 """Request that a command be executed.
@@ -1484,6 +1492,9 b' class clientreactor(object):'
1484 1492 if frame.streamflags & STREAM_FLAG_END_STREAM:
1485 1493 del self._incomingstreams[frame.streamid]
1486 1494
1495 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1496 return self._onstreamsettingsframe(frame)
1497
1487 1498 if frame.requestid not in self._activerequests:
1488 1499 return 'error', {
1489 1500 'message': (_('received frame for inactive request ID: %d') %
@@ -1505,6 +1516,64 b' class clientreactor(object):'
1505 1516
1506 1517 return meth(request, frame)
1507 1518
1519 def _onstreamsettingsframe(self, frame):
1520 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1521
1522 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1523 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1524
1525 if more and eos:
1526 return 'error', {
1527 'message': (_('stream encoding settings frame cannot have both '
1528 'continuation and end of stream flags set')),
1529 }
1530
1531 if not more and not eos:
1532 return 'error', {
1533 'message': _('stream encoding settings frame must have '
1534 'continuation or end of stream flag set'),
1535 }
1536
1537 if frame.streamid not in self._streamsettingsdecoders:
1538 decoder = cborutil.bufferingdecoder()
1539 self._streamsettingsdecoders[frame.streamid] = decoder
1540
1541 decoder = self._streamsettingsdecoders[frame.streamid]
1542
1543 try:
1544 decoder.decode(frame.payload)
1545 except Exception as e:
1546 return 'error', {
1547 'message': (_('error decoding CBOR from stream encoding '
1548 'settings frame: %s') %
1549 stringutil.forcebytestr(e)),
1550 }
1551
1552 if more:
1553 return 'noop', {}
1554
1555 assert eos
1556
1557 decoded = decoder.getavailable()
1558 del self._streamsettingsdecoders[frame.streamid]
1559
1560 if not decoded:
1561 return 'error', {
1562 'message': _('stream encoding settings frame did not contain '
1563 'CBOR data'),
1564 }
1565
1566 try:
1567 self._incomingstreams[frame.streamid].setdecoder(decoded[0],
1568 decoded[1:])
1569 except Exception as e:
1570 return 'error', {
1571 'message': (_('error setting stream decoder: %s') %
1572 stringutil.forcebytestr(e)),
1573 }
1574
1575 return 'noop', {}
1576
1508 1577 def _oncommandresponseframe(self, request, frame):
1509 1578 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1510 1579 request.state = 'received'
@@ -6,6 +6,9 b' from mercurial import ('
6 6 error,
7 7 wireprotoframing as framing,
8 8 )
9 from mercurial.utils import (
10 cborutil,
11 )
9 12
10 13 ffs = framing.makeframefromhumanstring
11 14
@@ -162,6 +165,120 b' class RedirectTests(unittest.TestCase):'
162 165 b"b'redirect': {b'targets': [b'a', b'b'], "
163 166 b"b'hashes': [b'sha256']}}"))
164 167
168 class StreamSettingsTests(unittest.TestCase):
169 def testnoflags(self):
170 reactor = framing.clientreactor(buffersends=False)
171
172 request, action, meta = reactor.callcommand(b'foo', {})
173 for f in meta[b'framegen']:
174 pass
175
176 action, meta = sendframe(reactor,
177 ffs(b'1 2 stream-begin stream-settings 0 '))
178
179 self.assertEqual(action, b'error')
180 self.assertEqual(meta, {
181 b'message': b'stream encoding settings frame must have '
182 b'continuation or end of stream flag set',
183 })
184
185 def testconflictflags(self):
186 reactor = framing.clientreactor(buffersends=False)
187
188 request, action, meta = reactor.callcommand(b'foo', {})
189 for f in meta[b'framegen']:
190 pass
191
192 action, meta = sendframe(reactor,
193 ffs(b'1 2 stream-begin stream-settings continuation|eos '))
194
195 self.assertEqual(action, b'error')
196 self.assertEqual(meta, {
197 b'message': b'stream encoding settings frame cannot have both '
198 b'continuation and end of stream flags set',
199 })
200
201 def testemptypayload(self):
202 reactor = framing.clientreactor(buffersends=False)
203
204 request, action, meta = reactor.callcommand(b'foo', {})
205 for f in meta[b'framegen']:
206 pass
207
208 action, meta = sendframe(reactor,
209 ffs(b'1 2 stream-begin stream-settings eos '))
210
211 self.assertEqual(action, b'error')
212 self.assertEqual(meta, {
213 b'message': b'stream encoding settings frame did not contain '
214 b'CBOR data'
215 })
216
217 def testbadcbor(self):
218 reactor = framing.clientreactor(buffersends=False)
219
220 request, action, meta = reactor.callcommand(b'foo', {})
221 for f in meta[b'framegen']:
222 pass
223
224 action, meta = sendframe(reactor,
225 ffs(b'1 2 stream-begin stream-settings eos badvalue'))
226
227 self.assertEqual(action, b'error')
228
229 def testsingleobject(self):
230 reactor = framing.clientreactor(buffersends=False)
231
232 request, action, meta = reactor.callcommand(b'foo', {})
233 for f in meta[b'framegen']:
234 pass
235
236 action, meta = sendframe(reactor,
237 ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'))
238
239 self.assertEqual(action, b'noop')
240 self.assertEqual(meta, {})
241
242 def testmultipleobjects(self):
243 reactor = framing.clientreactor(buffersends=False)
244
245 request, action, meta = reactor.callcommand(b'foo', {})
246 for f in meta[b'framegen']:
247 pass
248
249 data = b''.join([
250 b''.join(cborutil.streamencode(b'identity')),
251 b''.join(cborutil.streamencode({b'foo', b'bar'})),
252 ])
253
254 action, meta = sendframe(reactor,
255 ffs(b'1 2 stream-begin stream-settings eos %s' % data))
256
257 self.assertEqual(action, b'noop')
258 self.assertEqual(meta, {})
259
260 def testmultipleframes(self):
261 reactor = framing.clientreactor(buffersends=False)
262
263 request, action, meta = reactor.callcommand(b'foo', {})
264 for f in meta[b'framegen']:
265 pass
266
267 data = b''.join(cborutil.streamencode(b'identity'))
268
269 action, meta = sendframe(reactor,
270 ffs(b'1 2 stream-begin stream-settings continuation %s' %
271 data[0:3]))
272
273 self.assertEqual(action, b'noop')
274 self.assertEqual(meta, {})
275
276 action, meta = sendframe(reactor,
277 ffs(b'1 2 0 stream-settings eos %s' % data[3:]))
278
279 self.assertEqual(action, b'noop')
280 self.assertEqual(meta, {})
281
165 282 if __name__ == '__main__':
166 283 import silenttestrunner
167 284 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now