Show More
@@ -365,75 +365,6 b' def createcommandframes(stream, requesti' | |||
|
365 | 365 | if done: |
|
366 | 366 | break |
|
367 | 367 | |
|
368 | def createcommandresponseframesfrombytes(stream, requestid, data, | |
|
369 | maxframesize=DEFAULT_MAX_FRAME_SIZE): | |
|
370 | """Create a raw frame to send a bytes response from static bytes input. | |
|
371 | ||
|
372 | Returns a generator of bytearrays. | |
|
373 | """ | |
|
374 | # Automatically send the overall CBOR response map. | |
|
375 | overall = b''.join(cborutil.streamencode({b'status': b'ok'})) | |
|
376 | if len(overall) > maxframesize: | |
|
377 | raise error.ProgrammingError('not yet implemented') | |
|
378 | ||
|
379 | # Simple case where we can fit the full response in a single frame. | |
|
380 | if len(overall) + len(data) <= maxframesize: | |
|
381 | flags = FLAG_COMMAND_RESPONSE_EOS | |
|
382 | yield stream.makeframe(requestid=requestid, | |
|
383 | typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
|
384 | flags=flags, | |
|
385 | payload=overall + data) | |
|
386 | return | |
|
387 | ||
|
388 | # It's easier to send the overall CBOR map in its own frame than to track | |
|
389 | # offsets. | |
|
390 | yield stream.makeframe(requestid=requestid, | |
|
391 | typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
|
392 | flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | |
|
393 | payload=overall) | |
|
394 | ||
|
395 | offset = 0 | |
|
396 | while True: | |
|
397 | chunk = data[offset:offset + maxframesize] | |
|
398 | offset += len(chunk) | |
|
399 | done = offset == len(data) | |
|
400 | ||
|
401 | if done: | |
|
402 | flags = FLAG_COMMAND_RESPONSE_EOS | |
|
403 | else: | |
|
404 | flags = FLAG_COMMAND_RESPONSE_CONTINUATION | |
|
405 | ||
|
406 | yield stream.makeframe(requestid=requestid, | |
|
407 | typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
|
408 | flags=flags, | |
|
409 | payload=chunk) | |
|
410 | ||
|
411 | if done: | |
|
412 | break | |
|
413 | ||
|
414 | def createbytesresponseframesfromgen(stream, requestid, gen, | |
|
415 | maxframesize=DEFAULT_MAX_FRAME_SIZE): | |
|
416 | """Generator of frames from a generator of byte chunks. | |
|
417 | ||
|
418 | This assumes that another frame will follow whatever this emits. i.e. | |
|
419 | this always emits the continuation flag and never emits the end-of-stream | |
|
420 | flag. | |
|
421 | """ | |
|
422 | cb = util.chunkbuffer(gen) | |
|
423 | flags = FLAG_COMMAND_RESPONSE_CONTINUATION | |
|
424 | ||
|
425 | while True: | |
|
426 | chunk = cb.read(maxframesize) | |
|
427 | if not chunk: | |
|
428 | break | |
|
429 | ||
|
430 | yield stream.makeframe(requestid=requestid, | |
|
431 | typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
|
432 | flags=flags, | |
|
433 | payload=chunk) | |
|
434 | ||
|
435 | flags |= FLAG_COMMAND_RESPONSE_CONTINUATION | |
|
436 | ||
|
437 | 368 | def createcommandresponseokframe(stream, requestid): |
|
438 | 369 | overall = b''.join(cborutil.streamencode({b'status': b'ok'})) |
|
439 | 370 | |
@@ -1020,30 +951,6 b' class serverreactor(object):' | |||
|
1020 | 951 | |
|
1021 | 952 | return meth(frame) |
|
1022 | 953 | |
|
1023 | def oncommandresponseready(self, stream, requestid, data): | |
|
1024 | """Signal that a bytes response is ready to be sent to the client. | |
|
1025 | ||
|
1026 | The raw bytes response is passed as an argument. | |
|
1027 | """ | |
|
1028 | ensureserverstream(stream) | |
|
1029 | ||
|
1030 | def sendframes(): | |
|
1031 | for frame in createcommandresponseframesfrombytes(stream, requestid, | |
|
1032 | data): | |
|
1033 | yield frame | |
|
1034 | ||
|
1035 | self._activecommands.remove(requestid) | |
|
1036 | ||
|
1037 | result = sendframes() | |
|
1038 | ||
|
1039 | if self._deferoutput: | |
|
1040 | self._bufferedframegens.append(result) | |
|
1041 | return 'noop', {} | |
|
1042 | else: | |
|
1043 | return 'sendframes', { | |
|
1044 | 'framegen': result, | |
|
1045 | } | |
|
1046 | ||
|
1047 | 954 | def oncommandresponsereadyobjects(self, stream, requestid, objs): |
|
1048 | 955 | """Signal that objects are ready to be sent to the client. |
|
1049 | 956 | |
@@ -1053,6 +960,10 b' class serverreactor(object):' | |||
|
1053 | 960 | """ |
|
1054 | 961 | ensureserverstream(stream) |
|
1055 | 962 | |
|
963 | # A more robust solution would be to check for objs.{next,__next__}. | |
|
964 | if isinstance(objs, list): | |
|
965 | objs = iter(objs) | |
|
966 | ||
|
1056 | 967 | # We need to take care over exception handling. Uncaught exceptions |
|
1057 | 968 | # when generating frames could lead to premature end of the frame |
|
1058 | 969 | # stream and the possibility of the server or client process getting |
@@ -225,19 +225,22 b' class ServerReactorTests(unittest.TestCa' | |||
|
225 | 225 | results.append(self._sendsingleframe( |
|
226 | 226 | reactor, ffs(b'1 1 stream-begin command-request new ' |
|
227 | 227 | b"cbor:{b'name': b'command'}"))) |
|
228 |
result = reactor.oncommandresponseready( |
|
|
228 | result = reactor.oncommandresponsereadyobjects( | |
|
229 | outstream, 1, [b'response1']) | |
|
229 | 230 | self.assertaction(result, b'sendframes') |
|
230 | 231 | list(result[1][b'framegen']) |
|
231 | 232 | results.append(self._sendsingleframe( |
|
232 | 233 | reactor, ffs(b'1 1 stream-begin command-request new ' |
|
233 | 234 | b"cbor:{b'name': b'command'}"))) |
|
234 |
result = reactor.oncommandresponseready( |
|
|
235 | result = reactor.oncommandresponsereadyobjects( | |
|
236 | outstream, 1, [b'response2']) | |
|
235 | 237 | self.assertaction(result, b'sendframes') |
|
236 | 238 | list(result[1][b'framegen']) |
|
237 | 239 | results.append(self._sendsingleframe( |
|
238 | 240 | reactor, ffs(b'1 1 stream-begin command-request new ' |
|
239 | 241 | b"cbor:{b'name': b'command'}"))) |
|
240 |
result = reactor.oncommandresponseready( |
|
|
242 | result = reactor.oncommandresponsereadyobjects( | |
|
243 | outstream, 1, [b'response3']) | |
|
241 | 244 | self.assertaction(result, b'sendframes') |
|
242 | 245 | list(result[1][b'framegen']) |
|
243 | 246 | |
@@ -364,10 +367,13 b' class ServerReactorTests(unittest.TestCa' | |||
|
364 | 367 | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) |
|
365 | 368 | |
|
366 | 369 | outstream = reactor.makeoutputstream() |
|
367 |
result = reactor.oncommandresponseready( |
|
|
370 | result = reactor.oncommandresponsereadyobjects( | |
|
371 | outstream, 1, [b'response']) | |
|
368 | 372 | self.assertaction(result, b'sendframes') |
|
369 | 373 | self.assertframesequal(result[1][b'framegen'], [ |
|
370 |
b'1 2 stream-begin command-response |
|
|
374 | b'1 2 stream-begin command-response continuation %s' % OK, | |
|
375 | b'1 2 0 command-response continuation cbor:b"response"', | |
|
376 | b'1 2 0 command-response eos ', | |
|
371 | 377 | ]) |
|
372 | 378 | |
|
373 | 379 | def testmultiframeresponse(self): |
@@ -380,12 +386,16 b' class ServerReactorTests(unittest.TestCa' | |||
|
380 | 386 | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) |
|
381 | 387 | |
|
382 | 388 | outstream = reactor.makeoutputstream() |
|
383 |
result = reactor.oncommandresponseready( |
|
|
389 | result = reactor.oncommandresponsereadyobjects( | |
|
390 | outstream, 1, [first + second]) | |
|
384 | 391 | self.assertaction(result, b'sendframes') |
|
385 | 392 | self.assertframesequal(result[1][b'framegen'], [ |
|
386 | 393 | b'1 2 stream-begin command-response continuation %s' % OK, |
|
394 | b'1 2 0 command-response continuation Y\x80d', | |
|
387 | 395 | b'1 2 0 command-response continuation %s' % first, |
|
388 |
b'1 2 0 command-response |
|
|
396 | b'1 2 0 command-response continuation %s' % second, | |
|
397 | b'1 2 0 command-response continuation ', | |
|
398 | b'1 2 0 command-response eos ' | |
|
389 | 399 | ]) |
|
390 | 400 | |
|
391 | 401 | def testservererror(self): |
@@ -412,12 +422,15 b' class ServerReactorTests(unittest.TestCa' | |||
|
412 | 422 | self.assertaction(results[0], b'runcommand') |
|
413 | 423 | |
|
414 | 424 | outstream = reactor.makeoutputstream() |
|
415 |
result = reactor.oncommandresponseready( |
|
|
425 | result = reactor.oncommandresponsereadyobjects( | |
|
426 | outstream, 1, [b'response']) | |
|
416 | 427 | self.assertaction(result, b'noop') |
|
417 | 428 | result = reactor.oninputeof() |
|
418 | 429 | self.assertaction(result, b'sendframes') |
|
419 | 430 | self.assertframesequal(result[1][b'framegen'], [ |
|
420 |
b'1 2 stream-begin command-response |
|
|
431 | b'1 2 stream-begin command-response continuation %s' % OK, | |
|
432 | b'1 2 0 command-response continuation cbor:b"response"', | |
|
433 | b'1 2 0 command-response eos ', | |
|
421 | 434 | ]) |
|
422 | 435 | |
|
423 | 436 | def testmultiplecommanddeferresponse(self): |
@@ -427,15 +440,21 b' class ServerReactorTests(unittest.TestCa' | |||
|
427 | 440 | list(sendcommandframes(reactor, instream, 3, b'command2', {})) |
|
428 | 441 | |
|
429 | 442 | outstream = reactor.makeoutputstream() |
|
430 |
result = reactor.oncommandresponseready( |
|
|
443 | result = reactor.oncommandresponsereadyobjects( | |
|
444 | outstream, 1, [b'response1']) | |
|
431 | 445 | self.assertaction(result, b'noop') |
|
432 |
result = reactor.oncommandresponseready( |
|
|
446 | result = reactor.oncommandresponsereadyobjects( | |
|
447 | outstream, 3, [b'response2']) | |
|
433 | 448 | self.assertaction(result, b'noop') |
|
434 | 449 | result = reactor.oninputeof() |
|
435 | 450 | self.assertaction(result, b'sendframes') |
|
436 | 451 | self.assertframesequal(result[1][b'framegen'], [ |
|
437 |
b'1 2 stream-begin command-response |
|
|
438 |
b' |
|
|
452 | b'1 2 stream-begin command-response continuation %s' % OK, | |
|
453 | b'1 2 0 command-response continuation cbor:b"response1"', | |
|
454 | b'1 2 0 command-response eos ', | |
|
455 | b'3 2 0 command-response continuation %s' % OK, | |
|
456 | b'3 2 0 command-response continuation cbor:b"response2"', | |
|
457 | b'3 2 0 command-response eos ', | |
|
439 | 458 | ]) |
|
440 | 459 | |
|
441 | 460 | def testrequestidtracking(self): |
@@ -447,16 +466,22 b' class ServerReactorTests(unittest.TestCa' | |||
|
447 | 466 | |
|
448 | 467 | # Register results for commands out of order. |
|
449 | 468 | outstream = reactor.makeoutputstream() |
|
450 | reactor.oncommandresponseready(outstream, 3, b'response3') | |
|
451 | reactor.oncommandresponseready(outstream, 1, b'response1') | |
|
452 | reactor.oncommandresponseready(outstream, 5, b'response5') | |
|
469 | reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3']) | |
|
470 | reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1']) | |
|
471 | reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5']) | |
|
453 | 472 | |
|
454 | 473 | result = reactor.oninputeof() |
|
455 | 474 | self.assertaction(result, b'sendframes') |
|
456 | 475 | self.assertframesequal(result[1][b'framegen'], [ |
|
457 |
b'3 2 stream-begin command-response |
|
|
458 |
b' |
|
|
459 |
b' |
|
|
476 | b'3 2 stream-begin command-response continuation %s' % OK, | |
|
477 | b'3 2 0 command-response continuation cbor:b"response3"', | |
|
478 | b'3 2 0 command-response eos ', | |
|
479 | b'1 2 0 command-response continuation %s' % OK, | |
|
480 | b'1 2 0 command-response continuation cbor:b"response1"', | |
|
481 | b'1 2 0 command-response eos ', | |
|
482 | b'5 2 0 command-response continuation %s' % OK, | |
|
483 | b'5 2 0 command-response continuation cbor:b"response5"', | |
|
484 | b'5 2 0 command-response eos ', | |
|
460 | 485 | ]) |
|
461 | 486 | |
|
462 | 487 | def testduplicaterequestonactivecommand(self): |
@@ -477,7 +502,7 b' class ServerReactorTests(unittest.TestCa' | |||
|
477 | 502 | instream = framing.stream(1) |
|
478 | 503 | list(sendcommandframes(reactor, instream, 1, b'command1', {})) |
|
479 | 504 | outstream = reactor.makeoutputstream() |
|
480 | reactor.oncommandresponseready(outstream, 1, b'response') | |
|
505 | reactor.oncommandresponsereadyobjects(outstream, 1, [b'response']) | |
|
481 | 506 | |
|
482 | 507 | # We've registered the response but haven't sent it. From the |
|
483 | 508 | # perspective of the reactor, the command is still active. |
@@ -494,7 +519,7 b' class ServerReactorTests(unittest.TestCa' | |||
|
494 | 519 | instream = framing.stream(1) |
|
495 | 520 | list(sendcommandframes(reactor, instream, 1, b'command1', {})) |
|
496 | 521 | outstream = reactor.makeoutputstream() |
|
497 | res = reactor.oncommandresponseready(outstream, 1, b'response') | |
|
522 | res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response']) | |
|
498 | 523 | list(res[1][b'framegen']) |
|
499 | 524 | |
|
500 | 525 | results = list(sendcommandframes(reactor, instream, 1, b'command1', {})) |
General Comments 0
You need to be logged in to leave comments.
Login now