##// END OF EJS Templates
wireprotov2: update stream encoding specification...
Gregory Szorc -
r40161:e2fe1074 default
parent child Browse files
Show More
@@ -1,649 +1,740 b''
1 **Experimental and under development**
1 **Experimental and under development**
2
2
3 This document describe's Mercurial's transport-agnostic remote procedure
3 This document describe's Mercurial's transport-agnostic remote procedure
4 call (RPC) protocol which is used to perform interactions with remote
4 call (RPC) protocol which is used to perform interactions with remote
5 servers. This protocol is also referred to as ``hgrpc``.
5 servers. This protocol is also referred to as ``hgrpc``.
6
6
7 The protocol has the following high-level features:
7 The protocol has the following high-level features:
8
8
9 * Concurrent request and response support (multiple commands can be issued
9 * Concurrent request and response support (multiple commands can be issued
10 simultaneously and responses can be streamed simultaneously).
10 simultaneously and responses can be streamed simultaneously).
11 * Supports half-duplex and full-duplex connections.
11 * Supports half-duplex and full-duplex connections.
12 * All data is transmitted within *frames*, which have a well-defined
12 * All data is transmitted within *frames*, which have a well-defined
13 header and encode their length.
13 header and encode their length.
14 * Side-channels for sending progress updates and printing output. Text
14 * Side-channels for sending progress updates and printing output. Text
15 output from the remote can be localized locally.
15 output from the remote can be localized locally.
16 * Support for simultaneous and long-lived compression streams, even across
16 * Support for simultaneous and long-lived compression streams, even across
17 requests.
17 requests.
18 * Uses CBOR for data exchange.
18 * Uses CBOR for data exchange.
19
19
20 The protocol is not specific to Mercurial and could be used by other
20 The protocol is not specific to Mercurial and could be used by other
21 applications.
21 applications.
22
22
23 High-level Overview
23 High-level Overview
24 ===================
24 ===================
25
25
26 To operate the protocol, a bi-directional, half-duplex pipe supporting
26 To operate the protocol, a bi-directional, half-duplex pipe supporting
27 ordered sends and receives is required. That is, each peer has one pipe
27 ordered sends and receives is required. That is, each peer has one pipe
28 for sending data and another for receiving. Full-duplex pipes are also
28 for sending data and another for receiving. Full-duplex pipes are also
29 supported.
29 supported.
30
30
31 All data is read and written in atomic units called *frames*. These
31 All data is read and written in atomic units called *frames*. These
32 are conceptually similar to TCP packets. Higher-level functionality
32 are conceptually similar to TCP packets. Higher-level functionality
33 is built on the exchange and processing of frames.
33 is built on the exchange and processing of frames.
34
34
35 All frames are associated with a *stream*. A *stream* provides a
35 All frames are associated with a *stream*. A *stream* provides a
36 unidirectional grouping of frames. Streams facilitate two goals:
36 unidirectional grouping of frames. Streams facilitate two goals:
37 content encoding and parallelism. There is a dedicated section on
37 content encoding and parallelism. There is a dedicated section on
38 streams below.
38 streams below.
39
39
40 The protocol is request-response based: the client issues requests to
40 The protocol is request-response based: the client issues requests to
41 the server, which issues replies to those requests. Server-initiated
41 the server, which issues replies to those requests. Server-initiated
42 messaging is not currently supported, but this specification carves
42 messaging is not currently supported, but this specification carves
43 out room to implement it.
43 out room to implement it.
44
44
45 All frames are associated with a numbered request. Frames can thus
45 All frames are associated with a numbered request. Frames can thus
46 be logically grouped by their request ID.
46 be logically grouped by their request ID.
47
47
48 Frames
48 Frames
49 ======
49 ======
50
50
51 Frames begin with an 8 octet header followed by a variable length
51 Frames begin with an 8 octet header followed by a variable length
52 payload::
52 payload::
53
53
54 +------------------------------------------------+
54 +------------------------------------------------+
55 | Length (24) |
55 | Length (24) |
56 +--------------------------------+---------------+
56 +--------------------------------+---------------+
57 | Request ID (16) | Stream ID (8) |
57 | Request ID (16) | Stream ID (8) |
58 +------------------+-------------+---------------+
58 +------------------+-------------+---------------+
59 | Stream Flags (8) |
59 | Stream Flags (8) |
60 +-----------+------+
60 +-----------+------+
61 | Type (4) |
61 | Type (4) |
62 +-----------+
62 +-----------+
63 | Flags (4) |
63 | Flags (4) |
64 +===========+===================================================|
64 +===========+===================================================|
65 | Frame Payload (0...) ...
65 | Frame Payload (0...) ...
66 +---------------------------------------------------------------+
66 +---------------------------------------------------------------+
67
67
68 The length of the frame payload is expressed as an unsigned 24 bit
68 The length of the frame payload is expressed as an unsigned 24 bit
69 little endian integer. Values larger than 65535 MUST NOT be used unless
69 little endian integer. Values larger than 65535 MUST NOT be used unless
70 given permission by the server as part of the negotiated capabilities
70 given permission by the server as part of the negotiated capabilities
71 during the handshake. The frame header is not part of the advertised
71 during the handshake. The frame header is not part of the advertised
72 frame length. The payload length is the over-the-wire length. If there
72 frame length. The payload length is the over-the-wire length. If there
73 is content encoding applied to the payload as part of the frame's stream,
73 is content encoding applied to the payload as part of the frame's stream,
74 the length is the output of that content encoding, not the input.
74 the length is the output of that content encoding, not the input.
75
75
76 The 16-bit ``Request ID`` field denotes the integer request identifier,
76 The 16-bit ``Request ID`` field denotes the integer request identifier,
77 stored as an unsigned little endian integer. Odd numbered requests are
77 stored as an unsigned little endian integer. Odd numbered requests are
78 client-initiated. Even numbered requests are server-initiated. This
78 client-initiated. Even numbered requests are server-initiated. This
79 refers to where the *request* was initiated - not where the *frame* was
79 refers to where the *request* was initiated - not where the *frame* was
80 initiated, so servers will send frames with odd ``Request ID`` in
80 initiated, so servers will send frames with odd ``Request ID`` in
81 response to client-initiated requests. Implementations are advised to
81 response to client-initiated requests. Implementations are advised to
82 start ordering request identifiers at ``1`` and ``0``, increment by
82 start ordering request identifiers at ``1`` and ``0``, increment by
83 ``2``, and wrap around if all available numbers have been exhausted.
83 ``2``, and wrap around if all available numbers have been exhausted.
84
84
85 The 8-bit ``Stream ID`` field denotes the stream that the frame is
85 The 8-bit ``Stream ID`` field denotes the stream that the frame is
86 associated with. Frames belonging to a stream may have content
86 associated with. Frames belonging to a stream may have content
87 encoding applied and the receiver may need to decode the raw frame
87 encoding applied and the receiver may need to decode the raw frame
88 payload to obtain the original data. Odd numbered IDs are
88 payload to obtain the original data. Odd numbered IDs are
89 client-initiated. Even numbered IDs are server-initiated.
89 client-initiated. Even numbered IDs are server-initiated.
90
90
91 The 8-bit ``Stream Flags`` field defines stream processing semantics.
91 The 8-bit ``Stream Flags`` field defines stream processing semantics.
92 See the section on streams below.
92 See the section on streams below.
93
93
94 The 4-bit ``Type`` field denotes the type of frame being sent.
94 The 4-bit ``Type`` field denotes the type of frame being sent.
95
95
96 The 4-bit ``Flags`` field defines special, per-type attributes for
96 The 4-bit ``Flags`` field defines special, per-type attributes for
97 the frame.
97 the frame.
98
98
99 The sections below define the frame types and their behavior.
99 The sections below define the frame types and their behavior.
100
100
101 Command Request (``0x01``)
101 Command Request (``0x01``)
102 --------------------------
102 --------------------------
103
103
104 This frame contains a request to run a command.
104 This frame contains a request to run a command.
105
105
106 The payload consists of a CBOR map defining the command request. The
106 The payload consists of a CBOR map defining the command request. The
107 bytestring keys of that map are:
107 bytestring keys of that map are:
108
108
109 name
109 name
110 Name of the command that should be executed (bytestring).
110 Name of the command that should be executed (bytestring).
111 args
111 args
112 Map of bytestring keys to various value types containing the named
112 Map of bytestring keys to various value types containing the named
113 arguments to this command.
113 arguments to this command.
114
114
115 Each command defines its own set of argument names and their expected
115 Each command defines its own set of argument names and their expected
116 types.
116 types.
117
117
118 redirect (optional)
118 redirect (optional)
119 (map) Advertises client support for following response *redirects*.
119 (map) Advertises client support for following response *redirects*.
120
120
121 This map has the following bytestring keys:
121 This map has the following bytestring keys:
122
122
123 targets
123 targets
124 (array of bytestring) List of named redirect targets supported by
124 (array of bytestring) List of named redirect targets supported by
125 this client. The names come from the targets advertised by the
125 this client. The names come from the targets advertised by the
126 server's *capabilities* message.
126 server's *capabilities* message.
127
127
128 hashes
128 hashes
129 (array of bytestring) List of preferred hashing algorithms that can
129 (array of bytestring) List of preferred hashing algorithms that can
130 be used for content integrity verification.
130 be used for content integrity verification.
131
131
132 See the *Content Redirects* section below for more on content redirects.
132 See the *Content Redirects* section below for more on content redirects.
133
133
134 This frame type MUST ONLY be sent from clients to servers: it is illegal
134 This frame type MUST ONLY be sent from clients to servers: it is illegal
135 for a server to send this frame to a client.
135 for a server to send this frame to a client.
136
136
137 The following flag values are defined for this type:
137 The following flag values are defined for this type:
138
138
139 0x01
139 0x01
140 New command request. When set, this frame represents the beginning
140 New command request. When set, this frame represents the beginning
141 of a new request to run a command. The ``Request ID`` attached to this
141 of a new request to run a command. The ``Request ID`` attached to this
142 frame MUST NOT be active.
142 frame MUST NOT be active.
143 0x02
143 0x02
144 Command request continuation. When set, this frame is a continuation
144 Command request continuation. When set, this frame is a continuation
145 from a previous command request frame for its ``Request ID``. This
145 from a previous command request frame for its ``Request ID``. This
146 flag is set when the CBOR data for a command request does not fit
146 flag is set when the CBOR data for a command request does not fit
147 in a single frame.
147 in a single frame.
148 0x04
148 0x04
149 Additional frames expected. When set, the command request didn't fit
149 Additional frames expected. When set, the command request didn't fit
150 into a single frame and additional CBOR data follows in a subsequent
150 into a single frame and additional CBOR data follows in a subsequent
151 frame.
151 frame.
152 0x08
152 0x08
153 Command data frames expected. When set, command data frames are
153 Command data frames expected. When set, command data frames are
154 expected to follow the final command request frame for this request.
154 expected to follow the final command request frame for this request.
155
155
156 ``0x01`` MUST be set on the initial command request frame for a
156 ``0x01`` MUST be set on the initial command request frame for a
157 ``Request ID``.
157 ``Request ID``.
158
158
159 ``0x01`` or ``0x02`` MUST be set to indicate this frame's role in
159 ``0x01`` or ``0x02`` MUST be set to indicate this frame's role in
160 a series of command request frames.
160 a series of command request frames.
161
161
162 If command data frames are to be sent, ``0x08`` MUST be set on ALL
162 If command data frames are to be sent, ``0x08`` MUST be set on ALL
163 command request frames.
163 command request frames.
164
164
165 Command Data (``0x02``)
165 Command Data (``0x02``)
166 -----------------------
166 -----------------------
167
167
168 This frame contains raw data for a command.
168 This frame contains raw data for a command.
169
169
170 Most commands can be executed by specifying arguments. However,
170 Most commands can be executed by specifying arguments. However,
171 arguments have an upper bound to their length. For commands that
171 arguments have an upper bound to their length. For commands that
172 accept data that is beyond this length or whose length isn't known
172 accept data that is beyond this length or whose length isn't known
173 when the command is initially sent, they will need to stream
173 when the command is initially sent, they will need to stream
174 arbitrary data to the server. This frame type facilitates the sending
174 arbitrary data to the server. This frame type facilitates the sending
175 of this data.
175 of this data.
176
176
177 The payload of this frame type consists of a stream of raw data to be
177 The payload of this frame type consists of a stream of raw data to be
178 consumed by the command handler on the server. The format of the data
178 consumed by the command handler on the server. The format of the data
179 is command specific.
179 is command specific.
180
180
181 The following flag values are defined for this type:
181 The following flag values are defined for this type:
182
182
183 0x01
183 0x01
184 Command data continuation. When set, the data for this command
184 Command data continuation. When set, the data for this command
185 continues into a subsequent frame.
185 continues into a subsequent frame.
186
186
187 0x02
187 0x02
188 End of data. When set, command data has been fully sent to the
188 End of data. When set, command data has been fully sent to the
189 server. The command has been fully issued and no new data for this
189 server. The command has been fully issued and no new data for this
190 command will be sent. The next frame will belong to a new command.
190 command will be sent. The next frame will belong to a new command.
191
191
192 Command Response Data (``0x03``)
192 Command Response Data (``0x03``)
193 --------------------------------
193 --------------------------------
194
194
195 This frame contains response data to an issued command.
195 This frame contains response data to an issued command.
196
196
197 Response data ALWAYS consists of a series of 1 or more CBOR encoded
197 Response data ALWAYS consists of a series of 1 or more CBOR encoded
198 values. A CBOR value may be using indefinite length encoding. And the
198 values. A CBOR value may be using indefinite length encoding. And the
199 bytes constituting the value may span several frames.
199 bytes constituting the value may span several frames.
200
200
201 The following flag values are defined for this type:
201 The following flag values are defined for this type:
202
202
203 0x01
203 0x01
204 Data continuation. When set, an additional frame containing response data
204 Data continuation. When set, an additional frame containing response data
205 will follow.
205 will follow.
206 0x02
206 0x02
207 End of data. When set, the response data has been fully sent and
207 End of data. When set, the response data has been fully sent and
208 no additional frames for this response will be sent.
208 no additional frames for this response will be sent.
209
209
210 The ``0x01`` flag is mutually exclusive with the ``0x02`` flag.
210 The ``0x01`` flag is mutually exclusive with the ``0x02`` flag.
211
211
212 Error Occurred (``0x05``)
212 Error Occurred (``0x05``)
213 -------------------------
213 -------------------------
214
214
215 Some kind of error occurred.
215 Some kind of error occurred.
216
216
217 There are 3 general kinds of failures that can occur:
217 There are 3 general kinds of failures that can occur:
218
218
219 * Command error encountered before any response issued
219 * Command error encountered before any response issued
220 * Command error encountered after a response was issued
220 * Command error encountered after a response was issued
221 * Protocol or stream level error
221 * Protocol or stream level error
222
222
223 This frame type is used to capture the latter cases. (The general
223 This frame type is used to capture the latter cases. (The general
224 command error case is handled by the leading CBOR map in
224 command error case is handled by the leading CBOR map in
225 ``Command Response`` frames.)
225 ``Command Response`` frames.)
226
226
227 The payload of this frame contains a CBOR map detailing the error. That
227 The payload of this frame contains a CBOR map detailing the error. That
228 map has the following bytestring keys:
228 map has the following bytestring keys:
229
229
230 type
230 type
231 (bytestring) The overall type of error encountered. Can be one of the
231 (bytestring) The overall type of error encountered. Can be one of the
232 following values:
232 following values:
233
233
234 protocol
234 protocol
235 A protocol-level error occurred. This typically means someone
235 A protocol-level error occurred. This typically means someone
236 is violating the framing protocol semantics and the server is
236 is violating the framing protocol semantics and the server is
237 refusing to proceed.
237 refusing to proceed.
238
238
239 server
239 server
240 A server-level error occurred. This typically indicates some kind of
240 A server-level error occurred. This typically indicates some kind of
241 logic error on the server, likely the fault of the server.
241 logic error on the server, likely the fault of the server.
242
242
243 command
243 command
244 A command-level error, likely the fault of the client.
244 A command-level error, likely the fault of the client.
245
245
246 message
246 message
247 (array of maps) A richly formatted message that is intended for
247 (array of maps) A richly formatted message that is intended for
248 human consumption. See the ``Human Output Side-Channel`` frame
248 human consumption. See the ``Human Output Side-Channel`` frame
249 section for a description of the format of this data structure.
249 section for a description of the format of this data structure.
250
250
251 Human Output Side-Channel (``0x06``)
251 Human Output Side-Channel (``0x06``)
252 ------------------------------------
252 ------------------------------------
253
253
254 This frame contains a message that is intended to be displayed to
254 This frame contains a message that is intended to be displayed to
255 people. Whereas most frames communicate machine readable data, this
255 people. Whereas most frames communicate machine readable data, this
256 frame communicates textual data that is intended to be shown to
256 frame communicates textual data that is intended to be shown to
257 humans.
257 humans.
258
258
259 The frame consists of a series of *formatting requests*. Each formatting
259 The frame consists of a series of *formatting requests*. Each formatting
260 request consists of a formatting string, arguments for that formatting
260 request consists of a formatting string, arguments for that formatting
261 string, and labels to apply to that formatting string.
261 string, and labels to apply to that formatting string.
262
262
263 A formatting string is a printf()-like string that allows variable
263 A formatting string is a printf()-like string that allows variable
264 substitution within the string. Labels allow the rendered text to be
264 substitution within the string. Labels allow the rendered text to be
265 *decorated*. Assuming use of the canonical Mercurial code base, a
265 *decorated*. Assuming use of the canonical Mercurial code base, a
266 formatting string can be the input to the ``i18n._`` function. This
266 formatting string can be the input to the ``i18n._`` function. This
267 allows messages emitted from the server to be localized. So even if
267 allows messages emitted from the server to be localized. So even if
268 the server has different i18n settings, people could see messages in
268 the server has different i18n settings, people could see messages in
269 their *native* settings. Similarly, the use of labels allows
269 their *native* settings. Similarly, the use of labels allows
270 decorations like coloring and underlining to be applied using the
270 decorations like coloring and underlining to be applied using the
271 client's configured rendering settings.
271 client's configured rendering settings.
272
272
273 Formatting strings are similar to ``printf()`` strings or how
273 Formatting strings are similar to ``printf()`` strings or how
274 Python's ``%`` operator works. The only supported formatting sequences
274 Python's ``%`` operator works. The only supported formatting sequences
275 are ``%s`` and ``%%``. ``%s`` will be replaced by whatever the string
275 are ``%s`` and ``%%``. ``%s`` will be replaced by whatever the string
276 at that position resolves to. ``%%`` will be replaced by ``%``. All
276 at that position resolves to. ``%%`` will be replaced by ``%``. All
277 other 2-byte sequences beginning with ``%`` represent a literal
277 other 2-byte sequences beginning with ``%`` represent a literal
278 ``%`` followed by that character. However, future versions of the
278 ``%`` followed by that character. However, future versions of the
279 wire protocol reserve the right to allow clients to opt in to receiving
279 wire protocol reserve the right to allow clients to opt in to receiving
280 formatting strings with additional formatters, hence why ``%%`` is
280 formatting strings with additional formatters, hence why ``%%`` is
281 required to represent the literal ``%``.
281 required to represent the literal ``%``.
282
282
283 The frame payload consists of a CBOR array of CBOR maps. Each map
283 The frame payload consists of a CBOR array of CBOR maps. Each map
284 defines an *atom* of text data to print. Each *atom* has the following
284 defines an *atom* of text data to print. Each *atom* has the following
285 bytestring keys:
285 bytestring keys:
286
286
287 msg
287 msg
288 (bytestring) The formatting string. Content MUST be ASCII.
288 (bytestring) The formatting string. Content MUST be ASCII.
289 args (optional)
289 args (optional)
290 Array of bytestrings defining arguments to the formatting string.
290 Array of bytestrings defining arguments to the formatting string.
291 labels (optional)
291 labels (optional)
292 Array of bytestrings defining labels to apply to this atom.
292 Array of bytestrings defining labels to apply to this atom.
293
293
294 All data to be printed MUST be encoded into a single frame: this frame
294 All data to be printed MUST be encoded into a single frame: this frame
295 does not support spanning data across multiple frames.
295 does not support spanning data across multiple frames.
296
296
297 All textual data encoded in these frames is assumed to be line delimited.
297 All textual data encoded in these frames is assumed to be line delimited.
298 The last atom in the frame SHOULD end with a newline (``\n``). If it
298 The last atom in the frame SHOULD end with a newline (``\n``). If it
299 doesn't, clients MAY add a newline to facilitate immediate printing.
299 doesn't, clients MAY add a newline to facilitate immediate printing.
300
300
301 Progress Update (``0x07``)
301 Progress Update (``0x07``)
302 --------------------------
302 --------------------------
303
303
304 This frame holds the progress of an operation on the peer. Consumption
304 This frame holds the progress of an operation on the peer. Consumption
305 of these frames allows clients to display progress bars, estimated
305 of these frames allows clients to display progress bars, estimated
306 completion times, etc.
306 completion times, etc.
307
307
308 Each frame defines the progress of a single operation on the peer. The
308 Each frame defines the progress of a single operation on the peer. The
309 payload consists of a CBOR map with the following bytestring keys:
309 payload consists of a CBOR map with the following bytestring keys:
310
310
311 topic
311 topic
312 Topic name (string)
312 Topic name (string)
313 pos
313 pos
314 Current numeric position within the topic (integer)
314 Current numeric position within the topic (integer)
315 total
315 total
316 Total/end numeric position of this topic (unsigned integer)
316 Total/end numeric position of this topic (unsigned integer)
317 label (optional)
317 label (optional)
318 Unit label (string)
318 Unit label (string)
319 item (optional)
319 item (optional)
320 Item name (string)
320 Item name (string)
321
321
322 Progress state is created when a frame is received referencing a
322 Progress state is created when a frame is received referencing a
323 *topic* that isn't currently tracked. Progress tracking for that
323 *topic* that isn't currently tracked. Progress tracking for that
324 *topic* is finished when a frame is received reporting the current
324 *topic* is finished when a frame is received reporting the current
325 position of that topic as ``-1``.
325 position of that topic as ``-1``.
326
326
327 Multiple *topics* may be active at any given time.
327 Multiple *topics* may be active at any given time.
328
328
329 Rendering of progress information is not mandated or governed by this
329 Rendering of progress information is not mandated or governed by this
330 specification: implementations MAY render progress information however
330 specification: implementations MAY render progress information however
331 they see fit, including not at all.
331 they see fit, including not at all.
332
332
333 The string data describing the topic SHOULD be static strings to
333 The string data describing the topic SHOULD be static strings to
334 facilitate receivers localizing that string data. The emitter
334 facilitate receivers localizing that string data. The emitter
335 MUST normalize all string data to valid UTF-8 and receivers SHOULD
335 MUST normalize all string data to valid UTF-8 and receivers SHOULD
336 validate that received data conforms to UTF-8. The topic name
336 validate that received data conforms to UTF-8. The topic name
337 SHOULD be ASCII.
337 SHOULD be ASCII.
338
338
339 Stream Encoding Settings (``0x08``)
339 Sender Protocol Settings (``0x08``)
340 -----------------------------------
341
342 This frame type advertises the sender's support for various protocol and
343 stream level features. The data advertised in this frame is used to influence
344 subsequent behavior of the current frame exchange channel.
345
346 The frame payload consists of a CBOR map. It may contain the following
347 bytestring keys:
348
349 contentencodings
350 (array of bytestring) A list of content encodings supported by the
351 sender, in order of most to least preferred.
352
353 Peers are allowed to encode stream data using any of the listed
354 encodings.
355
356 See the ``Content Encoding Profiles`` section for an enumeration
357 of supported content encodings.
358
359 If not defined, the value is assumed to be a list with the single value
360 ``identity``, meaning only the no-op encoding is supported.
361
362 Senders MAY filter the set of advertised encodings against what it
363 knows the receiver supports (e.g. if the receiver advertised encodings
364 via the capabilities descriptor). However, doing so will prevent
365 servers from gaining an understanding of the aggregate capabilities
366 of clients. So clients are discouraged from doing so.
367
368 When this frame is not sent/received, the receiver assumes default values
369 for all keys.
370
371 If encountered, this frame type MUST be sent before any other frame type
372 in a channel.
373
374 The following flag values are defined for this frame type:
375
376 0x01
377 Data continuation. When set, an additional frame containing more protocol
378 settings immediately follows.
379 0x02
380 End of data. When set, the protocol settings data has been completely
381 sent.
382
383 The ``0x01`` flag is mutually exclusive with the ``0x02`` flag.
384
385 Stream Encoding Settings (``0x09``)
340 -----------------------------------
386 -----------------------------------
341
387
342 This frame type holds information defining the content encoding
388 This frame type holds information defining the content encoding
343 settings for a *stream*.
389 settings for a *stream*.
344
390
345 This frame type is likely consumed by the protocol layer and is not
391 This frame type is likely consumed by the protocol layer and is not
346 passed on to applications.
392 passed on to applications.
347
393
348 This frame type MUST ONLY occur on frames having the *Beginning of Stream*
394 This frame type MUST ONLY occur on frames having the *Beginning of Stream*
349 ``Stream Flag`` set.
395 ``Stream Flag`` set.
350
396
351 The payload of this frame defines what content encoding has (possibly)
397 The payload of this frame defines what content encoding has (possibly)
352 been applied to the payloads of subsequent frames in this stream.
398 been applied to the payloads of subsequent frames in this stream.
353
399
354 The payload begins with an 8-bit integer defining the length of the
400 The payload consists of a series of CBOR values. The first value is a
355 encoding *profile*, followed by the string name of that profile, which
401 bytestring denoting the content encoding profile of the data in this
356 must be an ASCII string. All bytes that follow can be used by that
402 stream. Subsequent CBOR values supplement this simple value in a
357 profile for supplemental settings definitions. See the section below
403 profile-specific manner. See the ``Content Encoding Profiles`` section
358 on defined encoding profiles.
404 for more.
405
406 In the absence of this frame on a stream, it is assumed the stream is
407 using the ``identity`` content encoding.
408
409 The following flag values are defined for this frame type:
410
411 0x01
412 Data continuation. When set, an additional frame containing more encoding
413 settings immediately follows.
414 0x02
415 End of data. When set, the encoding settings data has been completely
416 sent.
417
418 The ``0x01`` flag is mutually exclusive with the ``0x02`` flag.
359
419
360 Stream States and Flags
420 Stream States and Flags
361 =======================
421 =======================
362
422
363 Streams can be in two states: *open* and *closed*. An *open* stream
423 Streams can be in two states: *open* and *closed*. An *open* stream
364 is active and frames attached to that stream could arrive at any time.
424 is active and frames attached to that stream could arrive at any time.
365 A *closed* stream is not active. If a frame attached to a *closed*
425 A *closed* stream is not active. If a frame attached to a *closed*
366 stream arrives, that frame MUST have an appropriate stream flag
426 stream arrives, that frame MUST have an appropriate stream flag
367 set indicating beginning of stream. All streams are in the *closed*
427 set indicating beginning of stream. All streams are in the *closed*
368 state by default.
428 state by default.
369
429
370 The ``Stream Flags`` field denotes a set of bit flags for defining
430 The ``Stream Flags`` field denotes a set of bit flags for defining
371 the relationship of this frame within a stream. The following flags
431 the relationship of this frame within a stream. The following flags
372 are defined:
432 are defined:
373
433
374 0x01
434 0x01
375 Beginning of stream. The first frame in the stream MUST set this
435 Beginning of stream. The first frame in the stream MUST set this
376 flag. When received, the ``Stream ID`` this frame is attached to
436 flag. When received, the ``Stream ID`` this frame is attached to
377 becomes ``open``.
437 becomes ``open``.
378
438
379 0x02
439 0x02
380 End of stream. The last frame in a stream MUST set this flag. When
440 End of stream. The last frame in a stream MUST set this flag. When
381 received, the ``Stream ID`` this frame is attached to becomes
441 received, the ``Stream ID`` this frame is attached to becomes
382 ``closed``. Any content encoding context associated with this stream
442 ``closed``. Any content encoding context associated with this stream
383 can be destroyed after processing the payload of this frame.
443 can be destroyed after processing the payload of this frame.
384
444
385 0x04
445 0x04
386 Apply content encoding. When set, any content encoding settings
446 Apply content encoding. When set, any content encoding settings
387 defined by the stream should be applied when attempting to read
447 defined by the stream should be applied when attempting to read
388 the frame. When not set, the frame payload isn't encoded.
448 the frame. When not set, the frame payload isn't encoded.
389
449
450 TODO consider making stream opening and closing communicated via
451 explicit frame types (e.g. a "stream state change" frame) rather than
452 flags on all frames. This would make stream state changes more explicit,
453 as they could only occur on specific frame types.
454
390 Streams
455 Streams
391 =======
456 =======
392
457
393 Streams - along with ``Request IDs`` - facilitate grouping of frames.
458 Streams - along with ``Request IDs`` - facilitate grouping of frames.
394 But the purpose of each is quite different and the groupings they
459 But the purpose of each is quite different and the groupings they
395 constitute are independent.
460 constitute are independent.
396
461
397 A ``Request ID`` is essentially a tag. It tells you which logical
462 A ``Request ID`` is essentially a tag. It tells you which logical
398 request a frame is associated with.
463 request a frame is associated with.
399
464
400 A *stream* is a sequence of frames grouped for the express purpose
465 A *stream* is a sequence of frames grouped for the express purpose
401 of applying a stateful encoding or for denoting sub-groups of frames.
466 of applying a stateful encoding or for denoting sub-groups of frames.
402
467
403 Unlike ``Request ID``s which span the request and response, a stream
468 Unlike ``Request ID``s which span the request and response, a stream
404 is unidirectional and stream IDs are independent from client to
469 is unidirectional and stream IDs are independent from client to
405 server.
470 server.
406
471
407 There is no strict hierarchical relationship between ``Request IDs``
472 There is no strict hierarchical relationship between ``Request IDs``
408 and *streams*. A stream can contain frames having multiple
473 and *streams*. A stream can contain frames having multiple
409 ``Request IDs``. Frames belonging to the same ``Request ID`` can
474 ``Request IDs``. Frames belonging to the same ``Request ID`` can
410 span multiple streams.
475 span multiple streams.
411
476
412 One goal of streams is to facilitate content encoding. A stream can
477 One goal of streams is to facilitate content encoding. A stream can
413 define an encoding to be applied to frame payloads. For example, the
478 define an encoding to be applied to frame payloads. For example, the
414 payload transmitted over the wire may contain output from a
479 payload transmitted over the wire may contain output from a
415 zstandard compression operation and the receiving end may decompress
480 zstandard compression operation and the receiving end may decompress
416 that payload to obtain the original data.
481 that payload to obtain the original data.
417
482
418 The other goal of streams is to facilitate concurrent execution. For
483 The other goal of streams is to facilitate concurrent execution. For
419 example, a server could spawn 4 threads to service a request that can
484 example, a server could spawn 4 threads to service a request that can
420 be easily parallelized. Each of those 4 threads could write into its
485 be easily parallelized. Each of those 4 threads could write into its
421 own stream. Those streams could then in turn be delivered to 4 threads
486 own stream. Those streams could then in turn be delivered to 4 threads
422 on the receiving end, with each thread consuming its stream in near
487 on the receiving end, with each thread consuming its stream in near
423 isolation. The *main* thread on both ends merely does I/O and
488 isolation. The *main* thread on both ends merely does I/O and
424 encodes/decodes frame headers: the bulk of the work is done by worker
489 encodes/decodes frame headers: the bulk of the work is done by worker
425 threads.
490 threads.
426
491
427 In addition, since content encoding is defined per stream, each
492 In addition, since content encoding is defined per stream, each
428 *worker thread* could perform potentially CPU bound work concurrently
493 *worker thread* could perform potentially CPU bound work concurrently
429 with other threads. This approach of applying encoding at the
494 with other threads. This approach of applying encoding at the
430 sub-protocol / stream level eliminates a potential resource constraint
495 sub-protocol / stream level eliminates a potential resource constraint
431 on the protocol stream as a whole (it is common for the throughput of
496 on the protocol stream as a whole (it is common for the throughput of
432 a compression engine to be smaller than the throughput of a network).
497 a compression engine to be smaller than the throughput of a network).
433
498
434 Having multiple streams - each with their own encoding settings - also
499 Having multiple streams - each with their own encoding settings - also
435 facilitates the use of advanced data compression techniques. For
500 facilitates the use of advanced data compression techniques. For
436 example, a transmitter could see that it is generating data faster
501 example, a transmitter could see that it is generating data faster
437 and slower than the receiving end is consuming it and adjust its
502 and slower than the receiving end is consuming it and adjust its
438 compression settings to trade CPU for compression ratio accordingly.
503 compression settings to trade CPU for compression ratio accordingly.
439
504
440 While streams can define a content encoding, not all frames within
505 While streams can define a content encoding, not all frames within
441 that stream must use that content encoding. This can be useful when
506 that stream must use that content encoding. This can be useful when
442 data is being served from caches and being derived dynamically. A
507 data is being served from caches and being derived dynamically. A
443 cache could pre-compressed data so the server doesn't have to
508 cache could pre-compressed data so the server doesn't have to
444 recompress it. The ability to pick and choose which frames are
509 recompress it. The ability to pick and choose which frames are
445 compressed allows servers to easily send data to the wire without
510 compressed allows servers to easily send data to the wire without
446 involving potentially expensive encoding overhead.
511 involving potentially expensive encoding overhead.
447
512
448 Content Encoding Profiles
513 Content Encoding Profiles
449 =========================
514 =========================
450
515
451 Streams can have named content encoding *profiles* associated with
516 Streams can have named content encoding *profiles* associated with
452 them. A profile defines a shared understanding of content encoding
517 them. A profile defines a shared understanding of content encoding
453 settings and behavior.
518 settings and behavior.
454
519
455 The following profiles are defined:
520 Profiles are described in the following sections.
521
522 identity
523 --------
524
525 The ``identity`` profile is a no-op encoding: the encoded bytes are
526 exactly the input bytes.
527
528 This profile MUST be supported by all peers.
529
530 In the absence of an identified profile, the ``identity`` profile is
531 assumed.
456
532
457 TBD
533 zstd-8mb
534 --------
535
536 Zstandard encoding (RFC 8478). Zstandard is a fast and effective lossless
537 compression format.
538
539 This profile allows decompressor window sizes of up to 8 MB.
540
541 zlib
542 ----
543
544 zlib compressed data (RFC 1950). zlib is a widely-used and supported
545 lossless compression format.
546
547 It isn't as fast as zstandard and it is recommended to use zstandard instead,
548 if possible.
458
549
459 Command Protocol
550 Command Protocol
460 ================
551 ================
461
552
462 A client can request that a remote run a command by sending it
553 A client can request that a remote run a command by sending it
463 frames defining that command. This logical stream is composed of
554 frames defining that command. This logical stream is composed of
464 1 or more ``Command Request`` frames and and 0 or more ``Command Data``
555 1 or more ``Command Request`` frames and and 0 or more ``Command Data``
465 frames.
556 frames.
466
557
467 All frames composing a single command request MUST be associated with
558 All frames composing a single command request MUST be associated with
468 the same ``Request ID``.
559 the same ``Request ID``.
469
560
470 Clients MAY send additional command requests without waiting on the
561 Clients MAY send additional command requests without waiting on the
471 response to a previous command request. If they do so, they MUST ensure
562 response to a previous command request. If they do so, they MUST ensure
472 that the ``Request ID`` field of outbound frames does not conflict
563 that the ``Request ID`` field of outbound frames does not conflict
473 with that of an active ``Request ID`` whose response has not yet been
564 with that of an active ``Request ID`` whose response has not yet been
474 fully received.
565 fully received.
475
566
476 Servers MAY respond to commands in a different order than they were
567 Servers MAY respond to commands in a different order than they were
477 sent over the wire. Clients MUST be prepared to deal with this. Servers
568 sent over the wire. Clients MUST be prepared to deal with this. Servers
478 also MAY start executing commands in a different order than they were
569 also MAY start executing commands in a different order than they were
479 received, or MAY execute multiple commands concurrently.
570 received, or MAY execute multiple commands concurrently.
480
571
481 If there is a dependency between commands or a race condition between
572 If there is a dependency between commands or a race condition between
482 commands executing (e.g. a read-only command that depends on the results
573 commands executing (e.g. a read-only command that depends on the results
483 of a command that mutates the repository), then clients MUST NOT send
574 of a command that mutates the repository), then clients MUST NOT send
484 frames issuing a command until a response to all dependent commands has
575 frames issuing a command until a response to all dependent commands has
485 been received.
576 been received.
486 TODO think about whether we should express dependencies between commands
577 TODO think about whether we should express dependencies between commands
487 to avoid roundtrip latency.
578 to avoid roundtrip latency.
488
579
489 A command is defined by a command name, 0 or more command arguments,
580 A command is defined by a command name, 0 or more command arguments,
490 and optional command data.
581 and optional command data.
491
582
492 Arguments are the recommended mechanism for transferring fixed sets of
583 Arguments are the recommended mechanism for transferring fixed sets of
493 parameters to a command. Data is appropriate for transferring variable
584 parameters to a command. Data is appropriate for transferring variable
494 data. Thinking in terms of HTTP, arguments would be headers and data
585 data. Thinking in terms of HTTP, arguments would be headers and data
495 would be the message body.
586 would be the message body.
496
587
497 It is recommended for servers to delay the dispatch of a command
588 It is recommended for servers to delay the dispatch of a command
498 until all argument have been received. Servers MAY impose limits on the
589 until all argument have been received. Servers MAY impose limits on the
499 maximum argument size.
590 maximum argument size.
500 TODO define failure mechanism.
591 TODO define failure mechanism.
501
592
502 Servers MAY dispatch to commands immediately once argument data
593 Servers MAY dispatch to commands immediately once argument data
503 is available or delay until command data is received in full.
594 is available or delay until command data is received in full.
504
595
505 Once a ``Command Request`` frame is sent, a client must be prepared to
596 Once a ``Command Request`` frame is sent, a client must be prepared to
506 receive any of the following frames associated with that request:
597 receive any of the following frames associated with that request:
507 ``Command Response``, ``Error Response``, ``Human Output Side-Channel``,
598 ``Command Response``, ``Error Response``, ``Human Output Side-Channel``,
508 ``Progress Update``.
599 ``Progress Update``.
509
600
510 The *main* response for a command will be in ``Command Response`` frames.
601 The *main* response for a command will be in ``Command Response`` frames.
511 The payloads of these frames consist of 1 or more CBOR encoded values.
602 The payloads of these frames consist of 1 or more CBOR encoded values.
512 The first CBOR value on the first ``Command Response`` frame is special
603 The first CBOR value on the first ``Command Response`` frame is special
513 and denotes the overall status of the command. This CBOR map contains
604 and denotes the overall status of the command. This CBOR map contains
514 the following bytestring keys:
605 the following bytestring keys:
515
606
516 status
607 status
517 (bytestring) A well-defined message containing the overall status of
608 (bytestring) A well-defined message containing the overall status of
518 this command request. The following values are defined:
609 this command request. The following values are defined:
519
610
520 ok
611 ok
521 The command was received successfully and its response follows.
612 The command was received successfully and its response follows.
522 error
613 error
523 There was an error processing the command. More details about the
614 There was an error processing the command. More details about the
524 error are encoded in the ``error`` key.
615 error are encoded in the ``error`` key.
525 redirect
616 redirect
526 The response for this command is available elsewhere. Details on
617 The response for this command is available elsewhere. Details on
527 where are in the ``location`` key.
618 where are in the ``location`` key.
528
619
529 error (optional)
620 error (optional)
530 A map containing information about an encountered error. The map has the
621 A map containing information about an encountered error. The map has the
531 following keys:
622 following keys:
532
623
533 message
624 message
534 (array of maps) A message describing the error. The message uses the
625 (array of maps) A message describing the error. The message uses the
535 same format as those in the ``Human Output Side-Channel`` frame.
626 same format as those in the ``Human Output Side-Channel`` frame.
536
627
537 location (optional)
628 location (optional)
538 (map) Presence indicates that a *content redirect* has occurred. The map
629 (map) Presence indicates that a *content redirect* has occurred. The map
539 provides the external location of the content.
630 provides the external location of the content.
540
631
541 This map contains the following bytestring keys:
632 This map contains the following bytestring keys:
542
633
543 url
634 url
544 (bytestring) URL from which this content may be requested.
635 (bytestring) URL from which this content may be requested.
545
636
546 mediatype
637 mediatype
547 (bytestring) The media type for the fetched content. e.g.
638 (bytestring) The media type for the fetched content. e.g.
548 ``application/mercurial-*``.
639 ``application/mercurial-*``.
549
640
550 In some transports, this value is also advertised by the transport.
641 In some transports, this value is also advertised by the transport.
551 e.g. as the ``Content-Type`` HTTP header.
642 e.g. as the ``Content-Type`` HTTP header.
552
643
553 size (optional)
644 size (optional)
554 (unsigned integer) Total size of remote object in bytes. This is
645 (unsigned integer) Total size of remote object in bytes. This is
555 the raw size of the entity that will be fetched, minus any
646 the raw size of the entity that will be fetched, minus any
556 non-Mercurial protocol encoding (e.g. HTTP content or transfer
647 non-Mercurial protocol encoding (e.g. HTTP content or transfer
557 encoding.)
648 encoding.)
558
649
559 fullhashes (optional)
650 fullhashes (optional)
560 (array of arrays) Content hashes for the entire payload. Each entry
651 (array of arrays) Content hashes for the entire payload. Each entry
561 is an array of bytestrings containing the hash name and the hash value.
652 is an array of bytestrings containing the hash name and the hash value.
562
653
563 fullhashseed (optional)
654 fullhashseed (optional)
564 (bytestring) Optional seed value to feed into hasher for full content
655 (bytestring) Optional seed value to feed into hasher for full content
565 hash verification.
656 hash verification.
566
657
567 serverdercerts (optional)
658 serverdercerts (optional)
568 (array of bytestring) DER encoded x509 certificates for the server. When
659 (array of bytestring) DER encoded x509 certificates for the server. When
569 defined, clients MAY validate that the x509 certificate on the target
660 defined, clients MAY validate that the x509 certificate on the target
570 server exactly matches the certificate used here.
661 server exactly matches the certificate used here.
571
662
572 servercadercerts (optional)
663 servercadercerts (optional)
573 (array of bytestring) DER encoded x509 certificates for the certificate
664 (array of bytestring) DER encoded x509 certificates for the certificate
574 authority of the target server. When defined, clients MAY validate that
665 authority of the target server. When defined, clients MAY validate that
575 the x509 on the target server was signed by CA certificate in this set.
666 the x509 on the target server was signed by CA certificate in this set.
576
667
577 # TODO support for giving client an x509 certificate pair to be used as a
668 # TODO support for giving client an x509 certificate pair to be used as a
578 # client certificate.
669 # client certificate.
579
670
580 # TODO support common authentication mechanisms (e.g. HTTP basic/digest
671 # TODO support common authentication mechanisms (e.g. HTTP basic/digest
581 # auth).
672 # auth).
582
673
583 # TODO support custom authentication mechanisms. This likely requires
674 # TODO support custom authentication mechanisms. This likely requires
584 # server to advertise required auth mechanism so client can filter.
675 # server to advertise required auth mechanism so client can filter.
585
676
586 # TODO support chained hashes. e.g. hash for each 1MB segment so client
677 # TODO support chained hashes. e.g. hash for each 1MB segment so client
587 # can iteratively validate data without having to consume all of it first.
678 # can iteratively validate data without having to consume all of it first.
588
679
589 TODO formalize when error frames can be seen and how errors can be
680 TODO formalize when error frames can be seen and how errors can be
590 recognized midway through a command response.
681 recognized midway through a command response.
591
682
592 Content Redirects
683 Content Redirects
593 =================
684 =================
594
685
595 Servers have the ability to respond to ANY command request with a
686 Servers have the ability to respond to ANY command request with a
596 *redirect* to another location. Such a response is referred to as a *redirect
687 *redirect* to another location. Such a response is referred to as a *redirect
597 response*. (This feature is conceptually similar to HTTP redirects, but is
688 response*. (This feature is conceptually similar to HTTP redirects, but is
598 more powerful.)
689 more powerful.)
599
690
600 A *redirect response* MUST ONLY be issued if the client advertises support
691 A *redirect response* MUST ONLY be issued if the client advertises support
601 for a redirect *target*.
692 for a redirect *target*.
602
693
603 A *redirect response* MUST NOT be issued unless the client advertises support
694 A *redirect response* MUST NOT be issued unless the client advertises support
604 for one.
695 for one.
605
696
606 Clients advertise support for *redirect responses* after looking at the server's
697 Clients advertise support for *redirect responses* after looking at the server's
607 *capabilities* data, which is fetched during initial server connection
698 *capabilities* data, which is fetched during initial server connection
608 handshake. The server's capabilities data advertises named *targets* for
699 handshake. The server's capabilities data advertises named *targets* for
609 potential redirects.
700 potential redirects.
610
701
611 Each target is described by a protocol name, connection and protocol features,
702 Each target is described by a protocol name, connection and protocol features,
612 etc. The server also advertises target-agnostic redirect settings, such as
703 etc. The server also advertises target-agnostic redirect settings, such as
613 which hash algorithms are supported for content integrity checking. (See
704 which hash algorithms are supported for content integrity checking. (See
614 the documentation for the *capabilities* command for more.)
705 the documentation for the *capabilities* command for more.)
615
706
616 Clients examine the set of advertised redirect targets for compatibility.
707 Clients examine the set of advertised redirect targets for compatibility.
617 When sending a command request, the client advertises the set of redirect
708 When sending a command request, the client advertises the set of redirect
618 target names it is willing to follow, along with some other settings influencing
709 target names it is willing to follow, along with some other settings influencing
619 behavior.
710 behavior.
620
711
621 For example, say the server is advertising a ``cdn`` redirect target that
712 For example, say the server is advertising a ``cdn`` redirect target that
622 requires SNI and TLS 1.2. If the client supports those features, it will
713 requires SNI and TLS 1.2. If the client supports those features, it will
623 send command requests stating that the ``cdn`` target is acceptable to use.
714 send command requests stating that the ``cdn`` target is acceptable to use.
624 But if the client doesn't support SNI or TLS 1.2 (or maybe it encountered an
715 But if the client doesn't support SNI or TLS 1.2 (or maybe it encountered an
625 error using this target from a previous request), then it omits this target
716 error using this target from a previous request), then it omits this target
626 name.
717 name.
627
718
628 If the client advertises support for a redirect target, the server MAY
719 If the client advertises support for a redirect target, the server MAY
629 substitute the normal, inline response data for a *redirect response* -
720 substitute the normal, inline response data for a *redirect response* -
630 one where the initial CBOR map has a ``status`` key with value ``redirect``.
721 one where the initial CBOR map has a ``status`` key with value ``redirect``.
631
722
632 The *redirect response* at a minimum advertises the URL where the response
723 The *redirect response* at a minimum advertises the URL where the response
633 can be retrieved.
724 can be retrieved.
634
725
635 The *redirect response* MAY also advertise additional details about that
726 The *redirect response* MAY also advertise additional details about that
636 content and how to retrieve it. Notably, the response may contain the
727 content and how to retrieve it. Notably, the response may contain the
637 x509 public certificates for the server being redirected to or the
728 x509 public certificates for the server being redirected to or the
638 certificate authority that signed that server's certificate. Unless the
729 certificate authority that signed that server's certificate. Unless the
639 client has existing settings that offer stronger trust validation than what
730 client has existing settings that offer stronger trust validation than what
640 the server advertises, the client SHOULD use the server-provided certificates
731 the server advertises, the client SHOULD use the server-provided certificates
641 when validating the connection to the remote server in place of any default
732 when validating the connection to the remote server in place of any default
642 connection verification checks. This is because certificates coming from
733 connection verification checks. This is because certificates coming from
643 the server SHOULD establish a stronger chain of trust than what the default
734 the server SHOULD establish a stronger chain of trust than what the default
644 certification validation mechanism in most environments provides. (By default,
735 certification validation mechanism in most environments provides. (By default,
645 certificate validation ensures the signer of the cert chains up to a set of
736 certificate validation ensures the signer of the cert chains up to a set of
646 trusted root certificates. And if an explicit certificate or CA certificate
737 trusted root certificates. And if an explicit certificate or CA certificate
647 is presented, that greadly reduces the set of certificates that will be
738 is presented, that greadly reduces the set of certificates that will be
648 recognized as valid, thus reducing the potential for a "bad" certificate
739 recognized as valid, thus reducing the potential for a "bad" certificate
649 to be used and trusted.)
740 to be used and trusted.)
@@ -1,1388 +1,1407 b''
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import collections
15 import struct
15 import struct
16
16
17 from .i18n import _
17 from .i18n import _
18 from .thirdparty import (
18 from .thirdparty import (
19 attr,
19 attr,
20 )
20 )
21 from . import (
21 from . import (
22 encoding,
22 encoding,
23 error,
23 error,
24 pycompat,
24 pycompat,
25 util,
25 util,
26 wireprototypes,
26 wireprototypes,
27 )
27 )
28 from .utils import (
28 from .utils import (
29 cborutil,
29 cborutil,
30 stringutil,
30 stringutil,
31 )
31 )
32
32
33 FRAME_HEADER_SIZE = 8
33 FRAME_HEADER_SIZE = 8
34 DEFAULT_MAX_FRAME_SIZE = 32768
34 DEFAULT_MAX_FRAME_SIZE = 32768
35
35
36 STREAM_FLAG_BEGIN_STREAM = 0x01
36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 STREAM_FLAG_END_STREAM = 0x02
37 STREAM_FLAG_END_STREAM = 0x02
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39
39
40 STREAM_FLAGS = {
40 STREAM_FLAGS = {
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
42 b'stream-end': STREAM_FLAG_END_STREAM,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 }
44 }
45
45
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 FRAME_TYPE_COMMAND_DATA = 0x02
47 FRAME_TYPE_COMMAND_DATA = 0x02
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 FRAME_TYPE_PROGRESS = 0x07
51 FRAME_TYPE_PROGRESS = 0x07
52 FRAME_TYPE_STREAM_SETTINGS = 0x08
52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 FRAME_TYPE_STREAM_SETTINGS = 0x09
53
54
54 FRAME_TYPES = {
55 FRAME_TYPES = {
55 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
56 b'command-data': FRAME_TYPE_COMMAND_DATA,
57 b'command-data': FRAME_TYPE_COMMAND_DATA,
57 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
58 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
59 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
60 b'progress': FRAME_TYPE_PROGRESS,
61 b'progress': FRAME_TYPE_PROGRESS,
62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
61 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
62 }
64 }
63
65
64 FLAG_COMMAND_REQUEST_NEW = 0x01
66 FLAG_COMMAND_REQUEST_NEW = 0x01
65 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
66 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
67 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
68
70
69 FLAGS_COMMAND_REQUEST = {
71 FLAGS_COMMAND_REQUEST = {
70 b'new': FLAG_COMMAND_REQUEST_NEW,
72 b'new': FLAG_COMMAND_REQUEST_NEW,
71 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
72 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
73 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
74 }
76 }
75
77
76 FLAG_COMMAND_DATA_CONTINUATION = 0x01
78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
77 FLAG_COMMAND_DATA_EOS = 0x02
79 FLAG_COMMAND_DATA_EOS = 0x02
78
80
79 FLAGS_COMMAND_DATA = {
81 FLAGS_COMMAND_DATA = {
80 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
81 b'eos': FLAG_COMMAND_DATA_EOS,
83 b'eos': FLAG_COMMAND_DATA_EOS,
82 }
84 }
83
85
84 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
85 FLAG_COMMAND_RESPONSE_EOS = 0x02
87 FLAG_COMMAND_RESPONSE_EOS = 0x02
86
88
87 FLAGS_COMMAND_RESPONSE = {
89 FLAGS_COMMAND_RESPONSE = {
88 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
89 b'eos': FLAG_COMMAND_RESPONSE_EOS,
91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
90 }
92 }
91
93
94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96
97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 }
101
102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104
105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 }
109
92 # Maps frame types to their available flags.
110 # Maps frame types to their available flags.
93 FRAME_TYPE_FLAGS = {
111 FRAME_TYPE_FLAGS = {
94 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
95 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
96 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
97 FRAME_TYPE_ERROR_RESPONSE: {},
115 FRAME_TYPE_ERROR_RESPONSE: {},
98 FRAME_TYPE_TEXT_OUTPUT: {},
116 FRAME_TYPE_TEXT_OUTPUT: {},
99 FRAME_TYPE_PROGRESS: {},
117 FRAME_TYPE_PROGRESS: {},
100 FRAME_TYPE_STREAM_SETTINGS: {},
118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
101 }
120 }
102
121
103 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
104
123
105 def humanflags(mapping, value):
124 def humanflags(mapping, value):
106 """Convert a numeric flags value to a human value, using a mapping table."""
125 """Convert a numeric flags value to a human value, using a mapping table."""
107 namemap = {v: k for k, v in mapping.iteritems()}
126 namemap = {v: k for k, v in mapping.iteritems()}
108 flags = []
127 flags = []
109 val = 1
128 val = 1
110 while value >= val:
129 while value >= val:
111 if value & val:
130 if value & val:
112 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
113 val <<= 1
132 val <<= 1
114
133
115 return b'|'.join(flags)
134 return b'|'.join(flags)
116
135
117 @attr.s(slots=True)
136 @attr.s(slots=True)
118 class frameheader(object):
137 class frameheader(object):
119 """Represents the data in a frame header."""
138 """Represents the data in a frame header."""
120
139
121 length = attr.ib()
140 length = attr.ib()
122 requestid = attr.ib()
141 requestid = attr.ib()
123 streamid = attr.ib()
142 streamid = attr.ib()
124 streamflags = attr.ib()
143 streamflags = attr.ib()
125 typeid = attr.ib()
144 typeid = attr.ib()
126 flags = attr.ib()
145 flags = attr.ib()
127
146
128 @attr.s(slots=True, repr=False)
147 @attr.s(slots=True, repr=False)
129 class frame(object):
148 class frame(object):
130 """Represents a parsed frame."""
149 """Represents a parsed frame."""
131
150
132 requestid = attr.ib()
151 requestid = attr.ib()
133 streamid = attr.ib()
152 streamid = attr.ib()
134 streamflags = attr.ib()
153 streamflags = attr.ib()
135 typeid = attr.ib()
154 typeid = attr.ib()
136 flags = attr.ib()
155 flags = attr.ib()
137 payload = attr.ib()
156 payload = attr.ib()
138
157
139 @encoding.strmethod
158 @encoding.strmethod
140 def __repr__(self):
159 def __repr__(self):
141 typename = '<unknown 0x%02x>' % self.typeid
160 typename = '<unknown 0x%02x>' % self.typeid
142 for name, value in FRAME_TYPES.iteritems():
161 for name, value in FRAME_TYPES.iteritems():
143 if value == self.typeid:
162 if value == self.typeid:
144 typename = name
163 typename = name
145 break
164 break
146
165
147 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
148 'type=%s; flags=%s)' % (
167 'type=%s; flags=%s)' % (
149 len(self.payload), self.requestid, self.streamid,
168 len(self.payload), self.requestid, self.streamid,
150 humanflags(STREAM_FLAGS, self.streamflags), typename,
169 humanflags(STREAM_FLAGS, self.streamflags), typename,
151 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
152
171
153 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
154 """Assemble a frame into a byte array."""
173 """Assemble a frame into a byte array."""
155 # TODO assert size of payload.
174 # TODO assert size of payload.
156 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
157
176
158 # 24 bits length
177 # 24 bits length
159 # 16 bits request id
178 # 16 bits request id
160 # 8 bits stream id
179 # 8 bits stream id
161 # 8 bits stream flags
180 # 8 bits stream flags
162 # 4 bits type
181 # 4 bits type
163 # 4 bits flags
182 # 4 bits flags
164
183
165 l = struct.pack(r'<I', len(payload))
184 l = struct.pack(r'<I', len(payload))
166 frame[0:3] = l[0:3]
185 frame[0:3] = l[0:3]
167 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
168 frame[7] = (typeid << 4) | flags
187 frame[7] = (typeid << 4) | flags
169 frame[8:] = payload
188 frame[8:] = payload
170
189
171 return frame
190 return frame
172
191
173 def makeframefromhumanstring(s):
192 def makeframefromhumanstring(s):
174 """Create a frame from a human readable string
193 """Create a frame from a human readable string
175
194
176 Strings have the form:
195 Strings have the form:
177
196
178 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
179
198
180 This can be used by user-facing applications and tests for creating
199 This can be used by user-facing applications and tests for creating
181 frames easily without having to type out a bunch of constants.
200 frames easily without having to type out a bunch of constants.
182
201
183 Request ID and stream IDs are integers.
202 Request ID and stream IDs are integers.
184
203
185 Stream flags, frame type, and flags can be specified by integer or
204 Stream flags, frame type, and flags can be specified by integer or
186 named constant.
205 named constant.
187
206
188 Flags can be delimited by `|` to bitwise OR them together.
207 Flags can be delimited by `|` to bitwise OR them together.
189
208
190 If the payload begins with ``cbor:``, the following string will be
209 If the payload begins with ``cbor:``, the following string will be
191 evaluated as Python literal and the resulting object will be fed into
210 evaluated as Python literal and the resulting object will be fed into
192 a CBOR encoder. Otherwise, the payload is interpreted as a Python
211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
193 byte string literal.
212 byte string literal.
194 """
213 """
195 fields = s.split(b' ', 5)
214 fields = s.split(b' ', 5)
196 requestid, streamid, streamflags, frametype, frameflags, payload = fields
215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
197
216
198 requestid = int(requestid)
217 requestid = int(requestid)
199 streamid = int(streamid)
218 streamid = int(streamid)
200
219
201 finalstreamflags = 0
220 finalstreamflags = 0
202 for flag in streamflags.split(b'|'):
221 for flag in streamflags.split(b'|'):
203 if flag in STREAM_FLAGS:
222 if flag in STREAM_FLAGS:
204 finalstreamflags |= STREAM_FLAGS[flag]
223 finalstreamflags |= STREAM_FLAGS[flag]
205 else:
224 else:
206 finalstreamflags |= int(flag)
225 finalstreamflags |= int(flag)
207
226
208 if frametype in FRAME_TYPES:
227 if frametype in FRAME_TYPES:
209 frametype = FRAME_TYPES[frametype]
228 frametype = FRAME_TYPES[frametype]
210 else:
229 else:
211 frametype = int(frametype)
230 frametype = int(frametype)
212
231
213 finalflags = 0
232 finalflags = 0
214 validflags = FRAME_TYPE_FLAGS[frametype]
233 validflags = FRAME_TYPE_FLAGS[frametype]
215 for flag in frameflags.split(b'|'):
234 for flag in frameflags.split(b'|'):
216 if flag in validflags:
235 if flag in validflags:
217 finalflags |= validflags[flag]
236 finalflags |= validflags[flag]
218 else:
237 else:
219 finalflags |= int(flag)
238 finalflags |= int(flag)
220
239
221 if payload.startswith(b'cbor:'):
240 if payload.startswith(b'cbor:'):
222 payload = b''.join(cborutil.streamencode(
241 payload = b''.join(cborutil.streamencode(
223 stringutil.evalpythonliteral(payload[5:])))
242 stringutil.evalpythonliteral(payload[5:])))
224
243
225 else:
244 else:
226 payload = stringutil.unescapestr(payload)
245 payload = stringutil.unescapestr(payload)
227
246
228 return makeframe(requestid=requestid, streamid=streamid,
247 return makeframe(requestid=requestid, streamid=streamid,
229 streamflags=finalstreamflags, typeid=frametype,
248 streamflags=finalstreamflags, typeid=frametype,
230 flags=finalflags, payload=payload)
249 flags=finalflags, payload=payload)
231
250
232 def parseheader(data):
251 def parseheader(data):
233 """Parse a unified framing protocol frame header from a buffer.
252 """Parse a unified framing protocol frame header from a buffer.
234
253
235 The header is expected to be in the buffer at offset 0 and the
254 The header is expected to be in the buffer at offset 0 and the
236 buffer is expected to be large enough to hold a full header.
255 buffer is expected to be large enough to hold a full header.
237 """
256 """
238 # 24 bits payload length (little endian)
257 # 24 bits payload length (little endian)
239 # 16 bits request ID
258 # 16 bits request ID
240 # 8 bits stream ID
259 # 8 bits stream ID
241 # 8 bits stream flags
260 # 8 bits stream flags
242 # 4 bits frame type
261 # 4 bits frame type
243 # 4 bits frame flags
262 # 4 bits frame flags
244 # ... payload
263 # ... payload
245 framelength = data[0] + 256 * data[1] + 16384 * data[2]
264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
246 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
247 typeflags = data[7]
266 typeflags = data[7]
248
267
249 frametype = (typeflags & 0xf0) >> 4
268 frametype = (typeflags & 0xf0) >> 4
250 frameflags = typeflags & 0x0f
269 frameflags = typeflags & 0x0f
251
270
252 return frameheader(framelength, requestid, streamid, streamflags,
271 return frameheader(framelength, requestid, streamid, streamflags,
253 frametype, frameflags)
272 frametype, frameflags)
254
273
255 def readframe(fh):
274 def readframe(fh):
256 """Read a unified framing protocol frame from a file object.
275 """Read a unified framing protocol frame from a file object.
257
276
258 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
259 None if no frame is available. May raise if a malformed frame is
278 None if no frame is available. May raise if a malformed frame is
260 seen.
279 seen.
261 """
280 """
262 header = bytearray(FRAME_HEADER_SIZE)
281 header = bytearray(FRAME_HEADER_SIZE)
263
282
264 readcount = fh.readinto(header)
283 readcount = fh.readinto(header)
265
284
266 if readcount == 0:
285 if readcount == 0:
267 return None
286 return None
268
287
269 if readcount != FRAME_HEADER_SIZE:
288 if readcount != FRAME_HEADER_SIZE:
270 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
271 (readcount, header))
290 (readcount, header))
272
291
273 h = parseheader(header)
292 h = parseheader(header)
274
293
275 payload = fh.read(h.length)
294 payload = fh.read(h.length)
276 if len(payload) != h.length:
295 if len(payload) != h.length:
277 raise error.Abort(_('frame length error: expected %d; got %d') %
296 raise error.Abort(_('frame length error: expected %d; got %d') %
278 (h.length, len(payload)))
297 (h.length, len(payload)))
279
298
280 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
281 payload)
300 payload)
282
301
283 def createcommandframes(stream, requestid, cmd, args, datafh=None,
302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
284 maxframesize=DEFAULT_MAX_FRAME_SIZE,
303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
285 redirect=None):
304 redirect=None):
286 """Create frames necessary to transmit a request to run a command.
305 """Create frames necessary to transmit a request to run a command.
287
306
288 This is a generator of bytearrays. Each item represents a frame
307 This is a generator of bytearrays. Each item represents a frame
289 ready to be sent over the wire to a peer.
308 ready to be sent over the wire to a peer.
290 """
309 """
291 data = {b'name': cmd}
310 data = {b'name': cmd}
292 if args:
311 if args:
293 data[b'args'] = args
312 data[b'args'] = args
294
313
295 if redirect:
314 if redirect:
296 data[b'redirect'] = redirect
315 data[b'redirect'] = redirect
297
316
298 data = b''.join(cborutil.streamencode(data))
317 data = b''.join(cborutil.streamencode(data))
299
318
300 offset = 0
319 offset = 0
301
320
302 while True:
321 while True:
303 flags = 0
322 flags = 0
304
323
305 # Must set new or continuation flag.
324 # Must set new or continuation flag.
306 if not offset:
325 if not offset:
307 flags |= FLAG_COMMAND_REQUEST_NEW
326 flags |= FLAG_COMMAND_REQUEST_NEW
308 else:
327 else:
309 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
310
329
311 # Data frames is set on all frames.
330 # Data frames is set on all frames.
312 if datafh:
331 if datafh:
313 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
314
333
315 payload = data[offset:offset + maxframesize]
334 payload = data[offset:offset + maxframesize]
316 offset += len(payload)
335 offset += len(payload)
317
336
318 if len(payload) == maxframesize and offset < len(data):
337 if len(payload) == maxframesize and offset < len(data):
319 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
320
339
321 yield stream.makeframe(requestid=requestid,
340 yield stream.makeframe(requestid=requestid,
322 typeid=FRAME_TYPE_COMMAND_REQUEST,
341 typeid=FRAME_TYPE_COMMAND_REQUEST,
323 flags=flags,
342 flags=flags,
324 payload=payload)
343 payload=payload)
325
344
326 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
327 break
346 break
328
347
329 if datafh:
348 if datafh:
330 while True:
349 while True:
331 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
332
351
333 done = False
352 done = False
334 if len(data) == DEFAULT_MAX_FRAME_SIZE:
353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
335 flags = FLAG_COMMAND_DATA_CONTINUATION
354 flags = FLAG_COMMAND_DATA_CONTINUATION
336 else:
355 else:
337 flags = FLAG_COMMAND_DATA_EOS
356 flags = FLAG_COMMAND_DATA_EOS
338 assert datafh.read(1) == b''
357 assert datafh.read(1) == b''
339 done = True
358 done = True
340
359
341 yield stream.makeframe(requestid=requestid,
360 yield stream.makeframe(requestid=requestid,
342 typeid=FRAME_TYPE_COMMAND_DATA,
361 typeid=FRAME_TYPE_COMMAND_DATA,
343 flags=flags,
362 flags=flags,
344 payload=data)
363 payload=data)
345
364
346 if done:
365 if done:
347 break
366 break
348
367
349 def createcommandresponseframesfrombytes(stream, requestid, data,
368 def createcommandresponseframesfrombytes(stream, requestid, data,
350 maxframesize=DEFAULT_MAX_FRAME_SIZE):
369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
351 """Create a raw frame to send a bytes response from static bytes input.
370 """Create a raw frame to send a bytes response from static bytes input.
352
371
353 Returns a generator of bytearrays.
372 Returns a generator of bytearrays.
354 """
373 """
355 # Automatically send the overall CBOR response map.
374 # Automatically send the overall CBOR response map.
356 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
357 if len(overall) > maxframesize:
376 if len(overall) > maxframesize:
358 raise error.ProgrammingError('not yet implemented')
377 raise error.ProgrammingError('not yet implemented')
359
378
360 # Simple case where we can fit the full response in a single frame.
379 # Simple case where we can fit the full response in a single frame.
361 if len(overall) + len(data) <= maxframesize:
380 if len(overall) + len(data) <= maxframesize:
362 flags = FLAG_COMMAND_RESPONSE_EOS
381 flags = FLAG_COMMAND_RESPONSE_EOS
363 yield stream.makeframe(requestid=requestid,
382 yield stream.makeframe(requestid=requestid,
364 typeid=FRAME_TYPE_COMMAND_RESPONSE,
383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
365 flags=flags,
384 flags=flags,
366 payload=overall + data)
385 payload=overall + data)
367 return
386 return
368
387
369 # It's easier to send the overall CBOR map in its own frame than to track
388 # It's easier to send the overall CBOR map in its own frame than to track
370 # offsets.
389 # offsets.
371 yield stream.makeframe(requestid=requestid,
390 yield stream.makeframe(requestid=requestid,
372 typeid=FRAME_TYPE_COMMAND_RESPONSE,
391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
373 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
374 payload=overall)
393 payload=overall)
375
394
376 offset = 0
395 offset = 0
377 while True:
396 while True:
378 chunk = data[offset:offset + maxframesize]
397 chunk = data[offset:offset + maxframesize]
379 offset += len(chunk)
398 offset += len(chunk)
380 done = offset == len(data)
399 done = offset == len(data)
381
400
382 if done:
401 if done:
383 flags = FLAG_COMMAND_RESPONSE_EOS
402 flags = FLAG_COMMAND_RESPONSE_EOS
384 else:
403 else:
385 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
386
405
387 yield stream.makeframe(requestid=requestid,
406 yield stream.makeframe(requestid=requestid,
388 typeid=FRAME_TYPE_COMMAND_RESPONSE,
407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
389 flags=flags,
408 flags=flags,
390 payload=chunk)
409 payload=chunk)
391
410
392 if done:
411 if done:
393 break
412 break
394
413
395 def createbytesresponseframesfromgen(stream, requestid, gen,
414 def createbytesresponseframesfromgen(stream, requestid, gen,
396 maxframesize=DEFAULT_MAX_FRAME_SIZE):
415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
397 """Generator of frames from a generator of byte chunks.
416 """Generator of frames from a generator of byte chunks.
398
417
399 This assumes that another frame will follow whatever this emits. i.e.
418 This assumes that another frame will follow whatever this emits. i.e.
400 this always emits the continuation flag and never emits the end-of-stream
419 this always emits the continuation flag and never emits the end-of-stream
401 flag.
420 flag.
402 """
421 """
403 cb = util.chunkbuffer(gen)
422 cb = util.chunkbuffer(gen)
404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405
424
406 while True:
425 while True:
407 chunk = cb.read(maxframesize)
426 chunk = cb.read(maxframesize)
408 if not chunk:
427 if not chunk:
409 break
428 break
410
429
411 yield stream.makeframe(requestid=requestid,
430 yield stream.makeframe(requestid=requestid,
412 typeid=FRAME_TYPE_COMMAND_RESPONSE,
431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
413 flags=flags,
432 flags=flags,
414 payload=chunk)
433 payload=chunk)
415
434
416 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
417
436
418 def createcommandresponseokframe(stream, requestid):
437 def createcommandresponseokframe(stream, requestid):
419 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
420
439
421 return stream.makeframe(requestid=requestid,
440 return stream.makeframe(requestid=requestid,
422 typeid=FRAME_TYPE_COMMAND_RESPONSE,
441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
423 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
424 payload=overall)
443 payload=overall)
425
444
426 def createcommandresponseeosframe(stream, requestid):
445 def createcommandresponseeosframe(stream, requestid):
427 """Create an empty payload frame representing command end-of-stream."""
446 """Create an empty payload frame representing command end-of-stream."""
428 return stream.makeframe(requestid=requestid,
447 return stream.makeframe(requestid=requestid,
429 typeid=FRAME_TYPE_COMMAND_RESPONSE,
448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
430 flags=FLAG_COMMAND_RESPONSE_EOS,
449 flags=FLAG_COMMAND_RESPONSE_EOS,
431 payload=b'')
450 payload=b'')
432
451
433 def createalternatelocationresponseframe(stream, requestid, location):
452 def createalternatelocationresponseframe(stream, requestid, location):
434 data = {
453 data = {
435 b'status': b'redirect',
454 b'status': b'redirect',
436 b'location': {
455 b'location': {
437 b'url': location.url,
456 b'url': location.url,
438 b'mediatype': location.mediatype,
457 b'mediatype': location.mediatype,
439 }
458 }
440 }
459 }
441
460
442 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
443 r'servercadercerts'):
462 r'servercadercerts'):
444 value = getattr(location, a)
463 value = getattr(location, a)
445 if value is not None:
464 if value is not None:
446 data[b'location'][pycompat.bytestr(a)] = value
465 data[b'location'][pycompat.bytestr(a)] = value
447
466
448 return stream.makeframe(requestid=requestid,
467 return stream.makeframe(requestid=requestid,
449 typeid=FRAME_TYPE_COMMAND_RESPONSE,
468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
450 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
451 payload=b''.join(cborutil.streamencode(data)))
470 payload=b''.join(cborutil.streamencode(data)))
452
471
453 def createcommanderrorresponse(stream, requestid, message, args=None):
472 def createcommanderrorresponse(stream, requestid, message, args=None):
454 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
455 # formatting works consistently?
474 # formatting works consistently?
456 m = {
475 m = {
457 b'status': b'error',
476 b'status': b'error',
458 b'error': {
477 b'error': {
459 b'message': message,
478 b'message': message,
460 }
479 }
461 }
480 }
462
481
463 if args:
482 if args:
464 m[b'error'][b'args'] = args
483 m[b'error'][b'args'] = args
465
484
466 overall = b''.join(cborutil.streamencode(m))
485 overall = b''.join(cborutil.streamencode(m))
467
486
468 yield stream.makeframe(requestid=requestid,
487 yield stream.makeframe(requestid=requestid,
469 typeid=FRAME_TYPE_COMMAND_RESPONSE,
488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
470 flags=FLAG_COMMAND_RESPONSE_EOS,
489 flags=FLAG_COMMAND_RESPONSE_EOS,
471 payload=overall)
490 payload=overall)
472
491
473 def createerrorframe(stream, requestid, msg, errtype):
492 def createerrorframe(stream, requestid, msg, errtype):
474 # TODO properly handle frame size limits.
493 # TODO properly handle frame size limits.
475 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
476
495
477 payload = b''.join(cborutil.streamencode({
496 payload = b''.join(cborutil.streamencode({
478 b'type': errtype,
497 b'type': errtype,
479 b'message': [{b'msg': msg}],
498 b'message': [{b'msg': msg}],
480 }))
499 }))
481
500
482 yield stream.makeframe(requestid=requestid,
501 yield stream.makeframe(requestid=requestid,
483 typeid=FRAME_TYPE_ERROR_RESPONSE,
502 typeid=FRAME_TYPE_ERROR_RESPONSE,
484 flags=0,
503 flags=0,
485 payload=payload)
504 payload=payload)
486
505
487 def createtextoutputframe(stream, requestid, atoms,
506 def createtextoutputframe(stream, requestid, atoms,
488 maxframesize=DEFAULT_MAX_FRAME_SIZE):
507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
489 """Create a text output frame to render text to people.
508 """Create a text output frame to render text to people.
490
509
491 ``atoms`` is a 3-tuple of (formatting string, args, labels).
510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
492
511
493 The formatting string contains ``%s`` tokens to be replaced by the
512 The formatting string contains ``%s`` tokens to be replaced by the
494 corresponding indexed entry in ``args``. ``labels`` is an iterable of
513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
495 formatters to be applied at rendering time. In terms of the ``ui``
514 formatters to be applied at rendering time. In terms of the ``ui``
496 class, each atom corresponds to a ``ui.write()``.
515 class, each atom corresponds to a ``ui.write()``.
497 """
516 """
498 atomdicts = []
517 atomdicts = []
499
518
500 for (formatting, args, labels) in atoms:
519 for (formatting, args, labels) in atoms:
501 # TODO look for localstr, other types here?
520 # TODO look for localstr, other types here?
502
521
503 if not isinstance(formatting, bytes):
522 if not isinstance(formatting, bytes):
504 raise ValueError('must use bytes formatting strings')
523 raise ValueError('must use bytes formatting strings')
505 for arg in args:
524 for arg in args:
506 if not isinstance(arg, bytes):
525 if not isinstance(arg, bytes):
507 raise ValueError('must use bytes for arguments')
526 raise ValueError('must use bytes for arguments')
508 for label in labels:
527 for label in labels:
509 if not isinstance(label, bytes):
528 if not isinstance(label, bytes):
510 raise ValueError('must use bytes for labels')
529 raise ValueError('must use bytes for labels')
511
530
512 # Formatting string must be ASCII.
531 # Formatting string must be ASCII.
513 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
514
533
515 # Arguments must be UTF-8.
534 # Arguments must be UTF-8.
516 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
517
536
518 # Labels must be ASCII.
537 # Labels must be ASCII.
519 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
520 for l in labels]
539 for l in labels]
521
540
522 atom = {b'msg': formatting}
541 atom = {b'msg': formatting}
523 if args:
542 if args:
524 atom[b'args'] = args
543 atom[b'args'] = args
525 if labels:
544 if labels:
526 atom[b'labels'] = labels
545 atom[b'labels'] = labels
527
546
528 atomdicts.append(atom)
547 atomdicts.append(atom)
529
548
530 payload = b''.join(cborutil.streamencode(atomdicts))
549 payload = b''.join(cborutil.streamencode(atomdicts))
531
550
532 if len(payload) > maxframesize:
551 if len(payload) > maxframesize:
533 raise ValueError('cannot encode data in a single frame')
552 raise ValueError('cannot encode data in a single frame')
534
553
535 yield stream.makeframe(requestid=requestid,
554 yield stream.makeframe(requestid=requestid,
536 typeid=FRAME_TYPE_TEXT_OUTPUT,
555 typeid=FRAME_TYPE_TEXT_OUTPUT,
537 flags=0,
556 flags=0,
538 payload=payload)
557 payload=payload)
539
558
540 class bufferingcommandresponseemitter(object):
559 class bufferingcommandresponseemitter(object):
541 """Helper object to emit command response frames intelligently.
560 """Helper object to emit command response frames intelligently.
542
561
543 Raw command response data is likely emitted in chunks much smaller
562 Raw command response data is likely emitted in chunks much smaller
544 than what can fit in a single frame. This class exists to buffer
563 than what can fit in a single frame. This class exists to buffer
545 chunks until enough data is available to fit in a single frame.
564 chunks until enough data is available to fit in a single frame.
546
565
547 TODO we'll need something like this when compression is supported.
566 TODO we'll need something like this when compression is supported.
548 So it might make sense to implement this functionality at the stream
567 So it might make sense to implement this functionality at the stream
549 level.
568 level.
550 """
569 """
551 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
552 self._stream = stream
571 self._stream = stream
553 self._requestid = requestid
572 self._requestid = requestid
554 self._maxsize = maxframesize
573 self._maxsize = maxframesize
555 self._chunks = []
574 self._chunks = []
556 self._chunkssize = 0
575 self._chunkssize = 0
557
576
558 def send(self, data):
577 def send(self, data):
559 """Send new data for emission.
578 """Send new data for emission.
560
579
561 Is a generator of new frames that were derived from the new input.
580 Is a generator of new frames that were derived from the new input.
562
581
563 If the special input ``None`` is received, flushes all buffered
582 If the special input ``None`` is received, flushes all buffered
564 data to frames.
583 data to frames.
565 """
584 """
566
585
567 if data is None:
586 if data is None:
568 for frame in self._flush():
587 for frame in self._flush():
569 yield frame
588 yield frame
570 return
589 return
571
590
572 # There is a ton of potential to do more complicated things here.
591 # There is a ton of potential to do more complicated things here.
573 # Our immediate goal is to coalesce small chunks into big frames,
592 # Our immediate goal is to coalesce small chunks into big frames,
574 # not achieve the fewest number of frames possible. So we go with
593 # not achieve the fewest number of frames possible. So we go with
575 # a simple implementation:
594 # a simple implementation:
576 #
595 #
577 # * If a chunk is too large for a frame, we flush and emit frames
596 # * If a chunk is too large for a frame, we flush and emit frames
578 # for the new chunk.
597 # for the new chunk.
579 # * If a chunk can be buffered without total buffered size limits
598 # * If a chunk can be buffered without total buffered size limits
580 # being exceeded, we do that.
599 # being exceeded, we do that.
581 # * If a chunk causes us to go over our buffering limit, we flush
600 # * If a chunk causes us to go over our buffering limit, we flush
582 # and then buffer the new chunk.
601 # and then buffer the new chunk.
583
602
584 if len(data) > self._maxsize:
603 if len(data) > self._maxsize:
585 for frame in self._flush():
604 for frame in self._flush():
586 yield frame
605 yield frame
587
606
588 # Now emit frames for the big chunk.
607 # Now emit frames for the big chunk.
589 offset = 0
608 offset = 0
590 while True:
609 while True:
591 chunk = data[offset:offset + self._maxsize]
610 chunk = data[offset:offset + self._maxsize]
592 offset += len(chunk)
611 offset += len(chunk)
593
612
594 yield self._stream.makeframe(
613 yield self._stream.makeframe(
595 self._requestid,
614 self._requestid,
596 typeid=FRAME_TYPE_COMMAND_RESPONSE,
615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
597 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
598 payload=chunk)
617 payload=chunk)
599
618
600 if offset == len(data):
619 if offset == len(data):
601 return
620 return
602
621
603 # If we don't have enough to constitute a full frame, buffer and
622 # If we don't have enough to constitute a full frame, buffer and
604 # return.
623 # return.
605 if len(data) + self._chunkssize < self._maxsize:
624 if len(data) + self._chunkssize < self._maxsize:
606 self._chunks.append(data)
625 self._chunks.append(data)
607 self._chunkssize += len(data)
626 self._chunkssize += len(data)
608 return
627 return
609
628
610 # Else flush what we have and buffer the new chunk. We could do
629 # Else flush what we have and buffer the new chunk. We could do
611 # something more intelligent here, like break the chunk. Let's
630 # something more intelligent here, like break the chunk. Let's
612 # keep things simple for now.
631 # keep things simple for now.
613 for frame in self._flush():
632 for frame in self._flush():
614 yield frame
633 yield frame
615
634
616 self._chunks.append(data)
635 self._chunks.append(data)
617 self._chunkssize = len(data)
636 self._chunkssize = len(data)
618
637
619 def _flush(self):
638 def _flush(self):
620 payload = b''.join(self._chunks)
639 payload = b''.join(self._chunks)
621 assert len(payload) <= self._maxsize
640 assert len(payload) <= self._maxsize
622
641
623 self._chunks[:] = []
642 self._chunks[:] = []
624 self._chunkssize = 0
643 self._chunkssize = 0
625
644
626 yield self._stream.makeframe(
645 yield self._stream.makeframe(
627 self._requestid,
646 self._requestid,
628 typeid=FRAME_TYPE_COMMAND_RESPONSE,
647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
629 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
630 payload=payload)
649 payload=payload)
631
650
632 class stream(object):
651 class stream(object):
633 """Represents a logical unidirectional series of frames."""
652 """Represents a logical unidirectional series of frames."""
634
653
635 def __init__(self, streamid, active=False):
654 def __init__(self, streamid, active=False):
636 self.streamid = streamid
655 self.streamid = streamid
637 self._active = active
656 self._active = active
638
657
639 def makeframe(self, requestid, typeid, flags, payload):
658 def makeframe(self, requestid, typeid, flags, payload):
640 """Create a frame to be sent out over this stream.
659 """Create a frame to be sent out over this stream.
641
660
642 Only returns the frame instance. Does not actually send it.
661 Only returns the frame instance. Does not actually send it.
643 """
662 """
644 streamflags = 0
663 streamflags = 0
645 if not self._active:
664 if not self._active:
646 streamflags |= STREAM_FLAG_BEGIN_STREAM
665 streamflags |= STREAM_FLAG_BEGIN_STREAM
647 self._active = True
666 self._active = True
648
667
649 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
650 payload)
669 payload)
651
670
652 def ensureserverstream(stream):
671 def ensureserverstream(stream):
653 if stream.streamid % 2:
672 if stream.streamid % 2:
654 raise error.ProgrammingError('server should only write to even '
673 raise error.ProgrammingError('server should only write to even '
655 'numbered streams; %d is not even' %
674 'numbered streams; %d is not even' %
656 stream.streamid)
675 stream.streamid)
657
676
658 class serverreactor(object):
677 class serverreactor(object):
659 """Holds state of a server handling frame-based protocol requests.
678 """Holds state of a server handling frame-based protocol requests.
660
679
661 This class is the "brain" of the unified frame-based protocol server
680 This class is the "brain" of the unified frame-based protocol server
662 component. While the protocol is stateless from the perspective of
681 component. While the protocol is stateless from the perspective of
663 requests/commands, something needs to track which frames have been
682 requests/commands, something needs to track which frames have been
664 received, what frames to expect, etc. This class is that thing.
683 received, what frames to expect, etc. This class is that thing.
665
684
666 Instances are modeled as a state machine of sorts. Instances are also
685 Instances are modeled as a state machine of sorts. Instances are also
667 reactionary to external events. The point of this class is to encapsulate
686 reactionary to external events. The point of this class is to encapsulate
668 the state of the connection and the exchange of frames, not to perform
687 the state of the connection and the exchange of frames, not to perform
669 work. Instead, callers tell this class when something occurs, like a
688 work. Instead, callers tell this class when something occurs, like a
670 frame arriving. If that activity is worthy of a follow-up action (say
689 frame arriving. If that activity is worthy of a follow-up action (say
671 *run a command*), the return value of that handler will say so.
690 *run a command*), the return value of that handler will say so.
672
691
673 I/O and CPU intensive operations are purposefully delegated outside of
692 I/O and CPU intensive operations are purposefully delegated outside of
674 this class.
693 this class.
675
694
676 Consumers are expected to tell instances when events occur. They do so by
695 Consumers are expected to tell instances when events occur. They do so by
677 calling the various ``on*`` methods. These methods return a 2-tuple
696 calling the various ``on*`` methods. These methods return a 2-tuple
678 describing any follow-up action(s) to take. The first element is the
697 describing any follow-up action(s) to take. The first element is the
679 name of an action to perform. The second is a data structure (usually
698 name of an action to perform. The second is a data structure (usually
680 a dict) specific to that action that contains more information. e.g.
699 a dict) specific to that action that contains more information. e.g.
681 if the server wants to send frames back to the client, the data structure
700 if the server wants to send frames back to the client, the data structure
682 will contain a reference to those frames.
701 will contain a reference to those frames.
683
702
684 Valid actions that consumers can be instructed to take are:
703 Valid actions that consumers can be instructed to take are:
685
704
686 sendframes
705 sendframes
687 Indicates that frames should be sent to the client. The ``framegen``
706 Indicates that frames should be sent to the client. The ``framegen``
688 key contains a generator of frames that should be sent. The server
707 key contains a generator of frames that should be sent. The server
689 assumes that all frames are sent to the client.
708 assumes that all frames are sent to the client.
690
709
691 error
710 error
692 Indicates that an error occurred. Consumer should probably abort.
711 Indicates that an error occurred. Consumer should probably abort.
693
712
694 runcommand
713 runcommand
695 Indicates that the consumer should run a wire protocol command. Details
714 Indicates that the consumer should run a wire protocol command. Details
696 of the command to run are given in the data structure.
715 of the command to run are given in the data structure.
697
716
698 wantframe
717 wantframe
699 Indicates that nothing of interest happened and the server is waiting on
718 Indicates that nothing of interest happened and the server is waiting on
700 more frames from the client before anything interesting can be done.
719 more frames from the client before anything interesting can be done.
701
720
702 noop
721 noop
703 Indicates no additional action is required.
722 Indicates no additional action is required.
704
723
705 Known Issues
724 Known Issues
706 ------------
725 ------------
707
726
708 There are no limits to the number of partially received commands or their
727 There are no limits to the number of partially received commands or their
709 size. A malicious client could stream command request data and exhaust the
728 size. A malicious client could stream command request data and exhaust the
710 server's memory.
729 server's memory.
711
730
712 Partially received commands are not acted upon when end of input is
731 Partially received commands are not acted upon when end of input is
713 reached. Should the server error if it receives a partial request?
732 reached. Should the server error if it receives a partial request?
714 Should the client send a message to abort a partially transmitted request
733 Should the client send a message to abort a partially transmitted request
715 to facilitate graceful shutdown?
734 to facilitate graceful shutdown?
716
735
717 Active requests that haven't been responded to aren't tracked. This means
736 Active requests that haven't been responded to aren't tracked. This means
718 that if we receive a command and instruct its dispatch, another command
737 that if we receive a command and instruct its dispatch, another command
719 with its request ID can come in over the wire and there will be a race
738 with its request ID can come in over the wire and there will be a race
720 between who responds to what.
739 between who responds to what.
721 """
740 """
722
741
723 def __init__(self, deferoutput=False):
742 def __init__(self, deferoutput=False):
724 """Construct a new server reactor.
743 """Construct a new server reactor.
725
744
726 ``deferoutput`` can be used to indicate that no output frames should be
745 ``deferoutput`` can be used to indicate that no output frames should be
727 instructed to be sent until input has been exhausted. In this mode,
746 instructed to be sent until input has been exhausted. In this mode,
728 events that would normally generate output frames (such as a command
747 events that would normally generate output frames (such as a command
729 response being ready) will instead defer instructing the consumer to
748 response being ready) will instead defer instructing the consumer to
730 send those frames. This is useful for half-duplex transports where the
749 send those frames. This is useful for half-duplex transports where the
731 sender cannot receive until all data has been transmitted.
750 sender cannot receive until all data has been transmitted.
732 """
751 """
733 self._deferoutput = deferoutput
752 self._deferoutput = deferoutput
734 self._state = 'idle'
753 self._state = 'idle'
735 self._nextoutgoingstreamid = 2
754 self._nextoutgoingstreamid = 2
736 self._bufferedframegens = []
755 self._bufferedframegens = []
737 # stream id -> stream instance for all active streams from the client.
756 # stream id -> stream instance for all active streams from the client.
738 self._incomingstreams = {}
757 self._incomingstreams = {}
739 self._outgoingstreams = {}
758 self._outgoingstreams = {}
740 # request id -> dict of commands that are actively being received.
759 # request id -> dict of commands that are actively being received.
741 self._receivingcommands = {}
760 self._receivingcommands = {}
742 # Request IDs that have been received and are actively being processed.
761 # Request IDs that have been received and are actively being processed.
743 # Once all output for a request has been sent, it is removed from this
762 # Once all output for a request has been sent, it is removed from this
744 # set.
763 # set.
745 self._activecommands = set()
764 self._activecommands = set()
746
765
747 def onframerecv(self, frame):
766 def onframerecv(self, frame):
748 """Process a frame that has been received off the wire.
767 """Process a frame that has been received off the wire.
749
768
750 Returns a dict with an ``action`` key that details what action,
769 Returns a dict with an ``action`` key that details what action,
751 if any, the consumer should take next.
770 if any, the consumer should take next.
752 """
771 """
753 if not frame.streamid % 2:
772 if not frame.streamid % 2:
754 self._state = 'errored'
773 self._state = 'errored'
755 return self._makeerrorresult(
774 return self._makeerrorresult(
756 _('received frame with even numbered stream ID: %d') %
775 _('received frame with even numbered stream ID: %d') %
757 frame.streamid)
776 frame.streamid)
758
777
759 if frame.streamid not in self._incomingstreams:
778 if frame.streamid not in self._incomingstreams:
760 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
779 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
761 self._state = 'errored'
780 self._state = 'errored'
762 return self._makeerrorresult(
781 return self._makeerrorresult(
763 _('received frame on unknown inactive stream without '
782 _('received frame on unknown inactive stream without '
764 'beginning of stream flag set'))
783 'beginning of stream flag set'))
765
784
766 self._incomingstreams[frame.streamid] = stream(frame.streamid)
785 self._incomingstreams[frame.streamid] = stream(frame.streamid)
767
786
768 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
787 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
769 # TODO handle decoding frames
788 # TODO handle decoding frames
770 self._state = 'errored'
789 self._state = 'errored'
771 raise error.ProgrammingError('support for decoding stream payloads '
790 raise error.ProgrammingError('support for decoding stream payloads '
772 'not yet implemented')
791 'not yet implemented')
773
792
774 if frame.streamflags & STREAM_FLAG_END_STREAM:
793 if frame.streamflags & STREAM_FLAG_END_STREAM:
775 del self._incomingstreams[frame.streamid]
794 del self._incomingstreams[frame.streamid]
776
795
777 handlers = {
796 handlers = {
778 'idle': self._onframeidle,
797 'idle': self._onframeidle,
779 'command-receiving': self._onframecommandreceiving,
798 'command-receiving': self._onframecommandreceiving,
780 'errored': self._onframeerrored,
799 'errored': self._onframeerrored,
781 }
800 }
782
801
783 meth = handlers.get(self._state)
802 meth = handlers.get(self._state)
784 if not meth:
803 if not meth:
785 raise error.ProgrammingError('unhandled state: %s' % self._state)
804 raise error.ProgrammingError('unhandled state: %s' % self._state)
786
805
787 return meth(frame)
806 return meth(frame)
788
807
789 def oncommandresponseready(self, stream, requestid, data):
808 def oncommandresponseready(self, stream, requestid, data):
790 """Signal that a bytes response is ready to be sent to the client.
809 """Signal that a bytes response is ready to be sent to the client.
791
810
792 The raw bytes response is passed as an argument.
811 The raw bytes response is passed as an argument.
793 """
812 """
794 ensureserverstream(stream)
813 ensureserverstream(stream)
795
814
796 def sendframes():
815 def sendframes():
797 for frame in createcommandresponseframesfrombytes(stream, requestid,
816 for frame in createcommandresponseframesfrombytes(stream, requestid,
798 data):
817 data):
799 yield frame
818 yield frame
800
819
801 self._activecommands.remove(requestid)
820 self._activecommands.remove(requestid)
802
821
803 result = sendframes()
822 result = sendframes()
804
823
805 if self._deferoutput:
824 if self._deferoutput:
806 self._bufferedframegens.append(result)
825 self._bufferedframegens.append(result)
807 return 'noop', {}
826 return 'noop', {}
808 else:
827 else:
809 return 'sendframes', {
828 return 'sendframes', {
810 'framegen': result,
829 'framegen': result,
811 }
830 }
812
831
813 def oncommandresponsereadyobjects(self, stream, requestid, objs):
832 def oncommandresponsereadyobjects(self, stream, requestid, objs):
814 """Signal that objects are ready to be sent to the client.
833 """Signal that objects are ready to be sent to the client.
815
834
816 ``objs`` is an iterable of objects (typically a generator) that will
835 ``objs`` is an iterable of objects (typically a generator) that will
817 be encoded via CBOR and added to frames, which will be sent to the
836 be encoded via CBOR and added to frames, which will be sent to the
818 client.
837 client.
819 """
838 """
820 ensureserverstream(stream)
839 ensureserverstream(stream)
821
840
822 # We need to take care over exception handling. Uncaught exceptions
841 # We need to take care over exception handling. Uncaught exceptions
823 # when generating frames could lead to premature end of the frame
842 # when generating frames could lead to premature end of the frame
824 # stream and the possibility of the server or client process getting
843 # stream and the possibility of the server or client process getting
825 # in a bad state.
844 # in a bad state.
826 #
845 #
827 # Keep in mind that if ``objs`` is a generator, advancing it could
846 # Keep in mind that if ``objs`` is a generator, advancing it could
828 # raise exceptions that originated in e.g. wire protocol command
847 # raise exceptions that originated in e.g. wire protocol command
829 # functions. That is why we differentiate between exceptions raised
848 # functions. That is why we differentiate between exceptions raised
830 # when iterating versus other exceptions that occur.
849 # when iterating versus other exceptions that occur.
831 #
850 #
832 # In all cases, when the function finishes, the request is fully
851 # In all cases, when the function finishes, the request is fully
833 # handled and no new frames for it should be seen.
852 # handled and no new frames for it should be seen.
834
853
835 def sendframes():
854 def sendframes():
836 emitted = False
855 emitted = False
837 alternatelocationsent = False
856 alternatelocationsent = False
838 emitter = bufferingcommandresponseemitter(stream, requestid)
857 emitter = bufferingcommandresponseemitter(stream, requestid)
839 while True:
858 while True:
840 try:
859 try:
841 o = next(objs)
860 o = next(objs)
842 except StopIteration:
861 except StopIteration:
843 for frame in emitter.send(None):
862 for frame in emitter.send(None):
844 yield frame
863 yield frame
845
864
846 if emitted:
865 if emitted:
847 yield createcommandresponseeosframe(stream, requestid)
866 yield createcommandresponseeosframe(stream, requestid)
848 break
867 break
849
868
850 except error.WireprotoCommandError as e:
869 except error.WireprotoCommandError as e:
851 for frame in createcommanderrorresponse(
870 for frame in createcommanderrorresponse(
852 stream, requestid, e.message, e.messageargs):
871 stream, requestid, e.message, e.messageargs):
853 yield frame
872 yield frame
854 break
873 break
855
874
856 except Exception as e:
875 except Exception as e:
857 for frame in createerrorframe(
876 for frame in createerrorframe(
858 stream, requestid, '%s' % stringutil.forcebytestr(e),
877 stream, requestid, '%s' % stringutil.forcebytestr(e),
859 errtype='server'):
878 errtype='server'):
860
879
861 yield frame
880 yield frame
862
881
863 break
882 break
864
883
865 try:
884 try:
866 # Alternate location responses can only be the first and
885 # Alternate location responses can only be the first and
867 # only object in the output stream.
886 # only object in the output stream.
868 if isinstance(o, wireprototypes.alternatelocationresponse):
887 if isinstance(o, wireprototypes.alternatelocationresponse):
869 if emitted:
888 if emitted:
870 raise error.ProgrammingError(
889 raise error.ProgrammingError(
871 'alternatelocationresponse seen after initial '
890 'alternatelocationresponse seen after initial '
872 'output object')
891 'output object')
873
892
874 yield createalternatelocationresponseframe(
893 yield createalternatelocationresponseframe(
875 stream, requestid, o)
894 stream, requestid, o)
876
895
877 alternatelocationsent = True
896 alternatelocationsent = True
878 emitted = True
897 emitted = True
879 continue
898 continue
880
899
881 if alternatelocationsent:
900 if alternatelocationsent:
882 raise error.ProgrammingError(
901 raise error.ProgrammingError(
883 'object follows alternatelocationresponse')
902 'object follows alternatelocationresponse')
884
903
885 if not emitted:
904 if not emitted:
886 yield createcommandresponseokframe(stream, requestid)
905 yield createcommandresponseokframe(stream, requestid)
887 emitted = True
906 emitted = True
888
907
889 # Objects emitted by command functions can be serializable
908 # Objects emitted by command functions can be serializable
890 # data structures or special types.
909 # data structures or special types.
891 # TODO consider extracting the content normalization to a
910 # TODO consider extracting the content normalization to a
892 # standalone function, as it may be useful for e.g. cachers.
911 # standalone function, as it may be useful for e.g. cachers.
893
912
894 # A pre-encoded object is sent directly to the emitter.
913 # A pre-encoded object is sent directly to the emitter.
895 if isinstance(o, wireprototypes.encodedresponse):
914 if isinstance(o, wireprototypes.encodedresponse):
896 for frame in emitter.send(o.data):
915 for frame in emitter.send(o.data):
897 yield frame
916 yield frame
898
917
899 # A regular object is CBOR encoded.
918 # A regular object is CBOR encoded.
900 else:
919 else:
901 for chunk in cborutil.streamencode(o):
920 for chunk in cborutil.streamencode(o):
902 for frame in emitter.send(chunk):
921 for frame in emitter.send(chunk):
903 yield frame
922 yield frame
904
923
905 except Exception as e:
924 except Exception as e:
906 for frame in createerrorframe(stream, requestid,
925 for frame in createerrorframe(stream, requestid,
907 '%s' % e,
926 '%s' % e,
908 errtype='server'):
927 errtype='server'):
909 yield frame
928 yield frame
910
929
911 break
930 break
912
931
913 self._activecommands.remove(requestid)
932 self._activecommands.remove(requestid)
914
933
915 return self._handlesendframes(sendframes())
934 return self._handlesendframes(sendframes())
916
935
917 def oninputeof(self):
936 def oninputeof(self):
918 """Signals that end of input has been received.
937 """Signals that end of input has been received.
919
938
920 No more frames will be received. All pending activity should be
939 No more frames will be received. All pending activity should be
921 completed.
940 completed.
922 """
941 """
923 # TODO should we do anything about in-flight commands?
942 # TODO should we do anything about in-flight commands?
924
943
925 if not self._deferoutput or not self._bufferedframegens:
944 if not self._deferoutput or not self._bufferedframegens:
926 return 'noop', {}
945 return 'noop', {}
927
946
928 # If we buffered all our responses, emit those.
947 # If we buffered all our responses, emit those.
929 def makegen():
948 def makegen():
930 for gen in self._bufferedframegens:
949 for gen in self._bufferedframegens:
931 for frame in gen:
950 for frame in gen:
932 yield frame
951 yield frame
933
952
934 return 'sendframes', {
953 return 'sendframes', {
935 'framegen': makegen(),
954 'framegen': makegen(),
936 }
955 }
937
956
938 def _handlesendframes(self, framegen):
957 def _handlesendframes(self, framegen):
939 if self._deferoutput:
958 if self._deferoutput:
940 self._bufferedframegens.append(framegen)
959 self._bufferedframegens.append(framegen)
941 return 'noop', {}
960 return 'noop', {}
942 else:
961 else:
943 return 'sendframes', {
962 return 'sendframes', {
944 'framegen': framegen,
963 'framegen': framegen,
945 }
964 }
946
965
947 def onservererror(self, stream, requestid, msg):
966 def onservererror(self, stream, requestid, msg):
948 ensureserverstream(stream)
967 ensureserverstream(stream)
949
968
950 def sendframes():
969 def sendframes():
951 for frame in createerrorframe(stream, requestid, msg,
970 for frame in createerrorframe(stream, requestid, msg,
952 errtype='server'):
971 errtype='server'):
953 yield frame
972 yield frame
954
973
955 self._activecommands.remove(requestid)
974 self._activecommands.remove(requestid)
956
975
957 return self._handlesendframes(sendframes())
976 return self._handlesendframes(sendframes())
958
977
959 def oncommanderror(self, stream, requestid, message, args=None):
978 def oncommanderror(self, stream, requestid, message, args=None):
960 """Called when a command encountered an error before sending output."""
979 """Called when a command encountered an error before sending output."""
961 ensureserverstream(stream)
980 ensureserverstream(stream)
962
981
963 def sendframes():
982 def sendframes():
964 for frame in createcommanderrorresponse(stream, requestid, message,
983 for frame in createcommanderrorresponse(stream, requestid, message,
965 args):
984 args):
966 yield frame
985 yield frame
967
986
968 self._activecommands.remove(requestid)
987 self._activecommands.remove(requestid)
969
988
970 return self._handlesendframes(sendframes())
989 return self._handlesendframes(sendframes())
971
990
972 def makeoutputstream(self):
991 def makeoutputstream(self):
973 """Create a stream to be used for sending data to the client."""
992 """Create a stream to be used for sending data to the client."""
974 streamid = self._nextoutgoingstreamid
993 streamid = self._nextoutgoingstreamid
975 self._nextoutgoingstreamid += 2
994 self._nextoutgoingstreamid += 2
976
995
977 s = stream(streamid)
996 s = stream(streamid)
978 self._outgoingstreams[streamid] = s
997 self._outgoingstreams[streamid] = s
979
998
980 return s
999 return s
981
1000
982 def _makeerrorresult(self, msg):
1001 def _makeerrorresult(self, msg):
983 return 'error', {
1002 return 'error', {
984 'message': msg,
1003 'message': msg,
985 }
1004 }
986
1005
987 def _makeruncommandresult(self, requestid):
1006 def _makeruncommandresult(self, requestid):
988 entry = self._receivingcommands[requestid]
1007 entry = self._receivingcommands[requestid]
989
1008
990 if not entry['requestdone']:
1009 if not entry['requestdone']:
991 self._state = 'errored'
1010 self._state = 'errored'
992 raise error.ProgrammingError('should not be called without '
1011 raise error.ProgrammingError('should not be called without '
993 'requestdone set')
1012 'requestdone set')
994
1013
995 del self._receivingcommands[requestid]
1014 del self._receivingcommands[requestid]
996
1015
997 if self._receivingcommands:
1016 if self._receivingcommands:
998 self._state = 'command-receiving'
1017 self._state = 'command-receiving'
999 else:
1018 else:
1000 self._state = 'idle'
1019 self._state = 'idle'
1001
1020
1002 # Decode the payloads as CBOR.
1021 # Decode the payloads as CBOR.
1003 entry['payload'].seek(0)
1022 entry['payload'].seek(0)
1004 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1023 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1005
1024
1006 if b'name' not in request:
1025 if b'name' not in request:
1007 self._state = 'errored'
1026 self._state = 'errored'
1008 return self._makeerrorresult(
1027 return self._makeerrorresult(
1009 _('command request missing "name" field'))
1028 _('command request missing "name" field'))
1010
1029
1011 if b'args' not in request:
1030 if b'args' not in request:
1012 request[b'args'] = {}
1031 request[b'args'] = {}
1013
1032
1014 assert requestid not in self._activecommands
1033 assert requestid not in self._activecommands
1015 self._activecommands.add(requestid)
1034 self._activecommands.add(requestid)
1016
1035
1017 return 'runcommand', {
1036 return 'runcommand', {
1018 'requestid': requestid,
1037 'requestid': requestid,
1019 'command': request[b'name'],
1038 'command': request[b'name'],
1020 'args': request[b'args'],
1039 'args': request[b'args'],
1021 'redirect': request.get(b'redirect'),
1040 'redirect': request.get(b'redirect'),
1022 'data': entry['data'].getvalue() if entry['data'] else None,
1041 'data': entry['data'].getvalue() if entry['data'] else None,
1023 }
1042 }
1024
1043
1025 def _makewantframeresult(self):
1044 def _makewantframeresult(self):
1026 return 'wantframe', {
1045 return 'wantframe', {
1027 'state': self._state,
1046 'state': self._state,
1028 }
1047 }
1029
1048
1030 def _validatecommandrequestframe(self, frame):
1049 def _validatecommandrequestframe(self, frame):
1031 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1050 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1032 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1051 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1033
1052
1034 if new and continuation:
1053 if new and continuation:
1035 self._state = 'errored'
1054 self._state = 'errored'
1036 return self._makeerrorresult(
1055 return self._makeerrorresult(
1037 _('received command request frame with both new and '
1056 _('received command request frame with both new and '
1038 'continuation flags set'))
1057 'continuation flags set'))
1039
1058
1040 if not new and not continuation:
1059 if not new and not continuation:
1041 self._state = 'errored'
1060 self._state = 'errored'
1042 return self._makeerrorresult(
1061 return self._makeerrorresult(
1043 _('received command request frame with neither new nor '
1062 _('received command request frame with neither new nor '
1044 'continuation flags set'))
1063 'continuation flags set'))
1045
1064
1046 def _onframeidle(self, frame):
1065 def _onframeidle(self, frame):
1047 # The only frame type that should be received in this state is a
1066 # The only frame type that should be received in this state is a
1048 # command request.
1067 # command request.
1049 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1068 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1050 self._state = 'errored'
1069 self._state = 'errored'
1051 return self._makeerrorresult(
1070 return self._makeerrorresult(
1052 _('expected command request frame; got %d') % frame.typeid)
1071 _('expected command request frame; got %d') % frame.typeid)
1053
1072
1054 res = self._validatecommandrequestframe(frame)
1073 res = self._validatecommandrequestframe(frame)
1055 if res:
1074 if res:
1056 return res
1075 return res
1057
1076
1058 if frame.requestid in self._receivingcommands:
1077 if frame.requestid in self._receivingcommands:
1059 self._state = 'errored'
1078 self._state = 'errored'
1060 return self._makeerrorresult(
1079 return self._makeerrorresult(
1061 _('request with ID %d already received') % frame.requestid)
1080 _('request with ID %d already received') % frame.requestid)
1062
1081
1063 if frame.requestid in self._activecommands:
1082 if frame.requestid in self._activecommands:
1064 self._state = 'errored'
1083 self._state = 'errored'
1065 return self._makeerrorresult(
1084 return self._makeerrorresult(
1066 _('request with ID %d is already active') % frame.requestid)
1085 _('request with ID %d is already active') % frame.requestid)
1067
1086
1068 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1087 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1069 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1088 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1070 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1089 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1071
1090
1072 if not new:
1091 if not new:
1073 self._state = 'errored'
1092 self._state = 'errored'
1074 return self._makeerrorresult(
1093 return self._makeerrorresult(
1075 _('received command request frame without new flag set'))
1094 _('received command request frame without new flag set'))
1076
1095
1077 payload = util.bytesio()
1096 payload = util.bytesio()
1078 payload.write(frame.payload)
1097 payload.write(frame.payload)
1079
1098
1080 self._receivingcommands[frame.requestid] = {
1099 self._receivingcommands[frame.requestid] = {
1081 'payload': payload,
1100 'payload': payload,
1082 'data': None,
1101 'data': None,
1083 'requestdone': not moreframes,
1102 'requestdone': not moreframes,
1084 'expectingdata': bool(expectingdata),
1103 'expectingdata': bool(expectingdata),
1085 }
1104 }
1086
1105
1087 # This is the final frame for this request. Dispatch it.
1106 # This is the final frame for this request. Dispatch it.
1088 if not moreframes and not expectingdata:
1107 if not moreframes and not expectingdata:
1089 return self._makeruncommandresult(frame.requestid)
1108 return self._makeruncommandresult(frame.requestid)
1090
1109
1091 assert moreframes or expectingdata
1110 assert moreframes or expectingdata
1092 self._state = 'command-receiving'
1111 self._state = 'command-receiving'
1093 return self._makewantframeresult()
1112 return self._makewantframeresult()
1094
1113
1095 def _onframecommandreceiving(self, frame):
1114 def _onframecommandreceiving(self, frame):
1096 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1115 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1097 # Process new command requests as such.
1116 # Process new command requests as such.
1098 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1117 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1099 return self._onframeidle(frame)
1118 return self._onframeidle(frame)
1100
1119
1101 res = self._validatecommandrequestframe(frame)
1120 res = self._validatecommandrequestframe(frame)
1102 if res:
1121 if res:
1103 return res
1122 return res
1104
1123
1105 # All other frames should be related to a command that is currently
1124 # All other frames should be related to a command that is currently
1106 # receiving but is not active.
1125 # receiving but is not active.
1107 if frame.requestid in self._activecommands:
1126 if frame.requestid in self._activecommands:
1108 self._state = 'errored'
1127 self._state = 'errored'
1109 return self._makeerrorresult(
1128 return self._makeerrorresult(
1110 _('received frame for request that is still active: %d') %
1129 _('received frame for request that is still active: %d') %
1111 frame.requestid)
1130 frame.requestid)
1112
1131
1113 if frame.requestid not in self._receivingcommands:
1132 if frame.requestid not in self._receivingcommands:
1114 self._state = 'errored'
1133 self._state = 'errored'
1115 return self._makeerrorresult(
1134 return self._makeerrorresult(
1116 _('received frame for request that is not receiving: %d') %
1135 _('received frame for request that is not receiving: %d') %
1117 frame.requestid)
1136 frame.requestid)
1118
1137
1119 entry = self._receivingcommands[frame.requestid]
1138 entry = self._receivingcommands[frame.requestid]
1120
1139
1121 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1140 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1122 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1141 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1123 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1142 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1124
1143
1125 if entry['requestdone']:
1144 if entry['requestdone']:
1126 self._state = 'errored'
1145 self._state = 'errored'
1127 return self._makeerrorresult(
1146 return self._makeerrorresult(
1128 _('received command request frame when request frames '
1147 _('received command request frame when request frames '
1129 'were supposedly done'))
1148 'were supposedly done'))
1130
1149
1131 if expectingdata != entry['expectingdata']:
1150 if expectingdata != entry['expectingdata']:
1132 self._state = 'errored'
1151 self._state = 'errored'
1133 return self._makeerrorresult(
1152 return self._makeerrorresult(
1134 _('mismatch between expect data flag and previous frame'))
1153 _('mismatch between expect data flag and previous frame'))
1135
1154
1136 entry['payload'].write(frame.payload)
1155 entry['payload'].write(frame.payload)
1137
1156
1138 if not moreframes:
1157 if not moreframes:
1139 entry['requestdone'] = True
1158 entry['requestdone'] = True
1140
1159
1141 if not moreframes and not expectingdata:
1160 if not moreframes and not expectingdata:
1142 return self._makeruncommandresult(frame.requestid)
1161 return self._makeruncommandresult(frame.requestid)
1143
1162
1144 return self._makewantframeresult()
1163 return self._makewantframeresult()
1145
1164
1146 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1165 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1147 if not entry['expectingdata']:
1166 if not entry['expectingdata']:
1148 self._state = 'errored'
1167 self._state = 'errored'
1149 return self._makeerrorresult(_(
1168 return self._makeerrorresult(_(
1150 'received command data frame for request that is not '
1169 'received command data frame for request that is not '
1151 'expecting data: %d') % frame.requestid)
1170 'expecting data: %d') % frame.requestid)
1152
1171
1153 if entry['data'] is None:
1172 if entry['data'] is None:
1154 entry['data'] = util.bytesio()
1173 entry['data'] = util.bytesio()
1155
1174
1156 return self._handlecommanddataframe(frame, entry)
1175 return self._handlecommanddataframe(frame, entry)
1157 else:
1176 else:
1158 self._state = 'errored'
1177 self._state = 'errored'
1159 return self._makeerrorresult(_(
1178 return self._makeerrorresult(_(
1160 'received unexpected frame type: %d') % frame.typeid)
1179 'received unexpected frame type: %d') % frame.typeid)
1161
1180
1162 def _handlecommanddataframe(self, frame, entry):
1181 def _handlecommanddataframe(self, frame, entry):
1163 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1182 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1164
1183
1165 # TODO support streaming data instead of buffering it.
1184 # TODO support streaming data instead of buffering it.
1166 entry['data'].write(frame.payload)
1185 entry['data'].write(frame.payload)
1167
1186
1168 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1187 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1169 return self._makewantframeresult()
1188 return self._makewantframeresult()
1170 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1189 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1171 entry['data'].seek(0)
1190 entry['data'].seek(0)
1172 return self._makeruncommandresult(frame.requestid)
1191 return self._makeruncommandresult(frame.requestid)
1173 else:
1192 else:
1174 self._state = 'errored'
1193 self._state = 'errored'
1175 return self._makeerrorresult(_('command data frame without '
1194 return self._makeerrorresult(_('command data frame without '
1176 'flags'))
1195 'flags'))
1177
1196
1178 def _onframeerrored(self, frame):
1197 def _onframeerrored(self, frame):
1179 return self._makeerrorresult(_('server already errored'))
1198 return self._makeerrorresult(_('server already errored'))
1180
1199
1181 class commandrequest(object):
1200 class commandrequest(object):
1182 """Represents a request to run a command."""
1201 """Represents a request to run a command."""
1183
1202
1184 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1203 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1185 self.requestid = requestid
1204 self.requestid = requestid
1186 self.name = name
1205 self.name = name
1187 self.args = args
1206 self.args = args
1188 self.datafh = datafh
1207 self.datafh = datafh
1189 self.redirect = redirect
1208 self.redirect = redirect
1190 self.state = 'pending'
1209 self.state = 'pending'
1191
1210
1192 class clientreactor(object):
1211 class clientreactor(object):
1193 """Holds state of a client issuing frame-based protocol requests.
1212 """Holds state of a client issuing frame-based protocol requests.
1194
1213
1195 This is like ``serverreactor`` but for client-side state.
1214 This is like ``serverreactor`` but for client-side state.
1196
1215
1197 Each instance is bound to the lifetime of a connection. For persistent
1216 Each instance is bound to the lifetime of a connection. For persistent
1198 connection transports using e.g. TCP sockets and speaking the raw
1217 connection transports using e.g. TCP sockets and speaking the raw
1199 framing protocol, there will be a single instance for the lifetime of
1218 framing protocol, there will be a single instance for the lifetime of
1200 the TCP socket. For transports where there are multiple discrete
1219 the TCP socket. For transports where there are multiple discrete
1201 interactions (say tunneled within in HTTP request), there will be a
1220 interactions (say tunneled within in HTTP request), there will be a
1202 separate instance for each distinct interaction.
1221 separate instance for each distinct interaction.
1203 """
1222 """
1204 def __init__(self, hasmultiplesend=False, buffersends=True):
1223 def __init__(self, hasmultiplesend=False, buffersends=True):
1205 """Create a new instance.
1224 """Create a new instance.
1206
1225
1207 ``hasmultiplesend`` indicates whether multiple sends are supported
1226 ``hasmultiplesend`` indicates whether multiple sends are supported
1208 by the transport. When True, it is possible to send commands immediately
1227 by the transport. When True, it is possible to send commands immediately
1209 instead of buffering until the caller signals an intent to finish a
1228 instead of buffering until the caller signals an intent to finish a
1210 send operation.
1229 send operation.
1211
1230
1212 ``buffercommands`` indicates whether sends should be buffered until the
1231 ``buffercommands`` indicates whether sends should be buffered until the
1213 last request has been issued.
1232 last request has been issued.
1214 """
1233 """
1215 self._hasmultiplesend = hasmultiplesend
1234 self._hasmultiplesend = hasmultiplesend
1216 self._buffersends = buffersends
1235 self._buffersends = buffersends
1217
1236
1218 self._canissuecommands = True
1237 self._canissuecommands = True
1219 self._cansend = True
1238 self._cansend = True
1220
1239
1221 self._nextrequestid = 1
1240 self._nextrequestid = 1
1222 # We only support a single outgoing stream for now.
1241 # We only support a single outgoing stream for now.
1223 self._outgoingstream = stream(1)
1242 self._outgoingstream = stream(1)
1224 self._pendingrequests = collections.deque()
1243 self._pendingrequests = collections.deque()
1225 self._activerequests = {}
1244 self._activerequests = {}
1226 self._incomingstreams = {}
1245 self._incomingstreams = {}
1227
1246
1228 def callcommand(self, name, args, datafh=None, redirect=None):
1247 def callcommand(self, name, args, datafh=None, redirect=None):
1229 """Request that a command be executed.
1248 """Request that a command be executed.
1230
1249
1231 Receives the command name, a dict of arguments to pass to the command,
1250 Receives the command name, a dict of arguments to pass to the command,
1232 and an optional file object containing the raw data for the command.
1251 and an optional file object containing the raw data for the command.
1233
1252
1234 Returns a 3-tuple of (request, action, action data).
1253 Returns a 3-tuple of (request, action, action data).
1235 """
1254 """
1236 if not self._canissuecommands:
1255 if not self._canissuecommands:
1237 raise error.ProgrammingError('cannot issue new commands')
1256 raise error.ProgrammingError('cannot issue new commands')
1238
1257
1239 requestid = self._nextrequestid
1258 requestid = self._nextrequestid
1240 self._nextrequestid += 2
1259 self._nextrequestid += 2
1241
1260
1242 request = commandrequest(requestid, name, args, datafh=datafh,
1261 request = commandrequest(requestid, name, args, datafh=datafh,
1243 redirect=redirect)
1262 redirect=redirect)
1244
1263
1245 if self._buffersends:
1264 if self._buffersends:
1246 self._pendingrequests.append(request)
1265 self._pendingrequests.append(request)
1247 return request, 'noop', {}
1266 return request, 'noop', {}
1248 else:
1267 else:
1249 if not self._cansend:
1268 if not self._cansend:
1250 raise error.ProgrammingError('sends cannot be performed on '
1269 raise error.ProgrammingError('sends cannot be performed on '
1251 'this instance')
1270 'this instance')
1252
1271
1253 if not self._hasmultiplesend:
1272 if not self._hasmultiplesend:
1254 self._cansend = False
1273 self._cansend = False
1255 self._canissuecommands = False
1274 self._canissuecommands = False
1256
1275
1257 return request, 'sendframes', {
1276 return request, 'sendframes', {
1258 'framegen': self._makecommandframes(request),
1277 'framegen': self._makecommandframes(request),
1259 }
1278 }
1260
1279
1261 def flushcommands(self):
1280 def flushcommands(self):
1262 """Request that all queued commands be sent.
1281 """Request that all queued commands be sent.
1263
1282
1264 If any commands are buffered, this will instruct the caller to send
1283 If any commands are buffered, this will instruct the caller to send
1265 them over the wire. If no commands are buffered it instructs the client
1284 them over the wire. If no commands are buffered it instructs the client
1266 to no-op.
1285 to no-op.
1267
1286
1268 If instances aren't configured for multiple sends, no new command
1287 If instances aren't configured for multiple sends, no new command
1269 requests are allowed after this is called.
1288 requests are allowed after this is called.
1270 """
1289 """
1271 if not self._pendingrequests:
1290 if not self._pendingrequests:
1272 return 'noop', {}
1291 return 'noop', {}
1273
1292
1274 if not self._cansend:
1293 if not self._cansend:
1275 raise error.ProgrammingError('sends cannot be performed on this '
1294 raise error.ProgrammingError('sends cannot be performed on this '
1276 'instance')
1295 'instance')
1277
1296
1278 # If the instance only allows sending once, mark that we have fired
1297 # If the instance only allows sending once, mark that we have fired
1279 # our one shot.
1298 # our one shot.
1280 if not self._hasmultiplesend:
1299 if not self._hasmultiplesend:
1281 self._canissuecommands = False
1300 self._canissuecommands = False
1282 self._cansend = False
1301 self._cansend = False
1283
1302
1284 def makeframes():
1303 def makeframes():
1285 while self._pendingrequests:
1304 while self._pendingrequests:
1286 request = self._pendingrequests.popleft()
1305 request = self._pendingrequests.popleft()
1287 for frame in self._makecommandframes(request):
1306 for frame in self._makecommandframes(request):
1288 yield frame
1307 yield frame
1289
1308
1290 return 'sendframes', {
1309 return 'sendframes', {
1291 'framegen': makeframes(),
1310 'framegen': makeframes(),
1292 }
1311 }
1293
1312
1294 def _makecommandframes(self, request):
1313 def _makecommandframes(self, request):
1295 """Emit frames to issue a command request.
1314 """Emit frames to issue a command request.
1296
1315
1297 As a side-effect, update request accounting to reflect its changed
1316 As a side-effect, update request accounting to reflect its changed
1298 state.
1317 state.
1299 """
1318 """
1300 self._activerequests[request.requestid] = request
1319 self._activerequests[request.requestid] = request
1301 request.state = 'sending'
1320 request.state = 'sending'
1302
1321
1303 res = createcommandframes(self._outgoingstream,
1322 res = createcommandframes(self._outgoingstream,
1304 request.requestid,
1323 request.requestid,
1305 request.name,
1324 request.name,
1306 request.args,
1325 request.args,
1307 datafh=request.datafh,
1326 datafh=request.datafh,
1308 redirect=request.redirect)
1327 redirect=request.redirect)
1309
1328
1310 for frame in res:
1329 for frame in res:
1311 yield frame
1330 yield frame
1312
1331
1313 request.state = 'sent'
1332 request.state = 'sent'
1314
1333
1315 def onframerecv(self, frame):
1334 def onframerecv(self, frame):
1316 """Process a frame that has been received off the wire.
1335 """Process a frame that has been received off the wire.
1317
1336
1318 Returns a 2-tuple of (action, meta) describing further action the
1337 Returns a 2-tuple of (action, meta) describing further action the
1319 caller needs to take as a result of receiving this frame.
1338 caller needs to take as a result of receiving this frame.
1320 """
1339 """
1321 if frame.streamid % 2:
1340 if frame.streamid % 2:
1322 return 'error', {
1341 return 'error', {
1323 'message': (
1342 'message': (
1324 _('received frame with odd numbered stream ID: %d') %
1343 _('received frame with odd numbered stream ID: %d') %
1325 frame.streamid),
1344 frame.streamid),
1326 }
1345 }
1327
1346
1328 if frame.streamid not in self._incomingstreams:
1347 if frame.streamid not in self._incomingstreams:
1329 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1348 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1330 return 'error', {
1349 return 'error', {
1331 'message': _('received frame on unknown stream '
1350 'message': _('received frame on unknown stream '
1332 'without beginning of stream flag set'),
1351 'without beginning of stream flag set'),
1333 }
1352 }
1334
1353
1335 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1354 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1336
1355
1337 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1356 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1338 raise error.ProgrammingError('support for decoding stream '
1357 raise error.ProgrammingError('support for decoding stream '
1339 'payloads not yet implemneted')
1358 'payloads not yet implemneted')
1340
1359
1341 if frame.streamflags & STREAM_FLAG_END_STREAM:
1360 if frame.streamflags & STREAM_FLAG_END_STREAM:
1342 del self._incomingstreams[frame.streamid]
1361 del self._incomingstreams[frame.streamid]
1343
1362
1344 if frame.requestid not in self._activerequests:
1363 if frame.requestid not in self._activerequests:
1345 return 'error', {
1364 return 'error', {
1346 'message': (_('received frame for inactive request ID: %d') %
1365 'message': (_('received frame for inactive request ID: %d') %
1347 frame.requestid),
1366 frame.requestid),
1348 }
1367 }
1349
1368
1350 request = self._activerequests[frame.requestid]
1369 request = self._activerequests[frame.requestid]
1351 request.state = 'receiving'
1370 request.state = 'receiving'
1352
1371
1353 handlers = {
1372 handlers = {
1354 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1373 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1355 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1374 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1356 }
1375 }
1357
1376
1358 meth = handlers.get(frame.typeid)
1377 meth = handlers.get(frame.typeid)
1359 if not meth:
1378 if not meth:
1360 raise error.ProgrammingError('unhandled frame type: %d' %
1379 raise error.ProgrammingError('unhandled frame type: %d' %
1361 frame.typeid)
1380 frame.typeid)
1362
1381
1363 return meth(request, frame)
1382 return meth(request, frame)
1364
1383
1365 def _oncommandresponseframe(self, request, frame):
1384 def _oncommandresponseframe(self, request, frame):
1366 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1385 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1367 request.state = 'received'
1386 request.state = 'received'
1368 del self._activerequests[request.requestid]
1387 del self._activerequests[request.requestid]
1369
1388
1370 return 'responsedata', {
1389 return 'responsedata', {
1371 'request': request,
1390 'request': request,
1372 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1391 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1373 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1392 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1374 'data': frame.payload,
1393 'data': frame.payload,
1375 }
1394 }
1376
1395
1377 def _onerrorresponseframe(self, request, frame):
1396 def _onerrorresponseframe(self, request, frame):
1378 request.state = 'errored'
1397 request.state = 'errored'
1379 del self._activerequests[request.requestid]
1398 del self._activerequests[request.requestid]
1380
1399
1381 # The payload should be a CBOR map.
1400 # The payload should be a CBOR map.
1382 m = cborutil.decodeall(frame.payload)[0]
1401 m = cborutil.decodeall(frame.payload)[0]
1383
1402
1384 return 'error', {
1403 return 'error', {
1385 'request': request,
1404 'request': request,
1386 'type': m['type'],
1405 'type': m['type'],
1387 'message': m['message'],
1406 'message': m['message'],
1388 }
1407 }
General Comments 0
You need to be logged in to leave comments. Login now