##// END OF EJS Templates
wireprotov2: update stream encoding specification...
Gregory Szorc -
r40161:e2fe1074 default
parent child Browse files
Show More
@@ -1,649 +1,740 b''
1 1 **Experimental and under development**
2 2
3 3 This document describe's Mercurial's transport-agnostic remote procedure
4 4 call (RPC) protocol which is used to perform interactions with remote
5 5 servers. This protocol is also referred to as ``hgrpc``.
6 6
7 7 The protocol has the following high-level features:
8 8
9 9 * Concurrent request and response support (multiple commands can be issued
10 10 simultaneously and responses can be streamed simultaneously).
11 11 * Supports half-duplex and full-duplex connections.
12 12 * All data is transmitted within *frames*, which have a well-defined
13 13 header and encode their length.
14 14 * Side-channels for sending progress updates and printing output. Text
15 15 output from the remote can be localized locally.
16 16 * Support for simultaneous and long-lived compression streams, even across
17 17 requests.
18 18 * Uses CBOR for data exchange.
19 19
20 20 The protocol is not specific to Mercurial and could be used by other
21 21 applications.
22 22
23 23 High-level Overview
24 24 ===================
25 25
26 26 To operate the protocol, a bi-directional, half-duplex pipe supporting
27 27 ordered sends and receives is required. That is, each peer has one pipe
28 28 for sending data and another for receiving. Full-duplex pipes are also
29 29 supported.
30 30
31 31 All data is read and written in atomic units called *frames*. These
32 32 are conceptually similar to TCP packets. Higher-level functionality
33 33 is built on the exchange and processing of frames.
34 34
35 35 All frames are associated with a *stream*. A *stream* provides a
36 36 unidirectional grouping of frames. Streams facilitate two goals:
37 37 content encoding and parallelism. There is a dedicated section on
38 38 streams below.
39 39
40 40 The protocol is request-response based: the client issues requests to
41 41 the server, which issues replies to those requests. Server-initiated
42 42 messaging is not currently supported, but this specification carves
43 43 out room to implement it.
44 44
45 45 All frames are associated with a numbered request. Frames can thus
46 46 be logically grouped by their request ID.
47 47
48 48 Frames
49 49 ======
50 50
51 51 Frames begin with an 8 octet header followed by a variable length
52 52 payload::
53 53
54 54 +------------------------------------------------+
55 55 | Length (24) |
56 56 +--------------------------------+---------------+
57 57 | Request ID (16) | Stream ID (8) |
58 58 +------------------+-------------+---------------+
59 59 | Stream Flags (8) |
60 60 +-----------+------+
61 61 | Type (4) |
62 62 +-----------+
63 63 | Flags (4) |
64 64 +===========+===================================================|
65 65 | Frame Payload (0...) ...
66 66 +---------------------------------------------------------------+
67 67
68 68 The length of the frame payload is expressed as an unsigned 24 bit
69 69 little endian integer. Values larger than 65535 MUST NOT be used unless
70 70 given permission by the server as part of the negotiated capabilities
71 71 during the handshake. The frame header is not part of the advertised
72 72 frame length. The payload length is the over-the-wire length. If there
73 73 is content encoding applied to the payload as part of the frame's stream,
74 74 the length is the output of that content encoding, not the input.
75 75
76 76 The 16-bit ``Request ID`` field denotes the integer request identifier,
77 77 stored as an unsigned little endian integer. Odd numbered requests are
78 78 client-initiated. Even numbered requests are server-initiated. This
79 79 refers to where the *request* was initiated - not where the *frame* was
80 80 initiated, so servers will send frames with odd ``Request ID`` in
81 81 response to client-initiated requests. Implementations are advised to
82 82 start ordering request identifiers at ``1`` and ``0``, increment by
83 83 ``2``, and wrap around if all available numbers have been exhausted.
84 84
85 85 The 8-bit ``Stream ID`` field denotes the stream that the frame is
86 86 associated with. Frames belonging to a stream may have content
87 87 encoding applied and the receiver may need to decode the raw frame
88 88 payload to obtain the original data. Odd numbered IDs are
89 89 client-initiated. Even numbered IDs are server-initiated.
90 90
91 91 The 8-bit ``Stream Flags`` field defines stream processing semantics.
92 92 See the section on streams below.
93 93
94 94 The 4-bit ``Type`` field denotes the type of frame being sent.
95 95
96 96 The 4-bit ``Flags`` field defines special, per-type attributes for
97 97 the frame.
98 98
99 99 The sections below define the frame types and their behavior.
100 100
101 101 Command Request (``0x01``)
102 102 --------------------------
103 103
104 104 This frame contains a request to run a command.
105 105
106 106 The payload consists of a CBOR map defining the command request. The
107 107 bytestring keys of that map are:
108 108
109 109 name
110 110 Name of the command that should be executed (bytestring).
111 111 args
112 112 Map of bytestring keys to various value types containing the named
113 113 arguments to this command.
114 114
115 115 Each command defines its own set of argument names and their expected
116 116 types.
117 117
118 118 redirect (optional)
119 119 (map) Advertises client support for following response *redirects*.
120 120
121 121 This map has the following bytestring keys:
122 122
123 123 targets
124 124 (array of bytestring) List of named redirect targets supported by
125 125 this client. The names come from the targets advertised by the
126 126 server's *capabilities* message.
127 127
128 128 hashes
129 129 (array of bytestring) List of preferred hashing algorithms that can
130 130 be used for content integrity verification.
131 131
132 132 See the *Content Redirects* section below for more on content redirects.
133 133
134 134 This frame type MUST ONLY be sent from clients to servers: it is illegal
135 135 for a server to send this frame to a client.
136 136
137 137 The following flag values are defined for this type:
138 138
139 139 0x01
140 140 New command request. When set, this frame represents the beginning
141 141 of a new request to run a command. The ``Request ID`` attached to this
142 142 frame MUST NOT be active.
143 143 0x02
144 144 Command request continuation. When set, this frame is a continuation
145 145 from a previous command request frame for its ``Request ID``. This
146 146 flag is set when the CBOR data for a command request does not fit
147 147 in a single frame.
148 148 0x04
149 149 Additional frames expected. When set, the command request didn't fit
150 150 into a single frame and additional CBOR data follows in a subsequent
151 151 frame.
152 152 0x08
153 153 Command data frames expected. When set, command data frames are
154 154 expected to follow the final command request frame for this request.
155 155
156 156 ``0x01`` MUST be set on the initial command request frame for a
157 157 ``Request ID``.
158 158
159 159 ``0x01`` or ``0x02`` MUST be set to indicate this frame's role in
160 160 a series of command request frames.
161 161
162 162 If command data frames are to be sent, ``0x08`` MUST be set on ALL
163 163 command request frames.
164 164
165 165 Command Data (``0x02``)
166 166 -----------------------
167 167
168 168 This frame contains raw data for a command.
169 169
170 170 Most commands can be executed by specifying arguments. However,
171 171 arguments have an upper bound to their length. For commands that
172 172 accept data that is beyond this length or whose length isn't known
173 173 when the command is initially sent, they will need to stream
174 174 arbitrary data to the server. This frame type facilitates the sending
175 175 of this data.
176 176
177 177 The payload of this frame type consists of a stream of raw data to be
178 178 consumed by the command handler on the server. The format of the data
179 179 is command specific.
180 180
181 181 The following flag values are defined for this type:
182 182
183 183 0x01
184 184 Command data continuation. When set, the data for this command
185 185 continues into a subsequent frame.
186 186
187 187 0x02
188 188 End of data. When set, command data has been fully sent to the
189 189 server. The command has been fully issued and no new data for this
190 190 command will be sent. The next frame will belong to a new command.
191 191
192 192 Command Response Data (``0x03``)
193 193 --------------------------------
194 194
195 195 This frame contains response data to an issued command.
196 196
197 197 Response data ALWAYS consists of a series of 1 or more CBOR encoded
198 198 values. A CBOR value may be using indefinite length encoding. And the
199 199 bytes constituting the value may span several frames.
200 200
201 201 The following flag values are defined for this type:
202 202
203 203 0x01
204 204 Data continuation. When set, an additional frame containing response data
205 205 will follow.
206 206 0x02
207 207 End of data. When set, the response data has been fully sent and
208 208 no additional frames for this response will be sent.
209 209
210 210 The ``0x01`` flag is mutually exclusive with the ``0x02`` flag.
211 211
212 212 Error Occurred (``0x05``)
213 213 -------------------------
214 214
215 215 Some kind of error occurred.
216 216
217 217 There are 3 general kinds of failures that can occur:
218 218
219 219 * Command error encountered before any response issued
220 220 * Command error encountered after a response was issued
221 221 * Protocol or stream level error
222 222
223 223 This frame type is used to capture the latter cases. (The general
224 224 command error case is handled by the leading CBOR map in
225 225 ``Command Response`` frames.)
226 226
227 227 The payload of this frame contains a CBOR map detailing the error. That
228 228 map has the following bytestring keys:
229 229
230 230 type
231 231 (bytestring) The overall type of error encountered. Can be one of the
232 232 following values:
233 233
234 234 protocol
235 235 A protocol-level error occurred. This typically means someone
236 236 is violating the framing protocol semantics and the server is
237 237 refusing to proceed.
238 238
239 239 server
240 240 A server-level error occurred. This typically indicates some kind of
241 241 logic error on the server, likely the fault of the server.
242 242
243 243 command
244 244 A command-level error, likely the fault of the client.
245 245
246 246 message
247 247 (array of maps) A richly formatted message that is intended for
248 248 human consumption. See the ``Human Output Side-Channel`` frame
249 249 section for a description of the format of this data structure.
250 250
251 251 Human Output Side-Channel (``0x06``)
252 252 ------------------------------------
253 253
254 254 This frame contains a message that is intended to be displayed to
255 255 people. Whereas most frames communicate machine readable data, this
256 256 frame communicates textual data that is intended to be shown to
257 257 humans.
258 258
259 259 The frame consists of a series of *formatting requests*. Each formatting
260 260 request consists of a formatting string, arguments for that formatting
261 261 string, and labels to apply to that formatting string.
262 262
263 263 A formatting string is a printf()-like string that allows variable
264 264 substitution within the string. Labels allow the rendered text to be
265 265 *decorated*. Assuming use of the canonical Mercurial code base, a
266 266 formatting string can be the input to the ``i18n._`` function. This
267 267 allows messages emitted from the server to be localized. So even if
268 268 the server has different i18n settings, people could see messages in
269 269 their *native* settings. Similarly, the use of labels allows
270 270 decorations like coloring and underlining to be applied using the
271 271 client's configured rendering settings.
272 272
273 273 Formatting strings are similar to ``printf()`` strings or how
274 274 Python's ``%`` operator works. The only supported formatting sequences
275 275 are ``%s`` and ``%%``. ``%s`` will be replaced by whatever the string
276 276 at that position resolves to. ``%%`` will be replaced by ``%``. All
277 277 other 2-byte sequences beginning with ``%`` represent a literal
278 278 ``%`` followed by that character. However, future versions of the
279 279 wire protocol reserve the right to allow clients to opt in to receiving
280 280 formatting strings with additional formatters, hence why ``%%`` is
281 281 required to represent the literal ``%``.
282 282
283 283 The frame payload consists of a CBOR array of CBOR maps. Each map
284 284 defines an *atom* of text data to print. Each *atom* has the following
285 285 bytestring keys:
286 286
287 287 msg
288 288 (bytestring) The formatting string. Content MUST be ASCII.
289 289 args (optional)
290 290 Array of bytestrings defining arguments to the formatting string.
291 291 labels (optional)
292 292 Array of bytestrings defining labels to apply to this atom.
293 293
294 294 All data to be printed MUST be encoded into a single frame: this frame
295 295 does not support spanning data across multiple frames.
296 296
297 297 All textual data encoded in these frames is assumed to be line delimited.
298 298 The last atom in the frame SHOULD end with a newline (``\n``). If it
299 299 doesn't, clients MAY add a newline to facilitate immediate printing.
300 300
301 301 Progress Update (``0x07``)
302 302 --------------------------
303 303
304 304 This frame holds the progress of an operation on the peer. Consumption
305 305 of these frames allows clients to display progress bars, estimated
306 306 completion times, etc.
307 307
308 308 Each frame defines the progress of a single operation on the peer. The
309 309 payload consists of a CBOR map with the following bytestring keys:
310 310
311 311 topic
312 312 Topic name (string)
313 313 pos
314 314 Current numeric position within the topic (integer)
315 315 total
316 316 Total/end numeric position of this topic (unsigned integer)
317 317 label (optional)
318 318 Unit label (string)
319 319 item (optional)
320 320 Item name (string)
321 321
322 322 Progress state is created when a frame is received referencing a
323 323 *topic* that isn't currently tracked. Progress tracking for that
324 324 *topic* is finished when a frame is received reporting the current
325 325 position of that topic as ``-1``.
326 326
327 327 Multiple *topics* may be active at any given time.
328 328
329 329 Rendering of progress information is not mandated or governed by this
330 330 specification: implementations MAY render progress information however
331 331 they see fit, including not at all.
332 332
333 333 The string data describing the topic SHOULD be static strings to
334 334 facilitate receivers localizing that string data. The emitter
335 335 MUST normalize all string data to valid UTF-8 and receivers SHOULD
336 336 validate that received data conforms to UTF-8. The topic name
337 337 SHOULD be ASCII.
338 338
339 Stream Encoding Settings (``0x08``)
339 Sender Protocol Settings (``0x08``)
340 -----------------------------------
341
342 This frame type advertises the sender's support for various protocol and
343 stream level features. The data advertised in this frame is used to influence
344 subsequent behavior of the current frame exchange channel.
345
346 The frame payload consists of a CBOR map. It may contain the following
347 bytestring keys:
348
349 contentencodings
350 (array of bytestring) A list of content encodings supported by the
351 sender, in order of most to least preferred.
352
353 Peers are allowed to encode stream data using any of the listed
354 encodings.
355
356 See the ``Content Encoding Profiles`` section for an enumeration
357 of supported content encodings.
358
359 If not defined, the value is assumed to be a list with the single value
360 ``identity``, meaning only the no-op encoding is supported.
361
362 Senders MAY filter the set of advertised encodings against what it
363 knows the receiver supports (e.g. if the receiver advertised encodings
364 via the capabilities descriptor). However, doing so will prevent
365 servers from gaining an understanding of the aggregate capabilities
366 of clients. So clients are discouraged from doing so.
367
368 When this frame is not sent/received, the receiver assumes default values
369 for all keys.
370
371 If encountered, this frame type MUST be sent before any other frame type
372 in a channel.
373
374 The following flag values are defined for this frame type:
375
376 0x01
377 Data continuation. When set, an additional frame containing more protocol
378 settings immediately follows.
379 0x02
380 End of data. When set, the protocol settings data has been completely
381 sent.
382
383 The ``0x01`` flag is mutually exclusive with the ``0x02`` flag.
384
385 Stream Encoding Settings (``0x09``)
340 386 -----------------------------------
341 387
342 388 This frame type holds information defining the content encoding
343 389 settings for a *stream*.
344 390
345 391 This frame type is likely consumed by the protocol layer and is not
346 392 passed on to applications.
347 393
348 394 This frame type MUST ONLY occur on frames having the *Beginning of Stream*
349 395 ``Stream Flag`` set.
350 396
351 397 The payload of this frame defines what content encoding has (possibly)
352 398 been applied to the payloads of subsequent frames in this stream.
353 399
354 The payload begins with an 8-bit integer defining the length of the
355 encoding *profile*, followed by the string name of that profile, which
356 must be an ASCII string. All bytes that follow can be used by that
357 profile for supplemental settings definitions. See the section below
358 on defined encoding profiles.
400 The payload consists of a series of CBOR values. The first value is a
401 bytestring denoting the content encoding profile of the data in this
402 stream. Subsequent CBOR values supplement this simple value in a
403 profile-specific manner. See the ``Content Encoding Profiles`` section
404 for more.
405
406 In the absence of this frame on a stream, it is assumed the stream is
407 using the ``identity`` content encoding.
408
409 The following flag values are defined for this frame type:
410
411 0x01
412 Data continuation. When set, an additional frame containing more encoding
413 settings immediately follows.
414 0x02
415 End of data. When set, the encoding settings data has been completely
416 sent.
417
418 The ``0x01`` flag is mutually exclusive with the ``0x02`` flag.
359 419
360 420 Stream States and Flags
361 421 =======================
362 422
363 423 Streams can be in two states: *open* and *closed*. An *open* stream
364 424 is active and frames attached to that stream could arrive at any time.
365 425 A *closed* stream is not active. If a frame attached to a *closed*
366 426 stream arrives, that frame MUST have an appropriate stream flag
367 427 set indicating beginning of stream. All streams are in the *closed*
368 428 state by default.
369 429
370 430 The ``Stream Flags`` field denotes a set of bit flags for defining
371 431 the relationship of this frame within a stream. The following flags
372 432 are defined:
373 433
374 434 0x01
375 435 Beginning of stream. The first frame in the stream MUST set this
376 436 flag. When received, the ``Stream ID`` this frame is attached to
377 437 becomes ``open``.
378 438
379 439 0x02
380 440 End of stream. The last frame in a stream MUST set this flag. When
381 441 received, the ``Stream ID`` this frame is attached to becomes
382 442 ``closed``. Any content encoding context associated with this stream
383 443 can be destroyed after processing the payload of this frame.
384 444
385 445 0x04
386 446 Apply content encoding. When set, any content encoding settings
387 447 defined by the stream should be applied when attempting to read
388 448 the frame. When not set, the frame payload isn't encoded.
389 449
450 TODO consider making stream opening and closing communicated via
451 explicit frame types (e.g. a "stream state change" frame) rather than
452 flags on all frames. This would make stream state changes more explicit,
453 as they could only occur on specific frame types.
454
390 455 Streams
391 456 =======
392 457
393 458 Streams - along with ``Request IDs`` - facilitate grouping of frames.
394 459 But the purpose of each is quite different and the groupings they
395 460 constitute are independent.
396 461
397 462 A ``Request ID`` is essentially a tag. It tells you which logical
398 463 request a frame is associated with.
399 464
400 465 A *stream* is a sequence of frames grouped for the express purpose
401 466 of applying a stateful encoding or for denoting sub-groups of frames.
402 467
403 468 Unlike ``Request ID``s which span the request and response, a stream
404 469 is unidirectional and stream IDs are independent from client to
405 470 server.
406 471
407 472 There is no strict hierarchical relationship between ``Request IDs``
408 473 and *streams*. A stream can contain frames having multiple
409 474 ``Request IDs``. Frames belonging to the same ``Request ID`` can
410 475 span multiple streams.
411 476
412 477 One goal of streams is to facilitate content encoding. A stream can
413 478 define an encoding to be applied to frame payloads. For example, the
414 479 payload transmitted over the wire may contain output from a
415 480 zstandard compression operation and the receiving end may decompress
416 481 that payload to obtain the original data.
417 482
418 483 The other goal of streams is to facilitate concurrent execution. For
419 484 example, a server could spawn 4 threads to service a request that can
420 485 be easily parallelized. Each of those 4 threads could write into its
421 486 own stream. Those streams could then in turn be delivered to 4 threads
422 487 on the receiving end, with each thread consuming its stream in near
423 488 isolation. The *main* thread on both ends merely does I/O and
424 489 encodes/decodes frame headers: the bulk of the work is done by worker
425 490 threads.
426 491
427 492 In addition, since content encoding is defined per stream, each
428 493 *worker thread* could perform potentially CPU bound work concurrently
429 494 with other threads. This approach of applying encoding at the
430 495 sub-protocol / stream level eliminates a potential resource constraint
431 496 on the protocol stream as a whole (it is common for the throughput of
432 497 a compression engine to be smaller than the throughput of a network).
433 498
434 499 Having multiple streams - each with their own encoding settings - also
435 500 facilitates the use of advanced data compression techniques. For
436 501 example, a transmitter could see that it is generating data faster
437 502 and slower than the receiving end is consuming it and adjust its
438 503 compression settings to trade CPU for compression ratio accordingly.
439 504
440 505 While streams can define a content encoding, not all frames within
441 506 that stream must use that content encoding. This can be useful when
442 507 data is being served from caches and being derived dynamically. A
443 508 cache could pre-compressed data so the server doesn't have to
444 509 recompress it. The ability to pick and choose which frames are
445 510 compressed allows servers to easily send data to the wire without
446 511 involving potentially expensive encoding overhead.
447 512
448 513 Content Encoding Profiles
449 514 =========================
450 515
451 516 Streams can have named content encoding *profiles* associated with
452 517 them. A profile defines a shared understanding of content encoding
453 518 settings and behavior.
454 519
455 The following profiles are defined:
520 Profiles are described in the following sections.
521
522 identity
523 --------
524
525 The ``identity`` profile is a no-op encoding: the encoded bytes are
526 exactly the input bytes.
527
528 This profile MUST be supported by all peers.
529
530 In the absence of an identified profile, the ``identity`` profile is
531 assumed.
456 532
457 TBD
533 zstd-8mb
534 --------
535
536 Zstandard encoding (RFC 8478). Zstandard is a fast and effective lossless
537 compression format.
538
539 This profile allows decompressor window sizes of up to 8 MB.
540
541 zlib
542 ----
543
544 zlib compressed data (RFC 1950). zlib is a widely-used and supported
545 lossless compression format.
546
547 It isn't as fast as zstandard and it is recommended to use zstandard instead,
548 if possible.
458 549
459 550 Command Protocol
460 551 ================
461 552
462 553 A client can request that a remote run a command by sending it
463 554 frames defining that command. This logical stream is composed of
464 555 1 or more ``Command Request`` frames and and 0 or more ``Command Data``
465 556 frames.
466 557
467 558 All frames composing a single command request MUST be associated with
468 559 the same ``Request ID``.
469 560
470 561 Clients MAY send additional command requests without waiting on the
471 562 response to a previous command request. If they do so, they MUST ensure
472 563 that the ``Request ID`` field of outbound frames does not conflict
473 564 with that of an active ``Request ID`` whose response has not yet been
474 565 fully received.
475 566
476 567 Servers MAY respond to commands in a different order than they were
477 568 sent over the wire. Clients MUST be prepared to deal with this. Servers
478 569 also MAY start executing commands in a different order than they were
479 570 received, or MAY execute multiple commands concurrently.
480 571
481 572 If there is a dependency between commands or a race condition between
482 573 commands executing (e.g. a read-only command that depends on the results
483 574 of a command that mutates the repository), then clients MUST NOT send
484 575 frames issuing a command until a response to all dependent commands has
485 576 been received.
486 577 TODO think about whether we should express dependencies between commands
487 578 to avoid roundtrip latency.
488 579
489 580 A command is defined by a command name, 0 or more command arguments,
490 581 and optional command data.
491 582
492 583 Arguments are the recommended mechanism for transferring fixed sets of
493 584 parameters to a command. Data is appropriate for transferring variable
494 585 data. Thinking in terms of HTTP, arguments would be headers and data
495 586 would be the message body.
496 587
497 588 It is recommended for servers to delay the dispatch of a command
498 589 until all argument have been received. Servers MAY impose limits on the
499 590 maximum argument size.
500 591 TODO define failure mechanism.
501 592
502 593 Servers MAY dispatch to commands immediately once argument data
503 594 is available or delay until command data is received in full.
504 595
505 596 Once a ``Command Request`` frame is sent, a client must be prepared to
506 597 receive any of the following frames associated with that request:
507 598 ``Command Response``, ``Error Response``, ``Human Output Side-Channel``,
508 599 ``Progress Update``.
509 600
510 601 The *main* response for a command will be in ``Command Response`` frames.
511 602 The payloads of these frames consist of 1 or more CBOR encoded values.
512 603 The first CBOR value on the first ``Command Response`` frame is special
513 604 and denotes the overall status of the command. This CBOR map contains
514 605 the following bytestring keys:
515 606
516 607 status
517 608 (bytestring) A well-defined message containing the overall status of
518 609 this command request. The following values are defined:
519 610
520 611 ok
521 612 The command was received successfully and its response follows.
522 613 error
523 614 There was an error processing the command. More details about the
524 615 error are encoded in the ``error`` key.
525 616 redirect
526 617 The response for this command is available elsewhere. Details on
527 618 where are in the ``location`` key.
528 619
529 620 error (optional)
530 621 A map containing information about an encountered error. The map has the
531 622 following keys:
532 623
533 624 message
534 625 (array of maps) A message describing the error. The message uses the
535 626 same format as those in the ``Human Output Side-Channel`` frame.
536 627
537 628 location (optional)
538 629 (map) Presence indicates that a *content redirect* has occurred. The map
539 630 provides the external location of the content.
540 631
541 632 This map contains the following bytestring keys:
542 633
543 634 url
544 635 (bytestring) URL from which this content may be requested.
545 636
546 637 mediatype
547 638 (bytestring) The media type for the fetched content. e.g.
548 639 ``application/mercurial-*``.
549 640
550 641 In some transports, this value is also advertised by the transport.
551 642 e.g. as the ``Content-Type`` HTTP header.
552 643
553 644 size (optional)
554 645 (unsigned integer) Total size of remote object in bytes. This is
555 646 the raw size of the entity that will be fetched, minus any
556 647 non-Mercurial protocol encoding (e.g. HTTP content or transfer
557 648 encoding.)
558 649
559 650 fullhashes (optional)
560 651 (array of arrays) Content hashes for the entire payload. Each entry
561 652 is an array of bytestrings containing the hash name and the hash value.
562 653
563 654 fullhashseed (optional)
564 655 (bytestring) Optional seed value to feed into hasher for full content
565 656 hash verification.
566 657
567 658 serverdercerts (optional)
568 659 (array of bytestring) DER encoded x509 certificates for the server. When
569 660 defined, clients MAY validate that the x509 certificate on the target
570 661 server exactly matches the certificate used here.
571 662
572 663 servercadercerts (optional)
573 664 (array of bytestring) DER encoded x509 certificates for the certificate
574 665 authority of the target server. When defined, clients MAY validate that
575 666 the x509 on the target server was signed by CA certificate in this set.
576 667
577 668 # TODO support for giving client an x509 certificate pair to be used as a
578 669 # client certificate.
579 670
580 671 # TODO support common authentication mechanisms (e.g. HTTP basic/digest
581 672 # auth).
582 673
583 674 # TODO support custom authentication mechanisms. This likely requires
584 675 # server to advertise required auth mechanism so client can filter.
585 676
586 677 # TODO support chained hashes. e.g. hash for each 1MB segment so client
587 678 # can iteratively validate data without having to consume all of it first.
588 679
589 680 TODO formalize when error frames can be seen and how errors can be
590 681 recognized midway through a command response.
591 682
592 683 Content Redirects
593 684 =================
594 685
595 686 Servers have the ability to respond to ANY command request with a
596 687 *redirect* to another location. Such a response is referred to as a *redirect
597 688 response*. (This feature is conceptually similar to HTTP redirects, but is
598 689 more powerful.)
599 690
600 691 A *redirect response* MUST ONLY be issued if the client advertises support
601 692 for a redirect *target*.
602 693
603 694 A *redirect response* MUST NOT be issued unless the client advertises support
604 695 for one.
605 696
606 697 Clients advertise support for *redirect responses* after looking at the server's
607 698 *capabilities* data, which is fetched during initial server connection
608 699 handshake. The server's capabilities data advertises named *targets* for
609 700 potential redirects.
610 701
611 702 Each target is described by a protocol name, connection and protocol features,
612 703 etc. The server also advertises target-agnostic redirect settings, such as
613 704 which hash algorithms are supported for content integrity checking. (See
614 705 the documentation for the *capabilities* command for more.)
615 706
616 707 Clients examine the set of advertised redirect targets for compatibility.
617 708 When sending a command request, the client advertises the set of redirect
618 709 target names it is willing to follow, along with some other settings influencing
619 710 behavior.
620 711
621 712 For example, say the server is advertising a ``cdn`` redirect target that
622 713 requires SNI and TLS 1.2. If the client supports those features, it will
623 714 send command requests stating that the ``cdn`` target is acceptable to use.
624 715 But if the client doesn't support SNI or TLS 1.2 (or maybe it encountered an
625 716 error using this target from a previous request), then it omits this target
626 717 name.
627 718
628 719 If the client advertises support for a redirect target, the server MAY
629 720 substitute the normal, inline response data for a *redirect response* -
630 721 one where the initial CBOR map has a ``status`` key with value ``redirect``.
631 722
632 723 The *redirect response* at a minimum advertises the URL where the response
633 724 can be retrieved.
634 725
635 726 The *redirect response* MAY also advertise additional details about that
636 727 content and how to retrieve it. Notably, the response may contain the
637 728 x509 public certificates for the server being redirected to or the
638 729 certificate authority that signed that server's certificate. Unless the
639 730 client has existing settings that offer stronger trust validation than what
640 731 the server advertises, the client SHOULD use the server-provided certificates
641 732 when validating the connection to the remote server in place of any default
642 733 connection verification checks. This is because certificates coming from
643 734 the server SHOULD establish a stronger chain of trust than what the default
644 735 certification validation mechanism in most environments provides. (By default,
645 736 certificate validation ensures the signer of the cert chains up to a set of
646 737 trusted root certificates. And if an explicit certificate or CA certificate
647 738 is presented, that greadly reduces the set of certificates that will be
648 739 recognized as valid, thus reducing the potential for a "bad" certificate
649 740 to be used and trusted.)
@@ -1,1388 +1,1407 b''
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import collections
15 15 import struct
16 16
17 17 from .i18n import _
18 18 from .thirdparty import (
19 19 attr,
20 20 )
21 21 from . import (
22 22 encoding,
23 23 error,
24 24 pycompat,
25 25 util,
26 26 wireprototypes,
27 27 )
28 28 from .utils import (
29 29 cborutil,
30 30 stringutil,
31 31 )
32 32
33 33 FRAME_HEADER_SIZE = 8
34 34 DEFAULT_MAX_FRAME_SIZE = 32768
35 35
36 36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 37 STREAM_FLAG_END_STREAM = 0x02
38 38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39 39
40 40 STREAM_FLAGS = {
41 41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 42 b'stream-end': STREAM_FLAG_END_STREAM,
43 43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 44 }
45 45
46 46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 47 FRAME_TYPE_COMMAND_DATA = 0x02
48 48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 51 FRAME_TYPE_PROGRESS = 0x07
52 FRAME_TYPE_STREAM_SETTINGS = 0x08
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
53 54
54 55 FRAME_TYPES = {
55 56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
56 57 b'command-data': FRAME_TYPE_COMMAND_DATA,
57 58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
58 59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
59 60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
60 61 b'progress': FRAME_TYPE_PROGRESS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
61 63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
62 64 }
63 65
64 66 FLAG_COMMAND_REQUEST_NEW = 0x01
65 67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
66 68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
67 69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
68 70
69 71 FLAGS_COMMAND_REQUEST = {
70 72 b'new': FLAG_COMMAND_REQUEST_NEW,
71 73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
72 74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
73 75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
74 76 }
75 77
76 78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
77 79 FLAG_COMMAND_DATA_EOS = 0x02
78 80
79 81 FLAGS_COMMAND_DATA = {
80 82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
81 83 b'eos': FLAG_COMMAND_DATA_EOS,
82 84 }
83 85
84 86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
85 87 FLAG_COMMAND_RESPONSE_EOS = 0x02
86 88
87 89 FLAGS_COMMAND_RESPONSE = {
88 90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
89 91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
90 92 }
91 93
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 }
101
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104
105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 }
109
92 110 # Maps frame types to their available flags.
93 111 FRAME_TYPE_FLAGS = {
94 112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
95 113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
96 114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
97 115 FRAME_TYPE_ERROR_RESPONSE: {},
98 116 FRAME_TYPE_TEXT_OUTPUT: {},
99 117 FRAME_TYPE_PROGRESS: {},
100 FRAME_TYPE_STREAM_SETTINGS: {},
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
101 120 }
102 121
103 122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
104 123
105 124 def humanflags(mapping, value):
106 125 """Convert a numeric flags value to a human value, using a mapping table."""
107 126 namemap = {v: k for k, v in mapping.iteritems()}
108 127 flags = []
109 128 val = 1
110 129 while value >= val:
111 130 if value & val:
112 131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
113 132 val <<= 1
114 133
115 134 return b'|'.join(flags)
116 135
117 136 @attr.s(slots=True)
118 137 class frameheader(object):
119 138 """Represents the data in a frame header."""
120 139
121 140 length = attr.ib()
122 141 requestid = attr.ib()
123 142 streamid = attr.ib()
124 143 streamflags = attr.ib()
125 144 typeid = attr.ib()
126 145 flags = attr.ib()
127 146
128 147 @attr.s(slots=True, repr=False)
129 148 class frame(object):
130 149 """Represents a parsed frame."""
131 150
132 151 requestid = attr.ib()
133 152 streamid = attr.ib()
134 153 streamflags = attr.ib()
135 154 typeid = attr.ib()
136 155 flags = attr.ib()
137 156 payload = attr.ib()
138 157
139 158 @encoding.strmethod
140 159 def __repr__(self):
141 160 typename = '<unknown 0x%02x>' % self.typeid
142 161 for name, value in FRAME_TYPES.iteritems():
143 162 if value == self.typeid:
144 163 typename = name
145 164 break
146 165
147 166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
148 167 'type=%s; flags=%s)' % (
149 168 len(self.payload), self.requestid, self.streamid,
150 169 humanflags(STREAM_FLAGS, self.streamflags), typename,
151 170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
152 171
153 172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
154 173 """Assemble a frame into a byte array."""
155 174 # TODO assert size of payload.
156 175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
157 176
158 177 # 24 bits length
159 178 # 16 bits request id
160 179 # 8 bits stream id
161 180 # 8 bits stream flags
162 181 # 4 bits type
163 182 # 4 bits flags
164 183
165 184 l = struct.pack(r'<I', len(payload))
166 185 frame[0:3] = l[0:3]
167 186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
168 187 frame[7] = (typeid << 4) | flags
169 188 frame[8:] = payload
170 189
171 190 return frame
172 191
173 192 def makeframefromhumanstring(s):
174 193 """Create a frame from a human readable string
175 194
176 195 Strings have the form:
177 196
178 197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
179 198
180 199 This can be used by user-facing applications and tests for creating
181 200 frames easily without having to type out a bunch of constants.
182 201
183 202 Request ID and stream IDs are integers.
184 203
185 204 Stream flags, frame type, and flags can be specified by integer or
186 205 named constant.
187 206
188 207 Flags can be delimited by `|` to bitwise OR them together.
189 208
190 209 If the payload begins with ``cbor:``, the following string will be
191 210 evaluated as Python literal and the resulting object will be fed into
192 211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
193 212 byte string literal.
194 213 """
195 214 fields = s.split(b' ', 5)
196 215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
197 216
198 217 requestid = int(requestid)
199 218 streamid = int(streamid)
200 219
201 220 finalstreamflags = 0
202 221 for flag in streamflags.split(b'|'):
203 222 if flag in STREAM_FLAGS:
204 223 finalstreamflags |= STREAM_FLAGS[flag]
205 224 else:
206 225 finalstreamflags |= int(flag)
207 226
208 227 if frametype in FRAME_TYPES:
209 228 frametype = FRAME_TYPES[frametype]
210 229 else:
211 230 frametype = int(frametype)
212 231
213 232 finalflags = 0
214 233 validflags = FRAME_TYPE_FLAGS[frametype]
215 234 for flag in frameflags.split(b'|'):
216 235 if flag in validflags:
217 236 finalflags |= validflags[flag]
218 237 else:
219 238 finalflags |= int(flag)
220 239
221 240 if payload.startswith(b'cbor:'):
222 241 payload = b''.join(cborutil.streamencode(
223 242 stringutil.evalpythonliteral(payload[5:])))
224 243
225 244 else:
226 245 payload = stringutil.unescapestr(payload)
227 246
228 247 return makeframe(requestid=requestid, streamid=streamid,
229 248 streamflags=finalstreamflags, typeid=frametype,
230 249 flags=finalflags, payload=payload)
231 250
232 251 def parseheader(data):
233 252 """Parse a unified framing protocol frame header from a buffer.
234 253
235 254 The header is expected to be in the buffer at offset 0 and the
236 255 buffer is expected to be large enough to hold a full header.
237 256 """
238 257 # 24 bits payload length (little endian)
239 258 # 16 bits request ID
240 259 # 8 bits stream ID
241 260 # 8 bits stream flags
242 261 # 4 bits frame type
243 262 # 4 bits frame flags
244 263 # ... payload
245 264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
246 265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
247 266 typeflags = data[7]
248 267
249 268 frametype = (typeflags & 0xf0) >> 4
250 269 frameflags = typeflags & 0x0f
251 270
252 271 return frameheader(framelength, requestid, streamid, streamflags,
253 272 frametype, frameflags)
254 273
255 274 def readframe(fh):
256 275 """Read a unified framing protocol frame from a file object.
257 276
258 277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
259 278 None if no frame is available. May raise if a malformed frame is
260 279 seen.
261 280 """
262 281 header = bytearray(FRAME_HEADER_SIZE)
263 282
264 283 readcount = fh.readinto(header)
265 284
266 285 if readcount == 0:
267 286 return None
268 287
269 288 if readcount != FRAME_HEADER_SIZE:
270 289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
271 290 (readcount, header))
272 291
273 292 h = parseheader(header)
274 293
275 294 payload = fh.read(h.length)
276 295 if len(payload) != h.length:
277 296 raise error.Abort(_('frame length error: expected %d; got %d') %
278 297 (h.length, len(payload)))
279 298
280 299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
281 300 payload)
282 301
283 302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
284 303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
285 304 redirect=None):
286 305 """Create frames necessary to transmit a request to run a command.
287 306
288 307 This is a generator of bytearrays. Each item represents a frame
289 308 ready to be sent over the wire to a peer.
290 309 """
291 310 data = {b'name': cmd}
292 311 if args:
293 312 data[b'args'] = args
294 313
295 314 if redirect:
296 315 data[b'redirect'] = redirect
297 316
298 317 data = b''.join(cborutil.streamencode(data))
299 318
300 319 offset = 0
301 320
302 321 while True:
303 322 flags = 0
304 323
305 324 # Must set new or continuation flag.
306 325 if not offset:
307 326 flags |= FLAG_COMMAND_REQUEST_NEW
308 327 else:
309 328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
310 329
311 330 # Data frames is set on all frames.
312 331 if datafh:
313 332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
314 333
315 334 payload = data[offset:offset + maxframesize]
316 335 offset += len(payload)
317 336
318 337 if len(payload) == maxframesize and offset < len(data):
319 338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
320 339
321 340 yield stream.makeframe(requestid=requestid,
322 341 typeid=FRAME_TYPE_COMMAND_REQUEST,
323 342 flags=flags,
324 343 payload=payload)
325 344
326 345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
327 346 break
328 347
329 348 if datafh:
330 349 while True:
331 350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
332 351
333 352 done = False
334 353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
335 354 flags = FLAG_COMMAND_DATA_CONTINUATION
336 355 else:
337 356 flags = FLAG_COMMAND_DATA_EOS
338 357 assert datafh.read(1) == b''
339 358 done = True
340 359
341 360 yield stream.makeframe(requestid=requestid,
342 361 typeid=FRAME_TYPE_COMMAND_DATA,
343 362 flags=flags,
344 363 payload=data)
345 364
346 365 if done:
347 366 break
348 367
349 368 def createcommandresponseframesfrombytes(stream, requestid, data,
350 369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
351 370 """Create a raw frame to send a bytes response from static bytes input.
352 371
353 372 Returns a generator of bytearrays.
354 373 """
355 374 # Automatically send the overall CBOR response map.
356 375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
357 376 if len(overall) > maxframesize:
358 377 raise error.ProgrammingError('not yet implemented')
359 378
360 379 # Simple case where we can fit the full response in a single frame.
361 380 if len(overall) + len(data) <= maxframesize:
362 381 flags = FLAG_COMMAND_RESPONSE_EOS
363 382 yield stream.makeframe(requestid=requestid,
364 383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
365 384 flags=flags,
366 385 payload=overall + data)
367 386 return
368 387
369 388 # It's easier to send the overall CBOR map in its own frame than to track
370 389 # offsets.
371 390 yield stream.makeframe(requestid=requestid,
372 391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
373 392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
374 393 payload=overall)
375 394
376 395 offset = 0
377 396 while True:
378 397 chunk = data[offset:offset + maxframesize]
379 398 offset += len(chunk)
380 399 done = offset == len(data)
381 400
382 401 if done:
383 402 flags = FLAG_COMMAND_RESPONSE_EOS
384 403 else:
385 404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
386 405
387 406 yield stream.makeframe(requestid=requestid,
388 407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
389 408 flags=flags,
390 409 payload=chunk)
391 410
392 411 if done:
393 412 break
394 413
395 414 def createbytesresponseframesfromgen(stream, requestid, gen,
396 415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
397 416 """Generator of frames from a generator of byte chunks.
398 417
399 418 This assumes that another frame will follow whatever this emits. i.e.
400 419 this always emits the continuation flag and never emits the end-of-stream
401 420 flag.
402 421 """
403 422 cb = util.chunkbuffer(gen)
404 423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405 424
406 425 while True:
407 426 chunk = cb.read(maxframesize)
408 427 if not chunk:
409 428 break
410 429
411 430 yield stream.makeframe(requestid=requestid,
412 431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
413 432 flags=flags,
414 433 payload=chunk)
415 434
416 435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
417 436
418 437 def createcommandresponseokframe(stream, requestid):
419 438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
420 439
421 440 return stream.makeframe(requestid=requestid,
422 441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
423 442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
424 443 payload=overall)
425 444
426 445 def createcommandresponseeosframe(stream, requestid):
427 446 """Create an empty payload frame representing command end-of-stream."""
428 447 return stream.makeframe(requestid=requestid,
429 448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
430 449 flags=FLAG_COMMAND_RESPONSE_EOS,
431 450 payload=b'')
432 451
433 452 def createalternatelocationresponseframe(stream, requestid, location):
434 453 data = {
435 454 b'status': b'redirect',
436 455 b'location': {
437 456 b'url': location.url,
438 457 b'mediatype': location.mediatype,
439 458 }
440 459 }
441 460
442 461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
443 462 r'servercadercerts'):
444 463 value = getattr(location, a)
445 464 if value is not None:
446 465 data[b'location'][pycompat.bytestr(a)] = value
447 466
448 467 return stream.makeframe(requestid=requestid,
449 468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
450 469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
451 470 payload=b''.join(cborutil.streamencode(data)))
452 471
453 472 def createcommanderrorresponse(stream, requestid, message, args=None):
454 473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
455 474 # formatting works consistently?
456 475 m = {
457 476 b'status': b'error',
458 477 b'error': {
459 478 b'message': message,
460 479 }
461 480 }
462 481
463 482 if args:
464 483 m[b'error'][b'args'] = args
465 484
466 485 overall = b''.join(cborutil.streamencode(m))
467 486
468 487 yield stream.makeframe(requestid=requestid,
469 488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
470 489 flags=FLAG_COMMAND_RESPONSE_EOS,
471 490 payload=overall)
472 491
473 492 def createerrorframe(stream, requestid, msg, errtype):
474 493 # TODO properly handle frame size limits.
475 494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
476 495
477 496 payload = b''.join(cborutil.streamencode({
478 497 b'type': errtype,
479 498 b'message': [{b'msg': msg}],
480 499 }))
481 500
482 501 yield stream.makeframe(requestid=requestid,
483 502 typeid=FRAME_TYPE_ERROR_RESPONSE,
484 503 flags=0,
485 504 payload=payload)
486 505
487 506 def createtextoutputframe(stream, requestid, atoms,
488 507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
489 508 """Create a text output frame to render text to people.
490 509
491 510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
492 511
493 512 The formatting string contains ``%s`` tokens to be replaced by the
494 513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
495 514 formatters to be applied at rendering time. In terms of the ``ui``
496 515 class, each atom corresponds to a ``ui.write()``.
497 516 """
498 517 atomdicts = []
499 518
500 519 for (formatting, args, labels) in atoms:
501 520 # TODO look for localstr, other types here?
502 521
503 522 if not isinstance(formatting, bytes):
504 523 raise ValueError('must use bytes formatting strings')
505 524 for arg in args:
506 525 if not isinstance(arg, bytes):
507 526 raise ValueError('must use bytes for arguments')
508 527 for label in labels:
509 528 if not isinstance(label, bytes):
510 529 raise ValueError('must use bytes for labels')
511 530
512 531 # Formatting string must be ASCII.
513 532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
514 533
515 534 # Arguments must be UTF-8.
516 535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
517 536
518 537 # Labels must be ASCII.
519 538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
520 539 for l in labels]
521 540
522 541 atom = {b'msg': formatting}
523 542 if args:
524 543 atom[b'args'] = args
525 544 if labels:
526 545 atom[b'labels'] = labels
527 546
528 547 atomdicts.append(atom)
529 548
530 549 payload = b''.join(cborutil.streamencode(atomdicts))
531 550
532 551 if len(payload) > maxframesize:
533 552 raise ValueError('cannot encode data in a single frame')
534 553
535 554 yield stream.makeframe(requestid=requestid,
536 555 typeid=FRAME_TYPE_TEXT_OUTPUT,
537 556 flags=0,
538 557 payload=payload)
539 558
540 559 class bufferingcommandresponseemitter(object):
541 560 """Helper object to emit command response frames intelligently.
542 561
543 562 Raw command response data is likely emitted in chunks much smaller
544 563 than what can fit in a single frame. This class exists to buffer
545 564 chunks until enough data is available to fit in a single frame.
546 565
547 566 TODO we'll need something like this when compression is supported.
548 567 So it might make sense to implement this functionality at the stream
549 568 level.
550 569 """
551 570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
552 571 self._stream = stream
553 572 self._requestid = requestid
554 573 self._maxsize = maxframesize
555 574 self._chunks = []
556 575 self._chunkssize = 0
557 576
558 577 def send(self, data):
559 578 """Send new data for emission.
560 579
561 580 Is a generator of new frames that were derived from the new input.
562 581
563 582 If the special input ``None`` is received, flushes all buffered
564 583 data to frames.
565 584 """
566 585
567 586 if data is None:
568 587 for frame in self._flush():
569 588 yield frame
570 589 return
571 590
572 591 # There is a ton of potential to do more complicated things here.
573 592 # Our immediate goal is to coalesce small chunks into big frames,
574 593 # not achieve the fewest number of frames possible. So we go with
575 594 # a simple implementation:
576 595 #
577 596 # * If a chunk is too large for a frame, we flush and emit frames
578 597 # for the new chunk.
579 598 # * If a chunk can be buffered without total buffered size limits
580 599 # being exceeded, we do that.
581 600 # * If a chunk causes us to go over our buffering limit, we flush
582 601 # and then buffer the new chunk.
583 602
584 603 if len(data) > self._maxsize:
585 604 for frame in self._flush():
586 605 yield frame
587 606
588 607 # Now emit frames for the big chunk.
589 608 offset = 0
590 609 while True:
591 610 chunk = data[offset:offset + self._maxsize]
592 611 offset += len(chunk)
593 612
594 613 yield self._stream.makeframe(
595 614 self._requestid,
596 615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
597 616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
598 617 payload=chunk)
599 618
600 619 if offset == len(data):
601 620 return
602 621
603 622 # If we don't have enough to constitute a full frame, buffer and
604 623 # return.
605 624 if len(data) + self._chunkssize < self._maxsize:
606 625 self._chunks.append(data)
607 626 self._chunkssize += len(data)
608 627 return
609 628
610 629 # Else flush what we have and buffer the new chunk. We could do
611 630 # something more intelligent here, like break the chunk. Let's
612 631 # keep things simple for now.
613 632 for frame in self._flush():
614 633 yield frame
615 634
616 635 self._chunks.append(data)
617 636 self._chunkssize = len(data)
618 637
619 638 def _flush(self):
620 639 payload = b''.join(self._chunks)
621 640 assert len(payload) <= self._maxsize
622 641
623 642 self._chunks[:] = []
624 643 self._chunkssize = 0
625 644
626 645 yield self._stream.makeframe(
627 646 self._requestid,
628 647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
629 648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
630 649 payload=payload)
631 650
632 651 class stream(object):
633 652 """Represents a logical unidirectional series of frames."""
634 653
635 654 def __init__(self, streamid, active=False):
636 655 self.streamid = streamid
637 656 self._active = active
638 657
639 658 def makeframe(self, requestid, typeid, flags, payload):
640 659 """Create a frame to be sent out over this stream.
641 660
642 661 Only returns the frame instance. Does not actually send it.
643 662 """
644 663 streamflags = 0
645 664 if not self._active:
646 665 streamflags |= STREAM_FLAG_BEGIN_STREAM
647 666 self._active = True
648 667
649 668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
650 669 payload)
651 670
652 671 def ensureserverstream(stream):
653 672 if stream.streamid % 2:
654 673 raise error.ProgrammingError('server should only write to even '
655 674 'numbered streams; %d is not even' %
656 675 stream.streamid)
657 676
658 677 class serverreactor(object):
659 678 """Holds state of a server handling frame-based protocol requests.
660 679
661 680 This class is the "brain" of the unified frame-based protocol server
662 681 component. While the protocol is stateless from the perspective of
663 682 requests/commands, something needs to track which frames have been
664 683 received, what frames to expect, etc. This class is that thing.
665 684
666 685 Instances are modeled as a state machine of sorts. Instances are also
667 686 reactionary to external events. The point of this class is to encapsulate
668 687 the state of the connection and the exchange of frames, not to perform
669 688 work. Instead, callers tell this class when something occurs, like a
670 689 frame arriving. If that activity is worthy of a follow-up action (say
671 690 *run a command*), the return value of that handler will say so.
672 691
673 692 I/O and CPU intensive operations are purposefully delegated outside of
674 693 this class.
675 694
676 695 Consumers are expected to tell instances when events occur. They do so by
677 696 calling the various ``on*`` methods. These methods return a 2-tuple
678 697 describing any follow-up action(s) to take. The first element is the
679 698 name of an action to perform. The second is a data structure (usually
680 699 a dict) specific to that action that contains more information. e.g.
681 700 if the server wants to send frames back to the client, the data structure
682 701 will contain a reference to those frames.
683 702
684 703 Valid actions that consumers can be instructed to take are:
685 704
686 705 sendframes
687 706 Indicates that frames should be sent to the client. The ``framegen``
688 707 key contains a generator of frames that should be sent. The server
689 708 assumes that all frames are sent to the client.
690 709
691 710 error
692 711 Indicates that an error occurred. Consumer should probably abort.
693 712
694 713 runcommand
695 714 Indicates that the consumer should run a wire protocol command. Details
696 715 of the command to run are given in the data structure.
697 716
698 717 wantframe
699 718 Indicates that nothing of interest happened and the server is waiting on
700 719 more frames from the client before anything interesting can be done.
701 720
702 721 noop
703 722 Indicates no additional action is required.
704 723
705 724 Known Issues
706 725 ------------
707 726
708 727 There are no limits to the number of partially received commands or their
709 728 size. A malicious client could stream command request data and exhaust the
710 729 server's memory.
711 730
712 731 Partially received commands are not acted upon when end of input is
713 732 reached. Should the server error if it receives a partial request?
714 733 Should the client send a message to abort a partially transmitted request
715 734 to facilitate graceful shutdown?
716 735
717 736 Active requests that haven't been responded to aren't tracked. This means
718 737 that if we receive a command and instruct its dispatch, another command
719 738 with its request ID can come in over the wire and there will be a race
720 739 between who responds to what.
721 740 """
722 741
723 742 def __init__(self, deferoutput=False):
724 743 """Construct a new server reactor.
725 744
726 745 ``deferoutput`` can be used to indicate that no output frames should be
727 746 instructed to be sent until input has been exhausted. In this mode,
728 747 events that would normally generate output frames (such as a command
729 748 response being ready) will instead defer instructing the consumer to
730 749 send those frames. This is useful for half-duplex transports where the
731 750 sender cannot receive until all data has been transmitted.
732 751 """
733 752 self._deferoutput = deferoutput
734 753 self._state = 'idle'
735 754 self._nextoutgoingstreamid = 2
736 755 self._bufferedframegens = []
737 756 # stream id -> stream instance for all active streams from the client.
738 757 self._incomingstreams = {}
739 758 self._outgoingstreams = {}
740 759 # request id -> dict of commands that are actively being received.
741 760 self._receivingcommands = {}
742 761 # Request IDs that have been received and are actively being processed.
743 762 # Once all output for a request has been sent, it is removed from this
744 763 # set.
745 764 self._activecommands = set()
746 765
747 766 def onframerecv(self, frame):
748 767 """Process a frame that has been received off the wire.
749 768
750 769 Returns a dict with an ``action`` key that details what action,
751 770 if any, the consumer should take next.
752 771 """
753 772 if not frame.streamid % 2:
754 773 self._state = 'errored'
755 774 return self._makeerrorresult(
756 775 _('received frame with even numbered stream ID: %d') %
757 776 frame.streamid)
758 777
759 778 if frame.streamid not in self._incomingstreams:
760 779 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
761 780 self._state = 'errored'
762 781 return self._makeerrorresult(
763 782 _('received frame on unknown inactive stream without '
764 783 'beginning of stream flag set'))
765 784
766 785 self._incomingstreams[frame.streamid] = stream(frame.streamid)
767 786
768 787 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
769 788 # TODO handle decoding frames
770 789 self._state = 'errored'
771 790 raise error.ProgrammingError('support for decoding stream payloads '
772 791 'not yet implemented')
773 792
774 793 if frame.streamflags & STREAM_FLAG_END_STREAM:
775 794 del self._incomingstreams[frame.streamid]
776 795
777 796 handlers = {
778 797 'idle': self._onframeidle,
779 798 'command-receiving': self._onframecommandreceiving,
780 799 'errored': self._onframeerrored,
781 800 }
782 801
783 802 meth = handlers.get(self._state)
784 803 if not meth:
785 804 raise error.ProgrammingError('unhandled state: %s' % self._state)
786 805
787 806 return meth(frame)
788 807
789 808 def oncommandresponseready(self, stream, requestid, data):
790 809 """Signal that a bytes response is ready to be sent to the client.
791 810
792 811 The raw bytes response is passed as an argument.
793 812 """
794 813 ensureserverstream(stream)
795 814
796 815 def sendframes():
797 816 for frame in createcommandresponseframesfrombytes(stream, requestid,
798 817 data):
799 818 yield frame
800 819
801 820 self._activecommands.remove(requestid)
802 821
803 822 result = sendframes()
804 823
805 824 if self._deferoutput:
806 825 self._bufferedframegens.append(result)
807 826 return 'noop', {}
808 827 else:
809 828 return 'sendframes', {
810 829 'framegen': result,
811 830 }
812 831
813 832 def oncommandresponsereadyobjects(self, stream, requestid, objs):
814 833 """Signal that objects are ready to be sent to the client.
815 834
816 835 ``objs`` is an iterable of objects (typically a generator) that will
817 836 be encoded via CBOR and added to frames, which will be sent to the
818 837 client.
819 838 """
820 839 ensureserverstream(stream)
821 840
822 841 # We need to take care over exception handling. Uncaught exceptions
823 842 # when generating frames could lead to premature end of the frame
824 843 # stream and the possibility of the server or client process getting
825 844 # in a bad state.
826 845 #
827 846 # Keep in mind that if ``objs`` is a generator, advancing it could
828 847 # raise exceptions that originated in e.g. wire protocol command
829 848 # functions. That is why we differentiate between exceptions raised
830 849 # when iterating versus other exceptions that occur.
831 850 #
832 851 # In all cases, when the function finishes, the request is fully
833 852 # handled and no new frames for it should be seen.
834 853
835 854 def sendframes():
836 855 emitted = False
837 856 alternatelocationsent = False
838 857 emitter = bufferingcommandresponseemitter(stream, requestid)
839 858 while True:
840 859 try:
841 860 o = next(objs)
842 861 except StopIteration:
843 862 for frame in emitter.send(None):
844 863 yield frame
845 864
846 865 if emitted:
847 866 yield createcommandresponseeosframe(stream, requestid)
848 867 break
849 868
850 869 except error.WireprotoCommandError as e:
851 870 for frame in createcommanderrorresponse(
852 871 stream, requestid, e.message, e.messageargs):
853 872 yield frame
854 873 break
855 874
856 875 except Exception as e:
857 876 for frame in createerrorframe(
858 877 stream, requestid, '%s' % stringutil.forcebytestr(e),
859 878 errtype='server'):
860 879
861 880 yield frame
862 881
863 882 break
864 883
865 884 try:
866 885 # Alternate location responses can only be the first and
867 886 # only object in the output stream.
868 887 if isinstance(o, wireprototypes.alternatelocationresponse):
869 888 if emitted:
870 889 raise error.ProgrammingError(
871 890 'alternatelocationresponse seen after initial '
872 891 'output object')
873 892
874 893 yield createalternatelocationresponseframe(
875 894 stream, requestid, o)
876 895
877 896 alternatelocationsent = True
878 897 emitted = True
879 898 continue
880 899
881 900 if alternatelocationsent:
882 901 raise error.ProgrammingError(
883 902 'object follows alternatelocationresponse')
884 903
885 904 if not emitted:
886 905 yield createcommandresponseokframe(stream, requestid)
887 906 emitted = True
888 907
889 908 # Objects emitted by command functions can be serializable
890 909 # data structures or special types.
891 910 # TODO consider extracting the content normalization to a
892 911 # standalone function, as it may be useful for e.g. cachers.
893 912
894 913 # A pre-encoded object is sent directly to the emitter.
895 914 if isinstance(o, wireprototypes.encodedresponse):
896 915 for frame in emitter.send(o.data):
897 916 yield frame
898 917
899 918 # A regular object is CBOR encoded.
900 919 else:
901 920 for chunk in cborutil.streamencode(o):
902 921 for frame in emitter.send(chunk):
903 922 yield frame
904 923
905 924 except Exception as e:
906 925 for frame in createerrorframe(stream, requestid,
907 926 '%s' % e,
908 927 errtype='server'):
909 928 yield frame
910 929
911 930 break
912 931
913 932 self._activecommands.remove(requestid)
914 933
915 934 return self._handlesendframes(sendframes())
916 935
917 936 def oninputeof(self):
918 937 """Signals that end of input has been received.
919 938
920 939 No more frames will be received. All pending activity should be
921 940 completed.
922 941 """
923 942 # TODO should we do anything about in-flight commands?
924 943
925 944 if not self._deferoutput or not self._bufferedframegens:
926 945 return 'noop', {}
927 946
928 947 # If we buffered all our responses, emit those.
929 948 def makegen():
930 949 for gen in self._bufferedframegens:
931 950 for frame in gen:
932 951 yield frame
933 952
934 953 return 'sendframes', {
935 954 'framegen': makegen(),
936 955 }
937 956
938 957 def _handlesendframes(self, framegen):
939 958 if self._deferoutput:
940 959 self._bufferedframegens.append(framegen)
941 960 return 'noop', {}
942 961 else:
943 962 return 'sendframes', {
944 963 'framegen': framegen,
945 964 }
946 965
947 966 def onservererror(self, stream, requestid, msg):
948 967 ensureserverstream(stream)
949 968
950 969 def sendframes():
951 970 for frame in createerrorframe(stream, requestid, msg,
952 971 errtype='server'):
953 972 yield frame
954 973
955 974 self._activecommands.remove(requestid)
956 975
957 976 return self._handlesendframes(sendframes())
958 977
959 978 def oncommanderror(self, stream, requestid, message, args=None):
960 979 """Called when a command encountered an error before sending output."""
961 980 ensureserverstream(stream)
962 981
963 982 def sendframes():
964 983 for frame in createcommanderrorresponse(stream, requestid, message,
965 984 args):
966 985 yield frame
967 986
968 987 self._activecommands.remove(requestid)
969 988
970 989 return self._handlesendframes(sendframes())
971 990
972 991 def makeoutputstream(self):
973 992 """Create a stream to be used for sending data to the client."""
974 993 streamid = self._nextoutgoingstreamid
975 994 self._nextoutgoingstreamid += 2
976 995
977 996 s = stream(streamid)
978 997 self._outgoingstreams[streamid] = s
979 998
980 999 return s
981 1000
982 1001 def _makeerrorresult(self, msg):
983 1002 return 'error', {
984 1003 'message': msg,
985 1004 }
986 1005
987 1006 def _makeruncommandresult(self, requestid):
988 1007 entry = self._receivingcommands[requestid]
989 1008
990 1009 if not entry['requestdone']:
991 1010 self._state = 'errored'
992 1011 raise error.ProgrammingError('should not be called without '
993 1012 'requestdone set')
994 1013
995 1014 del self._receivingcommands[requestid]
996 1015
997 1016 if self._receivingcommands:
998 1017 self._state = 'command-receiving'
999 1018 else:
1000 1019 self._state = 'idle'
1001 1020
1002 1021 # Decode the payloads as CBOR.
1003 1022 entry['payload'].seek(0)
1004 1023 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1005 1024
1006 1025 if b'name' not in request:
1007 1026 self._state = 'errored'
1008 1027 return self._makeerrorresult(
1009 1028 _('command request missing "name" field'))
1010 1029
1011 1030 if b'args' not in request:
1012 1031 request[b'args'] = {}
1013 1032
1014 1033 assert requestid not in self._activecommands
1015 1034 self._activecommands.add(requestid)
1016 1035
1017 1036 return 'runcommand', {
1018 1037 'requestid': requestid,
1019 1038 'command': request[b'name'],
1020 1039 'args': request[b'args'],
1021 1040 'redirect': request.get(b'redirect'),
1022 1041 'data': entry['data'].getvalue() if entry['data'] else None,
1023 1042 }
1024 1043
1025 1044 def _makewantframeresult(self):
1026 1045 return 'wantframe', {
1027 1046 'state': self._state,
1028 1047 }
1029 1048
1030 1049 def _validatecommandrequestframe(self, frame):
1031 1050 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1032 1051 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1033 1052
1034 1053 if new and continuation:
1035 1054 self._state = 'errored'
1036 1055 return self._makeerrorresult(
1037 1056 _('received command request frame with both new and '
1038 1057 'continuation flags set'))
1039 1058
1040 1059 if not new and not continuation:
1041 1060 self._state = 'errored'
1042 1061 return self._makeerrorresult(
1043 1062 _('received command request frame with neither new nor '
1044 1063 'continuation flags set'))
1045 1064
1046 1065 def _onframeidle(self, frame):
1047 1066 # The only frame type that should be received in this state is a
1048 1067 # command request.
1049 1068 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1050 1069 self._state = 'errored'
1051 1070 return self._makeerrorresult(
1052 1071 _('expected command request frame; got %d') % frame.typeid)
1053 1072
1054 1073 res = self._validatecommandrequestframe(frame)
1055 1074 if res:
1056 1075 return res
1057 1076
1058 1077 if frame.requestid in self._receivingcommands:
1059 1078 self._state = 'errored'
1060 1079 return self._makeerrorresult(
1061 1080 _('request with ID %d already received') % frame.requestid)
1062 1081
1063 1082 if frame.requestid in self._activecommands:
1064 1083 self._state = 'errored'
1065 1084 return self._makeerrorresult(
1066 1085 _('request with ID %d is already active') % frame.requestid)
1067 1086
1068 1087 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1069 1088 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1070 1089 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1071 1090
1072 1091 if not new:
1073 1092 self._state = 'errored'
1074 1093 return self._makeerrorresult(
1075 1094 _('received command request frame without new flag set'))
1076 1095
1077 1096 payload = util.bytesio()
1078 1097 payload.write(frame.payload)
1079 1098
1080 1099 self._receivingcommands[frame.requestid] = {
1081 1100 'payload': payload,
1082 1101 'data': None,
1083 1102 'requestdone': not moreframes,
1084 1103 'expectingdata': bool(expectingdata),
1085 1104 }
1086 1105
1087 1106 # This is the final frame for this request. Dispatch it.
1088 1107 if not moreframes and not expectingdata:
1089 1108 return self._makeruncommandresult(frame.requestid)
1090 1109
1091 1110 assert moreframes or expectingdata
1092 1111 self._state = 'command-receiving'
1093 1112 return self._makewantframeresult()
1094 1113
1095 1114 def _onframecommandreceiving(self, frame):
1096 1115 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1097 1116 # Process new command requests as such.
1098 1117 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1099 1118 return self._onframeidle(frame)
1100 1119
1101 1120 res = self._validatecommandrequestframe(frame)
1102 1121 if res:
1103 1122 return res
1104 1123
1105 1124 # All other frames should be related to a command that is currently
1106 1125 # receiving but is not active.
1107 1126 if frame.requestid in self._activecommands:
1108 1127 self._state = 'errored'
1109 1128 return self._makeerrorresult(
1110 1129 _('received frame for request that is still active: %d') %
1111 1130 frame.requestid)
1112 1131
1113 1132 if frame.requestid not in self._receivingcommands:
1114 1133 self._state = 'errored'
1115 1134 return self._makeerrorresult(
1116 1135 _('received frame for request that is not receiving: %d') %
1117 1136 frame.requestid)
1118 1137
1119 1138 entry = self._receivingcommands[frame.requestid]
1120 1139
1121 1140 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1122 1141 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1123 1142 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1124 1143
1125 1144 if entry['requestdone']:
1126 1145 self._state = 'errored'
1127 1146 return self._makeerrorresult(
1128 1147 _('received command request frame when request frames '
1129 1148 'were supposedly done'))
1130 1149
1131 1150 if expectingdata != entry['expectingdata']:
1132 1151 self._state = 'errored'
1133 1152 return self._makeerrorresult(
1134 1153 _('mismatch between expect data flag and previous frame'))
1135 1154
1136 1155 entry['payload'].write(frame.payload)
1137 1156
1138 1157 if not moreframes:
1139 1158 entry['requestdone'] = True
1140 1159
1141 1160 if not moreframes and not expectingdata:
1142 1161 return self._makeruncommandresult(frame.requestid)
1143 1162
1144 1163 return self._makewantframeresult()
1145 1164
1146 1165 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1147 1166 if not entry['expectingdata']:
1148 1167 self._state = 'errored'
1149 1168 return self._makeerrorresult(_(
1150 1169 'received command data frame for request that is not '
1151 1170 'expecting data: %d') % frame.requestid)
1152 1171
1153 1172 if entry['data'] is None:
1154 1173 entry['data'] = util.bytesio()
1155 1174
1156 1175 return self._handlecommanddataframe(frame, entry)
1157 1176 else:
1158 1177 self._state = 'errored'
1159 1178 return self._makeerrorresult(_(
1160 1179 'received unexpected frame type: %d') % frame.typeid)
1161 1180
1162 1181 def _handlecommanddataframe(self, frame, entry):
1163 1182 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1164 1183
1165 1184 # TODO support streaming data instead of buffering it.
1166 1185 entry['data'].write(frame.payload)
1167 1186
1168 1187 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1169 1188 return self._makewantframeresult()
1170 1189 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1171 1190 entry['data'].seek(0)
1172 1191 return self._makeruncommandresult(frame.requestid)
1173 1192 else:
1174 1193 self._state = 'errored'
1175 1194 return self._makeerrorresult(_('command data frame without '
1176 1195 'flags'))
1177 1196
1178 1197 def _onframeerrored(self, frame):
1179 1198 return self._makeerrorresult(_('server already errored'))
1180 1199
1181 1200 class commandrequest(object):
1182 1201 """Represents a request to run a command."""
1183 1202
1184 1203 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1185 1204 self.requestid = requestid
1186 1205 self.name = name
1187 1206 self.args = args
1188 1207 self.datafh = datafh
1189 1208 self.redirect = redirect
1190 1209 self.state = 'pending'
1191 1210
1192 1211 class clientreactor(object):
1193 1212 """Holds state of a client issuing frame-based protocol requests.
1194 1213
1195 1214 This is like ``serverreactor`` but for client-side state.
1196 1215
1197 1216 Each instance is bound to the lifetime of a connection. For persistent
1198 1217 connection transports using e.g. TCP sockets and speaking the raw
1199 1218 framing protocol, there will be a single instance for the lifetime of
1200 1219 the TCP socket. For transports where there are multiple discrete
1201 1220 interactions (say tunneled within in HTTP request), there will be a
1202 1221 separate instance for each distinct interaction.
1203 1222 """
1204 1223 def __init__(self, hasmultiplesend=False, buffersends=True):
1205 1224 """Create a new instance.
1206 1225
1207 1226 ``hasmultiplesend`` indicates whether multiple sends are supported
1208 1227 by the transport. When True, it is possible to send commands immediately
1209 1228 instead of buffering until the caller signals an intent to finish a
1210 1229 send operation.
1211 1230
1212 1231 ``buffercommands`` indicates whether sends should be buffered until the
1213 1232 last request has been issued.
1214 1233 """
1215 1234 self._hasmultiplesend = hasmultiplesend
1216 1235 self._buffersends = buffersends
1217 1236
1218 1237 self._canissuecommands = True
1219 1238 self._cansend = True
1220 1239
1221 1240 self._nextrequestid = 1
1222 1241 # We only support a single outgoing stream for now.
1223 1242 self._outgoingstream = stream(1)
1224 1243 self._pendingrequests = collections.deque()
1225 1244 self._activerequests = {}
1226 1245 self._incomingstreams = {}
1227 1246
1228 1247 def callcommand(self, name, args, datafh=None, redirect=None):
1229 1248 """Request that a command be executed.
1230 1249
1231 1250 Receives the command name, a dict of arguments to pass to the command,
1232 1251 and an optional file object containing the raw data for the command.
1233 1252
1234 1253 Returns a 3-tuple of (request, action, action data).
1235 1254 """
1236 1255 if not self._canissuecommands:
1237 1256 raise error.ProgrammingError('cannot issue new commands')
1238 1257
1239 1258 requestid = self._nextrequestid
1240 1259 self._nextrequestid += 2
1241 1260
1242 1261 request = commandrequest(requestid, name, args, datafh=datafh,
1243 1262 redirect=redirect)
1244 1263
1245 1264 if self._buffersends:
1246 1265 self._pendingrequests.append(request)
1247 1266 return request, 'noop', {}
1248 1267 else:
1249 1268 if not self._cansend:
1250 1269 raise error.ProgrammingError('sends cannot be performed on '
1251 1270 'this instance')
1252 1271
1253 1272 if not self._hasmultiplesend:
1254 1273 self._cansend = False
1255 1274 self._canissuecommands = False
1256 1275
1257 1276 return request, 'sendframes', {
1258 1277 'framegen': self._makecommandframes(request),
1259 1278 }
1260 1279
1261 1280 def flushcommands(self):
1262 1281 """Request that all queued commands be sent.
1263 1282
1264 1283 If any commands are buffered, this will instruct the caller to send
1265 1284 them over the wire. If no commands are buffered it instructs the client
1266 1285 to no-op.
1267 1286
1268 1287 If instances aren't configured for multiple sends, no new command
1269 1288 requests are allowed after this is called.
1270 1289 """
1271 1290 if not self._pendingrequests:
1272 1291 return 'noop', {}
1273 1292
1274 1293 if not self._cansend:
1275 1294 raise error.ProgrammingError('sends cannot be performed on this '
1276 1295 'instance')
1277 1296
1278 1297 # If the instance only allows sending once, mark that we have fired
1279 1298 # our one shot.
1280 1299 if not self._hasmultiplesend:
1281 1300 self._canissuecommands = False
1282 1301 self._cansend = False
1283 1302
1284 1303 def makeframes():
1285 1304 while self._pendingrequests:
1286 1305 request = self._pendingrequests.popleft()
1287 1306 for frame in self._makecommandframes(request):
1288 1307 yield frame
1289 1308
1290 1309 return 'sendframes', {
1291 1310 'framegen': makeframes(),
1292 1311 }
1293 1312
1294 1313 def _makecommandframes(self, request):
1295 1314 """Emit frames to issue a command request.
1296 1315
1297 1316 As a side-effect, update request accounting to reflect its changed
1298 1317 state.
1299 1318 """
1300 1319 self._activerequests[request.requestid] = request
1301 1320 request.state = 'sending'
1302 1321
1303 1322 res = createcommandframes(self._outgoingstream,
1304 1323 request.requestid,
1305 1324 request.name,
1306 1325 request.args,
1307 1326 datafh=request.datafh,
1308 1327 redirect=request.redirect)
1309 1328
1310 1329 for frame in res:
1311 1330 yield frame
1312 1331
1313 1332 request.state = 'sent'
1314 1333
1315 1334 def onframerecv(self, frame):
1316 1335 """Process a frame that has been received off the wire.
1317 1336
1318 1337 Returns a 2-tuple of (action, meta) describing further action the
1319 1338 caller needs to take as a result of receiving this frame.
1320 1339 """
1321 1340 if frame.streamid % 2:
1322 1341 return 'error', {
1323 1342 'message': (
1324 1343 _('received frame with odd numbered stream ID: %d') %
1325 1344 frame.streamid),
1326 1345 }
1327 1346
1328 1347 if frame.streamid not in self._incomingstreams:
1329 1348 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1330 1349 return 'error', {
1331 1350 'message': _('received frame on unknown stream '
1332 1351 'without beginning of stream flag set'),
1333 1352 }
1334 1353
1335 1354 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1336 1355
1337 1356 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1338 1357 raise error.ProgrammingError('support for decoding stream '
1339 1358 'payloads not yet implemneted')
1340 1359
1341 1360 if frame.streamflags & STREAM_FLAG_END_STREAM:
1342 1361 del self._incomingstreams[frame.streamid]
1343 1362
1344 1363 if frame.requestid not in self._activerequests:
1345 1364 return 'error', {
1346 1365 'message': (_('received frame for inactive request ID: %d') %
1347 1366 frame.requestid),
1348 1367 }
1349 1368
1350 1369 request = self._activerequests[frame.requestid]
1351 1370 request.state = 'receiving'
1352 1371
1353 1372 handlers = {
1354 1373 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1355 1374 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1356 1375 }
1357 1376
1358 1377 meth = handlers.get(frame.typeid)
1359 1378 if not meth:
1360 1379 raise error.ProgrammingError('unhandled frame type: %d' %
1361 1380 frame.typeid)
1362 1381
1363 1382 return meth(request, frame)
1364 1383
1365 1384 def _oncommandresponseframe(self, request, frame):
1366 1385 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1367 1386 request.state = 'received'
1368 1387 del self._activerequests[request.requestid]
1369 1388
1370 1389 return 'responsedata', {
1371 1390 'request': request,
1372 1391 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1373 1392 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1374 1393 'data': frame.payload,
1375 1394 }
1376 1395
1377 1396 def _onerrorresponseframe(self, request, frame):
1378 1397 request.state = 'errored'
1379 1398 del self._activerequests[request.requestid]
1380 1399
1381 1400 # The payload should be a CBOR map.
1382 1401 m = cborutil.decodeall(frame.payload)[0]
1383 1402
1384 1403 return 'error', {
1385 1404 'request': request,
1386 1405 'type': m['type'],
1387 1406 'message': m['message'],
1388 1407 }
General Comments 0
You need to be logged in to leave comments. Login now