##// END OF EJS Templates
wireprotov2: remove functions for creating response frames from bytes...
Gregory Szorc -
r40170:966b5f7f default
parent child Browse files
Show More
@@ -365,75 +365,6 b' def createcommandframes(stream, requesti'
365 if done:
365 if done:
366 break
366 break
367
367
368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 """Create a raw frame to send a bytes response from static bytes input.
371
372 Returns a generator of bytearrays.
373 """
374 # Automatically send the overall CBOR response map.
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 if len(overall) > maxframesize:
377 raise error.ProgrammingError('not yet implemented')
378
379 # Simple case where we can fit the full response in a single frame.
380 if len(overall) + len(data) <= maxframesize:
381 flags = FLAG_COMMAND_RESPONSE_EOS
382 yield stream.makeframe(requestid=requestid,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 flags=flags,
385 payload=overall + data)
386 return
387
388 # It's easier to send the overall CBOR map in its own frame than to track
389 # offsets.
390 yield stream.makeframe(requestid=requestid,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 payload=overall)
394
395 offset = 0
396 while True:
397 chunk = data[offset:offset + maxframesize]
398 offset += len(chunk)
399 done = offset == len(data)
400
401 if done:
402 flags = FLAG_COMMAND_RESPONSE_EOS
403 else:
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405
406 yield stream.makeframe(requestid=requestid,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 flags=flags,
409 payload=chunk)
410
411 if done:
412 break
413
414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 """Generator of frames from a generator of byte chunks.
417
418 This assumes that another frame will follow whatever this emits. i.e.
419 this always emits the continuation flag and never emits the end-of-stream
420 flag.
421 """
422 cb = util.chunkbuffer(gen)
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424
425 while True:
426 chunk = cb.read(maxframesize)
427 if not chunk:
428 break
429
430 yield stream.makeframe(requestid=requestid,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 flags=flags,
433 payload=chunk)
434
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436
437 def createcommandresponseokframe(stream, requestid):
368 def createcommandresponseokframe(stream, requestid):
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
369 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439
370
@@ -1020,30 +951,6 b' class serverreactor(object):'
1020
951
1021 return meth(frame)
952 return meth(frame)
1022
953
1023 def oncommandresponseready(self, stream, requestid, data):
1024 """Signal that a bytes response is ready to be sent to the client.
1025
1026 The raw bytes response is passed as an argument.
1027 """
1028 ensureserverstream(stream)
1029
1030 def sendframes():
1031 for frame in createcommandresponseframesfrombytes(stream, requestid,
1032 data):
1033 yield frame
1034
1035 self._activecommands.remove(requestid)
1036
1037 result = sendframes()
1038
1039 if self._deferoutput:
1040 self._bufferedframegens.append(result)
1041 return 'noop', {}
1042 else:
1043 return 'sendframes', {
1044 'framegen': result,
1045 }
1046
1047 def oncommandresponsereadyobjects(self, stream, requestid, objs):
954 def oncommandresponsereadyobjects(self, stream, requestid, objs):
1048 """Signal that objects are ready to be sent to the client.
955 """Signal that objects are ready to be sent to the client.
1049
956
@@ -1053,6 +960,10 b' class serverreactor(object):'
1053 """
960 """
1054 ensureserverstream(stream)
961 ensureserverstream(stream)
1055
962
963 # A more robust solution would be to check for objs.{next,__next__}.
964 if isinstance(objs, list):
965 objs = iter(objs)
966
1056 # We need to take care over exception handling. Uncaught exceptions
967 # We need to take care over exception handling. Uncaught exceptions
1057 # when generating frames could lead to premature end of the frame
968 # when generating frames could lead to premature end of the frame
1058 # stream and the possibility of the server or client process getting
969 # stream and the possibility of the server or client process getting
@@ -225,19 +225,22 b' class ServerReactorTests(unittest.TestCa'
225 results.append(self._sendsingleframe(
225 results.append(self._sendsingleframe(
226 reactor, ffs(b'1 1 stream-begin command-request new '
226 reactor, ffs(b'1 1 stream-begin command-request new '
227 b"cbor:{b'name': b'command'}")))
227 b"cbor:{b'name': b'command'}")))
228 result = reactor.oncommandresponseready(outstream, 1, b'response1')
228 result = reactor.oncommandresponsereadyobjects(
229 outstream, 1, [b'response1'])
229 self.assertaction(result, b'sendframes')
230 self.assertaction(result, b'sendframes')
230 list(result[1][b'framegen'])
231 list(result[1][b'framegen'])
231 results.append(self._sendsingleframe(
232 results.append(self._sendsingleframe(
232 reactor, ffs(b'1 1 stream-begin command-request new '
233 reactor, ffs(b'1 1 stream-begin command-request new '
233 b"cbor:{b'name': b'command'}")))
234 b"cbor:{b'name': b'command'}")))
234 result = reactor.oncommandresponseready(outstream, 1, b'response2')
235 result = reactor.oncommandresponsereadyobjects(
236 outstream, 1, [b'response2'])
235 self.assertaction(result, b'sendframes')
237 self.assertaction(result, b'sendframes')
236 list(result[1][b'framegen'])
238 list(result[1][b'framegen'])
237 results.append(self._sendsingleframe(
239 results.append(self._sendsingleframe(
238 reactor, ffs(b'1 1 stream-begin command-request new '
240 reactor, ffs(b'1 1 stream-begin command-request new '
239 b"cbor:{b'name': b'command'}")))
241 b"cbor:{b'name': b'command'}")))
240 result = reactor.oncommandresponseready(outstream, 1, b'response3')
242 result = reactor.oncommandresponsereadyobjects(
243 outstream, 1, [b'response3'])
241 self.assertaction(result, b'sendframes')
244 self.assertaction(result, b'sendframes')
242 list(result[1][b'framegen'])
245 list(result[1][b'framegen'])
243
246
@@ -364,10 +367,13 b' class ServerReactorTests(unittest.TestCa'
364 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
367 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
365
368
366 outstream = reactor.makeoutputstream()
369 outstream = reactor.makeoutputstream()
367 result = reactor.oncommandresponseready(outstream, 1, b'response')
370 result = reactor.oncommandresponsereadyobjects(
371 outstream, 1, [b'response'])
368 self.assertaction(result, b'sendframes')
372 self.assertaction(result, b'sendframes')
369 self.assertframesequal(result[1][b'framegen'], [
373 self.assertframesequal(result[1][b'framegen'], [
370 b'1 2 stream-begin command-response eos %sresponse' % OK,
374 b'1 2 stream-begin command-response continuation %s' % OK,
375 b'1 2 0 command-response continuation cbor:b"response"',
376 b'1 2 0 command-response eos ',
371 ])
377 ])
372
378
373 def testmultiframeresponse(self):
379 def testmultiframeresponse(self):
@@ -380,12 +386,16 b' class ServerReactorTests(unittest.TestCa'
380 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
386 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
381
387
382 outstream = reactor.makeoutputstream()
388 outstream = reactor.makeoutputstream()
383 result = reactor.oncommandresponseready(outstream, 1, first + second)
389 result = reactor.oncommandresponsereadyobjects(
390 outstream, 1, [first + second])
384 self.assertaction(result, b'sendframes')
391 self.assertaction(result, b'sendframes')
385 self.assertframesequal(result[1][b'framegen'], [
392 self.assertframesequal(result[1][b'framegen'], [
386 b'1 2 stream-begin command-response continuation %s' % OK,
393 b'1 2 stream-begin command-response continuation %s' % OK,
394 b'1 2 0 command-response continuation Y\x80d',
387 b'1 2 0 command-response continuation %s' % first,
395 b'1 2 0 command-response continuation %s' % first,
388 b'1 2 0 command-response eos %s' % second,
396 b'1 2 0 command-response continuation %s' % second,
397 b'1 2 0 command-response continuation ',
398 b'1 2 0 command-response eos '
389 ])
399 ])
390
400
391 def testservererror(self):
401 def testservererror(self):
@@ -412,12 +422,15 b' class ServerReactorTests(unittest.TestCa'
412 self.assertaction(results[0], b'runcommand')
422 self.assertaction(results[0], b'runcommand')
413
423
414 outstream = reactor.makeoutputstream()
424 outstream = reactor.makeoutputstream()
415 result = reactor.oncommandresponseready(outstream, 1, b'response')
425 result = reactor.oncommandresponsereadyobjects(
426 outstream, 1, [b'response'])
416 self.assertaction(result, b'noop')
427 self.assertaction(result, b'noop')
417 result = reactor.oninputeof()
428 result = reactor.oninputeof()
418 self.assertaction(result, b'sendframes')
429 self.assertaction(result, b'sendframes')
419 self.assertframesequal(result[1][b'framegen'], [
430 self.assertframesequal(result[1][b'framegen'], [
420 b'1 2 stream-begin command-response eos %sresponse' % OK,
431 b'1 2 stream-begin command-response continuation %s' % OK,
432 b'1 2 0 command-response continuation cbor:b"response"',
433 b'1 2 0 command-response eos ',
421 ])
434 ])
422
435
423 def testmultiplecommanddeferresponse(self):
436 def testmultiplecommanddeferresponse(self):
@@ -427,15 +440,21 b' class ServerReactorTests(unittest.TestCa'
427 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
440 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
428
441
429 outstream = reactor.makeoutputstream()
442 outstream = reactor.makeoutputstream()
430 result = reactor.oncommandresponseready(outstream, 1, b'response1')
443 result = reactor.oncommandresponsereadyobjects(
444 outstream, 1, [b'response1'])
431 self.assertaction(result, b'noop')
445 self.assertaction(result, b'noop')
432 result = reactor.oncommandresponseready(outstream, 3, b'response2')
446 result = reactor.oncommandresponsereadyobjects(
447 outstream, 3, [b'response2'])
433 self.assertaction(result, b'noop')
448 self.assertaction(result, b'noop')
434 result = reactor.oninputeof()
449 result = reactor.oninputeof()
435 self.assertaction(result, b'sendframes')
450 self.assertaction(result, b'sendframes')
436 self.assertframesequal(result[1][b'framegen'], [
451 self.assertframesequal(result[1][b'framegen'], [
437 b'1 2 stream-begin command-response eos %sresponse1' % OK,
452 b'1 2 stream-begin command-response continuation %s' % OK,
438 b'3 2 0 command-response eos %sresponse2' % OK,
453 b'1 2 0 command-response continuation cbor:b"response1"',
454 b'1 2 0 command-response eos ',
455 b'3 2 0 command-response continuation %s' % OK,
456 b'3 2 0 command-response continuation cbor:b"response2"',
457 b'3 2 0 command-response eos ',
439 ])
458 ])
440
459
441 def testrequestidtracking(self):
460 def testrequestidtracking(self):
@@ -447,16 +466,22 b' class ServerReactorTests(unittest.TestCa'
447
466
448 # Register results for commands out of order.
467 # Register results for commands out of order.
449 outstream = reactor.makeoutputstream()
468 outstream = reactor.makeoutputstream()
450 reactor.oncommandresponseready(outstream, 3, b'response3')
469 reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3'])
451 reactor.oncommandresponseready(outstream, 1, b'response1')
470 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1'])
452 reactor.oncommandresponseready(outstream, 5, b'response5')
471 reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5'])
453
472
454 result = reactor.oninputeof()
473 result = reactor.oninputeof()
455 self.assertaction(result, b'sendframes')
474 self.assertaction(result, b'sendframes')
456 self.assertframesequal(result[1][b'framegen'], [
475 self.assertframesequal(result[1][b'framegen'], [
457 b'3 2 stream-begin command-response eos %sresponse3' % OK,
476 b'3 2 stream-begin command-response continuation %s' % OK,
458 b'1 2 0 command-response eos %sresponse1' % OK,
477 b'3 2 0 command-response continuation cbor:b"response3"',
459 b'5 2 0 command-response eos %sresponse5' % OK,
478 b'3 2 0 command-response eos ',
479 b'1 2 0 command-response continuation %s' % OK,
480 b'1 2 0 command-response continuation cbor:b"response1"',
481 b'1 2 0 command-response eos ',
482 b'5 2 0 command-response continuation %s' % OK,
483 b'5 2 0 command-response continuation cbor:b"response5"',
484 b'5 2 0 command-response eos ',
460 ])
485 ])
461
486
462 def testduplicaterequestonactivecommand(self):
487 def testduplicaterequestonactivecommand(self):
@@ -477,7 +502,7 b' class ServerReactorTests(unittest.TestCa'
477 instream = framing.stream(1)
502 instream = framing.stream(1)
478 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
503 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
479 outstream = reactor.makeoutputstream()
504 outstream = reactor.makeoutputstream()
480 reactor.oncommandresponseready(outstream, 1, b'response')
505 reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
481
506
482 # We've registered the response but haven't sent it. From the
507 # We've registered the response but haven't sent it. From the
483 # perspective of the reactor, the command is still active.
508 # perspective of the reactor, the command is still active.
@@ -494,7 +519,7 b' class ServerReactorTests(unittest.TestCa'
494 instream = framing.stream(1)
519 instream = framing.stream(1)
495 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
520 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
496 outstream = reactor.makeoutputstream()
521 outstream = reactor.makeoutputstream()
497 res = reactor.oncommandresponseready(outstream, 1, b'response')
522 res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
498 list(res[1][b'framegen'])
523 list(res[1][b'framegen'])
499
524
500 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
525 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
General Comments 0
You need to be logged in to leave comments. Login now