##// END OF EJS Templates
wireprotoframing: record when new stream is encountered...
Gregory Szorc -
r37674:e6870bca default
parent child Browse files
Show More
@@ -1,1070 +1,1072 b''
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import collections
15 15 import struct
16 16
17 17 from .i18n import _
18 18 from .thirdparty import (
19 19 attr,
20 20 cbor,
21 21 )
22 22 from . import (
23 23 encoding,
24 24 error,
25 25 util,
26 26 )
27 27 from .utils import (
28 28 stringutil,
29 29 )
30 30
31 31 FRAME_HEADER_SIZE = 8
32 32 DEFAULT_MAX_FRAME_SIZE = 32768
33 33
34 34 STREAM_FLAG_BEGIN_STREAM = 0x01
35 35 STREAM_FLAG_END_STREAM = 0x02
36 36 STREAM_FLAG_ENCODING_APPLIED = 0x04
37 37
38 38 STREAM_FLAGS = {
39 39 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
40 40 b'stream-end': STREAM_FLAG_END_STREAM,
41 41 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
42 42 }
43 43
44 44 FRAME_TYPE_COMMAND_REQUEST = 0x01
45 45 FRAME_TYPE_COMMAND_DATA = 0x03
46 46 FRAME_TYPE_BYTES_RESPONSE = 0x04
47 47 FRAME_TYPE_ERROR_RESPONSE = 0x05
48 48 FRAME_TYPE_TEXT_OUTPUT = 0x06
49 49 FRAME_TYPE_PROGRESS = 0x07
50 50 FRAME_TYPE_STREAM_SETTINGS = 0x08
51 51
52 52 FRAME_TYPES = {
53 53 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
54 54 b'command-data': FRAME_TYPE_COMMAND_DATA,
55 55 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
56 56 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
57 57 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
58 58 b'progress': FRAME_TYPE_PROGRESS,
59 59 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
60 60 }
61 61
62 62 FLAG_COMMAND_REQUEST_NEW = 0x01
63 63 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
64 64 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
65 65 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
66 66
67 67 FLAGS_COMMAND_REQUEST = {
68 68 b'new': FLAG_COMMAND_REQUEST_NEW,
69 69 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
70 70 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
71 71 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
72 72 }
73 73
74 74 FLAG_COMMAND_DATA_CONTINUATION = 0x01
75 75 FLAG_COMMAND_DATA_EOS = 0x02
76 76
77 77 FLAGS_COMMAND_DATA = {
78 78 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
79 79 b'eos': FLAG_COMMAND_DATA_EOS,
80 80 }
81 81
82 82 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
83 83 FLAG_BYTES_RESPONSE_EOS = 0x02
84 84 FLAG_BYTES_RESPONSE_CBOR = 0x04
85 85
86 86 FLAGS_BYTES_RESPONSE = {
87 87 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
88 88 b'eos': FLAG_BYTES_RESPONSE_EOS,
89 89 b'cbor': FLAG_BYTES_RESPONSE_CBOR,
90 90 }
91 91
92 92 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
93 93 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
94 94
95 95 FLAGS_ERROR_RESPONSE = {
96 96 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
97 97 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
98 98 }
99 99
100 100 # Maps frame types to their available flags.
101 101 FRAME_TYPE_FLAGS = {
102 102 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
103 103 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
104 104 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
105 105 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
106 106 FRAME_TYPE_TEXT_OUTPUT: {},
107 107 FRAME_TYPE_PROGRESS: {},
108 108 FRAME_TYPE_STREAM_SETTINGS: {},
109 109 }
110 110
111 111 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
112 112
113 113 def humanflags(mapping, value):
114 114 """Convert a numeric flags value to a human value, using a mapping table."""
115 115 namemap = {v: k for k, v in mapping.iteritems()}
116 116 flags = []
117 117 val = 1
118 118 while value >= val:
119 119 if value & val:
120 120 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
121 121 val <<= 1
122 122
123 123 return b'|'.join(flags)
124 124
125 125 @attr.s(slots=True)
126 126 class frameheader(object):
127 127 """Represents the data in a frame header."""
128 128
129 129 length = attr.ib()
130 130 requestid = attr.ib()
131 131 streamid = attr.ib()
132 132 streamflags = attr.ib()
133 133 typeid = attr.ib()
134 134 flags = attr.ib()
135 135
136 136 @attr.s(slots=True, repr=False)
137 137 class frame(object):
138 138 """Represents a parsed frame."""
139 139
140 140 requestid = attr.ib()
141 141 streamid = attr.ib()
142 142 streamflags = attr.ib()
143 143 typeid = attr.ib()
144 144 flags = attr.ib()
145 145 payload = attr.ib()
146 146
147 147 @encoding.strmethod
148 148 def __repr__(self):
149 149 typename = '<unknown 0x%02x>' % self.typeid
150 150 for name, value in FRAME_TYPES.iteritems():
151 151 if value == self.typeid:
152 152 typename = name
153 153 break
154 154
155 155 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
156 156 'type=%s; flags=%s)' % (
157 157 len(self.payload), self.requestid, self.streamid,
158 158 humanflags(STREAM_FLAGS, self.streamflags), typename,
159 159 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
160 160
161 161 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
162 162 """Assemble a frame into a byte array."""
163 163 # TODO assert size of payload.
164 164 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
165 165
166 166 # 24 bits length
167 167 # 16 bits request id
168 168 # 8 bits stream id
169 169 # 8 bits stream flags
170 170 # 4 bits type
171 171 # 4 bits flags
172 172
173 173 l = struct.pack(r'<I', len(payload))
174 174 frame[0:3] = l[0:3]
175 175 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
176 176 frame[7] = (typeid << 4) | flags
177 177 frame[8:] = payload
178 178
179 179 return frame
180 180
181 181 def makeframefromhumanstring(s):
182 182 """Create a frame from a human readable string
183 183
184 184 Strings have the form:
185 185
186 186 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
187 187
188 188 This can be used by user-facing applications and tests for creating
189 189 frames easily without having to type out a bunch of constants.
190 190
191 191 Request ID and stream IDs are integers.
192 192
193 193 Stream flags, frame type, and flags can be specified by integer or
194 194 named constant.
195 195
196 196 Flags can be delimited by `|` to bitwise OR them together.
197 197
198 198 If the payload begins with ``cbor:``, the following string will be
199 199 evaluated as Python literal and the resulting object will be fed into
200 200 a CBOR encoder. Otherwise, the payload is interpreted as a Python
201 201 byte string literal.
202 202 """
203 203 fields = s.split(b' ', 5)
204 204 requestid, streamid, streamflags, frametype, frameflags, payload = fields
205 205
206 206 requestid = int(requestid)
207 207 streamid = int(streamid)
208 208
209 209 finalstreamflags = 0
210 210 for flag in streamflags.split(b'|'):
211 211 if flag in STREAM_FLAGS:
212 212 finalstreamflags |= STREAM_FLAGS[flag]
213 213 else:
214 214 finalstreamflags |= int(flag)
215 215
216 216 if frametype in FRAME_TYPES:
217 217 frametype = FRAME_TYPES[frametype]
218 218 else:
219 219 frametype = int(frametype)
220 220
221 221 finalflags = 0
222 222 validflags = FRAME_TYPE_FLAGS[frametype]
223 223 for flag in frameflags.split(b'|'):
224 224 if flag in validflags:
225 225 finalflags |= validflags[flag]
226 226 else:
227 227 finalflags |= int(flag)
228 228
229 229 if payload.startswith(b'cbor:'):
230 230 payload = cbor.dumps(stringutil.evalpythonliteral(payload[5:]),
231 231 canonical=True)
232 232
233 233 else:
234 234 payload = stringutil.unescapestr(payload)
235 235
236 236 return makeframe(requestid=requestid, streamid=streamid,
237 237 streamflags=finalstreamflags, typeid=frametype,
238 238 flags=finalflags, payload=payload)
239 239
240 240 def parseheader(data):
241 241 """Parse a unified framing protocol frame header from a buffer.
242 242
243 243 The header is expected to be in the buffer at offset 0 and the
244 244 buffer is expected to be large enough to hold a full header.
245 245 """
246 246 # 24 bits payload length (little endian)
247 247 # 16 bits request ID
248 248 # 8 bits stream ID
249 249 # 8 bits stream flags
250 250 # 4 bits frame type
251 251 # 4 bits frame flags
252 252 # ... payload
253 253 framelength = data[0] + 256 * data[1] + 16384 * data[2]
254 254 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
255 255 typeflags = data[7]
256 256
257 257 frametype = (typeflags & 0xf0) >> 4
258 258 frameflags = typeflags & 0x0f
259 259
260 260 return frameheader(framelength, requestid, streamid, streamflags,
261 261 frametype, frameflags)
262 262
263 263 def readframe(fh):
264 264 """Read a unified framing protocol frame from a file object.
265 265
266 266 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
267 267 None if no frame is available. May raise if a malformed frame is
268 268 seen.
269 269 """
270 270 header = bytearray(FRAME_HEADER_SIZE)
271 271
272 272 readcount = fh.readinto(header)
273 273
274 274 if readcount == 0:
275 275 return None
276 276
277 277 if readcount != FRAME_HEADER_SIZE:
278 278 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
279 279 (readcount, header))
280 280
281 281 h = parseheader(header)
282 282
283 283 payload = fh.read(h.length)
284 284 if len(payload) != h.length:
285 285 raise error.Abort(_('frame length error: expected %d; got %d') %
286 286 (h.length, len(payload)))
287 287
288 288 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
289 289 payload)
290 290
291 291 def createcommandframes(stream, requestid, cmd, args, datafh=None,
292 292 maxframesize=DEFAULT_MAX_FRAME_SIZE):
293 293 """Create frames necessary to transmit a request to run a command.
294 294
295 295 This is a generator of bytearrays. Each item represents a frame
296 296 ready to be sent over the wire to a peer.
297 297 """
298 298 data = {b'name': cmd}
299 299 if args:
300 300 data[b'args'] = args
301 301
302 302 data = cbor.dumps(data, canonical=True)
303 303
304 304 offset = 0
305 305
306 306 while True:
307 307 flags = 0
308 308
309 309 # Must set new or continuation flag.
310 310 if not offset:
311 311 flags |= FLAG_COMMAND_REQUEST_NEW
312 312 else:
313 313 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
314 314
315 315 # Data frames is set on all frames.
316 316 if datafh:
317 317 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
318 318
319 319 payload = data[offset:offset + maxframesize]
320 320 offset += len(payload)
321 321
322 322 if len(payload) == maxframesize and offset < len(data):
323 323 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
324 324
325 325 yield stream.makeframe(requestid=requestid,
326 326 typeid=FRAME_TYPE_COMMAND_REQUEST,
327 327 flags=flags,
328 328 payload=payload)
329 329
330 330 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
331 331 break
332 332
333 333 if datafh:
334 334 while True:
335 335 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
336 336
337 337 done = False
338 338 if len(data) == DEFAULT_MAX_FRAME_SIZE:
339 339 flags = FLAG_COMMAND_DATA_CONTINUATION
340 340 else:
341 341 flags = FLAG_COMMAND_DATA_EOS
342 342 assert datafh.read(1) == b''
343 343 done = True
344 344
345 345 yield stream.makeframe(requestid=requestid,
346 346 typeid=FRAME_TYPE_COMMAND_DATA,
347 347 flags=flags,
348 348 payload=data)
349 349
350 350 if done:
351 351 break
352 352
353 353 def createbytesresponseframesfrombytes(stream, requestid, data, iscbor=False,
354 354 maxframesize=DEFAULT_MAX_FRAME_SIZE):
355 355 """Create a raw frame to send a bytes response from static bytes input.
356 356
357 357 Returns a generator of bytearrays.
358 358 """
359 359
360 360 # Simple case of a single frame.
361 361 if len(data) <= maxframesize:
362 362 flags = FLAG_BYTES_RESPONSE_EOS
363 363 if iscbor:
364 364 flags |= FLAG_BYTES_RESPONSE_CBOR
365 365
366 366 yield stream.makeframe(requestid=requestid,
367 367 typeid=FRAME_TYPE_BYTES_RESPONSE,
368 368 flags=flags,
369 369 payload=data)
370 370 return
371 371
372 372 offset = 0
373 373 while True:
374 374 chunk = data[offset:offset + maxframesize]
375 375 offset += len(chunk)
376 376 done = offset == len(data)
377 377
378 378 if done:
379 379 flags = FLAG_BYTES_RESPONSE_EOS
380 380 else:
381 381 flags = FLAG_BYTES_RESPONSE_CONTINUATION
382 382
383 383 if iscbor:
384 384 flags |= FLAG_BYTES_RESPONSE_CBOR
385 385
386 386 yield stream.makeframe(requestid=requestid,
387 387 typeid=FRAME_TYPE_BYTES_RESPONSE,
388 388 flags=flags,
389 389 payload=chunk)
390 390
391 391 if done:
392 392 break
393 393
394 394 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
395 395 # TODO properly handle frame size limits.
396 396 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
397 397
398 398 flags = 0
399 399 if protocol:
400 400 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
401 401 if application:
402 402 flags |= FLAG_ERROR_RESPONSE_APPLICATION
403 403
404 404 yield stream.makeframe(requestid=requestid,
405 405 typeid=FRAME_TYPE_ERROR_RESPONSE,
406 406 flags=flags,
407 407 payload=msg)
408 408
409 409 def createtextoutputframe(stream, requestid, atoms,
410 410 maxframesize=DEFAULT_MAX_FRAME_SIZE):
411 411 """Create a text output frame to render text to people.
412 412
413 413 ``atoms`` is a 3-tuple of (formatting string, args, labels).
414 414
415 415 The formatting string contains ``%s`` tokens to be replaced by the
416 416 corresponding indexed entry in ``args``. ``labels`` is an iterable of
417 417 formatters to be applied at rendering time. In terms of the ``ui``
418 418 class, each atom corresponds to a ``ui.write()``.
419 419 """
420 420 atomdicts = []
421 421
422 422 for (formatting, args, labels) in atoms:
423 423 # TODO look for localstr, other types here?
424 424
425 425 if not isinstance(formatting, bytes):
426 426 raise ValueError('must use bytes formatting strings')
427 427 for arg in args:
428 428 if not isinstance(arg, bytes):
429 429 raise ValueError('must use bytes for arguments')
430 430 for label in labels:
431 431 if not isinstance(label, bytes):
432 432 raise ValueError('must use bytes for labels')
433 433
434 434 # Formatting string must be ASCII.
435 435 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
436 436
437 437 # Arguments must be UTF-8.
438 438 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
439 439
440 440 # Labels must be ASCII.
441 441 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
442 442 for l in labels]
443 443
444 444 atom = {b'msg': formatting}
445 445 if args:
446 446 atom[b'args'] = args
447 447 if labels:
448 448 atom[b'labels'] = labels
449 449
450 450 atomdicts.append(atom)
451 451
452 452 payload = cbor.dumps(atomdicts, canonical=True)
453 453
454 454 if len(payload) > maxframesize:
455 455 raise ValueError('cannot encode data in a single frame')
456 456
457 457 yield stream.makeframe(requestid=requestid,
458 458 typeid=FRAME_TYPE_TEXT_OUTPUT,
459 459 flags=0,
460 460 payload=payload)
461 461
462 462 class stream(object):
463 463 """Represents a logical unidirectional series of frames."""
464 464
465 465 def __init__(self, streamid, active=False):
466 466 self.streamid = streamid
467 467 self._active = active
468 468
469 469 def makeframe(self, requestid, typeid, flags, payload):
470 470 """Create a frame to be sent out over this stream.
471 471
472 472 Only returns the frame instance. Does not actually send it.
473 473 """
474 474 streamflags = 0
475 475 if not self._active:
476 476 streamflags |= STREAM_FLAG_BEGIN_STREAM
477 477 self._active = True
478 478
479 479 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
480 480 payload)
481 481
482 482 def ensureserverstream(stream):
483 483 if stream.streamid % 2:
484 484 raise error.ProgrammingError('server should only write to even '
485 485 'numbered streams; %d is not even' %
486 486 stream.streamid)
487 487
488 488 class serverreactor(object):
489 489 """Holds state of a server handling frame-based protocol requests.
490 490
491 491 This class is the "brain" of the unified frame-based protocol server
492 492 component. While the protocol is stateless from the perspective of
493 493 requests/commands, something needs to track which frames have been
494 494 received, what frames to expect, etc. This class is that thing.
495 495
496 496 Instances are modeled as a state machine of sorts. Instances are also
497 497 reactionary to external events. The point of this class is to encapsulate
498 498 the state of the connection and the exchange of frames, not to perform
499 499 work. Instead, callers tell this class when something occurs, like a
500 500 frame arriving. If that activity is worthy of a follow-up action (say
501 501 *run a command*), the return value of that handler will say so.
502 502
503 503 I/O and CPU intensive operations are purposefully delegated outside of
504 504 this class.
505 505
506 506 Consumers are expected to tell instances when events occur. They do so by
507 507 calling the various ``on*`` methods. These methods return a 2-tuple
508 508 describing any follow-up action(s) to take. The first element is the
509 509 name of an action to perform. The second is a data structure (usually
510 510 a dict) specific to that action that contains more information. e.g.
511 511 if the server wants to send frames back to the client, the data structure
512 512 will contain a reference to those frames.
513 513
514 514 Valid actions that consumers can be instructed to take are:
515 515
516 516 sendframes
517 517 Indicates that frames should be sent to the client. The ``framegen``
518 518 key contains a generator of frames that should be sent. The server
519 519 assumes that all frames are sent to the client.
520 520
521 521 error
522 522 Indicates that an error occurred. Consumer should probably abort.
523 523
524 524 runcommand
525 525 Indicates that the consumer should run a wire protocol command. Details
526 526 of the command to run are given in the data structure.
527 527
528 528 wantframe
529 529 Indicates that nothing of interest happened and the server is waiting on
530 530 more frames from the client before anything interesting can be done.
531 531
532 532 noop
533 533 Indicates no additional action is required.
534 534
535 535 Known Issues
536 536 ------------
537 537
538 538 There are no limits to the number of partially received commands or their
539 539 size. A malicious client could stream command request data and exhaust the
540 540 server's memory.
541 541
542 542 Partially received commands are not acted upon when end of input is
543 543 reached. Should the server error if it receives a partial request?
544 544 Should the client send a message to abort a partially transmitted request
545 545 to facilitate graceful shutdown?
546 546
547 547 Active requests that haven't been responded to aren't tracked. This means
548 548 that if we receive a command and instruct its dispatch, another command
549 549 with its request ID can come in over the wire and there will be a race
550 550 between who responds to what.
551 551 """
552 552
553 553 def __init__(self, deferoutput=False):
554 554 """Construct a new server reactor.
555 555
556 556 ``deferoutput`` can be used to indicate that no output frames should be
557 557 instructed to be sent until input has been exhausted. In this mode,
558 558 events that would normally generate output frames (such as a command
559 559 response being ready) will instead defer instructing the consumer to
560 560 send those frames. This is useful for half-duplex transports where the
561 561 sender cannot receive until all data has been transmitted.
562 562 """
563 563 self._deferoutput = deferoutput
564 564 self._state = 'idle'
565 565 self._nextoutgoingstreamid = 2
566 566 self._bufferedframegens = []
567 567 # stream id -> stream instance for all active streams from the client.
568 568 self._incomingstreams = {}
569 569 self._outgoingstreams = {}
570 570 # request id -> dict of commands that are actively being received.
571 571 self._receivingcommands = {}
572 572 # Request IDs that have been received and are actively being processed.
573 573 # Once all output for a request has been sent, it is removed from this
574 574 # set.
575 575 self._activecommands = set()
576 576
577 577 def onframerecv(self, frame):
578 578 """Process a frame that has been received off the wire.
579 579
580 580 Returns a dict with an ``action`` key that details what action,
581 581 if any, the consumer should take next.
582 582 """
583 583 if not frame.streamid % 2:
584 584 self._state = 'errored'
585 585 return self._makeerrorresult(
586 586 _('received frame with even numbered stream ID: %d') %
587 587 frame.streamid)
588 588
589 589 if frame.streamid not in self._incomingstreams:
590 590 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
591 591 self._state = 'errored'
592 592 return self._makeerrorresult(
593 593 _('received frame on unknown inactive stream without '
594 594 'beginning of stream flag set'))
595 595
596 596 self._incomingstreams[frame.streamid] = stream(frame.streamid)
597 597
598 598 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
599 599 # TODO handle decoding frames
600 600 self._state = 'errored'
601 601 raise error.ProgrammingError('support for decoding stream payloads '
602 602 'not yet implemented')
603 603
604 604 if frame.streamflags & STREAM_FLAG_END_STREAM:
605 605 del self._incomingstreams[frame.streamid]
606 606
607 607 handlers = {
608 608 'idle': self._onframeidle,
609 609 'command-receiving': self._onframecommandreceiving,
610 610 'errored': self._onframeerrored,
611 611 }
612 612
613 613 meth = handlers.get(self._state)
614 614 if not meth:
615 615 raise error.ProgrammingError('unhandled state: %s' % self._state)
616 616
617 617 return meth(frame)
618 618
619 619 def onbytesresponseready(self, stream, requestid, data, iscbor=False):
620 620 """Signal that a bytes response is ready to be sent to the client.
621 621
622 622 The raw bytes response is passed as an argument.
623 623 """
624 624 ensureserverstream(stream)
625 625
626 626 def sendframes():
627 627 for frame in createbytesresponseframesfrombytes(stream, requestid,
628 628 data,
629 629 iscbor=iscbor):
630 630 yield frame
631 631
632 632 self._activecommands.remove(requestid)
633 633
634 634 result = sendframes()
635 635
636 636 if self._deferoutput:
637 637 self._bufferedframegens.append(result)
638 638 return 'noop', {}
639 639 else:
640 640 return 'sendframes', {
641 641 'framegen': result,
642 642 }
643 643
644 644 def oninputeof(self):
645 645 """Signals that end of input has been received.
646 646
647 647 No more frames will be received. All pending activity should be
648 648 completed.
649 649 """
650 650 # TODO should we do anything about in-flight commands?
651 651
652 652 if not self._deferoutput or not self._bufferedframegens:
653 653 return 'noop', {}
654 654
655 655 # If we buffered all our responses, emit those.
656 656 def makegen():
657 657 for gen in self._bufferedframegens:
658 658 for frame in gen:
659 659 yield frame
660 660
661 661 return 'sendframes', {
662 662 'framegen': makegen(),
663 663 }
664 664
665 665 def onapplicationerror(self, stream, requestid, msg):
666 666 ensureserverstream(stream)
667 667
668 668 return 'sendframes', {
669 669 'framegen': createerrorframe(stream, requestid, msg,
670 670 application=True),
671 671 }
672 672
673 673 def makeoutputstream(self):
674 674 """Create a stream to be used for sending data to the client."""
675 675 streamid = self._nextoutgoingstreamid
676 676 self._nextoutgoingstreamid += 2
677 677
678 678 s = stream(streamid)
679 679 self._outgoingstreams[streamid] = s
680 680
681 681 return s
682 682
683 683 def _makeerrorresult(self, msg):
684 684 return 'error', {
685 685 'message': msg,
686 686 }
687 687
688 688 def _makeruncommandresult(self, requestid):
689 689 entry = self._receivingcommands[requestid]
690 690
691 691 if not entry['requestdone']:
692 692 self._state = 'errored'
693 693 raise error.ProgrammingError('should not be called without '
694 694 'requestdone set')
695 695
696 696 del self._receivingcommands[requestid]
697 697
698 698 if self._receivingcommands:
699 699 self._state = 'command-receiving'
700 700 else:
701 701 self._state = 'idle'
702 702
703 703 # Decode the payloads as CBOR.
704 704 entry['payload'].seek(0)
705 705 request = cbor.load(entry['payload'])
706 706
707 707 if b'name' not in request:
708 708 self._state = 'errored'
709 709 return self._makeerrorresult(
710 710 _('command request missing "name" field'))
711 711
712 712 if b'args' not in request:
713 713 request[b'args'] = {}
714 714
715 715 assert requestid not in self._activecommands
716 716 self._activecommands.add(requestid)
717 717
718 718 return 'runcommand', {
719 719 'requestid': requestid,
720 720 'command': request[b'name'],
721 721 'args': request[b'args'],
722 722 'data': entry['data'].getvalue() if entry['data'] else None,
723 723 }
724 724
725 725 def _makewantframeresult(self):
726 726 return 'wantframe', {
727 727 'state': self._state,
728 728 }
729 729
730 730 def _validatecommandrequestframe(self, frame):
731 731 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
732 732 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
733 733
734 734 if new and continuation:
735 735 self._state = 'errored'
736 736 return self._makeerrorresult(
737 737 _('received command request frame with both new and '
738 738 'continuation flags set'))
739 739
740 740 if not new and not continuation:
741 741 self._state = 'errored'
742 742 return self._makeerrorresult(
743 743 _('received command request frame with neither new nor '
744 744 'continuation flags set'))
745 745
746 746 def _onframeidle(self, frame):
747 747 # The only frame type that should be received in this state is a
748 748 # command request.
749 749 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
750 750 self._state = 'errored'
751 751 return self._makeerrorresult(
752 752 _('expected command request frame; got %d') % frame.typeid)
753 753
754 754 res = self._validatecommandrequestframe(frame)
755 755 if res:
756 756 return res
757 757
758 758 if frame.requestid in self._receivingcommands:
759 759 self._state = 'errored'
760 760 return self._makeerrorresult(
761 761 _('request with ID %d already received') % frame.requestid)
762 762
763 763 if frame.requestid in self._activecommands:
764 764 self._state = 'errored'
765 765 return self._makeerrorresult(
766 766 _('request with ID %d is already active') % frame.requestid)
767 767
768 768 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
769 769 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
770 770 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
771 771
772 772 if not new:
773 773 self._state = 'errored'
774 774 return self._makeerrorresult(
775 775 _('received command request frame without new flag set'))
776 776
777 777 payload = util.bytesio()
778 778 payload.write(frame.payload)
779 779
780 780 self._receivingcommands[frame.requestid] = {
781 781 'payload': payload,
782 782 'data': None,
783 783 'requestdone': not moreframes,
784 784 'expectingdata': bool(expectingdata),
785 785 }
786 786
787 787 # This is the final frame for this request. Dispatch it.
788 788 if not moreframes and not expectingdata:
789 789 return self._makeruncommandresult(frame.requestid)
790 790
791 791 assert moreframes or expectingdata
792 792 self._state = 'command-receiving'
793 793 return self._makewantframeresult()
794 794
795 795 def _onframecommandreceiving(self, frame):
796 796 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
797 797 # Process new command requests as such.
798 798 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
799 799 return self._onframeidle(frame)
800 800
801 801 res = self._validatecommandrequestframe(frame)
802 802 if res:
803 803 return res
804 804
805 805 # All other frames should be related to a command that is currently
806 806 # receiving but is not active.
807 807 if frame.requestid in self._activecommands:
808 808 self._state = 'errored'
809 809 return self._makeerrorresult(
810 810 _('received frame for request that is still active: %d') %
811 811 frame.requestid)
812 812
813 813 if frame.requestid not in self._receivingcommands:
814 814 self._state = 'errored'
815 815 return self._makeerrorresult(
816 816 _('received frame for request that is not receiving: %d') %
817 817 frame.requestid)
818 818
819 819 entry = self._receivingcommands[frame.requestid]
820 820
821 821 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
822 822 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
823 823 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
824 824
825 825 if entry['requestdone']:
826 826 self._state = 'errored'
827 827 return self._makeerrorresult(
828 828 _('received command request frame when request frames '
829 829 'were supposedly done'))
830 830
831 831 if expectingdata != entry['expectingdata']:
832 832 self._state = 'errored'
833 833 return self._makeerrorresult(
834 834 _('mismatch between expect data flag and previous frame'))
835 835
836 836 entry['payload'].write(frame.payload)
837 837
838 838 if not moreframes:
839 839 entry['requestdone'] = True
840 840
841 841 if not moreframes and not expectingdata:
842 842 return self._makeruncommandresult(frame.requestid)
843 843
844 844 return self._makewantframeresult()
845 845
846 846 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
847 847 if not entry['expectingdata']:
848 848 self._state = 'errored'
849 849 return self._makeerrorresult(_(
850 850 'received command data frame for request that is not '
851 851 'expecting data: %d') % frame.requestid)
852 852
853 853 if entry['data'] is None:
854 854 entry['data'] = util.bytesio()
855 855
856 856 return self._handlecommanddataframe(frame, entry)
857 857 else:
858 858 self._state = 'errored'
859 859 return self._makeerrorresult(_(
860 860 'received unexpected frame type: %d') % frame.typeid)
861 861
862 862 def _handlecommanddataframe(self, frame, entry):
863 863 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
864 864
865 865 # TODO support streaming data instead of buffering it.
866 866 entry['data'].write(frame.payload)
867 867
868 868 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
869 869 return self._makewantframeresult()
870 870 elif frame.flags & FLAG_COMMAND_DATA_EOS:
871 871 entry['data'].seek(0)
872 872 return self._makeruncommandresult(frame.requestid)
873 873 else:
874 874 self._state = 'errored'
875 875 return self._makeerrorresult(_('command data frame without '
876 876 'flags'))
877 877
878 878 def _onframeerrored(self, frame):
879 879 return self._makeerrorresult(_('server already errored'))
880 880
881 881 class commandrequest(object):
882 882 """Represents a request to run a command."""
883 883
884 884 def __init__(self, requestid, name, args, datafh=None):
885 885 self.requestid = requestid
886 886 self.name = name
887 887 self.args = args
888 888 self.datafh = datafh
889 889 self.state = 'pending'
890 890
891 891 class clientreactor(object):
892 892 """Holds state of a client issuing frame-based protocol requests.
893 893
894 894 This is like ``serverreactor`` but for client-side state.
895 895
896 896 Each instance is bound to the lifetime of a connection. For persistent
897 897 connection transports using e.g. TCP sockets and speaking the raw
898 898 framing protocol, there will be a single instance for the lifetime of
899 899 the TCP socket. For transports where there are multiple discrete
900 900 interactions (say tunneled within in HTTP request), there will be a
901 901 separate instance for each distinct interaction.
902 902 """
903 903 def __init__(self, hasmultiplesend=False, buffersends=True):
904 904 """Create a new instance.
905 905
906 906 ``hasmultiplesend`` indicates whether multiple sends are supported
907 907 by the transport. When True, it is possible to send commands immediately
908 908 instead of buffering until the caller signals an intent to finish a
909 909 send operation.
910 910
911 911 ``buffercommands`` indicates whether sends should be buffered until the
912 912 last request has been issued.
913 913 """
914 914 self._hasmultiplesend = hasmultiplesend
915 915 self._buffersends = buffersends
916 916
917 917 self._canissuecommands = True
918 918 self._cansend = True
919 919
920 920 self._nextrequestid = 1
921 921 # We only support a single outgoing stream for now.
922 922 self._outgoingstream = stream(1)
923 923 self._pendingrequests = collections.deque()
924 924 self._activerequests = {}
925 925 self._incomingstreams = {}
926 926
927 927 def callcommand(self, name, args, datafh=None):
928 928 """Request that a command be executed.
929 929
930 930 Receives the command name, a dict of arguments to pass to the command,
931 931 and an optional file object containing the raw data for the command.
932 932
933 933 Returns a 3-tuple of (request, action, action data).
934 934 """
935 935 if not self._canissuecommands:
936 936 raise error.ProgrammingError('cannot issue new commands')
937 937
938 938 requestid = self._nextrequestid
939 939 self._nextrequestid += 2
940 940
941 941 request = commandrequest(requestid, name, args, datafh=datafh)
942 942
943 943 if self._buffersends:
944 944 self._pendingrequests.append(request)
945 945 return request, 'noop', {}
946 946 else:
947 947 if not self._cansend:
948 948 raise error.ProgrammingError('sends cannot be performed on '
949 949 'this instance')
950 950
951 951 if not self._hasmultiplesend:
952 952 self._cansend = False
953 953 self._canissuecommands = False
954 954
955 955 return request, 'sendframes', {
956 956 'framegen': self._makecommandframes(request),
957 957 }
958 958
959 959 def flushcommands(self):
960 960 """Request that all queued commands be sent.
961 961
962 962 If any commands are buffered, this will instruct the caller to send
963 963 them over the wire. If no commands are buffered it instructs the client
964 964 to no-op.
965 965
966 966 If instances aren't configured for multiple sends, no new command
967 967 requests are allowed after this is called.
968 968 """
969 969 if not self._pendingrequests:
970 970 return 'noop', {}
971 971
972 972 if not self._cansend:
973 973 raise error.ProgrammingError('sends cannot be performed on this '
974 974 'instance')
975 975
976 976 # If the instance only allows sending once, mark that we have fired
977 977 # our one shot.
978 978 if not self._hasmultiplesend:
979 979 self._canissuecommands = False
980 980 self._cansend = False
981 981
982 982 def makeframes():
983 983 while self._pendingrequests:
984 984 request = self._pendingrequests.popleft()
985 985 for frame in self._makecommandframes(request):
986 986 yield frame
987 987
988 988 return 'sendframes', {
989 989 'framegen': makeframes(),
990 990 }
991 991
992 992 def _makecommandframes(self, request):
993 993 """Emit frames to issue a command request.
994 994
995 995 As a side-effect, update request accounting to reflect its changed
996 996 state.
997 997 """
998 998 self._activerequests[request.requestid] = request
999 999 request.state = 'sending'
1000 1000
1001 1001 res = createcommandframes(self._outgoingstream,
1002 1002 request.requestid,
1003 1003 request.name,
1004 1004 request.args,
1005 1005 request.datafh)
1006 1006
1007 1007 for frame in res:
1008 1008 yield frame
1009 1009
1010 1010 request.state = 'sent'
1011 1011
1012 1012 def onframerecv(self, frame):
1013 1013 """Process a frame that has been received off the wire.
1014 1014
1015 1015 Returns a 2-tuple of (action, meta) describing further action the
1016 1016 caller needs to take as a result of receiving this frame.
1017 1017 """
1018 1018 if frame.streamid % 2:
1019 1019 return 'error', {
1020 1020 'message': (
1021 1021 _('received frame with odd numbered stream ID: %d') %
1022 1022 frame.streamid),
1023 1023 }
1024 1024
1025 1025 if frame.streamid not in self._incomingstreams:
1026 1026 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1027 1027 return 'error', {
1028 1028 'message': _('received frame on unknown stream '
1029 1029 'without beginning of stream flag set'),
1030 1030 }
1031 1031
1032 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1033
1032 1034 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1033 1035 raise error.ProgrammingError('support for decoding stream '
1034 1036 'payloads not yet implemneted')
1035 1037
1036 1038 if frame.streamflags & STREAM_FLAG_END_STREAM:
1037 1039 del self._incomingstreams[frame.streamid]
1038 1040
1039 1041 if frame.requestid not in self._activerequests:
1040 1042 return 'error', {
1041 1043 'message': (_('received frame for inactive request ID: %d') %
1042 1044 frame.requestid),
1043 1045 }
1044 1046
1045 1047 request = self._activerequests[frame.requestid]
1046 1048 request.state = 'receiving'
1047 1049
1048 1050 handlers = {
1049 1051 FRAME_TYPE_BYTES_RESPONSE: self._onbytesresponseframe,
1050 1052 }
1051 1053
1052 1054 meth = handlers.get(frame.typeid)
1053 1055 if not meth:
1054 1056 raise error.ProgrammingError('unhandled frame type: %d' %
1055 1057 frame.typeid)
1056 1058
1057 1059 return meth(request, frame)
1058 1060
1059 1061 def _onbytesresponseframe(self, request, frame):
1060 1062 if frame.flags & FLAG_BYTES_RESPONSE_EOS:
1061 1063 request.state = 'received'
1062 1064 del self._activerequests[request.requestid]
1063 1065
1064 1066 return 'responsedata', {
1065 1067 'request': request,
1066 1068 'expectmore': frame.flags & FLAG_BYTES_RESPONSE_CONTINUATION,
1067 1069 'eos': frame.flags & FLAG_BYTES_RESPONSE_EOS,
1068 1070 'cbor': frame.flags & FLAG_BYTES_RESPONSE_CBOR,
1069 1071 'data': frame.payload,
1070 1072 }
@@ -1,110 +1,130 b''
1 1 from __future__ import absolute_import
2 2
3 3 import unittest
4 4
5 5 from mercurial import (
6 6 error,
7 7 wireprotoframing as framing,
8 8 )
9 9
10 10 ffs = framing.makeframefromhumanstring
11 11
12 12 def sendframe(reactor, frame):
13 13 """Send a frame bytearray to a reactor."""
14 14 header = framing.parseheader(frame)
15 15 payload = frame[framing.FRAME_HEADER_SIZE:]
16 16 assert len(payload) == header.length
17 17
18 18 return reactor.onframerecv(framing.frame(header.requestid,
19 19 header.streamid,
20 20 header.streamflags,
21 21 header.typeid,
22 22 header.flags,
23 23 payload))
24 24
25 25 class SingleSendTests(unittest.TestCase):
26 26 """A reactor that can only send once rejects subsequent sends."""
27 27 def testbasic(self):
28 28 reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True)
29 29
30 30 request, action, meta = reactor.callcommand(b'foo', {})
31 31 self.assertEqual(request.state, 'pending')
32 32 self.assertEqual(action, 'noop')
33 33
34 34 action, meta = reactor.flushcommands()
35 35 self.assertEqual(action, 'sendframes')
36 36
37 37 for frame in meta['framegen']:
38 38 self.assertEqual(request.state, 'sending')
39 39
40 40 self.assertEqual(request.state, 'sent')
41 41
42 42 with self.assertRaisesRegexp(error.ProgrammingError,
43 43 'cannot issue new commands'):
44 44 reactor.callcommand(b'foo', {})
45 45
46 46 with self.assertRaisesRegexp(error.ProgrammingError,
47 47 'cannot issue new commands'):
48 48 reactor.callcommand(b'foo', {})
49 49
50 50 class NoBufferTests(unittest.TestCase):
51 51 """A reactor without send buffering sends requests immediately."""
52 52 def testbasic(self):
53 53 reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False)
54 54
55 55 request, action, meta = reactor.callcommand(b'command1', {})
56 56 self.assertEqual(request.requestid, 1)
57 57 self.assertEqual(action, 'sendframes')
58 58
59 59 self.assertEqual(request.state, 'pending')
60 60
61 61 for frame in meta['framegen']:
62 62 self.assertEqual(request.state, 'sending')
63 63
64 64 self.assertEqual(request.state, 'sent')
65 65
66 66 action, meta = reactor.flushcommands()
67 67 self.assertEqual(action, 'noop')
68 68
69 69 # And we can send another command.
70 70 request, action, meta = reactor.callcommand(b'command2', {})
71 71 self.assertEqual(request.requestid, 3)
72 72 self.assertEqual(action, 'sendframes')
73 73
74 74 for frame in meta['framegen']:
75 75 self.assertEqual(request.state, 'sending')
76 76
77 77 self.assertEqual(request.state, 'sent')
78 78
79 79 class BadFrameRecvTests(unittest.TestCase):
80 80 def testoddstream(self):
81 81 reactor = framing.clientreactor()
82 82
83 83 action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
84 84 self.assertEqual(action, 'error')
85 85 self.assertEqual(meta['message'],
86 86 'received frame with odd numbered stream ID: 1')
87 87
88 88 def testunknownstream(self):
89 89 reactor = framing.clientreactor()
90 90
91 91 action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
92 92 self.assertEqual(action, 'error')
93 93 self.assertEqual(meta['message'],
94 94 'received frame on unknown stream without beginning '
95 95 'of stream flag set')
96 96
97 97 def testunhandledframetype(self):
98 98 reactor = framing.clientreactor(buffersends=False)
99 99
100 100 request, action, meta = reactor.callcommand(b'foo', {})
101 101 for frame in meta['framegen']:
102 102 pass
103 103
104 104 with self.assertRaisesRegexp(error.ProgrammingError,
105 105 'unhandled frame type'):
106 106 sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
107 107
108 class StreamTests(unittest.TestCase):
109 def testmultipleresponseframes(self):
110 reactor = framing.clientreactor(buffersends=False)
111
112 request, action, meta = reactor.callcommand(b'foo', {})
113
114 self.assertEqual(action, 'sendframes')
115 for f in meta['framegen']:
116 pass
117
118 action, meta = sendframe(
119 reactor,
120 ffs(b'%d 0 stream-begin 4 0 foo' % request.requestid))
121 self.assertEqual(action, 'responsedata')
122
123 action, meta = sendframe(
124 reactor,
125 ffs(b'%d 0 0 4 eos bar' % request.requestid))
126 self.assertEqual(action, 'responsedata')
127
108 128 if __name__ == '__main__':
109 129 import silenttestrunner
110 130 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now