##// END OF EJS Templates
wireproto: buffer output frames when in half duplex mode...
Gregory Szorc -
r37074:861e9d37 default
parent child Browse files
Show More
@@ -308,10 +308,24 class serverreactor(object):
308 wantframe
308 wantframe
309 Indicates that nothing of interest happened and the server is waiting on
309 Indicates that nothing of interest happened and the server is waiting on
310 more frames from the client before anything interesting can be done.
310 more frames from the client before anything interesting can be done.
311
312 noop
313 Indicates no additional action is required.
311 """
314 """
312
315
313 def __init__(self):
316 def __init__(self, deferoutput=False):
317 """Construct a new server reactor.
318
319 ``deferoutput`` can be used to indicate that no output frames should be
320 instructed to be sent until input has been exhausted. In this mode,
321 events that would normally generate output frames (such as a command
322 response being ready) will instead defer instructing the consumer to
323 send those frames. This is useful for half-duplex transports where the
324 sender cannot receive until all data has been transmitted.
325 """
326 self._deferoutput = deferoutput
314 self._state = 'idle'
327 self._state = 'idle'
328 self._bufferedframegens = []
315 self._activecommand = None
329 self._activecommand = None
316 self._activeargs = None
330 self._activeargs = None
317 self._activedata = None
331 self._activedata = None
@@ -344,8 +358,33 class serverreactor(object):
344
358
345 The raw bytes response is passed as an argument.
359 The raw bytes response is passed as an argument.
346 """
360 """
361 framegen = createbytesresponseframesfrombytes(data)
362
363 if self._deferoutput:
364 self._bufferedframegens.append(framegen)
365 return 'noop', {}
366 else:
367 return 'sendframes', {
368 'framegen': framegen,
369 }
370
371 def oninputeof(self):
372 """Signals that end of input has been received.
373
374 No more frames will be received. All pending activity should be
375 completed.
376 """
377 if not self._deferoutput or not self._bufferedframegens:
378 return 'noop', {}
379
380 # If we buffered all our responses, emit those.
381 def makegen():
382 for gen in self._bufferedframegens:
383 for frame in gen:
384 yield frame
385
347 return 'sendframes', {
386 return 'sendframes', {
348 'framegen': createbytesresponseframesfrombytes(data),
387 'framegen': makegen(),
349 }
388 }
350
389
351 def onapplicationerror(self, msg):
390 def onapplicationerror(self, msg):
@@ -401,6 +401,10 def _processhttpv2reflectrequest(ui, rep
401 states.append(json.dumps((action, meta), sort_keys=True,
401 states.append(json.dumps((action, meta), sort_keys=True,
402 separators=(', ', ': ')))
402 separators=(', ', ': ')))
403
403
404 action, meta = reactor.oninputeof()
405 meta['action'] = action
406 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
407
404 res.status = b'200 OK'
408 res.status = b'200 OK'
405 res.headers[b'Content-Type'] = b'text/plain'
409 res.headers[b'Content-Type'] = b'text/plain'
406 res.setbodybytes(b'\n'.join(states))
410 res.setbodybytes(b'\n'.join(states))
@@ -411,7 +415,10 def _processhttpv2request(ui, repo, req,
411 Called when the HTTP request contains unified frame-based protocol
415 Called when the HTTP request contains unified frame-based protocol
412 frames for evaluation.
416 frames for evaluation.
413 """
417 """
414 reactor = wireprotoframing.serverreactor()
418 # TODO Some HTTP clients are full duplex and can receive data before
419 # the entire request is transmitted. Figure out a way to indicate support
420 # for that so we can opt into full duplex mode.
421 reactor = wireprotoframing.serverreactor(deferoutput=True)
415 seencommand = False
422 seencommand = False
416
423
417 while True:
424 while True:
@@ -448,6 +455,19 def _processhttpv2request(ui, repo, req,
448 raise error.ProgrammingError(
455 raise error.ProgrammingError(
449 'unhandled action from frame processor: %s' % action)
456 'unhandled action from frame processor: %s' % action)
450
457
458 action, meta = reactor.oninputeof()
459 if action == 'sendframes':
460 # We assume we haven't started sending the response yet. If we're
461 # wrong, the response type will raise an exception.
462 res.status = b'200 OK'
463 res.headers[b'Content-Type'] = FRAMINGTYPE
464 res.setbodygen(meta['framegen'])
465 elif action == 'noop':
466 pass
467 else:
468 raise error.ProgrammingError('unhandled action from frame processor: %s'
469 % action)
470
451 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
471 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
452 command):
472 command):
453 """Dispatch a wire protocol command made from HTTPv2 requests.
473 """Dispatch a wire protocol command made from HTTPv2 requests.
@@ -504,6 +524,8 def _httpv2runcommand(ui, repo, req, res
504
524
505 if action == 'sendframes':
525 if action == 'sendframes':
506 res.setbodygen(meta['framegen'])
526 res.setbodygen(meta['framegen'])
527 elif action == 'noop':
528 pass
507 else:
529 else:
508 raise error.ProgrammingError('unhandled event from reactor: %s' %
530 raise error.ProgrammingError('unhandled event from reactor: %s' %
509 action)
531 action)
@@ -401,7 +401,7 Command frames can be reflected via debu
401 s> Server: testing stub value\r\n
401 s> Server: testing stub value\r\n
402 s> Date: $HTTP_DATE$\r\n
402 s> Date: $HTTP_DATE$\r\n
403 s> Content-Type: text/plain\r\n
403 s> Content-Type: text/plain\r\n
404 s> Content-Length: 291\r\n
404 s> Content-Length: 310\r\n
405 s> \r\n
405 s> \r\n
406 s> received: 1 2 command1\n
406 s> received: 1 2 command1\n
407 s> ["wantframe", {"state": "command-receiving-args"}]\n
407 s> ["wantframe", {"state": "command-receiving-args"}]\n
@@ -409,6 +409,7 Command frames can be reflected via debu
409 s> ["wantframe", {"state": "command-receiving-args"}]\n
409 s> ["wantframe", {"state": "command-receiving-args"}]\n
410 s> received: 2 2 \x04\x00\x03\x00bar1val\n
410 s> received: 2 2 \x04\x00\x03\x00bar1val\n
411 s> ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null}]\n
411 s> ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null}]\n
412 s> received: <no frame>
412 s> received: <no frame>\n
413 s> {"action": "noop"}
413
414
414 $ cat error.log
415 $ cat error.log
@@ -9,8 +9,8 from mercurial import (
9
9
10 ffs = framing.makeframefromhumanstring
10 ffs = framing.makeframefromhumanstring
11
11
12 def makereactor():
12 def makereactor(deferoutput=False):
13 return framing.serverreactor()
13 return framing.serverreactor(deferoutput=deferoutput)
14
14
15 def sendframes(reactor, gen):
15 def sendframes(reactor, gen):
16 """Send a generator of frame bytearray to a reactor.
16 """Send a generator of frame bytearray to a reactor.
@@ -95,6 +95,9 class ServerReactorTests(unittest.TestCa
95 'data': None,
95 'data': None,
96 })
96 })
97
97
98 result = reactor.oninputeof()
99 self.assertaction(result, 'noop')
100
98 def test1argument(self):
101 def test1argument(self):
99 reactor = makereactor()
102 reactor = makereactor()
100 results = list(sendcommandframes(reactor, b'mycommand',
103 results = list(sendcommandframes(reactor, b'mycommand',
@@ -310,6 +313,37 class ServerReactorTests(unittest.TestCa
310 b'error-response application some message',
313 b'error-response application some message',
311 ])
314 ])
312
315
316 def test1commanddeferresponse(self):
317 """Responses when in deferred output mode are delayed until EOF."""
318 reactor = makereactor(deferoutput=True)
319 results = list(sendcommandframes(reactor, b'mycommand', {}))
320 self.assertEqual(len(results), 1)
321 self.assertaction(results[0], 'runcommand')
322
323 result = reactor.onbytesresponseready(b'response')
324 self.assertaction(result, 'noop')
325 result = reactor.oninputeof()
326 self.assertaction(result, 'sendframes')
327 self.assertframesequal(result[1]['framegen'], [
328 b'bytes-response eos response',
329 ])
330
331 def testmultiplecommanddeferresponse(self):
332 reactor = makereactor(deferoutput=True)
333 list(sendcommandframes(reactor, b'command1', {}))
334 list(sendcommandframes(reactor, b'command2', {}))
335
336 result = reactor.onbytesresponseready(b'response1')
337 self.assertaction(result, 'noop')
338 result = reactor.onbytesresponseready(b'response2')
339 self.assertaction(result, 'noop')
340 result = reactor.oninputeof()
341 self.assertaction(result, 'sendframes')
342 self.assertframesequal(result[1]['framegen'], [
343 b'bytes-response eos response1',
344 b'bytes-response eos response2'
345 ])
346
313 if __name__ == '__main__':
347 if __name__ == '__main__':
314 import silenttestrunner
348 import silenttestrunner
315 silenttestrunner.main(__name__)
349 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now