Show More
@@ -668,6 +668,13 b' class stream(object):' | |||||
668 | return makeframe(requestid, self.streamid, streamflags, typeid, flags, |
|
668 | return makeframe(requestid, self.streamid, streamflags, typeid, flags, | |
669 | payload) |
|
669 | payload) | |
670 |
|
670 | |||
|
671 | def setdecoder(self, name, extraobjs): | |||
|
672 | """Set the decoder for this stream. | |||
|
673 | ||||
|
674 | Receives the stream profile name and any additional CBOR objects | |||
|
675 | decoded from the stream encoding settings frame payloads. | |||
|
676 | """ | |||
|
677 | ||||
671 | def ensureserverstream(stream): |
|
678 | def ensureserverstream(stream): | |
672 | if stream.streamid % 2: |
|
679 | if stream.streamid % 2: | |
673 | raise error.ProgrammingError('server should only write to even ' |
|
680 | raise error.ProgrammingError('server should only write to even ' | |
@@ -1367,6 +1374,7 b' class clientreactor(object):' | |||||
1367 | self._pendingrequests = collections.deque() |
|
1374 | self._pendingrequests = collections.deque() | |
1368 | self._activerequests = {} |
|
1375 | self._activerequests = {} | |
1369 | self._incomingstreams = {} |
|
1376 | self._incomingstreams = {} | |
|
1377 | self._streamsettingsdecoders = {} | |||
1370 |
|
1378 | |||
1371 | def callcommand(self, name, args, datafh=None, redirect=None): |
|
1379 | def callcommand(self, name, args, datafh=None, redirect=None): | |
1372 | """Request that a command be executed. |
|
1380 | """Request that a command be executed. | |
@@ -1484,6 +1492,9 b' class clientreactor(object):' | |||||
1484 | if frame.streamflags & STREAM_FLAG_END_STREAM: |
|
1492 | if frame.streamflags & STREAM_FLAG_END_STREAM: | |
1485 | del self._incomingstreams[frame.streamid] |
|
1493 | del self._incomingstreams[frame.streamid] | |
1486 |
|
1494 | |||
|
1495 | if frame.typeid == FRAME_TYPE_STREAM_SETTINGS: | |||
|
1496 | return self._onstreamsettingsframe(frame) | |||
|
1497 | ||||
1487 | if frame.requestid not in self._activerequests: |
|
1498 | if frame.requestid not in self._activerequests: | |
1488 | return 'error', { |
|
1499 | return 'error', { | |
1489 | 'message': (_('received frame for inactive request ID: %d') % |
|
1500 | 'message': (_('received frame for inactive request ID: %d') % | |
@@ -1505,6 +1516,64 b' class clientreactor(object):' | |||||
1505 |
|
1516 | |||
1506 | return meth(request, frame) |
|
1517 | return meth(request, frame) | |
1507 |
|
1518 | |||
|
1519 | def _onstreamsettingsframe(self, frame): | |||
|
1520 | assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS | |||
|
1521 | ||||
|
1522 | more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION | |||
|
1523 | eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS | |||
|
1524 | ||||
|
1525 | if more and eos: | |||
|
1526 | return 'error', { | |||
|
1527 | 'message': (_('stream encoding settings frame cannot have both ' | |||
|
1528 | 'continuation and end of stream flags set')), | |||
|
1529 | } | |||
|
1530 | ||||
|
1531 | if not more and not eos: | |||
|
1532 | return 'error', { | |||
|
1533 | 'message': _('stream encoding settings frame must have ' | |||
|
1534 | 'continuation or end of stream flag set'), | |||
|
1535 | } | |||
|
1536 | ||||
|
1537 | if frame.streamid not in self._streamsettingsdecoders: | |||
|
1538 | decoder = cborutil.bufferingdecoder() | |||
|
1539 | self._streamsettingsdecoders[frame.streamid] = decoder | |||
|
1540 | ||||
|
1541 | decoder = self._streamsettingsdecoders[frame.streamid] | |||
|
1542 | ||||
|
1543 | try: | |||
|
1544 | decoder.decode(frame.payload) | |||
|
1545 | except Exception as e: | |||
|
1546 | return 'error', { | |||
|
1547 | 'message': (_('error decoding CBOR from stream encoding ' | |||
|
1548 | 'settings frame: %s') % | |||
|
1549 | stringutil.forcebytestr(e)), | |||
|
1550 | } | |||
|
1551 | ||||
|
1552 | if more: | |||
|
1553 | return 'noop', {} | |||
|
1554 | ||||
|
1555 | assert eos | |||
|
1556 | ||||
|
1557 | decoded = decoder.getavailable() | |||
|
1558 | del self._streamsettingsdecoders[frame.streamid] | |||
|
1559 | ||||
|
1560 | if not decoded: | |||
|
1561 | return 'error', { | |||
|
1562 | 'message': _('stream encoding settings frame did not contain ' | |||
|
1563 | 'CBOR data'), | |||
|
1564 | } | |||
|
1565 | ||||
|
1566 | try: | |||
|
1567 | self._incomingstreams[frame.streamid].setdecoder(decoded[0], | |||
|
1568 | decoded[1:]) | |||
|
1569 | except Exception as e: | |||
|
1570 | return 'error', { | |||
|
1571 | 'message': (_('error setting stream decoder: %s') % | |||
|
1572 | stringutil.forcebytestr(e)), | |||
|
1573 | } | |||
|
1574 | ||||
|
1575 | return 'noop', {} | |||
|
1576 | ||||
1508 | def _oncommandresponseframe(self, request, frame): |
|
1577 | def _oncommandresponseframe(self, request, frame): | |
1509 | if frame.flags & FLAG_COMMAND_RESPONSE_EOS: |
|
1578 | if frame.flags & FLAG_COMMAND_RESPONSE_EOS: | |
1510 | request.state = 'received' |
|
1579 | request.state = 'received' |
@@ -6,6 +6,9 b' from mercurial import (' | |||||
6 | error, |
|
6 | error, | |
7 | wireprotoframing as framing, |
|
7 | wireprotoframing as framing, | |
8 | ) |
|
8 | ) | |
|
9 | from mercurial.utils import ( | |||
|
10 | cborutil, | |||
|
11 | ) | |||
9 |
|
12 | |||
10 | ffs = framing.makeframefromhumanstring |
|
13 | ffs = framing.makeframefromhumanstring | |
11 |
|
14 | |||
@@ -162,6 +165,120 b' class RedirectTests(unittest.TestCase):' | |||||
162 | b"b'redirect': {b'targets': [b'a', b'b'], " |
|
165 | b"b'redirect': {b'targets': [b'a', b'b'], " | |
163 | b"b'hashes': [b'sha256']}}")) |
|
166 | b"b'hashes': [b'sha256']}}")) | |
164 |
|
167 | |||
|
168 | class StreamSettingsTests(unittest.TestCase): | |||
|
169 | def testnoflags(self): | |||
|
170 | reactor = framing.clientreactor(buffersends=False) | |||
|
171 | ||||
|
172 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
173 | for f in meta[b'framegen']: | |||
|
174 | pass | |||
|
175 | ||||
|
176 | action, meta = sendframe(reactor, | |||
|
177 | ffs(b'1 2 stream-begin stream-settings 0 ')) | |||
|
178 | ||||
|
179 | self.assertEqual(action, b'error') | |||
|
180 | self.assertEqual(meta, { | |||
|
181 | b'message': b'stream encoding settings frame must have ' | |||
|
182 | b'continuation or end of stream flag set', | |||
|
183 | }) | |||
|
184 | ||||
|
185 | def testconflictflags(self): | |||
|
186 | reactor = framing.clientreactor(buffersends=False) | |||
|
187 | ||||
|
188 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
189 | for f in meta[b'framegen']: | |||
|
190 | pass | |||
|
191 | ||||
|
192 | action, meta = sendframe(reactor, | |||
|
193 | ffs(b'1 2 stream-begin stream-settings continuation|eos ')) | |||
|
194 | ||||
|
195 | self.assertEqual(action, b'error') | |||
|
196 | self.assertEqual(meta, { | |||
|
197 | b'message': b'stream encoding settings frame cannot have both ' | |||
|
198 | b'continuation and end of stream flags set', | |||
|
199 | }) | |||
|
200 | ||||
|
201 | def testemptypayload(self): | |||
|
202 | reactor = framing.clientreactor(buffersends=False) | |||
|
203 | ||||
|
204 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
205 | for f in meta[b'framegen']: | |||
|
206 | pass | |||
|
207 | ||||
|
208 | action, meta = sendframe(reactor, | |||
|
209 | ffs(b'1 2 stream-begin stream-settings eos ')) | |||
|
210 | ||||
|
211 | self.assertEqual(action, b'error') | |||
|
212 | self.assertEqual(meta, { | |||
|
213 | b'message': b'stream encoding settings frame did not contain ' | |||
|
214 | b'CBOR data' | |||
|
215 | }) | |||
|
216 | ||||
|
217 | def testbadcbor(self): | |||
|
218 | reactor = framing.clientreactor(buffersends=False) | |||
|
219 | ||||
|
220 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
221 | for f in meta[b'framegen']: | |||
|
222 | pass | |||
|
223 | ||||
|
224 | action, meta = sendframe(reactor, | |||
|
225 | ffs(b'1 2 stream-begin stream-settings eos badvalue')) | |||
|
226 | ||||
|
227 | self.assertEqual(action, b'error') | |||
|
228 | ||||
|
229 | def testsingleobject(self): | |||
|
230 | reactor = framing.clientreactor(buffersends=False) | |||
|
231 | ||||
|
232 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
233 | for f in meta[b'framegen']: | |||
|
234 | pass | |||
|
235 | ||||
|
236 | action, meta = sendframe(reactor, | |||
|
237 | ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"')) | |||
|
238 | ||||
|
239 | self.assertEqual(action, b'noop') | |||
|
240 | self.assertEqual(meta, {}) | |||
|
241 | ||||
|
242 | def testmultipleobjects(self): | |||
|
243 | reactor = framing.clientreactor(buffersends=False) | |||
|
244 | ||||
|
245 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
246 | for f in meta[b'framegen']: | |||
|
247 | pass | |||
|
248 | ||||
|
249 | data = b''.join([ | |||
|
250 | b''.join(cborutil.streamencode(b'identity')), | |||
|
251 | b''.join(cborutil.streamencode({b'foo', b'bar'})), | |||
|
252 | ]) | |||
|
253 | ||||
|
254 | action, meta = sendframe(reactor, | |||
|
255 | ffs(b'1 2 stream-begin stream-settings eos %s' % data)) | |||
|
256 | ||||
|
257 | self.assertEqual(action, b'noop') | |||
|
258 | self.assertEqual(meta, {}) | |||
|
259 | ||||
|
260 | def testmultipleframes(self): | |||
|
261 | reactor = framing.clientreactor(buffersends=False) | |||
|
262 | ||||
|
263 | request, action, meta = reactor.callcommand(b'foo', {}) | |||
|
264 | for f in meta[b'framegen']: | |||
|
265 | pass | |||
|
266 | ||||
|
267 | data = b''.join(cborutil.streamencode(b'identity')) | |||
|
268 | ||||
|
269 | action, meta = sendframe(reactor, | |||
|
270 | ffs(b'1 2 stream-begin stream-settings continuation %s' % | |||
|
271 | data[0:3])) | |||
|
272 | ||||
|
273 | self.assertEqual(action, b'noop') | |||
|
274 | self.assertEqual(meta, {}) | |||
|
275 | ||||
|
276 | action, meta = sendframe(reactor, | |||
|
277 | ffs(b'1 2 0 stream-settings eos %s' % data[3:])) | |||
|
278 | ||||
|
279 | self.assertEqual(action, b'noop') | |||
|
280 | self.assertEqual(meta, {}) | |||
|
281 | ||||
165 | if __name__ == '__main__': |
|
282 | if __name__ == '__main__': | |
166 | import silenttestrunner |
|
283 | import silenttestrunner | |
167 | silenttestrunner.main(__name__) |
|
284 | silenttestrunner.main(__name__) |
General Comments 0
You need to be logged in to leave comments.
Login now