Show More
@@ -668,6 +668,13 b' class stream(object):' | |||
|
668 | 668 | return makeframe(requestid, self.streamid, streamflags, typeid, flags, |
|
669 | 669 | payload) |
|
670 | 670 | |
|
671 | def setdecoder(self, name, extraobjs): | |
|
672 | """Set the decoder for this stream. | |
|
673 | ||
|
674 | Receives the stream profile name and any additional CBOR objects | |
|
675 | decoded from the stream encoding settings frame payloads. | |
|
676 | """ | |
|
677 | ||
|
671 | 678 | def ensureserverstream(stream): |
|
672 | 679 | if stream.streamid % 2: |
|
673 | 680 | raise error.ProgrammingError('server should only write to even ' |
@@ -1367,6 +1374,7 b' class clientreactor(object):' | |||
|
1367 | 1374 | self._pendingrequests = collections.deque() |
|
1368 | 1375 | self._activerequests = {} |
|
1369 | 1376 | self._incomingstreams = {} |
|
1377 | self._streamsettingsdecoders = {} | |
|
1370 | 1378 | |
|
1371 | 1379 | def callcommand(self, name, args, datafh=None, redirect=None): |
|
1372 | 1380 | """Request that a command be executed. |
@@ -1484,6 +1492,9 b' class clientreactor(object):' | |||
|
1484 | 1492 | if frame.streamflags & STREAM_FLAG_END_STREAM: |
|
1485 | 1493 | del self._incomingstreams[frame.streamid] |
|
1486 | 1494 | |
|
1495 | if frame.typeid == FRAME_TYPE_STREAM_SETTINGS: | |
|
1496 | return self._onstreamsettingsframe(frame) | |
|
1497 | ||
|
1487 | 1498 | if frame.requestid not in self._activerequests: |
|
1488 | 1499 | return 'error', { |
|
1489 | 1500 | 'message': (_('received frame for inactive request ID: %d') % |
@@ -1505,6 +1516,64 b' class clientreactor(object):' | |||
|
1505 | 1516 | |
|
1506 | 1517 | return meth(request, frame) |
|
1507 | 1518 | |
|
1519 | def _onstreamsettingsframe(self, frame): | |
|
1520 | assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS | |
|
1521 | ||
|
1522 | more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION | |
|
1523 | eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS | |
|
1524 | ||
|
1525 | if more and eos: | |
|
1526 | return 'error', { | |
|
1527 | 'message': (_('stream encoding settings frame cannot have both ' | |
|
1528 | 'continuation and end of stream flags set')), | |
|
1529 | } | |
|
1530 | ||
|
1531 | if not more and not eos: | |
|
1532 | return 'error', { | |
|
1533 | 'message': _('stream encoding settings frame must have ' | |
|
1534 | 'continuation or end of stream flag set'), | |
|
1535 | } | |
|
1536 | ||
|
1537 | if frame.streamid not in self._streamsettingsdecoders: | |
|
1538 | decoder = cborutil.bufferingdecoder() | |
|
1539 | self._streamsettingsdecoders[frame.streamid] = decoder | |
|
1540 | ||
|
1541 | decoder = self._streamsettingsdecoders[frame.streamid] | |
|
1542 | ||
|
1543 | try: | |
|
1544 | decoder.decode(frame.payload) | |
|
1545 | except Exception as e: | |
|
1546 | return 'error', { | |
|
1547 | 'message': (_('error decoding CBOR from stream encoding ' | |
|
1548 | 'settings frame: %s') % | |
|
1549 | stringutil.forcebytestr(e)), | |
|
1550 | } | |
|
1551 | ||
|
1552 | if more: | |
|
1553 | return 'noop', {} | |
|
1554 | ||
|
1555 | assert eos | |
|
1556 | ||
|
1557 | decoded = decoder.getavailable() | |
|
1558 | del self._streamsettingsdecoders[frame.streamid] | |
|
1559 | ||
|
1560 | if not decoded: | |
|
1561 | return 'error', { | |
|
1562 | 'message': _('stream encoding settings frame did not contain ' | |
|
1563 | 'CBOR data'), | |
|
1564 | } | |
|
1565 | ||
|
1566 | try: | |
|
1567 | self._incomingstreams[frame.streamid].setdecoder(decoded[0], | |
|
1568 | decoded[1:]) | |
|
1569 | except Exception as e: | |
|
1570 | return 'error', { | |
|
1571 | 'message': (_('error setting stream decoder: %s') % | |
|
1572 | stringutil.forcebytestr(e)), | |
|
1573 | } | |
|
1574 | ||
|
1575 | return 'noop', {} | |
|
1576 | ||
|
1508 | 1577 | def _oncommandresponseframe(self, request, frame): |
|
1509 | 1578 | if frame.flags & FLAG_COMMAND_RESPONSE_EOS: |
|
1510 | 1579 | request.state = 'received' |
@@ -6,6 +6,9 b' from mercurial import (' | |||
|
6 | 6 | error, |
|
7 | 7 | wireprotoframing as framing, |
|
8 | 8 | ) |
|
9 | from mercurial.utils import ( | |
|
10 | cborutil, | |
|
11 | ) | |
|
9 | 12 | |
|
10 | 13 | ffs = framing.makeframefromhumanstring |
|
11 | 14 | |
@@ -162,6 +165,120 b' class RedirectTests(unittest.TestCase):' | |||
|
162 | 165 | b"b'redirect': {b'targets': [b'a', b'b'], " |
|
163 | 166 | b"b'hashes': [b'sha256']}}")) |
|
164 | 167 | |
|
168 | class StreamSettingsTests(unittest.TestCase): | |
|
169 | def testnoflags(self): | |
|
170 | reactor = framing.clientreactor(buffersends=False) | |
|
171 | ||
|
172 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
173 | for f in meta[b'framegen']: | |
|
174 | pass | |
|
175 | ||
|
176 | action, meta = sendframe(reactor, | |
|
177 | ffs(b'1 2 stream-begin stream-settings 0 ')) | |
|
178 | ||
|
179 | self.assertEqual(action, b'error') | |
|
180 | self.assertEqual(meta, { | |
|
181 | b'message': b'stream encoding settings frame must have ' | |
|
182 | b'continuation or end of stream flag set', | |
|
183 | }) | |
|
184 | ||
|
185 | def testconflictflags(self): | |
|
186 | reactor = framing.clientreactor(buffersends=False) | |
|
187 | ||
|
188 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
189 | for f in meta[b'framegen']: | |
|
190 | pass | |
|
191 | ||
|
192 | action, meta = sendframe(reactor, | |
|
193 | ffs(b'1 2 stream-begin stream-settings continuation|eos ')) | |
|
194 | ||
|
195 | self.assertEqual(action, b'error') | |
|
196 | self.assertEqual(meta, { | |
|
197 | b'message': b'stream encoding settings frame cannot have both ' | |
|
198 | b'continuation and end of stream flags set', | |
|
199 | }) | |
|
200 | ||
|
201 | def testemptypayload(self): | |
|
202 | reactor = framing.clientreactor(buffersends=False) | |
|
203 | ||
|
204 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
205 | for f in meta[b'framegen']: | |
|
206 | pass | |
|
207 | ||
|
208 | action, meta = sendframe(reactor, | |
|
209 | ffs(b'1 2 stream-begin stream-settings eos ')) | |
|
210 | ||
|
211 | self.assertEqual(action, b'error') | |
|
212 | self.assertEqual(meta, { | |
|
213 | b'message': b'stream encoding settings frame did not contain ' | |
|
214 | b'CBOR data' | |
|
215 | }) | |
|
216 | ||
|
217 | def testbadcbor(self): | |
|
218 | reactor = framing.clientreactor(buffersends=False) | |
|
219 | ||
|
220 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
221 | for f in meta[b'framegen']: | |
|
222 | pass | |
|
223 | ||
|
224 | action, meta = sendframe(reactor, | |
|
225 | ffs(b'1 2 stream-begin stream-settings eos badvalue')) | |
|
226 | ||
|
227 | self.assertEqual(action, b'error') | |
|
228 | ||
|
229 | def testsingleobject(self): | |
|
230 | reactor = framing.clientreactor(buffersends=False) | |
|
231 | ||
|
232 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
233 | for f in meta[b'framegen']: | |
|
234 | pass | |
|
235 | ||
|
236 | action, meta = sendframe(reactor, | |
|
237 | ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"')) | |
|
238 | ||
|
239 | self.assertEqual(action, b'noop') | |
|
240 | self.assertEqual(meta, {}) | |
|
241 | ||
|
242 | def testmultipleobjects(self): | |
|
243 | reactor = framing.clientreactor(buffersends=False) | |
|
244 | ||
|
245 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
246 | for f in meta[b'framegen']: | |
|
247 | pass | |
|
248 | ||
|
249 | data = b''.join([ | |
|
250 | b''.join(cborutil.streamencode(b'identity')), | |
|
251 | b''.join(cborutil.streamencode({b'foo', b'bar'})), | |
|
252 | ]) | |
|
253 | ||
|
254 | action, meta = sendframe(reactor, | |
|
255 | ffs(b'1 2 stream-begin stream-settings eos %s' % data)) | |
|
256 | ||
|
257 | self.assertEqual(action, b'noop') | |
|
258 | self.assertEqual(meta, {}) | |
|
259 | ||
|
260 | def testmultipleframes(self): | |
|
261 | reactor = framing.clientreactor(buffersends=False) | |
|
262 | ||
|
263 | request, action, meta = reactor.callcommand(b'foo', {}) | |
|
264 | for f in meta[b'framegen']: | |
|
265 | pass | |
|
266 | ||
|
267 | data = b''.join(cborutil.streamencode(b'identity')) | |
|
268 | ||
|
269 | action, meta = sendframe(reactor, | |
|
270 | ffs(b'1 2 stream-begin stream-settings continuation %s' % | |
|
271 | data[0:3])) | |
|
272 | ||
|
273 | self.assertEqual(action, b'noop') | |
|
274 | self.assertEqual(meta, {}) | |
|
275 | ||
|
276 | action, meta = sendframe(reactor, | |
|
277 | ffs(b'1 2 0 stream-settings eos %s' % data[3:])) | |
|
278 | ||
|
279 | self.assertEqual(action, b'noop') | |
|
280 | self.assertEqual(meta, {}) | |
|
281 | ||
|
165 | 282 | if __name__ == '__main__': |
|
166 | 283 | import silenttestrunner |
|
167 | 284 | silenttestrunner.main(__name__) |
General Comments 0
You need to be logged in to leave comments.
Login now