Show More
@@ -308,10 +308,24 class serverreactor(object): | |||||
308 | wantframe |
|
308 | wantframe | |
309 | Indicates that nothing of interest happened and the server is waiting on |
|
309 | Indicates that nothing of interest happened and the server is waiting on | |
310 | more frames from the client before anything interesting can be done. |
|
310 | more frames from the client before anything interesting can be done. | |
|
311 | ||||
|
312 | noop | |||
|
313 | Indicates no additional action is required. | |||
311 | """ |
|
314 | """ | |
312 |
|
315 | |||
313 | def __init__(self): |
|
316 | def __init__(self, deferoutput=False): | |
|
317 | """Construct a new server reactor. | |||
|
318 | ||||
|
319 | ``deferoutput`` can be used to indicate that no output frames should be | |||
|
320 | instructed to be sent until input has been exhausted. In this mode, | |||
|
321 | events that would normally generate output frames (such as a command | |||
|
322 | response being ready) will instead defer instructing the consumer to | |||
|
323 | send those frames. This is useful for half-duplex transports where the | |||
|
324 | sender cannot receive until all data has been transmitted. | |||
|
325 | """ | |||
|
326 | self._deferoutput = deferoutput | |||
314 | self._state = 'idle' |
|
327 | self._state = 'idle' | |
|
328 | self._bufferedframegens = [] | |||
315 | self._activecommand = None |
|
329 | self._activecommand = None | |
316 | self._activeargs = None |
|
330 | self._activeargs = None | |
317 | self._activedata = None |
|
331 | self._activedata = None | |
@@ -344,8 +358,33 class serverreactor(object): | |||||
344 |
|
358 | |||
345 | The raw bytes response is passed as an argument. |
|
359 | The raw bytes response is passed as an argument. | |
346 | """ |
|
360 | """ | |
|
361 | framegen = createbytesresponseframesfrombytes(data) | |||
|
362 | ||||
|
363 | if self._deferoutput: | |||
|
364 | self._bufferedframegens.append(framegen) | |||
|
365 | return 'noop', {} | |||
|
366 | else: | |||
|
367 | return 'sendframes', { | |||
|
368 | 'framegen': framegen, | |||
|
369 | } | |||
|
370 | ||||
|
371 | def oninputeof(self): | |||
|
372 | """Signals that end of input has been received. | |||
|
373 | ||||
|
374 | No more frames will be received. All pending activity should be | |||
|
375 | completed. | |||
|
376 | """ | |||
|
377 | if not self._deferoutput or not self._bufferedframegens: | |||
|
378 | return 'noop', {} | |||
|
379 | ||||
|
380 | # If we buffered all our responses, emit those. | |||
|
381 | def makegen(): | |||
|
382 | for gen in self._bufferedframegens: | |||
|
383 | for frame in gen: | |||
|
384 | yield frame | |||
|
385 | ||||
347 | return 'sendframes', { |
|
386 | return 'sendframes', { | |
348 |
'framegen': |
|
387 | 'framegen': makegen(), | |
349 | } |
|
388 | } | |
350 |
|
389 | |||
351 | def onapplicationerror(self, msg): |
|
390 | def onapplicationerror(self, msg): |
@@ -401,6 +401,10 def _processhttpv2reflectrequest(ui, rep | |||||
401 | states.append(json.dumps((action, meta), sort_keys=True, |
|
401 | states.append(json.dumps((action, meta), sort_keys=True, | |
402 | separators=(', ', ': '))) |
|
402 | separators=(', ', ': '))) | |
403 |
|
403 | |||
|
404 | action, meta = reactor.oninputeof() | |||
|
405 | meta['action'] = action | |||
|
406 | states.append(json.dumps(meta, sort_keys=True, separators=(', ',': '))) | |||
|
407 | ||||
404 | res.status = b'200 OK' |
|
408 | res.status = b'200 OK' | |
405 | res.headers[b'Content-Type'] = b'text/plain' |
|
409 | res.headers[b'Content-Type'] = b'text/plain' | |
406 | res.setbodybytes(b'\n'.join(states)) |
|
410 | res.setbodybytes(b'\n'.join(states)) | |
@@ -411,7 +415,10 def _processhttpv2request(ui, repo, req, | |||||
411 | Called when the HTTP request contains unified frame-based protocol |
|
415 | Called when the HTTP request contains unified frame-based protocol | |
412 | frames for evaluation. |
|
416 | frames for evaluation. | |
413 | """ |
|
417 | """ | |
414 | reactor = wireprotoframing.serverreactor() |
|
418 | # TODO Some HTTP clients are full duplex and can receive data before | |
|
419 | # the entire request is transmitted. Figure out a way to indicate support | |||
|
420 | # for that so we can opt into full duplex mode. | |||
|
421 | reactor = wireprotoframing.serverreactor(deferoutput=True) | |||
415 | seencommand = False |
|
422 | seencommand = False | |
416 |
|
423 | |||
417 | while True: |
|
424 | while True: | |
@@ -448,6 +455,19 def _processhttpv2request(ui, repo, req, | |||||
448 | raise error.ProgrammingError( |
|
455 | raise error.ProgrammingError( | |
449 | 'unhandled action from frame processor: %s' % action) |
|
456 | 'unhandled action from frame processor: %s' % action) | |
450 |
|
457 | |||
|
458 | action, meta = reactor.oninputeof() | |||
|
459 | if action == 'sendframes': | |||
|
460 | # We assume we haven't started sending the response yet. If we're | |||
|
461 | # wrong, the response type will raise an exception. | |||
|
462 | res.status = b'200 OK' | |||
|
463 | res.headers[b'Content-Type'] = FRAMINGTYPE | |||
|
464 | res.setbodygen(meta['framegen']) | |||
|
465 | elif action == 'noop': | |||
|
466 | pass | |||
|
467 | else: | |||
|
468 | raise error.ProgrammingError('unhandled action from frame processor: %s' | |||
|
469 | % action) | |||
|
470 | ||||
451 | def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor, |
|
471 | def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor, | |
452 | command): |
|
472 | command): | |
453 | """Dispatch a wire protocol command made from HTTPv2 requests. |
|
473 | """Dispatch a wire protocol command made from HTTPv2 requests. | |
@@ -504,6 +524,8 def _httpv2runcommand(ui, repo, req, res | |||||
504 |
|
524 | |||
505 | if action == 'sendframes': |
|
525 | if action == 'sendframes': | |
506 | res.setbodygen(meta['framegen']) |
|
526 | res.setbodygen(meta['framegen']) | |
|
527 | elif action == 'noop': | |||
|
528 | pass | |||
507 | else: |
|
529 | else: | |
508 | raise error.ProgrammingError('unhandled event from reactor: %s' % |
|
530 | raise error.ProgrammingError('unhandled event from reactor: %s' % | |
509 | action) |
|
531 | action) |
@@ -401,7 +401,7 Command frames can be reflected via debu | |||||
401 | s> Server: testing stub value\r\n |
|
401 | s> Server: testing stub value\r\n | |
402 | s> Date: $HTTP_DATE$\r\n |
|
402 | s> Date: $HTTP_DATE$\r\n | |
403 | s> Content-Type: text/plain\r\n |
|
403 | s> Content-Type: text/plain\r\n | |
404 |
s> Content-Length: |
|
404 | s> Content-Length: 310\r\n | |
405 | s> \r\n |
|
405 | s> \r\n | |
406 | s> received: 1 2 command1\n |
|
406 | s> received: 1 2 command1\n | |
407 | s> ["wantframe", {"state": "command-receiving-args"}]\n |
|
407 | s> ["wantframe", {"state": "command-receiving-args"}]\n | |
@@ -409,6 +409,7 Command frames can be reflected via debu | |||||
409 | s> ["wantframe", {"state": "command-receiving-args"}]\n |
|
409 | s> ["wantframe", {"state": "command-receiving-args"}]\n | |
410 | s> received: 2 2 \x04\x00\x03\x00bar1val\n |
|
410 | s> received: 2 2 \x04\x00\x03\x00bar1val\n | |
411 | s> ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null}]\n |
|
411 | s> ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null}]\n | |
412 | s> received: <no frame> |
|
412 | s> received: <no frame>\n | |
|
413 | s> {"action": "noop"} | |||
413 |
|
414 | |||
414 | $ cat error.log |
|
415 | $ cat error.log |
@@ -9,8 +9,8 from mercurial import ( | |||||
9 |
|
9 | |||
10 | ffs = framing.makeframefromhumanstring |
|
10 | ffs = framing.makeframefromhumanstring | |
11 |
|
11 | |||
12 | def makereactor(): |
|
12 | def makereactor(deferoutput=False): | |
13 | return framing.serverreactor() |
|
13 | return framing.serverreactor(deferoutput=deferoutput) | |
14 |
|
14 | |||
15 | def sendframes(reactor, gen): |
|
15 | def sendframes(reactor, gen): | |
16 | """Send a generator of frame bytearray to a reactor. |
|
16 | """Send a generator of frame bytearray to a reactor. | |
@@ -95,6 +95,9 class ServerReactorTests(unittest.TestCa | |||||
95 | 'data': None, |
|
95 | 'data': None, | |
96 | }) |
|
96 | }) | |
97 |
|
97 | |||
|
98 | result = reactor.oninputeof() | |||
|
99 | self.assertaction(result, 'noop') | |||
|
100 | ||||
98 | def test1argument(self): |
|
101 | def test1argument(self): | |
99 | reactor = makereactor() |
|
102 | reactor = makereactor() | |
100 | results = list(sendcommandframes(reactor, b'mycommand', |
|
103 | results = list(sendcommandframes(reactor, b'mycommand', | |
@@ -310,6 +313,37 class ServerReactorTests(unittest.TestCa | |||||
310 | b'error-response application some message', |
|
313 | b'error-response application some message', | |
311 | ]) |
|
314 | ]) | |
312 |
|
315 | |||
|
316 | def test1commanddeferresponse(self): | |||
|
317 | """Responses when in deferred output mode are delayed until EOF.""" | |||
|
318 | reactor = makereactor(deferoutput=True) | |||
|
319 | results = list(sendcommandframes(reactor, b'mycommand', {})) | |||
|
320 | self.assertEqual(len(results), 1) | |||
|
321 | self.assertaction(results[0], 'runcommand') | |||
|
322 | ||||
|
323 | result = reactor.onbytesresponseready(b'response') | |||
|
324 | self.assertaction(result, 'noop') | |||
|
325 | result = reactor.oninputeof() | |||
|
326 | self.assertaction(result, 'sendframes') | |||
|
327 | self.assertframesequal(result[1]['framegen'], [ | |||
|
328 | b'bytes-response eos response', | |||
|
329 | ]) | |||
|
330 | ||||
|
331 | def testmultiplecommanddeferresponse(self): | |||
|
332 | reactor = makereactor(deferoutput=True) | |||
|
333 | list(sendcommandframes(reactor, b'command1', {})) | |||
|
334 | list(sendcommandframes(reactor, b'command2', {})) | |||
|
335 | ||||
|
336 | result = reactor.onbytesresponseready(b'response1') | |||
|
337 | self.assertaction(result, 'noop') | |||
|
338 | result = reactor.onbytesresponseready(b'response2') | |||
|
339 | self.assertaction(result, 'noop') | |||
|
340 | result = reactor.oninputeof() | |||
|
341 | self.assertaction(result, 'sendframes') | |||
|
342 | self.assertframesequal(result[1]['framegen'], [ | |||
|
343 | b'bytes-response eos response1', | |||
|
344 | b'bytes-response eos response2' | |||
|
345 | ]) | |||
|
346 | ||||
313 | if __name__ == '__main__': |
|
347 | if __name__ == '__main__': | |
314 | import silenttestrunner |
|
348 | import silenttestrunner | |
315 | silenttestrunner.main(__name__) |
|
349 | silenttestrunner.main(__name__) |
General Comments 0
You need to be logged in to leave comments.
Login now