Show More
@@ -365,75 +365,6 b' def createcommandframes(stream, requesti' | |||||
365 | if done: |
|
365 | if done: | |
366 | break |
|
366 | break | |
367 |
|
367 | |||
368 | def createcommandresponseframesfrombytes(stream, requestid, data, |
|
|||
369 | maxframesize=DEFAULT_MAX_FRAME_SIZE): |
|
|||
370 | """Create a raw frame to send a bytes response from static bytes input. |
|
|||
371 |
|
||||
372 | Returns a generator of bytearrays. |
|
|||
373 | """ |
|
|||
374 | # Automatically send the overall CBOR response map. |
|
|||
375 | overall = b''.join(cborutil.streamencode({b'status': b'ok'})) |
|
|||
376 | if len(overall) > maxframesize: |
|
|||
377 | raise error.ProgrammingError('not yet implemented') |
|
|||
378 |
|
||||
379 | # Simple case where we can fit the full response in a single frame. |
|
|||
380 | if len(overall) + len(data) <= maxframesize: |
|
|||
381 | flags = FLAG_COMMAND_RESPONSE_EOS |
|
|||
382 | yield stream.makeframe(requestid=requestid, |
|
|||
383 | typeid=FRAME_TYPE_COMMAND_RESPONSE, |
|
|||
384 | flags=flags, |
|
|||
385 | payload=overall + data) |
|
|||
386 | return |
|
|||
387 |
|
||||
388 | # It's easier to send the overall CBOR map in its own frame than to track |
|
|||
389 | # offsets. |
|
|||
390 | yield stream.makeframe(requestid=requestid, |
|
|||
391 | typeid=FRAME_TYPE_COMMAND_RESPONSE, |
|
|||
392 | flags=FLAG_COMMAND_RESPONSE_CONTINUATION, |
|
|||
393 | payload=overall) |
|
|||
394 |
|
||||
395 | offset = 0 |
|
|||
396 | while True: |
|
|||
397 | chunk = data[offset:offset + maxframesize] |
|
|||
398 | offset += len(chunk) |
|
|||
399 | done = offset == len(data) |
|
|||
400 |
|
||||
401 | if done: |
|
|||
402 | flags = FLAG_COMMAND_RESPONSE_EOS |
|
|||
403 | else: |
|
|||
404 | flags = FLAG_COMMAND_RESPONSE_CONTINUATION |
|
|||
405 |
|
||||
406 | yield stream.makeframe(requestid=requestid, |
|
|||
407 | typeid=FRAME_TYPE_COMMAND_RESPONSE, |
|
|||
408 | flags=flags, |
|
|||
409 | payload=chunk) |
|
|||
410 |
|
||||
411 | if done: |
|
|||
412 | break |
|
|||
413 |
|
||||
414 | def createbytesresponseframesfromgen(stream, requestid, gen, |
|
|||
415 | maxframesize=DEFAULT_MAX_FRAME_SIZE): |
|
|||
416 | """Generator of frames from a generator of byte chunks. |
|
|||
417 |
|
||||
418 | This assumes that another frame will follow whatever this emits. i.e. |
|
|||
419 | this always emits the continuation flag and never emits the end-of-stream |
|
|||
420 | flag. |
|
|||
421 | """ |
|
|||
422 | cb = util.chunkbuffer(gen) |
|
|||
423 | flags = FLAG_COMMAND_RESPONSE_CONTINUATION |
|
|||
424 |
|
||||
425 | while True: |
|
|||
426 | chunk = cb.read(maxframesize) |
|
|||
427 | if not chunk: |
|
|||
428 | break |
|
|||
429 |
|
||||
430 | yield stream.makeframe(requestid=requestid, |
|
|||
431 | typeid=FRAME_TYPE_COMMAND_RESPONSE, |
|
|||
432 | flags=flags, |
|
|||
433 | payload=chunk) |
|
|||
434 |
|
||||
435 | flags |= FLAG_COMMAND_RESPONSE_CONTINUATION |
|
|||
436 |
|
||||
437 | def createcommandresponseokframe(stream, requestid): |
|
368 | def createcommandresponseokframe(stream, requestid): | |
438 | overall = b''.join(cborutil.streamencode({b'status': b'ok'})) |
|
369 | overall = b''.join(cborutil.streamencode({b'status': b'ok'})) | |
439 |
|
370 | |||
@@ -1020,30 +951,6 b' class serverreactor(object):' | |||||
1020 |
|
951 | |||
1021 | return meth(frame) |
|
952 | return meth(frame) | |
1022 |
|
953 | |||
1023 | def oncommandresponseready(self, stream, requestid, data): |
|
|||
1024 | """Signal that a bytes response is ready to be sent to the client. |
|
|||
1025 |
|
||||
1026 | The raw bytes response is passed as an argument. |
|
|||
1027 | """ |
|
|||
1028 | ensureserverstream(stream) |
|
|||
1029 |
|
||||
1030 | def sendframes(): |
|
|||
1031 | for frame in createcommandresponseframesfrombytes(stream, requestid, |
|
|||
1032 | data): |
|
|||
1033 | yield frame |
|
|||
1034 |
|
||||
1035 | self._activecommands.remove(requestid) |
|
|||
1036 |
|
||||
1037 | result = sendframes() |
|
|||
1038 |
|
||||
1039 | if self._deferoutput: |
|
|||
1040 | self._bufferedframegens.append(result) |
|
|||
1041 | return 'noop', {} |
|
|||
1042 | else: |
|
|||
1043 | return 'sendframes', { |
|
|||
1044 | 'framegen': result, |
|
|||
1045 | } |
|
|||
1046 |
|
||||
1047 | def oncommandresponsereadyobjects(self, stream, requestid, objs): |
|
954 | def oncommandresponsereadyobjects(self, stream, requestid, objs): | |
1048 | """Signal that objects are ready to be sent to the client. |
|
955 | """Signal that objects are ready to be sent to the client. | |
1049 |
|
956 | |||
@@ -1053,6 +960,10 b' class serverreactor(object):' | |||||
1053 | """ |
|
960 | """ | |
1054 | ensureserverstream(stream) |
|
961 | ensureserverstream(stream) | |
1055 |
|
962 | |||
|
963 | # A more robust solution would be to check for objs.{next,__next__}. | |||
|
964 | if isinstance(objs, list): | |||
|
965 | objs = iter(objs) | |||
|
966 | ||||
1056 | # We need to take care over exception handling. Uncaught exceptions |
|
967 | # We need to take care over exception handling. Uncaught exceptions | |
1057 | # when generating frames could lead to premature end of the frame |
|
968 | # when generating frames could lead to premature end of the frame | |
1058 | # stream and the possibility of the server or client process getting |
|
969 | # stream and the possibility of the server or client process getting |
@@ -225,19 +225,22 b' class ServerReactorTests(unittest.TestCa' | |||||
225 | results.append(self._sendsingleframe( |
|
225 | results.append(self._sendsingleframe( | |
226 | reactor, ffs(b'1 1 stream-begin command-request new ' |
|
226 | reactor, ffs(b'1 1 stream-begin command-request new ' | |
227 | b"cbor:{b'name': b'command'}"))) |
|
227 | b"cbor:{b'name': b'command'}"))) | |
228 |
result = reactor.oncommandresponseready( |
|
228 | result = reactor.oncommandresponsereadyobjects( | |
|
229 | outstream, 1, [b'response1']) | |||
229 | self.assertaction(result, b'sendframes') |
|
230 | self.assertaction(result, b'sendframes') | |
230 | list(result[1][b'framegen']) |
|
231 | list(result[1][b'framegen']) | |
231 | results.append(self._sendsingleframe( |
|
232 | results.append(self._sendsingleframe( | |
232 | reactor, ffs(b'1 1 stream-begin command-request new ' |
|
233 | reactor, ffs(b'1 1 stream-begin command-request new ' | |
233 | b"cbor:{b'name': b'command'}"))) |
|
234 | b"cbor:{b'name': b'command'}"))) | |
234 |
result = reactor.oncommandresponseready( |
|
235 | result = reactor.oncommandresponsereadyobjects( | |
|
236 | outstream, 1, [b'response2']) | |||
235 | self.assertaction(result, b'sendframes') |
|
237 | self.assertaction(result, b'sendframes') | |
236 | list(result[1][b'framegen']) |
|
238 | list(result[1][b'framegen']) | |
237 | results.append(self._sendsingleframe( |
|
239 | results.append(self._sendsingleframe( | |
238 | reactor, ffs(b'1 1 stream-begin command-request new ' |
|
240 | reactor, ffs(b'1 1 stream-begin command-request new ' | |
239 | b"cbor:{b'name': b'command'}"))) |
|
241 | b"cbor:{b'name': b'command'}"))) | |
240 |
result = reactor.oncommandresponseready( |
|
242 | result = reactor.oncommandresponsereadyobjects( | |
|
243 | outstream, 1, [b'response3']) | |||
241 | self.assertaction(result, b'sendframes') |
|
244 | self.assertaction(result, b'sendframes') | |
242 | list(result[1][b'framegen']) |
|
245 | list(result[1][b'framegen']) | |
243 |
|
246 | |||
@@ -364,10 +367,13 b' class ServerReactorTests(unittest.TestCa' | |||||
364 | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) |
|
367 | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) | |
365 |
|
368 | |||
366 | outstream = reactor.makeoutputstream() |
|
369 | outstream = reactor.makeoutputstream() | |
367 |
result = reactor.oncommandresponseready( |
|
370 | result = reactor.oncommandresponsereadyobjects( | |
|
371 | outstream, 1, [b'response']) | |||
368 | self.assertaction(result, b'sendframes') |
|
372 | self.assertaction(result, b'sendframes') | |
369 | self.assertframesequal(result[1][b'framegen'], [ |
|
373 | self.assertframesequal(result[1][b'framegen'], [ | |
370 |
b'1 2 stream-begin command-response |
|
374 | b'1 2 stream-begin command-response continuation %s' % OK, | |
|
375 | b'1 2 0 command-response continuation cbor:b"response"', | |||
|
376 | b'1 2 0 command-response eos ', | |||
371 | ]) |
|
377 | ]) | |
372 |
|
378 | |||
373 | def testmultiframeresponse(self): |
|
379 | def testmultiframeresponse(self): | |
@@ -380,12 +386,16 b' class ServerReactorTests(unittest.TestCa' | |||||
380 | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) |
|
386 | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) | |
381 |
|
387 | |||
382 | outstream = reactor.makeoutputstream() |
|
388 | outstream = reactor.makeoutputstream() | |
383 |
result = reactor.oncommandresponseready( |
|
389 | result = reactor.oncommandresponsereadyobjects( | |
|
390 | outstream, 1, [first + second]) | |||
384 | self.assertaction(result, b'sendframes') |
|
391 | self.assertaction(result, b'sendframes') | |
385 | self.assertframesequal(result[1][b'framegen'], [ |
|
392 | self.assertframesequal(result[1][b'framegen'], [ | |
386 | b'1 2 stream-begin command-response continuation %s' % OK, |
|
393 | b'1 2 stream-begin command-response continuation %s' % OK, | |
|
394 | b'1 2 0 command-response continuation Y\x80d', | |||
387 | b'1 2 0 command-response continuation %s' % first, |
|
395 | b'1 2 0 command-response continuation %s' % first, | |
388 |
b'1 2 0 command-response |
|
396 | b'1 2 0 command-response continuation %s' % second, | |
|
397 | b'1 2 0 command-response continuation ', | |||
|
398 | b'1 2 0 command-response eos ' | |||
389 | ]) |
|
399 | ]) | |
390 |
|
400 | |||
391 | def testservererror(self): |
|
401 | def testservererror(self): | |
@@ -412,12 +422,15 b' class ServerReactorTests(unittest.TestCa' | |||||
412 | self.assertaction(results[0], b'runcommand') |
|
422 | self.assertaction(results[0], b'runcommand') | |
413 |
|
423 | |||
414 | outstream = reactor.makeoutputstream() |
|
424 | outstream = reactor.makeoutputstream() | |
415 |
result = reactor.oncommandresponseready( |
|
425 | result = reactor.oncommandresponsereadyobjects( | |
|
426 | outstream, 1, [b'response']) | |||
416 | self.assertaction(result, b'noop') |
|
427 | self.assertaction(result, b'noop') | |
417 | result = reactor.oninputeof() |
|
428 | result = reactor.oninputeof() | |
418 | self.assertaction(result, b'sendframes') |
|
429 | self.assertaction(result, b'sendframes') | |
419 | self.assertframesequal(result[1][b'framegen'], [ |
|
430 | self.assertframesequal(result[1][b'framegen'], [ | |
420 |
b'1 2 stream-begin command-response |
|
431 | b'1 2 stream-begin command-response continuation %s' % OK, | |
|
432 | b'1 2 0 command-response continuation cbor:b"response"', | |||
|
433 | b'1 2 0 command-response eos ', | |||
421 | ]) |
|
434 | ]) | |
422 |
|
435 | |||
423 | def testmultiplecommanddeferresponse(self): |
|
436 | def testmultiplecommanddeferresponse(self): | |
@@ -427,15 +440,21 b' class ServerReactorTests(unittest.TestCa' | |||||
427 | list(sendcommandframes(reactor, instream, 3, b'command2', {})) |
|
440 | list(sendcommandframes(reactor, instream, 3, b'command2', {})) | |
428 |
|
441 | |||
429 | outstream = reactor.makeoutputstream() |
|
442 | outstream = reactor.makeoutputstream() | |
430 |
result = reactor.oncommandresponseready( |
|
443 | result = reactor.oncommandresponsereadyobjects( | |
|
444 | outstream, 1, [b'response1']) | |||
431 | self.assertaction(result, b'noop') |
|
445 | self.assertaction(result, b'noop') | |
432 |
result = reactor.oncommandresponseready( |
|
446 | result = reactor.oncommandresponsereadyobjects( | |
|
447 | outstream, 3, [b'response2']) | |||
433 | self.assertaction(result, b'noop') |
|
448 | self.assertaction(result, b'noop') | |
434 | result = reactor.oninputeof() |
|
449 | result = reactor.oninputeof() | |
435 | self.assertaction(result, b'sendframes') |
|
450 | self.assertaction(result, b'sendframes') | |
436 | self.assertframesequal(result[1][b'framegen'], [ |
|
451 | self.assertframesequal(result[1][b'framegen'], [ | |
437 |
b'1 2 stream-begin command-response |
|
452 | b'1 2 stream-begin command-response continuation %s' % OK, | |
438 |
b' |
|
453 | b'1 2 0 command-response continuation cbor:b"response1"', | |
|
454 | b'1 2 0 command-response eos ', | |||
|
455 | b'3 2 0 command-response continuation %s' % OK, | |||
|
456 | b'3 2 0 command-response continuation cbor:b"response2"', | |||
|
457 | b'3 2 0 command-response eos ', | |||
439 | ]) |
|
458 | ]) | |
440 |
|
459 | |||
441 | def testrequestidtracking(self): |
|
460 | def testrequestidtracking(self): | |
@@ -447,16 +466,22 b' class ServerReactorTests(unittest.TestCa' | |||||
447 |
|
466 | |||
448 | # Register results for commands out of order. |
|
467 | # Register results for commands out of order. | |
449 | outstream = reactor.makeoutputstream() |
|
468 | outstream = reactor.makeoutputstream() | |
450 | reactor.oncommandresponseready(outstream, 3, b'response3') |
|
469 | reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3']) | |
451 | reactor.oncommandresponseready(outstream, 1, b'response1') |
|
470 | reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1']) | |
452 | reactor.oncommandresponseready(outstream, 5, b'response5') |
|
471 | reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5']) | |
453 |
|
472 | |||
454 | result = reactor.oninputeof() |
|
473 | result = reactor.oninputeof() | |
455 | self.assertaction(result, b'sendframes') |
|
474 | self.assertaction(result, b'sendframes') | |
456 | self.assertframesequal(result[1][b'framegen'], [ |
|
475 | self.assertframesequal(result[1][b'framegen'], [ | |
457 |
b'3 2 stream-begin command-response |
|
476 | b'3 2 stream-begin command-response continuation %s' % OK, | |
458 |
b' |
|
477 | b'3 2 0 command-response continuation cbor:b"response3"', | |
459 |
b' |
|
478 | b'3 2 0 command-response eos ', | |
|
479 | b'1 2 0 command-response continuation %s' % OK, | |||
|
480 | b'1 2 0 command-response continuation cbor:b"response1"', | |||
|
481 | b'1 2 0 command-response eos ', | |||
|
482 | b'5 2 0 command-response continuation %s' % OK, | |||
|
483 | b'5 2 0 command-response continuation cbor:b"response5"', | |||
|
484 | b'5 2 0 command-response eos ', | |||
460 | ]) |
|
485 | ]) | |
461 |
|
486 | |||
462 | def testduplicaterequestonactivecommand(self): |
|
487 | def testduplicaterequestonactivecommand(self): | |
@@ -477,7 +502,7 b' class ServerReactorTests(unittest.TestCa' | |||||
477 | instream = framing.stream(1) |
|
502 | instream = framing.stream(1) | |
478 | list(sendcommandframes(reactor, instream, 1, b'command1', {})) |
|
503 | list(sendcommandframes(reactor, instream, 1, b'command1', {})) | |
479 | outstream = reactor.makeoutputstream() |
|
504 | outstream = reactor.makeoutputstream() | |
480 | reactor.oncommandresponseready(outstream, 1, b'response') |
|
505 | reactor.oncommandresponsereadyobjects(outstream, 1, [b'response']) | |
481 |
|
506 | |||
482 | # We've registered the response but haven't sent it. From the |
|
507 | # We've registered the response but haven't sent it. From the | |
483 | # perspective of the reactor, the command is still active. |
|
508 | # perspective of the reactor, the command is still active. | |
@@ -494,7 +519,7 b' class ServerReactorTests(unittest.TestCa' | |||||
494 | instream = framing.stream(1) |
|
519 | instream = framing.stream(1) | |
495 | list(sendcommandframes(reactor, instream, 1, b'command1', {})) |
|
520 | list(sendcommandframes(reactor, instream, 1, b'command1', {})) | |
496 | outstream = reactor.makeoutputstream() |
|
521 | outstream = reactor.makeoutputstream() | |
497 | res = reactor.oncommandresponseready(outstream, 1, b'response') |
|
522 | res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response']) | |
498 | list(res[1][b'framegen']) |
|
523 | list(res[1][b'framegen']) | |
499 |
|
524 | |||
500 | results = list(sendcommandframes(reactor, instream, 1, b'command1', {})) |
|
525 | results = list(sendcommandframes(reactor, instream, 1, b'command1', {})) |
General Comments 0
You need to be logged in to leave comments.
Login now