##// END OF EJS Templates
wireprotov2: remove functions for creating response frames from bytes...
Gregory Szorc -
r40170:966b5f7f default
parent child Browse files
Show More
@@ -365,75 +365,6 b' def createcommandframes(stream, requesti'
365 365 if done:
366 366 break
367 367
368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 """Create a raw frame to send a bytes response from static bytes input.
371
372 Returns a generator of bytearrays.
373 """
374 # Automatically send the overall CBOR response map.
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 if len(overall) > maxframesize:
377 raise error.ProgrammingError('not yet implemented')
378
379 # Simple case where we can fit the full response in a single frame.
380 if len(overall) + len(data) <= maxframesize:
381 flags = FLAG_COMMAND_RESPONSE_EOS
382 yield stream.makeframe(requestid=requestid,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 flags=flags,
385 payload=overall + data)
386 return
387
388 # It's easier to send the overall CBOR map in its own frame than to track
389 # offsets.
390 yield stream.makeframe(requestid=requestid,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 payload=overall)
394
395 offset = 0
396 while True:
397 chunk = data[offset:offset + maxframesize]
398 offset += len(chunk)
399 done = offset == len(data)
400
401 if done:
402 flags = FLAG_COMMAND_RESPONSE_EOS
403 else:
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405
406 yield stream.makeframe(requestid=requestid,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 flags=flags,
409 payload=chunk)
410
411 if done:
412 break
413
414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 """Generator of frames from a generator of byte chunks.
417
418 This assumes that another frame will follow whatever this emits. i.e.
419 this always emits the continuation flag and never emits the end-of-stream
420 flag.
421 """
422 cb = util.chunkbuffer(gen)
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424
425 while True:
426 chunk = cb.read(maxframesize)
427 if not chunk:
428 break
429
430 yield stream.makeframe(requestid=requestid,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 flags=flags,
433 payload=chunk)
434
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436
437 368 def createcommandresponseokframe(stream, requestid):
438 369 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439 370
@@ -1020,30 +951,6 b' class serverreactor(object):'
1020 951
1021 952 return meth(frame)
1022 953
1023 def oncommandresponseready(self, stream, requestid, data):
1024 """Signal that a bytes response is ready to be sent to the client.
1025
1026 The raw bytes response is passed as an argument.
1027 """
1028 ensureserverstream(stream)
1029
1030 def sendframes():
1031 for frame in createcommandresponseframesfrombytes(stream, requestid,
1032 data):
1033 yield frame
1034
1035 self._activecommands.remove(requestid)
1036
1037 result = sendframes()
1038
1039 if self._deferoutput:
1040 self._bufferedframegens.append(result)
1041 return 'noop', {}
1042 else:
1043 return 'sendframes', {
1044 'framegen': result,
1045 }
1046
1047 954 def oncommandresponsereadyobjects(self, stream, requestid, objs):
1048 955 """Signal that objects are ready to be sent to the client.
1049 956
@@ -1053,6 +960,10 b' class serverreactor(object):'
1053 960 """
1054 961 ensureserverstream(stream)
1055 962
963 # A more robust solution would be to check for objs.{next,__next__}.
964 if isinstance(objs, list):
965 objs = iter(objs)
966
1056 967 # We need to take care over exception handling. Uncaught exceptions
1057 968 # when generating frames could lead to premature end of the frame
1058 969 # stream and the possibility of the server or client process getting
@@ -225,19 +225,22 b' class ServerReactorTests(unittest.TestCa'
225 225 results.append(self._sendsingleframe(
226 226 reactor, ffs(b'1 1 stream-begin command-request new '
227 227 b"cbor:{b'name': b'command'}")))
228 result = reactor.oncommandresponseready(outstream, 1, b'response1')
228 result = reactor.oncommandresponsereadyobjects(
229 outstream, 1, [b'response1'])
229 230 self.assertaction(result, b'sendframes')
230 231 list(result[1][b'framegen'])
231 232 results.append(self._sendsingleframe(
232 233 reactor, ffs(b'1 1 stream-begin command-request new '
233 234 b"cbor:{b'name': b'command'}")))
234 result = reactor.oncommandresponseready(outstream, 1, b'response2')
235 result = reactor.oncommandresponsereadyobjects(
236 outstream, 1, [b'response2'])
235 237 self.assertaction(result, b'sendframes')
236 238 list(result[1][b'framegen'])
237 239 results.append(self._sendsingleframe(
238 240 reactor, ffs(b'1 1 stream-begin command-request new '
239 241 b"cbor:{b'name': b'command'}")))
240 result = reactor.oncommandresponseready(outstream, 1, b'response3')
242 result = reactor.oncommandresponsereadyobjects(
243 outstream, 1, [b'response3'])
241 244 self.assertaction(result, b'sendframes')
242 245 list(result[1][b'framegen'])
243 246
@@ -364,10 +367,13 b' class ServerReactorTests(unittest.TestCa'
364 367 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
365 368
366 369 outstream = reactor.makeoutputstream()
367 result = reactor.oncommandresponseready(outstream, 1, b'response')
370 result = reactor.oncommandresponsereadyobjects(
371 outstream, 1, [b'response'])
368 372 self.assertaction(result, b'sendframes')
369 373 self.assertframesequal(result[1][b'framegen'], [
370 b'1 2 stream-begin command-response eos %sresponse' % OK,
374 b'1 2 stream-begin command-response continuation %s' % OK,
375 b'1 2 0 command-response continuation cbor:b"response"',
376 b'1 2 0 command-response eos ',
371 377 ])
372 378
373 379 def testmultiframeresponse(self):
@@ -380,12 +386,16 b' class ServerReactorTests(unittest.TestCa'
380 386 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
381 387
382 388 outstream = reactor.makeoutputstream()
383 result = reactor.oncommandresponseready(outstream, 1, first + second)
389 result = reactor.oncommandresponsereadyobjects(
390 outstream, 1, [first + second])
384 391 self.assertaction(result, b'sendframes')
385 392 self.assertframesequal(result[1][b'framegen'], [
386 393 b'1 2 stream-begin command-response continuation %s' % OK,
394 b'1 2 0 command-response continuation Y\x80d',
387 395 b'1 2 0 command-response continuation %s' % first,
388 b'1 2 0 command-response eos %s' % second,
396 b'1 2 0 command-response continuation %s' % second,
397 b'1 2 0 command-response continuation ',
398 b'1 2 0 command-response eos '
389 399 ])
390 400
391 401 def testservererror(self):
@@ -412,12 +422,15 b' class ServerReactorTests(unittest.TestCa'
412 422 self.assertaction(results[0], b'runcommand')
413 423
414 424 outstream = reactor.makeoutputstream()
415 result = reactor.oncommandresponseready(outstream, 1, b'response')
425 result = reactor.oncommandresponsereadyobjects(
426 outstream, 1, [b'response'])
416 427 self.assertaction(result, b'noop')
417 428 result = reactor.oninputeof()
418 429 self.assertaction(result, b'sendframes')
419 430 self.assertframesequal(result[1][b'framegen'], [
420 b'1 2 stream-begin command-response eos %sresponse' % OK,
431 b'1 2 stream-begin command-response continuation %s' % OK,
432 b'1 2 0 command-response continuation cbor:b"response"',
433 b'1 2 0 command-response eos ',
421 434 ])
422 435
423 436 def testmultiplecommanddeferresponse(self):
@@ -427,15 +440,21 b' class ServerReactorTests(unittest.TestCa'
427 440 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
428 441
429 442 outstream = reactor.makeoutputstream()
430 result = reactor.oncommandresponseready(outstream, 1, b'response1')
443 result = reactor.oncommandresponsereadyobjects(
444 outstream, 1, [b'response1'])
431 445 self.assertaction(result, b'noop')
432 result = reactor.oncommandresponseready(outstream, 3, b'response2')
446 result = reactor.oncommandresponsereadyobjects(
447 outstream, 3, [b'response2'])
433 448 self.assertaction(result, b'noop')
434 449 result = reactor.oninputeof()
435 450 self.assertaction(result, b'sendframes')
436 451 self.assertframesequal(result[1][b'framegen'], [
437 b'1 2 stream-begin command-response eos %sresponse1' % OK,
438 b'3 2 0 command-response eos %sresponse2' % OK,
452 b'1 2 stream-begin command-response continuation %s' % OK,
453 b'1 2 0 command-response continuation cbor:b"response1"',
454 b'1 2 0 command-response eos ',
455 b'3 2 0 command-response continuation %s' % OK,
456 b'3 2 0 command-response continuation cbor:b"response2"',
457 b'3 2 0 command-response eos ',
439 458 ])
440 459
441 460 def testrequestidtracking(self):
@@ -447,16 +466,22 b' class ServerReactorTests(unittest.TestCa'
447 466
448 467 # Register results for commands out of order.
449 468 outstream = reactor.makeoutputstream()
450 reactor.oncommandresponseready(outstream, 3, b'response3')
451 reactor.oncommandresponseready(outstream, 1, b'response1')
452 reactor.oncommandresponseready(outstream, 5, b'response5')
469 reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3'])
470 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1'])
471 reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5'])
453 472
454 473 result = reactor.oninputeof()
455 474 self.assertaction(result, b'sendframes')
456 475 self.assertframesequal(result[1][b'framegen'], [
457 b'3 2 stream-begin command-response eos %sresponse3' % OK,
458 b'1 2 0 command-response eos %sresponse1' % OK,
459 b'5 2 0 command-response eos %sresponse5' % OK,
476 b'3 2 stream-begin command-response continuation %s' % OK,
477 b'3 2 0 command-response continuation cbor:b"response3"',
478 b'3 2 0 command-response eos ',
479 b'1 2 0 command-response continuation %s' % OK,
480 b'1 2 0 command-response continuation cbor:b"response1"',
481 b'1 2 0 command-response eos ',
482 b'5 2 0 command-response continuation %s' % OK,
483 b'5 2 0 command-response continuation cbor:b"response5"',
484 b'5 2 0 command-response eos ',
460 485 ])
461 486
462 487 def testduplicaterequestonactivecommand(self):
@@ -477,7 +502,7 b' class ServerReactorTests(unittest.TestCa'
477 502 instream = framing.stream(1)
478 503 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
479 504 outstream = reactor.makeoutputstream()
480 reactor.oncommandresponseready(outstream, 1, b'response')
505 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
481 506
482 507 # We've registered the response but haven't sent it. From the
483 508 # perspective of the reactor, the command is still active.
@@ -494,7 +519,7 b' class ServerReactorTests(unittest.TestCa'
494 519 instream = framing.stream(1)
495 520 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
496 521 outstream = reactor.makeoutputstream()
497 res = reactor.oncommandresponseready(outstream, 1, b'response')
522 res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
498 523 list(res[1][b'framegen'])
499 524
500 525 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
General Comments 0
You need to be logged in to leave comments. Login now