Show More
@@ -1,197 +1,194 b'' | |||
|
1 | 1 | from __future__ import absolute_import, print_function |
|
2 | 2 | |
|
3 | 3 | import unittest |
|
4 | 4 | |
|
5 | 5 | from mercurial import ( |
|
6 | 6 | util, |
|
7 | 7 | wireprotoframing as framing, |
|
8 | 8 | ) |
|
9 | 9 | |
|
10 | 10 | ffs = framing.makeframefromhumanstring |
|
11 | 11 | |
|
12 | 12 | class FrameHumanStringTests(unittest.TestCase): |
|
13 | 13 | def testbasic(self): |
|
14 | 14 | self.assertEqual(ffs(b'1 1 0 1 0 '), |
|
15 | 15 | b'\x00\x00\x00\x01\x00\x01\x00\x10') |
|
16 | 16 | |
|
17 | 17 | self.assertEqual(ffs(b'2 4 0 1 0 '), |
|
18 | 18 | b'\x00\x00\x00\x02\x00\x04\x00\x10') |
|
19 | 19 | |
|
20 | 20 | self.assertEqual(ffs(b'2 4 0 1 0 foo'), |
|
21 | 21 | b'\x03\x00\x00\x02\x00\x04\x00\x10foo') |
|
22 | 22 | |
|
23 | 23 | def testcborint(self): |
|
24 | 24 | self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'), |
|
25 | 25 | b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f') |
|
26 | 26 | |
|
27 | 27 | self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'), |
|
28 | 28 | b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*') |
|
29 | 29 | |
|
30 | 30 | self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'), |
|
31 | 31 | b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a' |
|
32 | 32 | b'\x00\x10\x00\x00') |
|
33 | 33 | |
|
34 | 34 | self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'), |
|
35 | 35 | b'\x01\x00\x00\x01\x00\x01\x00\x10\x00') |
|
36 | 36 | |
|
37 | 37 | self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'), |
|
38 | 38 | b'\x01\x00\x00\x01\x00\x01\x00\x10 ') |
|
39 | 39 | |
|
40 | 40 | self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'), |
|
41 | 41 | b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r') |
|
42 | 42 | |
|
43 | 43 | def testcborstrings(self): |
|
44 | 44 | self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"), |
|
45 | 45 | b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo') |
|
46 | 46 | |
|
47 | self.assertEqual(ffs(b"1 1 0 1 0 cbor:u'foo'"), | |
|
48 | b'\x04\x00\x00\x01\x00\x01\x00\x10cfoo') | |
|
49 | ||
|
50 | 47 | def testcborlists(self): |
|
51 | 48 | self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"), |
|
52 | 49 | b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4' |
|
53 | 50 | b'\x18*Cfoo') |
|
54 | 51 | |
|
55 | 52 | def testcbordicts(self): |
|
56 | 53 | self.assertEqual(ffs(b"1 1 0 1 0 " |
|
57 | 54 | b"cbor:{b'foo': b'val1', b'bar': b'val2'}"), |
|
58 | 55 | b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2' |
|
59 | 56 | b'CbarDval2CfooDval1') |
|
60 | 57 | |
|
61 | 58 | class FrameTests(unittest.TestCase): |
|
62 | 59 | def testdataexactframesize(self): |
|
63 | 60 | data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE) |
|
64 | 61 | |
|
65 | 62 | stream = framing.stream(1) |
|
66 | 63 | frames = list(framing.createcommandframes(stream, 1, b'command', |
|
67 | 64 | {}, data)) |
|
68 | 65 | self.assertEqual(frames, [ |
|
69 | 66 | ffs(b'1 1 stream-begin command-request new|have-data ' |
|
70 | 67 | b"cbor:{b'name': b'command'}"), |
|
71 | 68 | ffs(b'1 1 0 command-data continuation %s' % data.getvalue()), |
|
72 | 69 | ffs(b'1 1 0 command-data eos ') |
|
73 | 70 | ]) |
|
74 | 71 | |
|
75 | 72 | def testdatamultipleframes(self): |
|
76 | 73 | data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1)) |
|
77 | 74 | |
|
78 | 75 | stream = framing.stream(1) |
|
79 | 76 | frames = list(framing.createcommandframes(stream, 1, b'command', {}, |
|
80 | 77 | data)) |
|
81 | 78 | self.assertEqual(frames, [ |
|
82 | 79 | ffs(b'1 1 stream-begin command-request new|have-data ' |
|
83 | 80 | b"cbor:{b'name': b'command'}"), |
|
84 | 81 | ffs(b'1 1 0 command-data continuation %s' % ( |
|
85 | 82 | b'x' * framing.DEFAULT_MAX_FRAME_SIZE)), |
|
86 | 83 | ffs(b'1 1 0 command-data eos x'), |
|
87 | 84 | ]) |
|
88 | 85 | |
|
89 | 86 | def testargsanddata(self): |
|
90 | 87 | data = util.bytesio(b'x' * 100) |
|
91 | 88 | |
|
92 | 89 | stream = framing.stream(1) |
|
93 | 90 | frames = list(framing.createcommandframes(stream, 1, b'command', { |
|
94 | 91 | b'key1': b'key1value', |
|
95 | 92 | b'key2': b'key2value', |
|
96 | 93 | b'key3': b'key3value', |
|
97 | 94 | }, data)) |
|
98 | 95 | |
|
99 | 96 | self.assertEqual(frames, [ |
|
100 | 97 | ffs(b'1 1 stream-begin command-request new|have-data ' |
|
101 | 98 | b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', " |
|
102 | 99 | b"b'key2': b'key2value', b'key3': b'key3value'}}"), |
|
103 | 100 | ffs(b'1 1 0 command-data eos %s' % data.getvalue()), |
|
104 | 101 | ]) |
|
105 | 102 | |
|
106 | 103 | if not getattr(unittest.TestCase, 'assertRaisesRegex', False): |
|
107 | 104 | # Python 3.7 deprecates the regex*p* version, but 2.7 lacks |
|
108 | 105 | # the regex version. |
|
109 | 106 | assertRaisesRegex = (# camelcase-required |
|
110 | 107 | unittest.TestCase.assertRaisesRegexp) |
|
111 | 108 | |
|
112 | 109 | def testtextoutputformattingstringtype(self): |
|
113 | 110 | """Formatting string must be bytes.""" |
|
114 | 111 | with self.assertRaisesRegex(ValueError, 'must use bytes formatting '): |
|
115 | 112 | list(framing.createtextoutputframe(None, 1, [ |
|
116 | 113 | (b'foo'.decode('ascii'), [], [])])) |
|
117 | 114 | |
|
118 | 115 | def testtextoutputargumentbytes(self): |
|
119 | 116 | with self.assertRaisesRegex(ValueError, 'must use bytes for argument'): |
|
120 | 117 | list(framing.createtextoutputframe(None, 1, [ |
|
121 | 118 | (b'foo', [b'foo'.decode('ascii')], [])])) |
|
122 | 119 | |
|
123 | 120 | def testtextoutputlabelbytes(self): |
|
124 | 121 | with self.assertRaisesRegex(ValueError, 'must use bytes for labels'): |
|
125 | 122 | list(framing.createtextoutputframe(None, 1, [ |
|
126 | 123 | (b'foo', [], [b'foo'.decode('ascii')])])) |
|
127 | 124 | |
|
128 | 125 | def testtextoutput1simpleatom(self): |
|
129 | 126 | stream = framing.stream(1) |
|
130 | 127 | val = list(framing.createtextoutputframe(stream, 1, [ |
|
131 | 128 | (b'foo', [], [])])) |
|
132 | 129 | |
|
133 | 130 | self.assertEqual(val, [ |
|
134 | 131 | ffs(b'1 1 stream-begin text-output 0 ' |
|
135 | 132 | b"cbor:[{b'msg': b'foo'}]"), |
|
136 | 133 | ]) |
|
137 | 134 | |
|
138 | 135 | def testtextoutput2simpleatoms(self): |
|
139 | 136 | stream = framing.stream(1) |
|
140 | 137 | val = list(framing.createtextoutputframe(stream, 1, [ |
|
141 | 138 | (b'foo', [], []), |
|
142 | 139 | (b'bar', [], []), |
|
143 | 140 | ])) |
|
144 | 141 | |
|
145 | 142 | self.assertEqual(val, [ |
|
146 | 143 | ffs(b'1 1 stream-begin text-output 0 ' |
|
147 | 144 | b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]") |
|
148 | 145 | ]) |
|
149 | 146 | |
|
150 | 147 | def testtextoutput1arg(self): |
|
151 | 148 | stream = framing.stream(1) |
|
152 | 149 | val = list(framing.createtextoutputframe(stream, 1, [ |
|
153 | 150 | (b'foo %s', [b'val1'], []), |
|
154 | 151 | ])) |
|
155 | 152 | |
|
156 | 153 | self.assertEqual(val, [ |
|
157 | 154 | ffs(b'1 1 stream-begin text-output 0 ' |
|
158 | 155 | b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]") |
|
159 | 156 | ]) |
|
160 | 157 | |
|
161 | 158 | def testtextoutput2arg(self): |
|
162 | 159 | stream = framing.stream(1) |
|
163 | 160 | val = list(framing.createtextoutputframe(stream, 1, [ |
|
164 | 161 | (b'foo %s %s', [b'val', b'value'], []), |
|
165 | 162 | ])) |
|
166 | 163 | |
|
167 | 164 | self.assertEqual(val, [ |
|
168 | 165 | ffs(b'1 1 stream-begin text-output 0 ' |
|
169 | 166 | b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]") |
|
170 | 167 | ]) |
|
171 | 168 | |
|
172 | 169 | def testtextoutput1label(self): |
|
173 | 170 | stream = framing.stream(1) |
|
174 | 171 | val = list(framing.createtextoutputframe(stream, 1, [ |
|
175 | 172 | (b'foo', [], [b'label']), |
|
176 | 173 | ])) |
|
177 | 174 | |
|
178 | 175 | self.assertEqual(val, [ |
|
179 | 176 | ffs(b'1 1 stream-begin text-output 0 ' |
|
180 | 177 | b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]") |
|
181 | 178 | ]) |
|
182 | 179 | |
|
183 | 180 | def testargandlabel(self): |
|
184 | 181 | stream = framing.stream(1) |
|
185 | 182 | val = list(framing.createtextoutputframe(stream, 1, [ |
|
186 | 183 | (b'foo %s', [b'arg'], [b'label']), |
|
187 | 184 | ])) |
|
188 | 185 | |
|
189 | 186 | self.assertEqual(val, [ |
|
190 | 187 | ffs(b'1 1 stream-begin text-output 0 ' |
|
191 | 188 | b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], " |
|
192 | 189 | b"b'labels': [b'label']}]") |
|
193 | 190 | ]) |
|
194 | 191 | |
|
195 | 192 | if __name__ == '__main__': |
|
196 | 193 | import silenttestrunner |
|
197 | 194 | silenttestrunner.main(__name__) |
General Comments 0
You need to be logged in to leave comments.
Login now