##// END OF EJS Templates
wireproto: support for receiving multiple requests...
Gregory Szorc -
r37076:c5e9c3b4 default
parent child Browse files
Show More
@@ -327,6 +327,23 b' class serverreactor(object):'
327
327
328 noop
328 noop
329 Indicates no additional action is required.
329 Indicates no additional action is required.
330
331 Known Issues
332 ------------
333
334 There are no limits to the number of partially received commands or their
335 size. A malicious client could stream command request data and exhaust the
336 server's memory.
337
338 Partially received commands are not acted upon when end of input is
339 reached. Should the server error if it receives a partial request?
340 Should the client send a message to abort a partially transmitted request
341 to facilitate graceful shutdown?
342
343 Active requests that haven't been responded to aren't tracked. This means
344 that if we receive a command and instruct its dispatch, another command
345 with its request ID can come in over the wire and there will be a race
346 between who responds to what.
330 """
347 """
331
348
332 def __init__(self, deferoutput=False):
349 def __init__(self, deferoutput=False):
@@ -342,14 +359,8 b' class serverreactor(object):'
342 self._deferoutput = deferoutput
359 self._deferoutput = deferoutput
343 self._state = 'idle'
360 self._state = 'idle'
344 self._bufferedframegens = []
361 self._bufferedframegens = []
345 self._activerequestid = None
362 # request id -> dict of commands that are actively being received.
346 self._activecommand = None
363 self._receivingcommands = {}
347 self._activeargs = None
348 self._activedata = None
349 self._expectingargs = None
350 self._expectingdata = None
351 self._activeargname = None
352 self._activeargchunks = None
353
364
354 def onframerecv(self, requestid, frametype, frameflags, payload):
365 def onframerecv(self, requestid, frametype, frameflags, payload):
355 """Process a frame that has been received off the wire.
366 """Process a frame that has been received off the wire.
@@ -359,8 +370,7 b' class serverreactor(object):'
359 """
370 """
360 handlers = {
371 handlers = {
361 'idle': self._onframeidle,
372 'idle': self._onframeidle,
362 'command-receiving-args': self._onframereceivingargs,
373 'command-receiving': self._onframecommandreceiving,
363 'command-receiving-data': self._onframereceivingdata,
364 'errored': self._onframeerrored,
374 'errored': self._onframeerrored,
365 }
375 }
366
376
@@ -391,6 +401,8 b' class serverreactor(object):'
391 No more frames will be received. All pending activity should be
401 No more frames will be received. All pending activity should be
392 completed.
402 completed.
393 """
403 """
404 # TODO should we do anything about in-flight commands?
405
394 if not self._deferoutput or not self._bufferedframegens:
406 if not self._deferoutput or not self._bufferedframegens:
395 return 'noop', {}
407 return 'noop', {}
396
408
@@ -414,12 +426,20 b' class serverreactor(object):'
414 'message': msg,
426 'message': msg,
415 }
427 }
416
428
417 def _makeruncommandresult(self):
429 def _makeruncommandresult(self, requestid):
430 entry = self._receivingcommands[requestid]
431 del self._receivingcommands[requestid]
432
433 if self._receivingcommands:
434 self._state = 'command-receiving'
435 else:
436 self._state = 'idle'
437
418 return 'runcommand', {
438 return 'runcommand', {
419 'requestid': self._activerequestid,
439 'requestid': requestid,
420 'command': self._activecommand,
440 'command': entry['command'],
421 'args': self._activeargs,
441 'args': entry['args'],
422 'data': self._activedata.getvalue() if self._activedata else None,
442 'data': entry['data'].getvalue() if entry['data'] else None,
423 }
443 }
424
444
425 def _makewantframeresult(self):
445 def _makewantframeresult(self):
@@ -435,34 +455,76 b' class serverreactor(object):'
435 return self._makeerrorresult(
455 return self._makeerrorresult(
436 _('expected command frame; got %d') % frametype)
456 _('expected command frame; got %d') % frametype)
437
457
438 self._activerequestid = requestid
458 if requestid in self._receivingcommands:
439 self._activecommand = payload
459 self._state = 'errored'
440 self._activeargs = {}
460 return self._makeerrorresult(
441 self._activedata = None
461 _('request with ID %d already received') % requestid)
462
463 expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
464 expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
465
466 self._receivingcommands[requestid] = {
467 'command': payload,
468 'args': {},
469 'data': None,
470 'expectingargs': expectingargs,
471 'expectingdata': expectingdata,
472 }
442
473
443 if frameflags & FLAG_COMMAND_NAME_EOS:
474 if frameflags & FLAG_COMMAND_NAME_EOS:
444 return self._makeruncommandresult()
475 return self._makeruncommandresult(requestid)
445
446 self._expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
447 self._expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
448
476
449 if self._expectingargs:
477 if expectingargs or expectingdata:
450 self._state = 'command-receiving-args'
478 self._state = 'command-receiving'
451 return self._makewantframeresult()
452 elif self._expectingdata:
453 self._activedata = util.bytesio()
454 self._state = 'command-receiving-data'
455 return self._makewantframeresult()
479 return self._makewantframeresult()
456 else:
480 else:
457 self._state = 'errored'
481 self._state = 'errored'
458 return self._makeerrorresult(_('missing frame flags on '
482 return self._makeerrorresult(_('missing frame flags on '
459 'command frame'))
483 'command frame'))
460
484
461 def _onframereceivingargs(self, requestid, frametype, frameflags, payload):
485 def _onframecommandreceiving(self, requestid, frametype, frameflags,
462 if frametype != FRAME_TYPE_COMMAND_ARGUMENT:
486 payload):
487 # It could be a new command request. Process it as such.
488 if frametype == FRAME_TYPE_COMMAND_NAME:
489 return self._onframeidle(requestid, frametype, frameflags, payload)
490
491 # All other frames should be related to a command that is currently
492 # receiving.
493 if requestid not in self._receivingcommands:
463 self._state = 'errored'
494 self._state = 'errored'
464 return self._makeerrorresult(_('expected command argument '
495 return self._makeerrorresult(
465 'frame; got %d') % frametype)
496 _('received frame for request that is not receiving: %d') %
497 requestid)
498
499 entry = self._receivingcommands[requestid]
500
501 if frametype == FRAME_TYPE_COMMAND_ARGUMENT:
502 if not entry['expectingargs']:
503 self._state = 'errored'
504 return self._makeerrorresult(_(
505 'received command argument frame for request that is not '
506 'expecting arguments: %d') % requestid)
507
508 return self._handlecommandargsframe(requestid, entry, frametype,
509 frameflags, payload)
510
511 elif frametype == FRAME_TYPE_COMMAND_DATA:
512 if not entry['expectingdata']:
513 self._state = 'errored'
514 return self._makeerrorresult(_(
515 'received command data frame for request that is not '
516 'expecting data: %d') % requestid)
517
518 if entry['data'] is None:
519 entry['data'] = util.bytesio()
520
521 return self._handlecommanddataframe(requestid, entry, frametype,
522 frameflags, payload)
523
524 def _handlecommandargsframe(self, requestid, entry, frametype, frameflags,
525 payload):
526 # The frame and state of command should have already been validated.
527 assert frametype == FRAME_TYPE_COMMAND_ARGUMENT
466
528
467 offset = 0
529 offset = 0
468 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
530 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
@@ -483,10 +545,6 b' class serverreactor(object):'
483 # and wait for the next frame.
545 # and wait for the next frame.
484 if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
546 if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
485 raise error.ProgrammingError('not yet implemented')
547 raise error.ProgrammingError('not yet implemented')
486 self._activeargname = argname
487 self._activeargchunks = [argvalue]
488 self._state = 'command-arg-continuation'
489 return self._makewantframeresult()
490
548
491 # Common case: the argument value is completely contained in this
549 # Common case: the argument value is completely contained in this
492 # frame.
550 # frame.
@@ -496,36 +554,30 b' class serverreactor(object):'
496 return self._makeerrorresult(_('malformed argument frame: '
554 return self._makeerrorresult(_('malformed argument frame: '
497 'partial argument value'))
555 'partial argument value'))
498
556
499 self._activeargs[argname] = argvalue
557 entry['args'][argname] = argvalue
500
558
501 if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
559 if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
502 if self._expectingdata:
560 if entry['expectingdata']:
503 self._state = 'command-receiving-data'
504 self._activedata = util.bytesio()
505 # TODO signal request to run a command once we don't
561 # TODO signal request to run a command once we don't
506 # buffer data frames.
562 # buffer data frames.
507 return self._makewantframeresult()
563 return self._makewantframeresult()
508 else:
564 else:
509 self._state = 'waiting'
565 return self._makeruncommandresult(requestid)
510 return self._makeruncommandresult()
511 else:
566 else:
512 return self._makewantframeresult()
567 return self._makewantframeresult()
513
568
514 def _onframereceivingdata(self, requestid, frametype, frameflags, payload):
569 def _handlecommanddataframe(self, requestid, entry, frametype, frameflags,
515 if frametype != FRAME_TYPE_COMMAND_DATA:
570 payload):
516 self._state = 'errored'
571 assert frametype == FRAME_TYPE_COMMAND_DATA
517 return self._makeerrorresult(_('expected command data frame; '
518 'got %d') % frametype)
519
572
520 # TODO support streaming data instead of buffering it.
573 # TODO support streaming data instead of buffering it.
521 self._activedata.write(payload)
574 entry['data'].write(payload)
522
575
523 if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
576 if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
524 return self._makewantframeresult()
577 return self._makewantframeresult()
525 elif frameflags & FLAG_COMMAND_DATA_EOS:
578 elif frameflags & FLAG_COMMAND_DATA_EOS:
526 self._activedata.seek(0)
579 entry['data'].seek(0)
527 self._state = 'idle'
580 return self._makeruncommandresult(requestid)
528 return self._makeruncommandresult()
529 else:
581 else:
530 self._state = 'errored'
582 self._state = 'errored'
531 return self._makeerrorresult(_('command data frame without '
583 return self._makeerrorresult(_('command data frame without '
@@ -401,12 +401,12 b' Command frames can be reflected via debu'
401 s> Server: testing stub value\r\n
401 s> Server: testing stub value\r\n
402 s> Date: $HTTP_DATE$\r\n
402 s> Date: $HTTP_DATE$\r\n
403 s> Content-Type: text/plain\r\n
403 s> Content-Type: text/plain\r\n
404 s> Content-Length: 332\r\n
404 s> Content-Length: 322\r\n
405 s> \r\n
405 s> \r\n
406 s> received: 1 2 1 command1\n
406 s> received: 1 2 1 command1\n
407 s> ["wantframe", {"state": "command-receiving-args"}]\n
407 s> ["wantframe", {"state": "command-receiving"}]\n
408 s> received: 2 0 1 \x03\x00\x04\x00fooval1\n
408 s> received: 2 0 1 \x03\x00\x04\x00fooval1\n
409 s> ["wantframe", {"state": "command-receiving-args"}]\n
409 s> ["wantframe", {"state": "command-receiving"}]\n
410 s> received: 2 2 1 \x04\x00\x03\x00bar1val\n
410 s> received: 2 2 1 \x04\x00\x03\x00bar1val\n
411 s> ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null, "requestid": 1}]\n
411 s> ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null, "requestid": 1}]\n
412 s> received: <no frame>\n
412 s> received: <no frame>\n
@@ -196,6 +196,19 b' class ServerReactorTests(unittest.TestCa'
196 'message': b'expected command frame; got 2',
196 'message': b'expected command frame; got 2',
197 })
197 })
198
198
199 def testunexpectedcommandargumentreceiving(self):
200 """Same as above but the command is receiving."""
201 results = list(sendframes(makereactor(), [
202 ffs(b'1 command-name have-data command'),
203 ffs(b'1 command-argument eoa ignored'),
204 ]))
205
206 self.assertaction(results[1], 'error')
207 self.assertEqual(results[1][1], {
208 'message': b'received command argument frame for request that is '
209 b'not expecting arguments: 1',
210 })
211
199 def testunexpectedcommanddata(self):
212 def testunexpectedcommanddata(self):
200 """Command argument frame when not running a command is an error."""
213 """Command argument frame when not running a command is an error."""
201 result = self._sendsingleframe(makereactor(),
214 result = self._sendsingleframe(makereactor(),
@@ -205,6 +218,19 b' class ServerReactorTests(unittest.TestCa'
205 'message': b'expected command frame; got 3',
218 'message': b'expected command frame; got 3',
206 })
219 })
207
220
221 def testunexpectedcommanddatareceiving(self):
222 """Same as above except the command is receiving."""
223 results = list(sendframes(makereactor(), [
224 ffs(b'1 command-name have-args command'),
225 ffs(b'1 command-data eos ignored'),
226 ]))
227
228 self.assertaction(results[1], 'error')
229 self.assertEqual(results[1][1], {
230 'message': b'received command data frame for request that is not '
231 b'expecting data: 1',
232 })
233
208 def testmissingcommandframeflags(self):
234 def testmissingcommandframeflags(self):
209 """Command name frame must have flags set."""
235 """Command name frame must have flags set."""
210 result = self._sendsingleframe(makereactor(),
236 result = self._sendsingleframe(makereactor(),
@@ -214,19 +240,77 b' class ServerReactorTests(unittest.TestCa'
214 'message': b'missing frame flags on command frame',
240 'message': b'missing frame flags on command frame',
215 })
241 })
216
242
243 def testconflictingrequestid(self):
244 """Multiple fully serviced commands with same request ID is allowed."""
245 results = list(sendframes(makereactor(), [
246 ffs(b'1 command-name eos command'),
247 ffs(b'1 command-name eos command'),
248 ffs(b'1 command-name eos command'),
249 ]))
250 for i in range(3):
251 self.assertaction(results[i], 'runcommand')
252 self.assertEqual(results[i][1], {
253 'requestid': 1,
254 'command': b'command',
255 'args': {},
256 'data': None,
257 })
258
259 def testconflictingrequestid(self):
260 """Request ID for new command matching in-flight command is illegal."""
261 results = list(sendframes(makereactor(), [
262 ffs(b'1 command-name have-args command'),
263 ffs(b'1 command-name eos command'),
264 ]))
265
266 self.assertaction(results[0], 'wantframe')
267 self.assertaction(results[1], 'error')
268 self.assertEqual(results[1][1], {
269 'message': b'request with ID 1 already received',
270 })
271
272 def testinterleavedcommands(self):
273 results = list(sendframes(makereactor(), [
274 ffs(b'1 command-name have-args command1'),
275 ffs(b'3 command-name have-args command3'),
276 ffs(br'1 command-argument 0 \x03\x00\x03\x00foobar'),
277 ffs(br'3 command-argument 0 \x03\x00\x03\x00bizbaz'),
278 ffs(br'3 command-argument eoa \x03\x00\x03\x00keyval'),
279 ffs(br'1 command-argument eoa \x04\x00\x03\x00key1val'),
280 ]))
281
282 self.assertEqual([t[0] for t in results], [
283 'wantframe',
284 'wantframe',
285 'wantframe',
286 'wantframe',
287 'runcommand',
288 'runcommand',
289 ])
290
291 self.assertEqual(results[4][1], {
292 'requestid': 3,
293 'command': 'command3',
294 'args': {b'biz': b'baz', b'key': b'val'},
295 'data': None,
296 })
297 self.assertEqual(results[5][1], {
298 'requestid': 1,
299 'command': 'command1',
300 'args': {b'foo': b'bar', b'key1': b'val'},
301 'data': None,
302 })
303
217 def testmissingargumentframe(self):
304 def testmissingargumentframe(self):
305 # This test attempts to test behavior when reactor has an incomplete
306 # command request waiting on argument data. But it doesn't handle that
307 # scenario yet. So this test does nothing of value.
218 frames = [
308 frames = [
219 ffs(b'1 command-name have-args command'),
309 ffs(b'1 command-name have-args command'),
220 ffs(b'1 command-name 0 ignored'),
221 ]
310 ]
222
311
223 results = list(sendframes(makereactor(), frames))
312 results = list(sendframes(makereactor(), frames))
224 self.assertEqual(len(results), 2)
225 self.assertaction(results[0], 'wantframe')
313 self.assertaction(results[0], 'wantframe')
226 self.assertaction(results[1], 'error')
227 self.assertEqual(results[1][1], {
228 'message': b'expected command argument frame; got 1',
229 })
230
314
231 def testincompleteargumentname(self):
315 def testincompleteargumentname(self):
232 """Argument frame with incomplete name."""
316 """Argument frame with incomplete name."""
@@ -259,17 +343,16 b' class ServerReactorTests(unittest.TestCa'
259 })
343 })
260
344
261 def testmissingcommanddataframe(self):
345 def testmissingcommanddataframe(self):
346 # The reactor doesn't currently handle partially received commands.
347 # So this test is failing to do anything with request 1.
262 frames = [
348 frames = [
263 ffs(b'1 command-name have-data command1'),
349 ffs(b'1 command-name have-data command1'),
264 ffs(b'1 command-name eos command2'),
350 ffs(b'3 command-name eos command2'),
265 ]
351 ]
266 results = list(sendframes(makereactor(), frames))
352 results = list(sendframes(makereactor(), frames))
267 self.assertEqual(len(results), 2)
353 self.assertEqual(len(results), 2)
268 self.assertaction(results[0], 'wantframe')
354 self.assertaction(results[0], 'wantframe')
269 self.assertaction(results[1], 'error')
355 self.assertaction(results[1], 'runcommand')
270 self.assertEqual(results[1][1], {
271 'message': b'expected command data frame; got 1',
272 })
273
356
274 def testmissingcommanddataframeflags(self):
357 def testmissingcommanddataframeflags(self):
275 frames = [
358 frames = [
@@ -284,6 +367,18 b' class ServerReactorTests(unittest.TestCa'
284 'message': b'command data frame without flags',
367 'message': b'command data frame without flags',
285 })
368 })
286
369
370 def testframefornonreceivingrequest(self):
371 """Receiving a frame for a command that is not receiving is illegal."""
372 results = list(sendframes(makereactor(), [
373 ffs(b'1 command-name eos command1'),
374 ffs(b'3 command-name have-data command3'),
375 ffs(b'1 command-argument eoa ignored'),
376 ]))
377 self.assertaction(results[2], 'error')
378 self.assertEqual(results[2][1], {
379 'message': b'received frame for request that is not receiving: 1',
380 })
381
287 def testsimpleresponse(self):
382 def testsimpleresponse(self):
288 """Bytes response to command sends result frames."""
383 """Bytes response to command sends result frames."""
289 reactor = makereactor()
384 reactor = makereactor()
General Comments 0
You need to be logged in to leave comments. Login now