##// END OF EJS Templates
wireproto: start to associate frame generation with a stream...
Gregory Szorc -
r37303:3ed34454 default
parent child Browse files
Show More
@@ -218,7 +218,7 b' def readframe(fh):'
218
218
219 return frame(h.requestid, h.typeid, h.flags, payload)
219 return frame(h.requestid, h.typeid, h.flags, payload)
220
220
221 def createcommandframes(requestid, cmd, args, datafh=None):
221 def createcommandframes(stream, requestid, cmd, args, datafh=None):
222 """Create frames necessary to transmit a request to run a command.
222 """Create frames necessary to transmit a request to run a command.
223
223
224 This is a generator of bytearrays. Each item represents a frame
224 This is a generator of bytearrays. Each item represents a frame
@@ -233,8 +233,8 b' def createcommandframes(requestid, cmd, '
233 if not flags:
233 if not flags:
234 flags |= FLAG_COMMAND_NAME_EOS
234 flags |= FLAG_COMMAND_NAME_EOS
235
235
236 yield makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME,
236 yield stream.makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME,
237 flags=flags, payload=cmd)
237 flags=flags, payload=cmd)
238
238
239 for i, k in enumerate(sorted(args)):
239 for i, k in enumerate(sorted(args)):
240 v = args[k]
240 v = args[k]
@@ -250,10 +250,10 b' def createcommandframes(requestid, cmd, '
250 payload[offset:offset + len(v)] = v
250 payload[offset:offset + len(v)] = v
251
251
252 flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
252 flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
253 yield makeframe(requestid=requestid,
253 yield stream.makeframe(requestid=requestid,
254 typeid=FRAME_TYPE_COMMAND_ARGUMENT,
254 typeid=FRAME_TYPE_COMMAND_ARGUMENT,
255 flags=flags,
255 flags=flags,
256 payload=payload)
256 payload=payload)
257
257
258 if datafh:
258 if datafh:
259 while True:
259 while True:
@@ -267,15 +267,15 b' def createcommandframes(requestid, cmd, '
267 assert datafh.read(1) == b''
267 assert datafh.read(1) == b''
268 done = True
268 done = True
269
269
270 yield makeframe(requestid=requestid,
270 yield stream.makeframe(requestid=requestid,
271 typeid=FRAME_TYPE_COMMAND_DATA,
271 typeid=FRAME_TYPE_COMMAND_DATA,
272 flags=flags,
272 flags=flags,
273 payload=data)
273 payload=data)
274
274
275 if done:
275 if done:
276 break
276 break
277
277
278 def createbytesresponseframesfrombytes(requestid, data,
278 def createbytesresponseframesfrombytes(stream, requestid, data,
279 maxframesize=DEFAULT_MAX_FRAME_SIZE):
279 maxframesize=DEFAULT_MAX_FRAME_SIZE):
280 """Create a raw frame to send a bytes response from static bytes input.
280 """Create a raw frame to send a bytes response from static bytes input.
281
281
@@ -284,10 +284,10 b' def createbytesresponseframesfrombytes(r'
284
284
285 # Simple case of a single frame.
285 # Simple case of a single frame.
286 if len(data) <= maxframesize:
286 if len(data) <= maxframesize:
287 yield makeframe(requestid=requestid,
287 yield stream.makeframe(requestid=requestid,
288 typeid=FRAME_TYPE_BYTES_RESPONSE,
288 typeid=FRAME_TYPE_BYTES_RESPONSE,
289 flags=FLAG_BYTES_RESPONSE_EOS,
289 flags=FLAG_BYTES_RESPONSE_EOS,
290 payload=data)
290 payload=data)
291 return
291 return
292
292
293 offset = 0
293 offset = 0
@@ -301,15 +301,15 b' def createbytesresponseframesfrombytes(r'
301 else:
301 else:
302 flags = FLAG_BYTES_RESPONSE_CONTINUATION
302 flags = FLAG_BYTES_RESPONSE_CONTINUATION
303
303
304 yield makeframe(requestid=requestid,
304 yield stream.makeframe(requestid=requestid,
305 typeid=FRAME_TYPE_BYTES_RESPONSE,
305 typeid=FRAME_TYPE_BYTES_RESPONSE,
306 flags=flags,
306 flags=flags,
307 payload=chunk)
307 payload=chunk)
308
308
309 if done:
309 if done:
310 break
310 break
311
311
312 def createerrorframe(requestid, msg, protocol=False, application=False):
312 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
313 # TODO properly handle frame size limits.
313 # TODO properly handle frame size limits.
314 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
314 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
315
315
@@ -319,12 +319,12 b' def createerrorframe(requestid, msg, pro'
319 if application:
319 if application:
320 flags |= FLAG_ERROR_RESPONSE_APPLICATION
320 flags |= FLAG_ERROR_RESPONSE_APPLICATION
321
321
322 yield makeframe(requestid=requestid,
322 yield stream.makeframe(requestid=requestid,
323 typeid=FRAME_TYPE_ERROR_RESPONSE,
323 typeid=FRAME_TYPE_ERROR_RESPONSE,
324 flags=flags,
324 flags=flags,
325 payload=msg)
325 payload=msg)
326
326
327 def createtextoutputframe(requestid, atoms):
327 def createtextoutputframe(stream, requestid, atoms):
328 """Create a text output frame to render text to people.
328 """Create a text output frame to render text to people.
329
329
330 ``atoms`` is a 3-tuple of (formatting string, args, labels).
330 ``atoms`` is a 3-tuple of (formatting string, args, labels).
@@ -390,10 +390,20 b' def createtextoutputframe(requestid, ato'
390 if bytesleft < 0:
390 if bytesleft < 0:
391 raise ValueError('cannot encode data in a single frame')
391 raise ValueError('cannot encode data in a single frame')
392
392
393 yield makeframe(requestid=requestid,
393 yield stream.makeframe(requestid=requestid,
394 typeid=FRAME_TYPE_TEXT_OUTPUT,
394 typeid=FRAME_TYPE_TEXT_OUTPUT,
395 flags=0,
395 flags=0,
396 payload=b''.join(atomchunks))
396 payload=b''.join(atomchunks))
397
398 class stream(object):
399 """Represents a logical unidirectional series of frames."""
400
401 def makeframe(self, requestid, typeid, flags, payload):
402 """Create a frame to be sent out over this stream.
403
404 Only returns the frame instance. Does not actually send it.
405 """
406 return makeframe(requestid, typeid, flags, payload)
397
407
398 class serverreactor(object):
408 class serverreactor(object):
399 """Holds state of a server handling frame-based protocol requests.
409 """Holds state of a server handling frame-based protocol requests.
@@ -498,13 +508,14 b' class serverreactor(object):'
498
508
499 return meth(frame)
509 return meth(frame)
500
510
501 def onbytesresponseready(self, requestid, data):
511 def onbytesresponseready(self, stream, requestid, data):
502 """Signal that a bytes response is ready to be sent to the client.
512 """Signal that a bytes response is ready to be sent to the client.
503
513
504 The raw bytes response is passed as an argument.
514 The raw bytes response is passed as an argument.
505 """
515 """
506 def sendframes():
516 def sendframes():
507 for frame in createbytesresponseframesfrombytes(requestid, data):
517 for frame in createbytesresponseframesfrombytes(stream, requestid,
518 data):
508 yield frame
519 yield frame
509
520
510 self._activecommands.remove(requestid)
521 self._activecommands.remove(requestid)
@@ -540,9 +551,10 b' class serverreactor(object):'
540 'framegen': makegen(),
551 'framegen': makegen(),
541 }
552 }
542
553
543 def onapplicationerror(self, requestid, msg):
554 def onapplicationerror(self, stream, requestid, msg):
544 return 'sendframes', {
555 return 'sendframes', {
545 'framegen': createerrorframe(requestid, msg, application=True),
556 'framegen': createerrorframe(stream, requestid, msg,
557 application=True),
546 }
558 }
547
559
548 def _makeerrorresult(self, msg):
560 def _makeerrorresult(self, msg):
@@ -546,9 +546,11 b' def _httpv2runcommand(ui, repo, req, res'
546
546
547 res.status = b'200 OK'
547 res.status = b'200 OK'
548 res.headers[b'Content-Type'] = FRAMINGTYPE
548 res.headers[b'Content-Type'] = FRAMINGTYPE
549 stream = wireprotoframing.stream()
549
550
550 if isinstance(rsp, wireprototypes.bytesresponse):
551 if isinstance(rsp, wireprototypes.bytesresponse):
551 action, meta = reactor.onbytesresponseready(command['requestid'],
552 action, meta = reactor.onbytesresponseready(stream,
553 command['requestid'],
552 rsp.data)
554 rsp.data)
553 else:
555 else:
554 action, meta = reactor.onapplicationerror(
556 action, meta = reactor.onapplicationerror(
@@ -27,16 +27,19 b' def sendframes(reactor, gen):'
27 header.flags,
27 header.flags,
28 payload))
28 payload))
29
29
30 def sendcommandframes(reactor, rid, cmd, args, datafh=None):
30 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
31 """Generate frames to run a command and send them to a reactor."""
31 """Generate frames to run a command and send them to a reactor."""
32 return sendframes(reactor,
32 return sendframes(reactor,
33 framing.createcommandframes(rid, cmd, args, datafh))
33 framing.createcommandframes(stream, rid, cmd, args,
34 datafh))
34
35
35 class FrameTests(unittest.TestCase):
36 class FrameTests(unittest.TestCase):
36 def testdataexactframesize(self):
37 def testdataexactframesize(self):
37 data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
38 data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
38
39
39 frames = list(framing.createcommandframes(1, b'command', {}, data))
40 stream = framing.stream()
41 frames = list(framing.createcommandframes(stream, 1, b'command',
42 {}, data))
40 self.assertEqual(frames, [
43 self.assertEqual(frames, [
41 ffs(b'1 command-name have-data command'),
44 ffs(b'1 command-name have-data command'),
42 ffs(b'1 command-data continuation %s' % data.getvalue()),
45 ffs(b'1 command-data continuation %s' % data.getvalue()),
@@ -45,7 +48,10 b' class FrameTests(unittest.TestCase):'
45
48
46 def testdatamultipleframes(self):
49 def testdatamultipleframes(self):
47 data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
50 data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
48 frames = list(framing.createcommandframes(1, b'command', {}, data))
51
52 stream = framing.stream()
53 frames = list(framing.createcommandframes(stream, 1, b'command', {},
54 data))
49 self.assertEqual(frames, [
55 self.assertEqual(frames, [
50 ffs(b'1 command-name have-data command'),
56 ffs(b'1 command-name have-data command'),
51 ffs(b'1 command-data continuation %s' % (
57 ffs(b'1 command-data continuation %s' % (
@@ -56,7 +62,8 b' class FrameTests(unittest.TestCase):'
56 def testargsanddata(self):
62 def testargsanddata(self):
57 data = util.bytesio(b'x' * 100)
63 data = util.bytesio(b'x' * 100)
58
64
59 frames = list(framing.createcommandframes(1, b'command', {
65 stream = framing.stream()
66 frames = list(framing.createcommandframes(stream, 1, b'command', {
60 b'key1': b'key1value',
67 b'key1': b'key1value',
61 b'key2': b'key2value',
68 b'key2': b'key2value',
62 b'key3': b'key3value',
69 b'key3': b'key3value',
@@ -75,51 +82,54 b' class FrameTests(unittest.TestCase):'
75 with self.assertRaisesRegexp(ValueError,
82 with self.assertRaisesRegexp(ValueError,
76 'cannot use more than 255 formatting'):
83 'cannot use more than 255 formatting'):
77 args = [b'x' for i in range(256)]
84 args = [b'x' for i in range(256)]
78 list(framing.createtextoutputframe(1, [(b'bleh', args, [])]))
85 list(framing.createtextoutputframe(None, 1,
86 [(b'bleh', args, [])]))
79
87
80 def testtextoutputexcessivelabels(self):
88 def testtextoutputexcessivelabels(self):
81 """At most 255 labels are allowed."""
89 """At most 255 labels are allowed."""
82 with self.assertRaisesRegexp(ValueError,
90 with self.assertRaisesRegexp(ValueError,
83 'cannot use more than 255 labels'):
91 'cannot use more than 255 labels'):
84 labels = [b'l' for i in range(256)]
92 labels = [b'l' for i in range(256)]
85 list(framing.createtextoutputframe(1, [(b'bleh', [], labels)]))
93 list(framing.createtextoutputframe(None, 1,
94 [(b'bleh', [], labels)]))
86
95
87 def testtextoutputformattingstringtype(self):
96 def testtextoutputformattingstringtype(self):
88 """Formatting string must be bytes."""
97 """Formatting string must be bytes."""
89 with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
98 with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
90 list(framing.createtextoutputframe(1, [
99 list(framing.createtextoutputframe(None, 1, [
91 (b'foo'.decode('ascii'), [], [])]))
100 (b'foo'.decode('ascii'), [], [])]))
92
101
93 def testtextoutputargumentbytes(self):
102 def testtextoutputargumentbytes(self):
94 with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
103 with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
95 list(framing.createtextoutputframe(1, [
104 list(framing.createtextoutputframe(None, 1, [
96 (b'foo', [b'foo'.decode('ascii')], [])]))
105 (b'foo', [b'foo'.decode('ascii')], [])]))
97
106
98 def testtextoutputlabelbytes(self):
107 def testtextoutputlabelbytes(self):
99 with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
108 with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
100 list(framing.createtextoutputframe(1, [
109 list(framing.createtextoutputframe(None, 1, [
101 (b'foo', [], [b'foo'.decode('ascii')])]))
110 (b'foo', [], [b'foo'.decode('ascii')])]))
102
111
103 def testtextoutputtoolongformatstring(self):
112 def testtextoutputtoolongformatstring(self):
104 with self.assertRaisesRegexp(ValueError,
113 with self.assertRaisesRegexp(ValueError,
105 'formatting string cannot be longer than'):
114 'formatting string cannot be longer than'):
106 list(framing.createtextoutputframe(1, [
115 list(framing.createtextoutputframe(None, 1, [
107 (b'x' * 65536, [], [])]))
116 (b'x' * 65536, [], [])]))
108
117
109 def testtextoutputtoolongargumentstring(self):
118 def testtextoutputtoolongargumentstring(self):
110 with self.assertRaisesRegexp(ValueError,
119 with self.assertRaisesRegexp(ValueError,
111 'argument string cannot be longer than'):
120 'argument string cannot be longer than'):
112 list(framing.createtextoutputframe(1, [
121 list(framing.createtextoutputframe(None, 1, [
113 (b'bleh', [b'x' * 65536], [])]))
122 (b'bleh', [b'x' * 65536], [])]))
114
123
115 def testtextoutputtoolonglabelstring(self):
124 def testtextoutputtoolonglabelstring(self):
116 with self.assertRaisesRegexp(ValueError,
125 with self.assertRaisesRegexp(ValueError,
117 'label string cannot be longer than'):
126 'label string cannot be longer than'):
118 list(framing.createtextoutputframe(1, [
127 list(framing.createtextoutputframe(None, 1, [
119 (b'bleh', [], [b'x' * 65536])]))
128 (b'bleh', [], [b'x' * 65536])]))
120
129
121 def testtextoutput1simpleatom(self):
130 def testtextoutput1simpleatom(self):
122 val = list(framing.createtextoutputframe(1, [
131 stream = framing.stream()
132 val = list(framing.createtextoutputframe(stream, 1, [
123 (b'foo', [], [])]))
133 (b'foo', [], [])]))
124
134
125 self.assertEqual(val, [
135 self.assertEqual(val, [
@@ -127,7 +137,8 b' class FrameTests(unittest.TestCase):'
127 ])
137 ])
128
138
129 def testtextoutput2simpleatoms(self):
139 def testtextoutput2simpleatoms(self):
130 val = list(framing.createtextoutputframe(1, [
140 stream = framing.stream()
141 val = list(framing.createtextoutputframe(stream, 1, [
131 (b'foo', [], []),
142 (b'foo', [], []),
132 (b'bar', [], []),
143 (b'bar', [], []),
133 ]))
144 ]))
@@ -137,7 +148,8 b' class FrameTests(unittest.TestCase):'
137 ])
148 ])
138
149
139 def testtextoutput1arg(self):
150 def testtextoutput1arg(self):
140 val = list(framing.createtextoutputframe(1, [
151 stream = framing.stream()
152 val = list(framing.createtextoutputframe(stream, 1, [
141 (b'foo %s', [b'val1'], []),
153 (b'foo %s', [b'val1'], []),
142 ]))
154 ]))
143
155
@@ -146,7 +158,8 b' class FrameTests(unittest.TestCase):'
146 ])
158 ])
147
159
148 def testtextoutput2arg(self):
160 def testtextoutput2arg(self):
149 val = list(framing.createtextoutputframe(1, [
161 stream = framing.stream()
162 val = list(framing.createtextoutputframe(stream, 1, [
150 (b'foo %s %s', [b'val', b'value'], []),
163 (b'foo %s %s', [b'val', b'value'], []),
151 ]))
164 ]))
152
165
@@ -156,7 +169,8 b' class FrameTests(unittest.TestCase):'
156 ])
169 ])
157
170
158 def testtextoutput1label(self):
171 def testtextoutput1label(self):
159 val = list(framing.createtextoutputframe(1, [
172 stream = framing.stream()
173 val = list(framing.createtextoutputframe(stream, 1, [
160 (b'foo', [], [b'label']),
174 (b'foo', [], [b'label']),
161 ]))
175 ]))
162
176
@@ -165,7 +179,8 b' class FrameTests(unittest.TestCase):'
165 ])
179 ])
166
180
167 def testargandlabel(self):
181 def testargandlabel(self):
168 val = list(framing.createtextoutputframe(1, [
182 stream = framing.stream()
183 val = list(framing.createtextoutputframe(stream, 1, [
169 (b'foo %s', [b'arg'], [b'label']),
184 (b'foo %s', [b'arg'], [b'label']),
170 ]))
185 ]))
171
186
@@ -193,7 +208,8 b' class ServerReactorTests(unittest.TestCa'
193 def test1framecommand(self):
208 def test1framecommand(self):
194 """Receiving a command in a single frame yields request to run it."""
209 """Receiving a command in a single frame yields request to run it."""
195 reactor = makereactor()
210 reactor = makereactor()
196 results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
211 stream = framing.stream()
212 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
197 self.assertEqual(len(results), 1)
213 self.assertEqual(len(results), 1)
198 self.assertaction(results[0], 'runcommand')
214 self.assertaction(results[0], 'runcommand')
199 self.assertEqual(results[0][1], {
215 self.assertEqual(results[0][1], {
@@ -208,7 +224,8 b' class ServerReactorTests(unittest.TestCa'
208
224
209 def test1argument(self):
225 def test1argument(self):
210 reactor = makereactor()
226 reactor = makereactor()
211 results = list(sendcommandframes(reactor, 41, b'mycommand',
227 stream = framing.stream()
228 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
212 {b'foo': b'bar'}))
229 {b'foo': b'bar'}))
213 self.assertEqual(len(results), 2)
230 self.assertEqual(len(results), 2)
214 self.assertaction(results[0], 'wantframe')
231 self.assertaction(results[0], 'wantframe')
@@ -222,7 +239,8 b' class ServerReactorTests(unittest.TestCa'
222
239
223 def testmultiarguments(self):
240 def testmultiarguments(self):
224 reactor = makereactor()
241 reactor = makereactor()
225 results = list(sendcommandframes(reactor, 1, b'mycommand',
242 stream = framing.stream()
243 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
226 {b'foo': b'bar', b'biz': b'baz'}))
244 {b'foo': b'bar', b'biz': b'baz'}))
227 self.assertEqual(len(results), 3)
245 self.assertEqual(len(results), 3)
228 self.assertaction(results[0], 'wantframe')
246 self.assertaction(results[0], 'wantframe')
@@ -237,7 +255,8 b' class ServerReactorTests(unittest.TestCa'
237
255
238 def testsimplecommanddata(self):
256 def testsimplecommanddata(self):
239 reactor = makereactor()
257 reactor = makereactor()
240 results = list(sendcommandframes(reactor, 1, b'mycommand', {},
258 stream = framing.stream()
259 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
241 util.bytesio(b'data!')))
260 util.bytesio(b'data!')))
242 self.assertEqual(len(results), 2)
261 self.assertEqual(len(results), 2)
243 self.assertaction(results[0], 'wantframe')
262 self.assertaction(results[0], 'wantframe')
@@ -350,19 +369,20 b' class ServerReactorTests(unittest.TestCa'
350 """Multiple fully serviced commands with same request ID is allowed."""
369 """Multiple fully serviced commands with same request ID is allowed."""
351 reactor = makereactor()
370 reactor = makereactor()
352 results = []
371 results = []
372 outstream = framing.stream()
353 results.append(self._sendsingleframe(
373 results.append(self._sendsingleframe(
354 reactor, ffs(b'1 command-name eos command')))
374 reactor, ffs(b'1 command-name eos command')))
355 result = reactor.onbytesresponseready(1, b'response1')
375 result = reactor.onbytesresponseready(outstream, 1, b'response1')
356 self.assertaction(result, 'sendframes')
376 self.assertaction(result, 'sendframes')
357 list(result[1]['framegen'])
377 list(result[1]['framegen'])
358 results.append(self._sendsingleframe(
378 results.append(self._sendsingleframe(
359 reactor, ffs(b'1 command-name eos command')))
379 reactor, ffs(b'1 command-name eos command')))
360 result = reactor.onbytesresponseready(1, b'response2')
380 result = reactor.onbytesresponseready(outstream, 1, b'response2')
361 self.assertaction(result, 'sendframes')
381 self.assertaction(result, 'sendframes')
362 list(result[1]['framegen'])
382 list(result[1]['framegen'])
363 results.append(self._sendsingleframe(
383 results.append(self._sendsingleframe(
364 reactor, ffs(b'1 command-name eos command')))
384 reactor, ffs(b'1 command-name eos command')))
365 result = reactor.onbytesresponseready(1, b'response3')
385 result = reactor.onbytesresponseready(outstream, 1, b'response3')
366 self.assertaction(result, 'sendframes')
386 self.assertaction(result, 'sendframes')
367 list(result[1]['framegen'])
387 list(result[1]['framegen'])
368
388
@@ -501,9 +521,11 b' class ServerReactorTests(unittest.TestCa'
501 def testsimpleresponse(self):
521 def testsimpleresponse(self):
502 """Bytes response to command sends result frames."""
522 """Bytes response to command sends result frames."""
503 reactor = makereactor()
523 reactor = makereactor()
504 list(sendcommandframes(reactor, 1, b'mycommand', {}))
524 instream = framing.stream()
525 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
505
526
506 result = reactor.onbytesresponseready(1, b'response')
527 outstream = framing.stream()
528 result = reactor.onbytesresponseready(outstream, 1, b'response')
507 self.assertaction(result, 'sendframes')
529 self.assertaction(result, 'sendframes')
508 self.assertframesequal(result[1]['framegen'], [
530 self.assertframesequal(result[1]['framegen'], [
509 b'1 bytes-response eos response',
531 b'1 bytes-response eos response',
@@ -515,9 +537,11 b' class ServerReactorTests(unittest.TestCa'
515 second = b'y' * 100
537 second = b'y' * 100
516
538
517 reactor = makereactor()
539 reactor = makereactor()
518 list(sendcommandframes(reactor, 1, b'mycommand', {}))
540 instream = framing.stream()
541 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
519
542
520 result = reactor.onbytesresponseready(1, first + second)
543 outstream = framing.stream()
544 result = reactor.onbytesresponseready(outstream, 1, first + second)
521 self.assertaction(result, 'sendframes')
545 self.assertaction(result, 'sendframes')
522 self.assertframesequal(result[1]['framegen'], [
546 self.assertframesequal(result[1]['framegen'], [
523 b'1 bytes-response continuation %s' % first,
547 b'1 bytes-response continuation %s' % first,
@@ -526,9 +550,11 b' class ServerReactorTests(unittest.TestCa'
526
550
527 def testapplicationerror(self):
551 def testapplicationerror(self):
528 reactor = makereactor()
552 reactor = makereactor()
529 list(sendcommandframes(reactor, 1, b'mycommand', {}))
553 instream = framing.stream()
554 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
530
555
531 result = reactor.onapplicationerror(1, b'some message')
556 outstream = framing.stream()
557 result = reactor.onapplicationerror(outstream, 1, b'some message')
532 self.assertaction(result, 'sendframes')
558 self.assertaction(result, 'sendframes')
533 self.assertframesequal(result[1]['framegen'], [
559 self.assertframesequal(result[1]['framegen'], [
534 b'1 error-response application some message',
560 b'1 error-response application some message',
@@ -537,11 +563,14 b' class ServerReactorTests(unittest.TestCa'
537 def test1commanddeferresponse(self):
563 def test1commanddeferresponse(self):
538 """Responses when in deferred output mode are delayed until EOF."""
564 """Responses when in deferred output mode are delayed until EOF."""
539 reactor = makereactor(deferoutput=True)
565 reactor = makereactor(deferoutput=True)
540 results = list(sendcommandframes(reactor, 1, b'mycommand', {}))
566 instream = framing.stream()
567 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
568 {}))
541 self.assertEqual(len(results), 1)
569 self.assertEqual(len(results), 1)
542 self.assertaction(results[0], 'runcommand')
570 self.assertaction(results[0], 'runcommand')
543
571
544 result = reactor.onbytesresponseready(1, b'response')
572 outstream = framing.stream()
573 result = reactor.onbytesresponseready(outstream, 1, b'response')
545 self.assertaction(result, 'noop')
574 self.assertaction(result, 'noop')
546 result = reactor.oninputeof()
575 result = reactor.oninputeof()
547 self.assertaction(result, 'sendframes')
576 self.assertaction(result, 'sendframes')
@@ -551,12 +580,14 b' class ServerReactorTests(unittest.TestCa'
551
580
552 def testmultiplecommanddeferresponse(self):
581 def testmultiplecommanddeferresponse(self):
553 reactor = makereactor(deferoutput=True)
582 reactor = makereactor(deferoutput=True)
554 list(sendcommandframes(reactor, 1, b'command1', {}))
583 instream = framing.stream()
555 list(sendcommandframes(reactor, 3, b'command2', {}))
584 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
585 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
556
586
557 result = reactor.onbytesresponseready(1, b'response1')
587 outstream = framing.stream()
588 result = reactor.onbytesresponseready(outstream, 1, b'response1')
558 self.assertaction(result, 'noop')
589 self.assertaction(result, 'noop')
559 result = reactor.onbytesresponseready(3, b'response2')
590 result = reactor.onbytesresponseready(outstream, 3, b'response2')
560 self.assertaction(result, 'noop')
591 self.assertaction(result, 'noop')
561 result = reactor.oninputeof()
592 result = reactor.oninputeof()
562 self.assertaction(result, 'sendframes')
593 self.assertaction(result, 'sendframes')
@@ -567,14 +598,16 b' class ServerReactorTests(unittest.TestCa'
567
598
568 def testrequestidtracking(self):
599 def testrequestidtracking(self):
569 reactor = makereactor(deferoutput=True)
600 reactor = makereactor(deferoutput=True)
570 list(sendcommandframes(reactor, 1, b'command1', {}))
601 instream = framing.stream()
571 list(sendcommandframes(reactor, 3, b'command2', {}))
602 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
572 list(sendcommandframes(reactor, 5, b'command3', {}))
603 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
604 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
573
605
574 # Register results for commands out of order.
606 # Register results for commands out of order.
575 reactor.onbytesresponseready(3, b'response3')
607 outstream = framing.stream()
576 reactor.onbytesresponseready(1, b'response1')
608 reactor.onbytesresponseready(outstream, 3, b'response3')
577 reactor.onbytesresponseready(5, b'response5')
609 reactor.onbytesresponseready(outstream, 1, b'response1')
610 reactor.onbytesresponseready(outstream, 5, b'response5')
578
611
579 result = reactor.oninputeof()
612 result = reactor.oninputeof()
580 self.assertaction(result, 'sendframes')
613 self.assertaction(result, 'sendframes')
@@ -587,8 +620,9 b' class ServerReactorTests(unittest.TestCa'
587 def testduplicaterequestonactivecommand(self):
620 def testduplicaterequestonactivecommand(self):
588 """Receiving a request ID that matches a request that isn't finished."""
621 """Receiving a request ID that matches a request that isn't finished."""
589 reactor = makereactor()
622 reactor = makereactor()
590 list(sendcommandframes(reactor, 1, b'command1', {}))
623 stream = framing.stream()
591 results = list(sendcommandframes(reactor, 1, b'command1', {}))
624 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
625 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
592
626
593 self.assertaction(results[0], 'error')
627 self.assertaction(results[0], 'error')
594 self.assertEqual(results[0][1], {
628 self.assertEqual(results[0][1], {
@@ -598,13 +632,15 b' class ServerReactorTests(unittest.TestCa'
598 def testduplicaterequestonactivecommandnosend(self):
632 def testduplicaterequestonactivecommandnosend(self):
599 """Same as above but we've registered a response but haven't sent it."""
633 """Same as above but we've registered a response but haven't sent it."""
600 reactor = makereactor()
634 reactor = makereactor()
601 list(sendcommandframes(reactor, 1, b'command1', {}))
635 instream = framing.stream()
602 reactor.onbytesresponseready(1, b'response')
636 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
637 outstream = framing.stream()
638 reactor.onbytesresponseready(outstream, 1, b'response')
603
639
604 # We've registered the response but haven't sent it. From the
640 # We've registered the response but haven't sent it. From the
605 # perspective of the reactor, the command is still active.
641 # perspective of the reactor, the command is still active.
606
642
607 results = list(sendcommandframes(reactor, 1, b'command1', {}))
643 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
608 self.assertaction(results[0], 'error')
644 self.assertaction(results[0], 'error')
609 self.assertEqual(results[0][1], {
645 self.assertEqual(results[0][1], {
610 'message': b'request with ID 1 is already active',
646 'message': b'request with ID 1 is already active',
@@ -613,7 +649,8 b' class ServerReactorTests(unittest.TestCa'
613 def testduplicaterequestargumentframe(self):
649 def testduplicaterequestargumentframe(self):
614 """Variant on above except we sent an argument frame instead of name."""
650 """Variant on above except we sent an argument frame instead of name."""
615 reactor = makereactor()
651 reactor = makereactor()
616 list(sendcommandframes(reactor, 1, b'command', {}))
652 stream = framing.stream()
653 list(sendcommandframes(reactor, stream, 1, b'command', {}))
617 results = list(sendframes(reactor, [
654 results = list(sendframes(reactor, [
618 ffs(b'3 command-name have-args command'),
655 ffs(b'3 command-name have-args command'),
619 ffs(b'1 command-argument 0 ignored'),
656 ffs(b'1 command-argument 0 ignored'),
@@ -627,11 +664,13 b' class ServerReactorTests(unittest.TestCa'
627 def testduplicaterequestaftersend(self):
664 def testduplicaterequestaftersend(self):
628 """We can use a duplicate request ID after we've sent the response."""
665 """We can use a duplicate request ID after we've sent the response."""
629 reactor = makereactor()
666 reactor = makereactor()
630 list(sendcommandframes(reactor, 1, b'command1', {}))
667 instream = framing.stream()
631 res = reactor.onbytesresponseready(1, b'response')
668 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
669 outstream = framing.stream()
670 res = reactor.onbytesresponseready(outstream, 1, b'response')
632 list(res[1]['framegen'])
671 list(res[1]['framegen'])
633
672
634 results = list(sendcommandframes(reactor, 1, b'command1', {}))
673 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
635 self.assertaction(results[0], 'runcommand')
674 self.assertaction(results[0], 'runcommand')
636
675
637 if __name__ == '__main__':
676 if __name__ == '__main__':
General Comments 0
You need to be logged in to leave comments. Login now