##// END OF EJS Templates
wireproto: explicit API to create outgoing streams...
Gregory Szorc -
r37305:5fadc63a default
parent child Browse files
Show More
@@ -1,813 +1,825
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import struct
15 15
16 16 from .i18n import _
17 17 from .thirdparty import (
18 18 attr,
19 19 )
20 20 from . import (
21 21 error,
22 22 util,
23 23 )
24 24 from .utils import (
25 25 stringutil,
26 26 )
27 27
28 28 FRAME_HEADER_SIZE = 8
29 29 DEFAULT_MAX_FRAME_SIZE = 32768
30 30
31 31 STREAM_FLAG_BEGIN_STREAM = 0x01
32 32 STREAM_FLAG_END_STREAM = 0x02
33 33 STREAM_FLAG_ENCODING_APPLIED = 0x04
34 34
35 35 STREAM_FLAGS = {
36 36 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
37 37 b'stream-end': STREAM_FLAG_END_STREAM,
38 38 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
39 39 }
40 40
41 41 FRAME_TYPE_COMMAND_NAME = 0x01
42 42 FRAME_TYPE_COMMAND_ARGUMENT = 0x02
43 43 FRAME_TYPE_COMMAND_DATA = 0x03
44 44 FRAME_TYPE_BYTES_RESPONSE = 0x04
45 45 FRAME_TYPE_ERROR_RESPONSE = 0x05
46 46 FRAME_TYPE_TEXT_OUTPUT = 0x06
47 47 FRAME_TYPE_STREAM_SETTINGS = 0x08
48 48
49 49 FRAME_TYPES = {
50 50 b'command-name': FRAME_TYPE_COMMAND_NAME,
51 51 b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT,
52 52 b'command-data': FRAME_TYPE_COMMAND_DATA,
53 53 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
54 54 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
55 55 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
56 56 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
57 57 }
58 58
59 59 FLAG_COMMAND_NAME_EOS = 0x01
60 60 FLAG_COMMAND_NAME_HAVE_ARGS = 0x02
61 61 FLAG_COMMAND_NAME_HAVE_DATA = 0x04
62 62
63 63 FLAGS_COMMAND = {
64 64 b'eos': FLAG_COMMAND_NAME_EOS,
65 65 b'have-args': FLAG_COMMAND_NAME_HAVE_ARGS,
66 66 b'have-data': FLAG_COMMAND_NAME_HAVE_DATA,
67 67 }
68 68
69 69 FLAG_COMMAND_ARGUMENT_CONTINUATION = 0x01
70 70 FLAG_COMMAND_ARGUMENT_EOA = 0x02
71 71
72 72 FLAGS_COMMAND_ARGUMENT = {
73 73 b'continuation': FLAG_COMMAND_ARGUMENT_CONTINUATION,
74 74 b'eoa': FLAG_COMMAND_ARGUMENT_EOA,
75 75 }
76 76
77 77 FLAG_COMMAND_DATA_CONTINUATION = 0x01
78 78 FLAG_COMMAND_DATA_EOS = 0x02
79 79
80 80 FLAGS_COMMAND_DATA = {
81 81 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
82 82 b'eos': FLAG_COMMAND_DATA_EOS,
83 83 }
84 84
85 85 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
86 86 FLAG_BYTES_RESPONSE_EOS = 0x02
87 87
88 88 FLAGS_BYTES_RESPONSE = {
89 89 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
90 90 b'eos': FLAG_BYTES_RESPONSE_EOS,
91 91 }
92 92
93 93 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
94 94 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
95 95
96 96 FLAGS_ERROR_RESPONSE = {
97 97 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
98 98 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
99 99 }
100 100
101 101 # Maps frame types to their available flags.
102 102 FRAME_TYPE_FLAGS = {
103 103 FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND,
104 104 FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT,
105 105 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
106 106 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
107 107 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
108 108 FRAME_TYPE_TEXT_OUTPUT: {},
109 109 FRAME_TYPE_STREAM_SETTINGS: {},
110 110 }
111 111
112 112 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
113 113
114 114 @attr.s(slots=True)
115 115 class frameheader(object):
116 116 """Represents the data in a frame header."""
117 117
118 118 length = attr.ib()
119 119 requestid = attr.ib()
120 120 streamid = attr.ib()
121 121 streamflags = attr.ib()
122 122 typeid = attr.ib()
123 123 flags = attr.ib()
124 124
125 125 @attr.s(slots=True)
126 126 class frame(object):
127 127 """Represents a parsed frame."""
128 128
129 129 requestid = attr.ib()
130 130 streamid = attr.ib()
131 131 streamflags = attr.ib()
132 132 typeid = attr.ib()
133 133 flags = attr.ib()
134 134 payload = attr.ib()
135 135
136 136 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
137 137 """Assemble a frame into a byte array."""
138 138 # TODO assert size of payload.
139 139 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
140 140
141 141 # 24 bits length
142 142 # 16 bits request id
143 143 # 8 bits stream id
144 144 # 8 bits stream flags
145 145 # 4 bits type
146 146 # 4 bits flags
147 147
148 148 l = struct.pack(r'<I', len(payload))
149 149 frame[0:3] = l[0:3]
150 150 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
151 151 frame[7] = (typeid << 4) | flags
152 152 frame[8:] = payload
153 153
154 154 return frame
155 155
156 156 def makeframefromhumanstring(s):
157 157 """Create a frame from a human readable string
158 158
159 159 Strings have the form:
160 160
161 161 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
162 162
163 163 This can be used by user-facing applications and tests for creating
164 164 frames easily without having to type out a bunch of constants.
165 165
166 166 Request ID and stream IDs are integers.
167 167
168 168 Stream flags, frame type, and flags can be specified by integer or
169 169 named constant.
170 170
171 171 Flags can be delimited by `|` to bitwise OR them together.
172 172 """
173 173 fields = s.split(b' ', 5)
174 174 requestid, streamid, streamflags, frametype, frameflags, payload = fields
175 175
176 176 requestid = int(requestid)
177 177 streamid = int(streamid)
178 178
179 179 finalstreamflags = 0
180 180 for flag in streamflags.split(b'|'):
181 181 if flag in STREAM_FLAGS:
182 182 finalstreamflags |= STREAM_FLAGS[flag]
183 183 else:
184 184 finalstreamflags |= int(flag)
185 185
186 186 if frametype in FRAME_TYPES:
187 187 frametype = FRAME_TYPES[frametype]
188 188 else:
189 189 frametype = int(frametype)
190 190
191 191 finalflags = 0
192 192 validflags = FRAME_TYPE_FLAGS[frametype]
193 193 for flag in frameflags.split(b'|'):
194 194 if flag in validflags:
195 195 finalflags |= validflags[flag]
196 196 else:
197 197 finalflags |= int(flag)
198 198
199 199 payload = stringutil.unescapestr(payload)
200 200
201 201 return makeframe(requestid=requestid, streamid=streamid,
202 202 streamflags=finalstreamflags, typeid=frametype,
203 203 flags=finalflags, payload=payload)
204 204
205 205 def parseheader(data):
206 206 """Parse a unified framing protocol frame header from a buffer.
207 207
208 208 The header is expected to be in the buffer at offset 0 and the
209 209 buffer is expected to be large enough to hold a full header.
210 210 """
211 211 # 24 bits payload length (little endian)
212 212 # 16 bits request ID
213 213 # 8 bits stream ID
214 214 # 8 bits stream flags
215 215 # 4 bits frame type
216 216 # 4 bits frame flags
217 217 # ... payload
218 218 framelength = data[0] + 256 * data[1] + 16384 * data[2]
219 219 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
220 220 typeflags = data[7]
221 221
222 222 frametype = (typeflags & 0xf0) >> 4
223 223 frameflags = typeflags & 0x0f
224 224
225 225 return frameheader(framelength, requestid, streamid, streamflags,
226 226 frametype, frameflags)
227 227
228 228 def readframe(fh):
229 229 """Read a unified framing protocol frame from a file object.
230 230
231 231 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
232 232 None if no frame is available. May raise if a malformed frame is
233 233 seen.
234 234 """
235 235 header = bytearray(FRAME_HEADER_SIZE)
236 236
237 237 readcount = fh.readinto(header)
238 238
239 239 if readcount == 0:
240 240 return None
241 241
242 242 if readcount != FRAME_HEADER_SIZE:
243 243 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
244 244 (readcount, header))
245 245
246 246 h = parseheader(header)
247 247
248 248 payload = fh.read(h.length)
249 249 if len(payload) != h.length:
250 250 raise error.Abort(_('frame length error: expected %d; got %d') %
251 251 (h.length, len(payload)))
252 252
253 253 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
254 254 payload)
255 255
256 256 def createcommandframes(stream, requestid, cmd, args, datafh=None):
257 257 """Create frames necessary to transmit a request to run a command.
258 258
259 259 This is a generator of bytearrays. Each item represents a frame
260 260 ready to be sent over the wire to a peer.
261 261 """
262 262 flags = 0
263 263 if args:
264 264 flags |= FLAG_COMMAND_NAME_HAVE_ARGS
265 265 if datafh:
266 266 flags |= FLAG_COMMAND_NAME_HAVE_DATA
267 267
268 268 if not flags:
269 269 flags |= FLAG_COMMAND_NAME_EOS
270 270
271 271 yield stream.makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME,
272 272 flags=flags, payload=cmd)
273 273
274 274 for i, k in enumerate(sorted(args)):
275 275 v = args[k]
276 276 last = i == len(args) - 1
277 277
278 278 # TODO handle splitting of argument values across frames.
279 279 payload = bytearray(ARGUMENT_FRAME_HEADER.size + len(k) + len(v))
280 280 offset = 0
281 281 ARGUMENT_FRAME_HEADER.pack_into(payload, offset, len(k), len(v))
282 282 offset += ARGUMENT_FRAME_HEADER.size
283 283 payload[offset:offset + len(k)] = k
284 284 offset += len(k)
285 285 payload[offset:offset + len(v)] = v
286 286
287 287 flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
288 288 yield stream.makeframe(requestid=requestid,
289 289 typeid=FRAME_TYPE_COMMAND_ARGUMENT,
290 290 flags=flags,
291 291 payload=payload)
292 292
293 293 if datafh:
294 294 while True:
295 295 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
296 296
297 297 done = False
298 298 if len(data) == DEFAULT_MAX_FRAME_SIZE:
299 299 flags = FLAG_COMMAND_DATA_CONTINUATION
300 300 else:
301 301 flags = FLAG_COMMAND_DATA_EOS
302 302 assert datafh.read(1) == b''
303 303 done = True
304 304
305 305 yield stream.makeframe(requestid=requestid,
306 306 typeid=FRAME_TYPE_COMMAND_DATA,
307 307 flags=flags,
308 308 payload=data)
309 309
310 310 if done:
311 311 break
312 312
313 313 def createbytesresponseframesfrombytes(stream, requestid, data,
314 314 maxframesize=DEFAULT_MAX_FRAME_SIZE):
315 315 """Create a raw frame to send a bytes response from static bytes input.
316 316
317 317 Returns a generator of bytearrays.
318 318 """
319 319
320 320 # Simple case of a single frame.
321 321 if len(data) <= maxframesize:
322 322 yield stream.makeframe(requestid=requestid,
323 323 typeid=FRAME_TYPE_BYTES_RESPONSE,
324 324 flags=FLAG_BYTES_RESPONSE_EOS,
325 325 payload=data)
326 326 return
327 327
328 328 offset = 0
329 329 while True:
330 330 chunk = data[offset:offset + maxframesize]
331 331 offset += len(chunk)
332 332 done = offset == len(data)
333 333
334 334 if done:
335 335 flags = FLAG_BYTES_RESPONSE_EOS
336 336 else:
337 337 flags = FLAG_BYTES_RESPONSE_CONTINUATION
338 338
339 339 yield stream.makeframe(requestid=requestid,
340 340 typeid=FRAME_TYPE_BYTES_RESPONSE,
341 341 flags=flags,
342 342 payload=chunk)
343 343
344 344 if done:
345 345 break
346 346
347 347 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
348 348 # TODO properly handle frame size limits.
349 349 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
350 350
351 351 flags = 0
352 352 if protocol:
353 353 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
354 354 if application:
355 355 flags |= FLAG_ERROR_RESPONSE_APPLICATION
356 356
357 357 yield stream.makeframe(requestid=requestid,
358 358 typeid=FRAME_TYPE_ERROR_RESPONSE,
359 359 flags=flags,
360 360 payload=msg)
361 361
362 362 def createtextoutputframe(stream, requestid, atoms):
363 363 """Create a text output frame to render text to people.
364 364
365 365 ``atoms`` is a 3-tuple of (formatting string, args, labels).
366 366
367 367 The formatting string contains ``%s`` tokens to be replaced by the
368 368 corresponding indexed entry in ``args``. ``labels`` is an iterable of
369 369 formatters to be applied at rendering time. In terms of the ``ui``
370 370 class, each atom corresponds to a ``ui.write()``.
371 371 """
372 372 bytesleft = DEFAULT_MAX_FRAME_SIZE
373 373 atomchunks = []
374 374
375 375 for (formatting, args, labels) in atoms:
376 376 if len(args) > 255:
377 377 raise ValueError('cannot use more than 255 formatting arguments')
378 378 if len(labels) > 255:
379 379 raise ValueError('cannot use more than 255 labels')
380 380
381 381 # TODO look for localstr, other types here?
382 382
383 383 if not isinstance(formatting, bytes):
384 384 raise ValueError('must use bytes formatting strings')
385 385 for arg in args:
386 386 if not isinstance(arg, bytes):
387 387 raise ValueError('must use bytes for arguments')
388 388 for label in labels:
389 389 if not isinstance(label, bytes):
390 390 raise ValueError('must use bytes for labels')
391 391
392 392 # Formatting string must be UTF-8.
393 393 formatting = formatting.decode(r'utf-8', r'replace').encode(r'utf-8')
394 394
395 395 # Arguments must be UTF-8.
396 396 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
397 397
398 398 # Labels must be ASCII.
399 399 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
400 400 for l in labels]
401 401
402 402 if len(formatting) > 65535:
403 403 raise ValueError('formatting string cannot be longer than 64k')
404 404
405 405 if any(len(a) > 65535 for a in args):
406 406 raise ValueError('argument string cannot be longer than 64k')
407 407
408 408 if any(len(l) > 255 for l in labels):
409 409 raise ValueError('label string cannot be longer than 255 bytes')
410 410
411 411 chunks = [
412 412 struct.pack(r'<H', len(formatting)),
413 413 struct.pack(r'<BB', len(labels), len(args)),
414 414 struct.pack(r'<' + r'B' * len(labels), *map(len, labels)),
415 415 struct.pack(r'<' + r'H' * len(args), *map(len, args)),
416 416 ]
417 417 chunks.append(formatting)
418 418 chunks.extend(labels)
419 419 chunks.extend(args)
420 420
421 421 atom = b''.join(chunks)
422 422 atomchunks.append(atom)
423 423 bytesleft -= len(atom)
424 424
425 425 if bytesleft < 0:
426 426 raise ValueError('cannot encode data in a single frame')
427 427
428 428 yield stream.makeframe(requestid=requestid,
429 429 typeid=FRAME_TYPE_TEXT_OUTPUT,
430 430 flags=0,
431 431 payload=b''.join(atomchunks))
432 432
433 433 class stream(object):
434 434 """Represents a logical unidirectional series of frames."""
435 435
436 436 def __init__(self, streamid, active=False):
437 437 self.streamid = streamid
438 438 self._active = False
439 439
440 440 def makeframe(self, requestid, typeid, flags, payload):
441 441 """Create a frame to be sent out over this stream.
442 442
443 443 Only returns the frame instance. Does not actually send it.
444 444 """
445 445 streamflags = 0
446 446 if not self._active:
447 447 streamflags |= STREAM_FLAG_BEGIN_STREAM
448 448 self._active = True
449 449
450 450 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
451 451 payload)
452 452
453 453 def ensureserverstream(stream):
454 454 if stream.streamid % 2:
455 455 raise error.ProgrammingError('server should only write to even '
456 456 'numbered streams; %d is not even' %
457 457 stream.streamid)
458 458
459 459 class serverreactor(object):
460 460 """Holds state of a server handling frame-based protocol requests.
461 461
462 462 This class is the "brain" of the unified frame-based protocol server
463 463 component. While the protocol is stateless from the perspective of
464 464 requests/commands, something needs to track which frames have been
465 465 received, what frames to expect, etc. This class is that thing.
466 466
467 467 Instances are modeled as a state machine of sorts. Instances are also
468 468 reactionary to external events. The point of this class is to encapsulate
469 469 the state of the connection and the exchange of frames, not to perform
470 470 work. Instead, callers tell this class when something occurs, like a
471 471 frame arriving. If that activity is worthy of a follow-up action (say
472 472 *run a command*), the return value of that handler will say so.
473 473
474 474 I/O and CPU intensive operations are purposefully delegated outside of
475 475 this class.
476 476
477 477 Consumers are expected to tell instances when events occur. They do so by
478 478 calling the various ``on*`` methods. These methods return a 2-tuple
479 479 describing any follow-up action(s) to take. The first element is the
480 480 name of an action to perform. The second is a data structure (usually
481 481 a dict) specific to that action that contains more information. e.g.
482 482 if the server wants to send frames back to the client, the data structure
483 483 will contain a reference to those frames.
484 484
485 485 Valid actions that consumers can be instructed to take are:
486 486
487 487 sendframes
488 488 Indicates that frames should be sent to the client. The ``framegen``
489 489 key contains a generator of frames that should be sent. The server
490 490 assumes that all frames are sent to the client.
491 491
492 492 error
493 493 Indicates that an error occurred. Consumer should probably abort.
494 494
495 495 runcommand
496 496 Indicates that the consumer should run a wire protocol command. Details
497 497 of the command to run are given in the data structure.
498 498
499 499 wantframe
500 500 Indicates that nothing of interest happened and the server is waiting on
501 501 more frames from the client before anything interesting can be done.
502 502
503 503 noop
504 504 Indicates no additional action is required.
505 505
506 506 Known Issues
507 507 ------------
508 508
509 509 There are no limits to the number of partially received commands or their
510 510 size. A malicious client could stream command request data and exhaust the
511 511 server's memory.
512 512
513 513 Partially received commands are not acted upon when end of input is
514 514 reached. Should the server error if it receives a partial request?
515 515 Should the client send a message to abort a partially transmitted request
516 516 to facilitate graceful shutdown?
517 517
518 518 Active requests that haven't been responded to aren't tracked. This means
519 519 that if we receive a command and instruct its dispatch, another command
520 520 with its request ID can come in over the wire and there will be a race
521 521 between who responds to what.
522 522 """
523 523
524 524 def __init__(self, deferoutput=False):
525 525 """Construct a new server reactor.
526 526
527 527 ``deferoutput`` can be used to indicate that no output frames should be
528 528 instructed to be sent until input has been exhausted. In this mode,
529 529 events that would normally generate output frames (such as a command
530 530 response being ready) will instead defer instructing the consumer to
531 531 send those frames. This is useful for half-duplex transports where the
532 532 sender cannot receive until all data has been transmitted.
533 533 """
534 534 self._deferoutput = deferoutput
535 535 self._state = 'idle'
536 self._nextoutgoingstreamid = 2
536 537 self._bufferedframegens = []
537 538 # stream id -> stream instance for all active streams from the client.
538 539 self._incomingstreams = {}
540 self._outgoingstreams = {}
539 541 # request id -> dict of commands that are actively being received.
540 542 self._receivingcommands = {}
541 543 # Request IDs that have been received and are actively being processed.
542 544 # Once all output for a request has been sent, it is removed from this
543 545 # set.
544 546 self._activecommands = set()
545 547
546 548 def onframerecv(self, frame):
547 549 """Process a frame that has been received off the wire.
548 550
549 551 Returns a dict with an ``action`` key that details what action,
550 552 if any, the consumer should take next.
551 553 """
552 554 if not frame.streamid % 2:
553 555 self._state = 'errored'
554 556 return self._makeerrorresult(
555 557 _('received frame with even numbered stream ID: %d') %
556 558 frame.streamid)
557 559
558 560 if frame.streamid not in self._incomingstreams:
559 561 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
560 562 self._state = 'errored'
561 563 return self._makeerrorresult(
562 564 _('received frame on unknown inactive stream without '
563 565 'beginning of stream flag set'))
564 566
565 567 self._incomingstreams[frame.streamid] = stream(frame.streamid)
566 568
567 569 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
568 570 # TODO handle decoding frames
569 571 self._state = 'errored'
570 572 raise error.ProgrammingError('support for decoding stream payloads '
571 573 'not yet implemented')
572 574
573 575 if frame.streamflags & STREAM_FLAG_END_STREAM:
574 576 del self._incomingstreams[frame.streamid]
575 577
576 578 handlers = {
577 579 'idle': self._onframeidle,
578 580 'command-receiving': self._onframecommandreceiving,
579 581 'errored': self._onframeerrored,
580 582 }
581 583
582 584 meth = handlers.get(self._state)
583 585 if not meth:
584 586 raise error.ProgrammingError('unhandled state: %s' % self._state)
585 587
586 588 return meth(frame)
587 589
588 590 def onbytesresponseready(self, stream, requestid, data):
589 591 """Signal that a bytes response is ready to be sent to the client.
590 592
591 593 The raw bytes response is passed as an argument.
592 594 """
593 595 ensureserverstream(stream)
594 596
595 597 def sendframes():
596 598 for frame in createbytesresponseframesfrombytes(stream, requestid,
597 599 data):
598 600 yield frame
599 601
600 602 self._activecommands.remove(requestid)
601 603
602 604 result = sendframes()
603 605
604 606 if self._deferoutput:
605 607 self._bufferedframegens.append(result)
606 608 return 'noop', {}
607 609 else:
608 610 return 'sendframes', {
609 611 'framegen': result,
610 612 }
611 613
612 614 def oninputeof(self):
613 615 """Signals that end of input has been received.
614 616
615 617 No more frames will be received. All pending activity should be
616 618 completed.
617 619 """
618 620 # TODO should we do anything about in-flight commands?
619 621
620 622 if not self._deferoutput or not self._bufferedframegens:
621 623 return 'noop', {}
622 624
623 625 # If we buffered all our responses, emit those.
624 626 def makegen():
625 627 for gen in self._bufferedframegens:
626 628 for frame in gen:
627 629 yield frame
628 630
629 631 return 'sendframes', {
630 632 'framegen': makegen(),
631 633 }
632 634
633 635 def onapplicationerror(self, stream, requestid, msg):
634 636 ensureserverstream(stream)
635 637
636 638 return 'sendframes', {
637 639 'framegen': createerrorframe(stream, requestid, msg,
638 640 application=True),
639 641 }
640 642
643 def makeoutputstream(self):
644 """Create a stream to be used for sending data to the client."""
645 streamid = self._nextoutgoingstreamid
646 self._nextoutgoingstreamid += 2
647
648 s = stream(streamid)
649 self._outgoingstreams[streamid] = s
650
651 return s
652
641 653 def _makeerrorresult(self, msg):
642 654 return 'error', {
643 655 'message': msg,
644 656 }
645 657
646 658 def _makeruncommandresult(self, requestid):
647 659 entry = self._receivingcommands[requestid]
648 660 del self._receivingcommands[requestid]
649 661
650 662 if self._receivingcommands:
651 663 self._state = 'command-receiving'
652 664 else:
653 665 self._state = 'idle'
654 666
655 667 assert requestid not in self._activecommands
656 668 self._activecommands.add(requestid)
657 669
658 670 return 'runcommand', {
659 671 'requestid': requestid,
660 672 'command': entry['command'],
661 673 'args': entry['args'],
662 674 'data': entry['data'].getvalue() if entry['data'] else None,
663 675 }
664 676
665 677 def _makewantframeresult(self):
666 678 return 'wantframe', {
667 679 'state': self._state,
668 680 }
669 681
670 682 def _onframeidle(self, frame):
671 683 # The only frame type that should be received in this state is a
672 684 # command request.
673 685 if frame.typeid != FRAME_TYPE_COMMAND_NAME:
674 686 self._state = 'errored'
675 687 return self._makeerrorresult(
676 688 _('expected command frame; got %d') % frame.typeid)
677 689
678 690 if frame.requestid in self._receivingcommands:
679 691 self._state = 'errored'
680 692 return self._makeerrorresult(
681 693 _('request with ID %d already received') % frame.requestid)
682 694
683 695 if frame.requestid in self._activecommands:
684 696 self._state = 'errored'
685 697 return self._makeerrorresult((
686 698 _('request with ID %d is already active') % frame.requestid))
687 699
688 700 expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
689 701 expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
690 702
691 703 self._receivingcommands[frame.requestid] = {
692 704 'command': frame.payload,
693 705 'args': {},
694 706 'data': None,
695 707 'expectingargs': expectingargs,
696 708 'expectingdata': expectingdata,
697 709 }
698 710
699 711 if frame.flags & FLAG_COMMAND_NAME_EOS:
700 712 return self._makeruncommandresult(frame.requestid)
701 713
702 714 if expectingargs or expectingdata:
703 715 self._state = 'command-receiving'
704 716 return self._makewantframeresult()
705 717 else:
706 718 self._state = 'errored'
707 719 return self._makeerrorresult(_('missing frame flags on '
708 720 'command frame'))
709 721
710 722 def _onframecommandreceiving(self, frame):
711 723 # It could be a new command request. Process it as such.
712 724 if frame.typeid == FRAME_TYPE_COMMAND_NAME:
713 725 return self._onframeidle(frame)
714 726
715 727 # All other frames should be related to a command that is currently
716 728 # receiving but is not active.
717 729 if frame.requestid in self._activecommands:
718 730 self._state = 'errored'
719 731 return self._makeerrorresult(
720 732 _('received frame for request that is still active: %d') %
721 733 frame.requestid)
722 734
723 735 if frame.requestid not in self._receivingcommands:
724 736 self._state = 'errored'
725 737 return self._makeerrorresult(
726 738 _('received frame for request that is not receiving: %d') %
727 739 frame.requestid)
728 740
729 741 entry = self._receivingcommands[frame.requestid]
730 742
731 743 if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT:
732 744 if not entry['expectingargs']:
733 745 self._state = 'errored'
734 746 return self._makeerrorresult(_(
735 747 'received command argument frame for request that is not '
736 748 'expecting arguments: %d') % frame.requestid)
737 749
738 750 return self._handlecommandargsframe(frame, entry)
739 751
740 752 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
741 753 if not entry['expectingdata']:
742 754 self._state = 'errored'
743 755 return self._makeerrorresult(_(
744 756 'received command data frame for request that is not '
745 757 'expecting data: %d') % frame.requestid)
746 758
747 759 if entry['data'] is None:
748 760 entry['data'] = util.bytesio()
749 761
750 762 return self._handlecommanddataframe(frame, entry)
751 763
752 764 def _handlecommandargsframe(self, frame, entry):
753 765 # The frame and state of command should have already been validated.
754 766 assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT
755 767
756 768 offset = 0
757 769 namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload)
758 770 offset += ARGUMENT_FRAME_HEADER.size
759 771
760 772 # The argument name MUST fit inside the frame.
761 773 argname = bytes(frame.payload[offset:offset + namesize])
762 774 offset += namesize
763 775
764 776 if len(argname) != namesize:
765 777 self._state = 'errored'
766 778 return self._makeerrorresult(_('malformed argument frame: '
767 779 'partial argument name'))
768 780
769 781 argvalue = bytes(frame.payload[offset:])
770 782
771 783 # Argument value spans multiple frames. Record our active state
772 784 # and wait for the next frame.
773 785 if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
774 786 raise error.ProgrammingError('not yet implemented')
775 787
776 788 # Common case: the argument value is completely contained in this
777 789 # frame.
778 790
779 791 if len(argvalue) != valuesize:
780 792 self._state = 'errored'
781 793 return self._makeerrorresult(_('malformed argument frame: '
782 794 'partial argument value'))
783 795
784 796 entry['args'][argname] = argvalue
785 797
786 798 if frame.flags & FLAG_COMMAND_ARGUMENT_EOA:
787 799 if entry['expectingdata']:
788 800 # TODO signal request to run a command once we don't
789 801 # buffer data frames.
790 802 return self._makewantframeresult()
791 803 else:
792 804 return self._makeruncommandresult(frame.requestid)
793 805 else:
794 806 return self._makewantframeresult()
795 807
796 808 def _handlecommanddataframe(self, frame, entry):
797 809 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
798 810
799 811 # TODO support streaming data instead of buffering it.
800 812 entry['data'].write(frame.payload)
801 813
802 814 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
803 815 return self._makewantframeresult()
804 816 elif frame.flags & FLAG_COMMAND_DATA_EOS:
805 817 entry['data'].seek(0)
806 818 return self._makeruncommandresult(frame.requestid)
807 819 else:
808 820 self._state = 'errored'
809 821 return self._makeerrorresult(_('command data frame without '
810 822 'flags'))
811 823
812 824 def _onframeerrored(self, frame):
813 825 return self._makeerrorresult(_('server already errored'))
@@ -1,1049 +1,1050
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10 import struct
11 11 import sys
12 12 import threading
13 13
14 14 from .i18n import _
15 15 from . import (
16 16 encoding,
17 17 error,
18 18 hook,
19 19 pycompat,
20 20 util,
21 21 wireproto,
22 22 wireprotoframing,
23 23 wireprototypes,
24 24 )
25 25 from .utils import (
26 26 procutil,
27 27 )
28 28
29 29 stringio = util.stringio
30 30
31 31 urlerr = util.urlerr
32 32 urlreq = util.urlreq
33 33
34 34 HTTP_OK = 200
35 35
36 36 HGTYPE = 'application/mercurial-0.1'
37 37 HGTYPE2 = 'application/mercurial-0.2'
38 38 HGERRTYPE = 'application/hg-error'
39 39 FRAMINGTYPE = b'application/mercurial-exp-framing-0002'
40 40
41 41 HTTPV2 = wireprototypes.HTTPV2
42 42 SSHV1 = wireprototypes.SSHV1
43 43 SSHV2 = wireprototypes.SSHV2
44 44
45 45 def decodevaluefromheaders(req, headerprefix):
46 46 """Decode a long value from multiple HTTP request headers.
47 47
48 48 Returns the value as a bytes, not a str.
49 49 """
50 50 chunks = []
51 51 i = 1
52 52 while True:
53 53 v = req.headers.get(b'%s-%d' % (headerprefix, i))
54 54 if v is None:
55 55 break
56 56 chunks.append(pycompat.bytesurl(v))
57 57 i += 1
58 58
59 59 return ''.join(chunks)
60 60
61 61 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
62 62 def __init__(self, req, ui, checkperm):
63 63 self._req = req
64 64 self._ui = ui
65 65 self._checkperm = checkperm
66 66
67 67 @property
68 68 def name(self):
69 69 return 'http-v1'
70 70
71 71 def getargs(self, args):
72 72 knownargs = self._args()
73 73 data = {}
74 74 keys = args.split()
75 75 for k in keys:
76 76 if k == '*':
77 77 star = {}
78 78 for key in knownargs.keys():
79 79 if key != 'cmd' and key not in keys:
80 80 star[key] = knownargs[key][0]
81 81 data['*'] = star
82 82 else:
83 83 data[k] = knownargs[k][0]
84 84 return [data[k] for k in keys]
85 85
86 86 def _args(self):
87 87 args = self._req.qsparams.asdictoflists()
88 88 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
89 89 if postlen:
90 90 args.update(urlreq.parseqs(
91 91 self._req.bodyfh.read(postlen), keep_blank_values=True))
92 92 return args
93 93
94 94 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
95 95 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
96 96 return args
97 97
98 98 def forwardpayload(self, fp):
99 99 # Existing clients *always* send Content-Length.
100 100 length = int(self._req.headers[b'Content-Length'])
101 101
102 102 # If httppostargs is used, we need to read Content-Length
103 103 # minus the amount that was consumed by args.
104 104 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
105 105 for s in util.filechunkiter(self._req.bodyfh, limit=length):
106 106 fp.write(s)
107 107
108 108 @contextlib.contextmanager
109 109 def mayberedirectstdio(self):
110 110 oldout = self._ui.fout
111 111 olderr = self._ui.ferr
112 112
113 113 out = util.stringio()
114 114
115 115 try:
116 116 self._ui.fout = out
117 117 self._ui.ferr = out
118 118 yield out
119 119 finally:
120 120 self._ui.fout = oldout
121 121 self._ui.ferr = olderr
122 122
123 123 def client(self):
124 124 return 'remote:%s:%s:%s' % (
125 125 self._req.urlscheme,
126 126 urlreq.quote(self._req.remotehost or ''),
127 127 urlreq.quote(self._req.remoteuser or ''))
128 128
129 129 def addcapabilities(self, repo, caps):
130 130 caps.append(b'batch')
131 131
132 132 caps.append('httpheader=%d' %
133 133 repo.ui.configint('server', 'maxhttpheaderlen'))
134 134 if repo.ui.configbool('experimental', 'httppostargs'):
135 135 caps.append('httppostargs')
136 136
137 137 # FUTURE advertise 0.2rx once support is implemented
138 138 # FUTURE advertise minrx and mintx after consulting config option
139 139 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
140 140
141 141 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
142 142 if compengines:
143 143 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
144 144 for e in compengines)
145 145 caps.append('compression=%s' % comptypes)
146 146
147 147 return caps
148 148
149 149 def checkperm(self, perm):
150 150 return self._checkperm(perm)
151 151
152 152 # This method exists mostly so that extensions like remotefilelog can
153 153 # disable a kludgey legacy method only over http. As of early 2018,
154 154 # there are no other known users, so with any luck we can discard this
155 155 # hook if remotefilelog becomes a first-party extension.
156 156 def iscmd(cmd):
157 157 return cmd in wireproto.commands
158 158
159 159 def handlewsgirequest(rctx, req, res, checkperm):
160 160 """Possibly process a wire protocol request.
161 161
162 162 If the current request is a wire protocol request, the request is
163 163 processed by this function.
164 164
165 165 ``req`` is a ``parsedrequest`` instance.
166 166 ``res`` is a ``wsgiresponse`` instance.
167 167
168 168 Returns a bool indicating if the request was serviced. If set, the caller
169 169 should stop processing the request, as a response has already been issued.
170 170 """
171 171 # Avoid cycle involving hg module.
172 172 from .hgweb import common as hgwebcommon
173 173
174 174 repo = rctx.repo
175 175
176 176 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
177 177 # string parameter. If it isn't present, this isn't a wire protocol
178 178 # request.
179 179 if 'cmd' not in req.qsparams:
180 180 return False
181 181
182 182 cmd = req.qsparams['cmd']
183 183
184 184 # The "cmd" request parameter is used by both the wire protocol and hgweb.
185 185 # While not all wire protocol commands are available for all transports,
186 186 # if we see a "cmd" value that resembles a known wire protocol command, we
187 187 # route it to a protocol handler. This is better than routing possible
188 188 # wire protocol requests to hgweb because it prevents hgweb from using
189 189 # known wire protocol commands and it is less confusing for machine
190 190 # clients.
191 191 if not iscmd(cmd):
192 192 return False
193 193
194 194 # The "cmd" query string argument is only valid on the root path of the
195 195 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
196 196 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
197 197 # in this case. We send an HTTP 404 for backwards compatibility reasons.
198 198 if req.dispatchpath:
199 199 res.status = hgwebcommon.statusmessage(404)
200 200 res.headers['Content-Type'] = HGTYPE
201 201 # TODO This is not a good response to issue for this request. This
202 202 # is mostly for BC for now.
203 203 res.setbodybytes('0\n%s\n' % b'Not Found')
204 204 return True
205 205
206 206 proto = httpv1protocolhandler(req, repo.ui,
207 207 lambda perm: checkperm(rctx, req, perm))
208 208
209 209 # The permissions checker should be the only thing that can raise an
210 210 # ErrorResponse. It is kind of a layer violation to catch an hgweb
211 211 # exception here. So consider refactoring into a exception type that
212 212 # is associated with the wire protocol.
213 213 try:
214 214 _callhttp(repo, req, res, proto, cmd)
215 215 except hgwebcommon.ErrorResponse as e:
216 216 for k, v in e.headers:
217 217 res.headers[k] = v
218 218 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
219 219 # TODO This response body assumes the failed command was
220 220 # "unbundle." That assumption is not always valid.
221 221 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
222 222
223 223 return True
224 224
225 225 def handlewsgiapirequest(rctx, req, res, checkperm):
226 226 """Handle requests to /api/*."""
227 227 assert req.dispatchparts[0] == b'api'
228 228
229 229 repo = rctx.repo
230 230
231 231 # This whole URL space is experimental for now. But we want to
232 232 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
233 233 if not repo.ui.configbool('experimental', 'web.apiserver'):
234 234 res.status = b'404 Not Found'
235 235 res.headers[b'Content-Type'] = b'text/plain'
236 236 res.setbodybytes(_('Experimental API server endpoint not enabled'))
237 237 return
238 238
239 239 # The URL space is /api/<protocol>/*. The structure of URLs under varies
240 240 # by <protocol>.
241 241
242 242 # Registered APIs are made available via config options of the name of
243 243 # the protocol.
244 244 availableapis = set()
245 245 for k, v in API_HANDLERS.items():
246 246 section, option = v['config']
247 247 if repo.ui.configbool(section, option):
248 248 availableapis.add(k)
249 249
250 250 # Requests to /api/ list available APIs.
251 251 if req.dispatchparts == [b'api']:
252 252 res.status = b'200 OK'
253 253 res.headers[b'Content-Type'] = b'text/plain'
254 254 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
255 255 'one of the following:\n')]
256 256 if availableapis:
257 257 lines.extend(sorted(availableapis))
258 258 else:
259 259 lines.append(_('(no available APIs)\n'))
260 260 res.setbodybytes(b'\n'.join(lines))
261 261 return
262 262
263 263 proto = req.dispatchparts[1]
264 264
265 265 if proto not in API_HANDLERS:
266 266 res.status = b'404 Not Found'
267 267 res.headers[b'Content-Type'] = b'text/plain'
268 268 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
269 269 proto, b', '.join(sorted(availableapis))))
270 270 return
271 271
272 272 if proto not in availableapis:
273 273 res.status = b'404 Not Found'
274 274 res.headers[b'Content-Type'] = b'text/plain'
275 275 res.setbodybytes(_('API %s not enabled\n') % proto)
276 276 return
277 277
278 278 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
279 279 req.dispatchparts[2:])
280 280
281 281 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
282 282 from .hgweb import common as hgwebcommon
283 283
284 284 # URL space looks like: <permissions>/<command>, where <permission> can
285 285 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
286 286
287 287 # Root URL does nothing meaningful... yet.
288 288 if not urlparts:
289 289 res.status = b'200 OK'
290 290 res.headers[b'Content-Type'] = b'text/plain'
291 291 res.setbodybytes(_('HTTP version 2 API handler'))
292 292 return
293 293
294 294 if len(urlparts) == 1:
295 295 res.status = b'404 Not Found'
296 296 res.headers[b'Content-Type'] = b'text/plain'
297 297 res.setbodybytes(_('do not know how to process %s\n') %
298 298 req.dispatchpath)
299 299 return
300 300
301 301 permission, command = urlparts[0:2]
302 302
303 303 if permission not in (b'ro', b'rw'):
304 304 res.status = b'404 Not Found'
305 305 res.headers[b'Content-Type'] = b'text/plain'
306 306 res.setbodybytes(_('unknown permission: %s') % permission)
307 307 return
308 308
309 309 if req.method != 'POST':
310 310 res.status = b'405 Method Not Allowed'
311 311 res.headers[b'Allow'] = b'POST'
312 312 res.setbodybytes(_('commands require POST requests'))
313 313 return
314 314
315 315 # At some point we'll want to use our own API instead of recycling the
316 316 # behavior of version 1 of the wire protocol...
317 317 # TODO return reasonable responses - not responses that overload the
318 318 # HTTP status line message for error reporting.
319 319 try:
320 320 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
321 321 except hgwebcommon.ErrorResponse as e:
322 322 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
323 323 for k, v in e.headers:
324 324 res.headers[k] = v
325 325 res.setbodybytes('permission denied')
326 326 return
327 327
328 328 # We have a special endpoint to reflect the request back at the client.
329 329 if command == b'debugreflect':
330 330 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
331 331 return
332 332
333 333 # Extra commands that we handle that aren't really wire protocol
334 334 # commands. Think extra hard before making this hackery available to
335 335 # extension.
336 336 extracommands = {'multirequest'}
337 337
338 338 if command not in wireproto.commands and command not in extracommands:
339 339 res.status = b'404 Not Found'
340 340 res.headers[b'Content-Type'] = b'text/plain'
341 341 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
342 342 return
343 343
344 344 repo = rctx.repo
345 345 ui = repo.ui
346 346
347 347 proto = httpv2protocolhandler(req, ui)
348 348
349 349 if (not wireproto.commands.commandavailable(command, proto)
350 350 and command not in extracommands):
351 351 res.status = b'404 Not Found'
352 352 res.headers[b'Content-Type'] = b'text/plain'
353 353 res.setbodybytes(_('invalid wire protocol command: %s') % command)
354 354 return
355 355
356 356 # TODO consider cases where proxies may add additional Accept headers.
357 357 if req.headers.get(b'Accept') != FRAMINGTYPE:
358 358 res.status = b'406 Not Acceptable'
359 359 res.headers[b'Content-Type'] = b'text/plain'
360 360 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
361 361 % FRAMINGTYPE)
362 362 return
363 363
364 364 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
365 365 res.status = b'415 Unsupported Media Type'
366 366 # TODO we should send a response with appropriate media type,
367 367 # since client does Accept it.
368 368 res.headers[b'Content-Type'] = b'text/plain'
369 369 res.setbodybytes(_('client MUST send Content-Type header with '
370 370 'value: %s\n') % FRAMINGTYPE)
371 371 return
372 372
373 373 _processhttpv2request(ui, repo, req, res, permission, command, proto)
374 374
375 375 def _processhttpv2reflectrequest(ui, repo, req, res):
376 376 """Reads unified frame protocol request and dumps out state to client.
377 377
378 378 This special endpoint can be used to help debug the wire protocol.
379 379
380 380 Instead of routing the request through the normal dispatch mechanism,
381 381 we instead read all frames, decode them, and feed them into our state
382 382 tracker. We then dump the log of all that activity back out to the
383 383 client.
384 384 """
385 385 import json
386 386
387 387 # Reflection APIs have a history of being abused, accidentally disclosing
388 388 # sensitive data, etc. So we have a config knob.
389 389 if not ui.configbool('experimental', 'web.api.debugreflect'):
390 390 res.status = b'404 Not Found'
391 391 res.headers[b'Content-Type'] = b'text/plain'
392 392 res.setbodybytes(_('debugreflect service not available'))
393 393 return
394 394
395 395 # We assume we have a unified framing protocol request body.
396 396
397 397 reactor = wireprotoframing.serverreactor()
398 398 states = []
399 399
400 400 while True:
401 401 frame = wireprotoframing.readframe(req.bodyfh)
402 402
403 403 if not frame:
404 404 states.append(b'received: <no frame>')
405 405 break
406 406
407 407 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
408 408 frame.requestid,
409 409 frame.payload))
410 410
411 411 action, meta = reactor.onframerecv(frame)
412 412 states.append(json.dumps((action, meta), sort_keys=True,
413 413 separators=(', ', ': ')))
414 414
415 415 action, meta = reactor.oninputeof()
416 416 meta['action'] = action
417 417 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
418 418
419 419 res.status = b'200 OK'
420 420 res.headers[b'Content-Type'] = b'text/plain'
421 421 res.setbodybytes(b'\n'.join(states))
422 422
423 423 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
424 424 """Post-validation handler for HTTPv2 requests.
425 425
426 426 Called when the HTTP request contains unified frame-based protocol
427 427 frames for evaluation.
428 428 """
429 429 # TODO Some HTTP clients are full duplex and can receive data before
430 430 # the entire request is transmitted. Figure out a way to indicate support
431 431 # for that so we can opt into full duplex mode.
432 432 reactor = wireprotoframing.serverreactor(deferoutput=True)
433 433 seencommand = False
434 434
435 outstream = reactor.makeoutputstream()
436
435 437 while True:
436 438 frame = wireprotoframing.readframe(req.bodyfh)
437 439 if not frame:
438 440 break
439 441
440 442 action, meta = reactor.onframerecv(frame)
441 443
442 444 if action == 'wantframe':
443 445 # Need more data before we can do anything.
444 446 continue
445 447 elif action == 'runcommand':
446 448 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
447 reqcommand, reactor, meta,
448 issubsequent=seencommand)
449 reqcommand, reactor, outstream,
450 meta, issubsequent=seencommand)
449 451
450 452 if sentoutput:
451 453 return
452 454
453 455 seencommand = True
454 456
455 457 elif action == 'error':
456 458 # TODO define proper error mechanism.
457 459 res.status = b'200 OK'
458 460 res.headers[b'Content-Type'] = b'text/plain'
459 461 res.setbodybytes(meta['message'] + b'\n')
460 462 return
461 463 else:
462 464 raise error.ProgrammingError(
463 465 'unhandled action from frame processor: %s' % action)
464 466
465 467 action, meta = reactor.oninputeof()
466 468 if action == 'sendframes':
467 469 # We assume we haven't started sending the response yet. If we're
468 470 # wrong, the response type will raise an exception.
469 471 res.status = b'200 OK'
470 472 res.headers[b'Content-Type'] = FRAMINGTYPE
471 473 res.setbodygen(meta['framegen'])
472 474 elif action == 'noop':
473 475 pass
474 476 else:
475 477 raise error.ProgrammingError('unhandled action from frame processor: %s'
476 478 % action)
477 479
478 480 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
479 command, issubsequent):
481 outstream, command, issubsequent):
480 482 """Dispatch a wire protocol command made from HTTPv2 requests.
481 483
482 484 The authenticated permission (``authedperm``) along with the original
483 485 command from the URL (``reqcommand``) are passed in.
484 486 """
485 487 # We already validated that the session has permissions to perform the
486 488 # actions in ``authedperm``. In the unified frame protocol, the canonical
487 489 # command to run is expressed in a frame. However, the URL also requested
488 490 # to run a specific command. We need to be careful that the command we
489 491 # run doesn't have permissions requirements greater than what was granted
490 492 # by ``authedperm``.
491 493 #
492 494 # Our rule for this is we only allow one command per HTTP request and
493 495 # that command must match the command in the URL. However, we make
494 496 # an exception for the ``multirequest`` URL. This URL is allowed to
495 497 # execute multiple commands. We double check permissions of each command
496 498 # as it is invoked to ensure there is no privilege escalation.
497 499 # TODO consider allowing multiple commands to regular command URLs
498 500 # iff each command is the same.
499 501
500 502 proto = httpv2protocolhandler(req, ui, args=command['args'])
501 503
502 504 if reqcommand == b'multirequest':
503 505 if not wireproto.commands.commandavailable(command['command'], proto):
504 506 # TODO proper error mechanism
505 507 res.status = b'200 OK'
506 508 res.headers[b'Content-Type'] = b'text/plain'
507 509 res.setbodybytes(_('wire protocol command not available: %s') %
508 510 command['command'])
509 511 return True
510 512
511 513 # TODO don't use assert here, since it may be elided by -O.
512 514 assert authedperm in (b'ro', b'rw')
513 515 wirecommand = wireproto.commands[command['command']]
514 516 assert wirecommand.permission in ('push', 'pull')
515 517
516 518 if authedperm == b'ro' and wirecommand.permission != 'pull':
517 519 # TODO proper error mechanism
518 520 res.status = b'403 Forbidden'
519 521 res.headers[b'Content-Type'] = b'text/plain'
520 522 res.setbodybytes(_('insufficient permissions to execute '
521 523 'command: %s') % command['command'])
522 524 return True
523 525
524 526 # TODO should we also call checkperm() here? Maybe not if we're going
525 527 # to overhaul that API. The granted scope from the URL check should
526 528 # be good enough.
527 529
528 530 else:
529 531 # Don't allow multiple commands outside of ``multirequest`` URL.
530 532 if issubsequent:
531 533 # TODO proper error mechanism
532 534 res.status = b'200 OK'
533 535 res.headers[b'Content-Type'] = b'text/plain'
534 536 res.setbodybytes(_('multiple commands cannot be issued to this '
535 537 'URL'))
536 538 return True
537 539
538 540 if reqcommand != command['command']:
539 541 # TODO define proper error mechanism
540 542 res.status = b'200 OK'
541 543 res.headers[b'Content-Type'] = b'text/plain'
542 544 res.setbodybytes(_('command in frame must match command in URL'))
543 545 return True
544 546
545 547 rsp = wireproto.dispatch(repo, proto, command['command'])
546 548
547 549 res.status = b'200 OK'
548 550 res.headers[b'Content-Type'] = FRAMINGTYPE
549 stream = wireprotoframing.stream(2)
550 551
551 552 if isinstance(rsp, wireprototypes.bytesresponse):
552 action, meta = reactor.onbytesresponseready(stream,
553 action, meta = reactor.onbytesresponseready(outstream,
553 554 command['requestid'],
554 555 rsp.data)
555 556 else:
556 557 action, meta = reactor.onapplicationerror(
557 558 _('unhandled response type from wire proto command'))
558 559
559 560 if action == 'sendframes':
560 561 res.setbodygen(meta['framegen'])
561 562 return True
562 563 elif action == 'noop':
563 564 return False
564 565 else:
565 566 raise error.ProgrammingError('unhandled event from reactor: %s' %
566 567 action)
567 568
568 569 # Maps API name to metadata so custom API can be registered.
569 570 API_HANDLERS = {
570 571 HTTPV2: {
571 572 'config': ('experimental', 'web.api.http-v2'),
572 573 'handler': _handlehttpv2request,
573 574 },
574 575 }
575 576
576 577 class httpv2protocolhandler(wireprototypes.baseprotocolhandler):
577 578 def __init__(self, req, ui, args=None):
578 579 self._req = req
579 580 self._ui = ui
580 581 self._args = args
581 582
582 583 @property
583 584 def name(self):
584 585 return HTTPV2
585 586
586 587 def getargs(self, args):
587 588 data = {}
588 589 for k in args.split():
589 590 if k == '*':
590 591 raise NotImplementedError('do not support * args')
591 592 else:
592 593 data[k] = self._args[k]
593 594
594 595 return [data[k] for k in args.split()]
595 596
596 597 def forwardpayload(self, fp):
597 598 raise NotImplementedError
598 599
599 600 @contextlib.contextmanager
600 601 def mayberedirectstdio(self):
601 602 raise NotImplementedError
602 603
603 604 def client(self):
604 605 raise NotImplementedError
605 606
606 607 def addcapabilities(self, repo, caps):
607 608 return caps
608 609
609 610 def checkperm(self, perm):
610 611 raise NotImplementedError
611 612
612 613 def _httpresponsetype(ui, req, prefer_uncompressed):
613 614 """Determine the appropriate response type and compression settings.
614 615
615 616 Returns a tuple of (mediatype, compengine, engineopts).
616 617 """
617 618 # Determine the response media type and compression engine based
618 619 # on the request parameters.
619 620 protocaps = decodevaluefromheaders(req, 'X-HgProto').split(' ')
620 621
621 622 if '0.2' in protocaps:
622 623 # All clients are expected to support uncompressed data.
623 624 if prefer_uncompressed:
624 625 return HGTYPE2, util._noopengine(), {}
625 626
626 627 # Default as defined by wire protocol spec.
627 628 compformats = ['zlib', 'none']
628 629 for cap in protocaps:
629 630 if cap.startswith('comp='):
630 631 compformats = cap[5:].split(',')
631 632 break
632 633
633 634 # Now find an agreed upon compression format.
634 635 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
635 636 if engine.wireprotosupport().name in compformats:
636 637 opts = {}
637 638 level = ui.configint('server', '%slevel' % engine.name())
638 639 if level is not None:
639 640 opts['level'] = level
640 641
641 642 return HGTYPE2, engine, opts
642 643
643 644 # No mutually supported compression format. Fall back to the
644 645 # legacy protocol.
645 646
646 647 # Don't allow untrusted settings because disabling compression or
647 648 # setting a very high compression level could lead to flooding
648 649 # the server's network or CPU.
649 650 opts = {'level': ui.configint('server', 'zliblevel')}
650 651 return HGTYPE, util.compengines['zlib'], opts
651 652
652 653 def _callhttp(repo, req, res, proto, cmd):
653 654 # Avoid cycle involving hg module.
654 655 from .hgweb import common as hgwebcommon
655 656
656 657 def genversion2(gen, engine, engineopts):
657 658 # application/mercurial-0.2 always sends a payload header
658 659 # identifying the compression engine.
659 660 name = engine.wireprotosupport().name
660 661 assert 0 < len(name) < 256
661 662 yield struct.pack('B', len(name))
662 663 yield name
663 664
664 665 for chunk in gen:
665 666 yield chunk
666 667
667 668 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
668 669 if code == HTTP_OK:
669 670 res.status = '200 Script output follows'
670 671 else:
671 672 res.status = hgwebcommon.statusmessage(code)
672 673
673 674 res.headers['Content-Type'] = contenttype
674 675
675 676 if bodybytes is not None:
676 677 res.setbodybytes(bodybytes)
677 678 if bodygen is not None:
678 679 res.setbodygen(bodygen)
679 680
680 681 if not wireproto.commands.commandavailable(cmd, proto):
681 682 setresponse(HTTP_OK, HGERRTYPE,
682 683 _('requested wire protocol command is not available over '
683 684 'HTTP'))
684 685 return
685 686
686 687 proto.checkperm(wireproto.commands[cmd].permission)
687 688
688 689 rsp = wireproto.dispatch(repo, proto, cmd)
689 690
690 691 if isinstance(rsp, bytes):
691 692 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
692 693 elif isinstance(rsp, wireprototypes.bytesresponse):
693 694 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
694 695 elif isinstance(rsp, wireprototypes.streamreslegacy):
695 696 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
696 697 elif isinstance(rsp, wireprototypes.streamres):
697 698 gen = rsp.gen
698 699
699 700 # This code for compression should not be streamres specific. It
700 701 # is here because we only compress streamres at the moment.
701 702 mediatype, engine, engineopts = _httpresponsetype(
702 703 repo.ui, req, rsp.prefer_uncompressed)
703 704 gen = engine.compressstream(gen, engineopts)
704 705
705 706 if mediatype == HGTYPE2:
706 707 gen = genversion2(gen, engine, engineopts)
707 708
708 709 setresponse(HTTP_OK, mediatype, bodygen=gen)
709 710 elif isinstance(rsp, wireprototypes.pushres):
710 711 rsp = '%d\n%s' % (rsp.res, rsp.output)
711 712 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
712 713 elif isinstance(rsp, wireprototypes.pusherr):
713 714 rsp = '0\n%s\n' % rsp.res
714 715 res.drain = True
715 716 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
716 717 elif isinstance(rsp, wireprototypes.ooberror):
717 718 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
718 719 else:
719 720 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
720 721
721 722 def _sshv1respondbytes(fout, value):
722 723 """Send a bytes response for protocol version 1."""
723 724 fout.write('%d\n' % len(value))
724 725 fout.write(value)
725 726 fout.flush()
726 727
727 728 def _sshv1respondstream(fout, source):
728 729 write = fout.write
729 730 for chunk in source.gen:
730 731 write(chunk)
731 732 fout.flush()
732 733
733 734 def _sshv1respondooberror(fout, ferr, rsp):
734 735 ferr.write(b'%s\n-\n' % rsp)
735 736 ferr.flush()
736 737 fout.write(b'\n')
737 738 fout.flush()
738 739
739 740 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
740 741 """Handler for requests services via version 1 of SSH protocol."""
741 742 def __init__(self, ui, fin, fout):
742 743 self._ui = ui
743 744 self._fin = fin
744 745 self._fout = fout
745 746
746 747 @property
747 748 def name(self):
748 749 return wireprototypes.SSHV1
749 750
750 751 def getargs(self, args):
751 752 data = {}
752 753 keys = args.split()
753 754 for n in xrange(len(keys)):
754 755 argline = self._fin.readline()[:-1]
755 756 arg, l = argline.split()
756 757 if arg not in keys:
757 758 raise error.Abort(_("unexpected parameter %r") % arg)
758 759 if arg == '*':
759 760 star = {}
760 761 for k in xrange(int(l)):
761 762 argline = self._fin.readline()[:-1]
762 763 arg, l = argline.split()
763 764 val = self._fin.read(int(l))
764 765 star[arg] = val
765 766 data['*'] = star
766 767 else:
767 768 val = self._fin.read(int(l))
768 769 data[arg] = val
769 770 return [data[k] for k in keys]
770 771
771 772 def forwardpayload(self, fpout):
772 773 # We initially send an empty response. This tells the client it is
773 774 # OK to start sending data. If a client sees any other response, it
774 775 # interprets it as an error.
775 776 _sshv1respondbytes(self._fout, b'')
776 777
777 778 # The file is in the form:
778 779 #
779 780 # <chunk size>\n<chunk>
780 781 # ...
781 782 # 0\n
782 783 count = int(self._fin.readline())
783 784 while count:
784 785 fpout.write(self._fin.read(count))
785 786 count = int(self._fin.readline())
786 787
787 788 @contextlib.contextmanager
788 789 def mayberedirectstdio(self):
789 790 yield None
790 791
791 792 def client(self):
792 793 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
793 794 return 'remote:ssh:' + client
794 795
795 796 def addcapabilities(self, repo, caps):
796 797 caps.append(b'batch')
797 798 return caps
798 799
799 800 def checkperm(self, perm):
800 801 pass
801 802
802 803 class sshv2protocolhandler(sshv1protocolhandler):
803 804 """Protocol handler for version 2 of the SSH protocol."""
804 805
805 806 @property
806 807 def name(self):
807 808 return wireprototypes.SSHV2
808 809
809 810 def _runsshserver(ui, repo, fin, fout, ev):
810 811 # This function operates like a state machine of sorts. The following
811 812 # states are defined:
812 813 #
813 814 # protov1-serving
814 815 # Server is in protocol version 1 serving mode. Commands arrive on
815 816 # new lines. These commands are processed in this state, one command
816 817 # after the other.
817 818 #
818 819 # protov2-serving
819 820 # Server is in protocol version 2 serving mode.
820 821 #
821 822 # upgrade-initial
822 823 # The server is going to process an upgrade request.
823 824 #
824 825 # upgrade-v2-filter-legacy-handshake
825 826 # The protocol is being upgraded to version 2. The server is expecting
826 827 # the legacy handshake from version 1.
827 828 #
828 829 # upgrade-v2-finish
829 830 # The upgrade to version 2 of the protocol is imminent.
830 831 #
831 832 # shutdown
832 833 # The server is shutting down, possibly in reaction to a client event.
833 834 #
834 835 # And here are their transitions:
835 836 #
836 837 # protov1-serving -> shutdown
837 838 # When server receives an empty request or encounters another
838 839 # error.
839 840 #
840 841 # protov1-serving -> upgrade-initial
841 842 # An upgrade request line was seen.
842 843 #
843 844 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
844 845 # Upgrade to version 2 in progress. Server is expecting to
845 846 # process a legacy handshake.
846 847 #
847 848 # upgrade-v2-filter-legacy-handshake -> shutdown
848 849 # Client did not fulfill upgrade handshake requirements.
849 850 #
850 851 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
851 852 # Client fulfilled version 2 upgrade requirements. Finishing that
852 853 # upgrade.
853 854 #
854 855 # upgrade-v2-finish -> protov2-serving
855 856 # Protocol upgrade to version 2 complete. Server can now speak protocol
856 857 # version 2.
857 858 #
858 859 # protov2-serving -> protov1-serving
859 860 # Ths happens by default since protocol version 2 is the same as
860 861 # version 1 except for the handshake.
861 862
862 863 state = 'protov1-serving'
863 864 proto = sshv1protocolhandler(ui, fin, fout)
864 865 protoswitched = False
865 866
866 867 while not ev.is_set():
867 868 if state == 'protov1-serving':
868 869 # Commands are issued on new lines.
869 870 request = fin.readline()[:-1]
870 871
871 872 # Empty lines signal to terminate the connection.
872 873 if not request:
873 874 state = 'shutdown'
874 875 continue
875 876
876 877 # It looks like a protocol upgrade request. Transition state to
877 878 # handle it.
878 879 if request.startswith(b'upgrade '):
879 880 if protoswitched:
880 881 _sshv1respondooberror(fout, ui.ferr,
881 882 b'cannot upgrade protocols multiple '
882 883 b'times')
883 884 state = 'shutdown'
884 885 continue
885 886
886 887 state = 'upgrade-initial'
887 888 continue
888 889
889 890 available = wireproto.commands.commandavailable(request, proto)
890 891
891 892 # This command isn't available. Send an empty response and go
892 893 # back to waiting for a new command.
893 894 if not available:
894 895 _sshv1respondbytes(fout, b'')
895 896 continue
896 897
897 898 rsp = wireproto.dispatch(repo, proto, request)
898 899
899 900 if isinstance(rsp, bytes):
900 901 _sshv1respondbytes(fout, rsp)
901 902 elif isinstance(rsp, wireprototypes.bytesresponse):
902 903 _sshv1respondbytes(fout, rsp.data)
903 904 elif isinstance(rsp, wireprototypes.streamres):
904 905 _sshv1respondstream(fout, rsp)
905 906 elif isinstance(rsp, wireprototypes.streamreslegacy):
906 907 _sshv1respondstream(fout, rsp)
907 908 elif isinstance(rsp, wireprototypes.pushres):
908 909 _sshv1respondbytes(fout, b'')
909 910 _sshv1respondbytes(fout, b'%d' % rsp.res)
910 911 elif isinstance(rsp, wireprototypes.pusherr):
911 912 _sshv1respondbytes(fout, rsp.res)
912 913 elif isinstance(rsp, wireprototypes.ooberror):
913 914 _sshv1respondooberror(fout, ui.ferr, rsp.message)
914 915 else:
915 916 raise error.ProgrammingError('unhandled response type from '
916 917 'wire protocol command: %s' % rsp)
917 918
918 919 # For now, protocol version 2 serving just goes back to version 1.
919 920 elif state == 'protov2-serving':
920 921 state = 'protov1-serving'
921 922 continue
922 923
923 924 elif state == 'upgrade-initial':
924 925 # We should never transition into this state if we've switched
925 926 # protocols.
926 927 assert not protoswitched
927 928 assert proto.name == wireprototypes.SSHV1
928 929
929 930 # Expected: upgrade <token> <capabilities>
930 931 # If we get something else, the request is malformed. It could be
931 932 # from a future client that has altered the upgrade line content.
932 933 # We treat this as an unknown command.
933 934 try:
934 935 token, caps = request.split(b' ')[1:]
935 936 except ValueError:
936 937 _sshv1respondbytes(fout, b'')
937 938 state = 'protov1-serving'
938 939 continue
939 940
940 941 # Send empty response if we don't support upgrading protocols.
941 942 if not ui.configbool('experimental', 'sshserver.support-v2'):
942 943 _sshv1respondbytes(fout, b'')
943 944 state = 'protov1-serving'
944 945 continue
945 946
946 947 try:
947 948 caps = urlreq.parseqs(caps)
948 949 except ValueError:
949 950 _sshv1respondbytes(fout, b'')
950 951 state = 'protov1-serving'
951 952 continue
952 953
953 954 # We don't see an upgrade request to protocol version 2. Ignore
954 955 # the upgrade request.
955 956 wantedprotos = caps.get(b'proto', [b''])[0]
956 957 if SSHV2 not in wantedprotos:
957 958 _sshv1respondbytes(fout, b'')
958 959 state = 'protov1-serving'
959 960 continue
960 961
961 962 # It looks like we can honor this upgrade request to protocol 2.
962 963 # Filter the rest of the handshake protocol request lines.
963 964 state = 'upgrade-v2-filter-legacy-handshake'
964 965 continue
965 966
966 967 elif state == 'upgrade-v2-filter-legacy-handshake':
967 968 # Client should have sent legacy handshake after an ``upgrade``
968 969 # request. Expected lines:
969 970 #
970 971 # hello
971 972 # between
972 973 # pairs 81
973 974 # 0000...-0000...
974 975
975 976 ok = True
976 977 for line in (b'hello', b'between', b'pairs 81'):
977 978 request = fin.readline()[:-1]
978 979
979 980 if request != line:
980 981 _sshv1respondooberror(fout, ui.ferr,
981 982 b'malformed handshake protocol: '
982 983 b'missing %s' % line)
983 984 ok = False
984 985 state = 'shutdown'
985 986 break
986 987
987 988 if not ok:
988 989 continue
989 990
990 991 request = fin.read(81)
991 992 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
992 993 _sshv1respondooberror(fout, ui.ferr,
993 994 b'malformed handshake protocol: '
994 995 b'missing between argument value')
995 996 state = 'shutdown'
996 997 continue
997 998
998 999 state = 'upgrade-v2-finish'
999 1000 continue
1000 1001
1001 1002 elif state == 'upgrade-v2-finish':
1002 1003 # Send the upgrade response.
1003 1004 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
1004 1005 servercaps = wireproto.capabilities(repo, proto)
1005 1006 rsp = b'capabilities: %s' % servercaps.data
1006 1007 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
1007 1008 fout.flush()
1008 1009
1009 1010 proto = sshv2protocolhandler(ui, fin, fout)
1010 1011 protoswitched = True
1011 1012
1012 1013 state = 'protov2-serving'
1013 1014 continue
1014 1015
1015 1016 elif state == 'shutdown':
1016 1017 break
1017 1018
1018 1019 else:
1019 1020 raise error.ProgrammingError('unhandled ssh server state: %s' %
1020 1021 state)
1021 1022
1022 1023 class sshserver(object):
1023 1024 def __init__(self, ui, repo, logfh=None):
1024 1025 self._ui = ui
1025 1026 self._repo = repo
1026 1027 self._fin = ui.fin
1027 1028 self._fout = ui.fout
1028 1029
1029 1030 # Log write I/O to stdout and stderr if configured.
1030 1031 if logfh:
1031 1032 self._fout = util.makeloggingfileobject(
1032 1033 logfh, self._fout, 'o', logdata=True)
1033 1034 ui.ferr = util.makeloggingfileobject(
1034 1035 logfh, ui.ferr, 'e', logdata=True)
1035 1036
1036 1037 hook.redirect(True)
1037 1038 ui.fout = repo.ui.fout = ui.ferr
1038 1039
1039 1040 # Prevent insertion/deletion of CRs
1040 1041 procutil.setbinary(self._fin)
1041 1042 procutil.setbinary(self._fout)
1042 1043
1043 1044 def serve_forever(self):
1044 1045 self.serveuntil(threading.Event())
1045 1046 sys.exit(0)
1046 1047
1047 1048 def serveuntil(self, ev):
1048 1049 """Serve until a threading.Event is set."""
1049 1050 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
@@ -1,564 +1,564
1 1 $ HTTPV2=exp-http-v2-0001
2 2 $ MEDIATYPE=application/mercurial-exp-framing-0002
3 3
4 4 $ send() {
5 5 > hg --verbose debugwireproto --peer raw http://$LOCALIP:$HGPORT/
6 6 > }
7 7
8 8 $ cat > dummycommands.py << EOF
9 9 > from mercurial import wireprototypes, wireproto
10 10 > @wireproto.wireprotocommand('customreadonly', permission='pull')
11 11 > def customreadonly(repo, proto):
12 12 > return wireprototypes.bytesresponse(b'customreadonly bytes response')
13 13 > @wireproto.wireprotocommand('customreadwrite', permission='push')
14 14 > def customreadwrite(repo, proto):
15 15 > return wireprototypes.bytesresponse(b'customreadwrite bytes response')
16 16 > EOF
17 17
18 18 $ cat >> $HGRCPATH << EOF
19 19 > [extensions]
20 20 > dummycommands = $TESTTMP/dummycommands.py
21 21 > EOF
22 22
23 23 $ hg init server
24 24 $ cat > server/.hg/hgrc << EOF
25 25 > [experimental]
26 26 > web.apiserver = true
27 27 > EOF
28 28 $ hg -R server serve -p $HGPORT -d --pid-file hg.pid
29 29 $ cat hg.pid > $DAEMON_PIDS
30 30
31 31 HTTP v2 protocol not enabled by default
32 32
33 33 $ send << EOF
34 34 > httprequest GET api/$HTTPV2
35 35 > user-agent: test
36 36 > EOF
37 37 using raw connection to peer
38 38 s> GET /api/exp-http-v2-0001 HTTP/1.1\r\n
39 39 s> Accept-Encoding: identity\r\n
40 40 s> user-agent: test\r\n
41 41 s> host: $LOCALIP:$HGPORT\r\n (glob)
42 42 s> \r\n
43 43 s> makefile('rb', None)
44 44 s> HTTP/1.1 404 Not Found\r\n
45 45 s> Server: testing stub value\r\n
46 46 s> Date: $HTTP_DATE$\r\n
47 47 s> Content-Type: text/plain\r\n
48 48 s> Content-Length: 33\r\n
49 49 s> \r\n
50 50 s> API exp-http-v2-0001 not enabled\n
51 51
52 52 Restart server with support for HTTP v2 API
53 53
54 54 $ killdaemons.py
55 55 $ cat > server/.hg/hgrc << EOF
56 56 > [experimental]
57 57 > web.apiserver = true
58 58 > web.api.http-v2 = true
59 59 > EOF
60 60
61 61 $ hg -R server serve -p $HGPORT -d --pid-file hg.pid
62 62 $ cat hg.pid > $DAEMON_PIDS
63 63
64 64 Request to unknown command yields 404
65 65
66 66 $ send << EOF
67 67 > httprequest POST api/$HTTPV2/ro/badcommand
68 68 > user-agent: test
69 69 > EOF
70 70 using raw connection to peer
71 71 s> POST /api/exp-http-v2-0001/ro/badcommand HTTP/1.1\r\n
72 72 s> Accept-Encoding: identity\r\n
73 73 s> user-agent: test\r\n
74 74 s> host: $LOCALIP:$HGPORT\r\n (glob)
75 75 s> \r\n
76 76 s> makefile('rb', None)
77 77 s> HTTP/1.1 404 Not Found\r\n
78 78 s> Server: testing stub value\r\n
79 79 s> Date: $HTTP_DATE$\r\n
80 80 s> Content-Type: text/plain\r\n
81 81 s> Content-Length: 42\r\n
82 82 s> \r\n
83 83 s> unknown wire protocol command: badcommand\n
84 84
85 85 GET to read-only command yields a 405
86 86
87 87 $ send << EOF
88 88 > httprequest GET api/$HTTPV2/ro/customreadonly
89 89 > user-agent: test
90 90 > EOF
91 91 using raw connection to peer
92 92 s> GET /api/exp-http-v2-0001/ro/customreadonly HTTP/1.1\r\n
93 93 s> Accept-Encoding: identity\r\n
94 94 s> user-agent: test\r\n
95 95 s> host: $LOCALIP:$HGPORT\r\n (glob)
96 96 s> \r\n
97 97 s> makefile('rb', None)
98 98 s> HTTP/1.1 405 Method Not Allowed\r\n
99 99 s> Server: testing stub value\r\n
100 100 s> Date: $HTTP_DATE$\r\n
101 101 s> Allow: POST\r\n
102 102 s> Content-Length: 30\r\n
103 103 s> \r\n
104 104 s> commands require POST requests
105 105
106 106 Missing Accept header results in 406
107 107
108 108 $ send << EOF
109 109 > httprequest POST api/$HTTPV2/ro/customreadonly
110 110 > user-agent: test
111 111 > EOF
112 112 using raw connection to peer
113 113 s> POST /api/exp-http-v2-0001/ro/customreadonly HTTP/1.1\r\n
114 114 s> Accept-Encoding: identity\r\n
115 115 s> user-agent: test\r\n
116 116 s> host: $LOCALIP:$HGPORT\r\n (glob)
117 117 s> \r\n
118 118 s> makefile('rb', None)
119 119 s> HTTP/1.1 406 Not Acceptable\r\n
120 120 s> Server: testing stub value\r\n
121 121 s> Date: $HTTP_DATE$\r\n
122 122 s> Content-Type: text/plain\r\n
123 123 s> Content-Length: 85\r\n
124 124 s> \r\n
125 125 s> client MUST specify Accept header with value: application/mercurial-exp-framing-0002\n
126 126
127 127 Bad Accept header results in 406
128 128
129 129 $ send << EOF
130 130 > httprequest POST api/$HTTPV2/ro/customreadonly
131 131 > accept: invalid
132 132 > user-agent: test
133 133 > EOF
134 134 using raw connection to peer
135 135 s> POST /api/exp-http-v2-0001/ro/customreadonly HTTP/1.1\r\n
136 136 s> Accept-Encoding: identity\r\n
137 137 s> accept: invalid\r\n
138 138 s> user-agent: test\r\n
139 139 s> host: $LOCALIP:$HGPORT\r\n (glob)
140 140 s> \r\n
141 141 s> makefile('rb', None)
142 142 s> HTTP/1.1 406 Not Acceptable\r\n
143 143 s> Server: testing stub value\r\n
144 144 s> Date: $HTTP_DATE$\r\n
145 145 s> Content-Type: text/plain\r\n
146 146 s> Content-Length: 85\r\n
147 147 s> \r\n
148 148 s> client MUST specify Accept header with value: application/mercurial-exp-framing-0002\n
149 149
150 150 Bad Content-Type header results in 415
151 151
152 152 $ send << EOF
153 153 > httprequest POST api/$HTTPV2/ro/customreadonly
154 154 > accept: $MEDIATYPE
155 155 > user-agent: test
156 156 > content-type: badmedia
157 157 > EOF
158 158 using raw connection to peer
159 159 s> POST /api/exp-http-v2-0001/ro/customreadonly HTTP/1.1\r\n
160 160 s> Accept-Encoding: identity\r\n
161 161 s> accept: application/mercurial-exp-framing-0002\r\n
162 162 s> content-type: badmedia\r\n
163 163 s> user-agent: test\r\n
164 164 s> host: $LOCALIP:$HGPORT\r\n (glob)
165 165 s> \r\n
166 166 s> makefile('rb', None)
167 167 s> HTTP/1.1 415 Unsupported Media Type\r\n
168 168 s> Server: testing stub value\r\n
169 169 s> Date: $HTTP_DATE$\r\n
170 170 s> Content-Type: text/plain\r\n
171 171 s> Content-Length: 88\r\n
172 172 s> \r\n
173 173 s> client MUST send Content-Type header with value: application/mercurial-exp-framing-0002\n
174 174
175 175 Request to read-only command works out of the box
176 176
177 177 $ send << EOF
178 178 > httprequest POST api/$HTTPV2/ro/customreadonly
179 179 > accept: $MEDIATYPE
180 180 > content-type: $MEDIATYPE
181 181 > user-agent: test
182 182 > frame 1 1 stream-begin command-name eos customreadonly
183 183 > EOF
184 184 using raw connection to peer
185 185 s> POST /api/exp-http-v2-0001/ro/customreadonly HTTP/1.1\r\n
186 186 s> Accept-Encoding: identity\r\n
187 187 s> accept: application/mercurial-exp-framing-0002\r\n
188 188 s> content-type: application/mercurial-exp-framing-0002\r\n
189 189 s> user-agent: test\r\n
190 190 s> *\r\n (glob)
191 191 s> host: $LOCALIP:$HGPORT\r\n (glob)
192 192 s> \r\n
193 193 s> \x0e\x00\x00\x01\x00\x01\x01\x11customreadonly
194 194 s> makefile('rb', None)
195 195 s> HTTP/1.1 200 OK\r\n
196 196 s> Server: testing stub value\r\n
197 197 s> Date: $HTTP_DATE$\r\n
198 198 s> Content-Type: application/mercurial-exp-framing-0002\r\n
199 199 s> Transfer-Encoding: chunked\r\n
200 200 s> \r\n
201 201 s> 25\r\n
202 202 s> \x1d\x00\x00\x01\x00\x02\x01Bcustomreadonly bytes response
203 203 s> \r\n
204 204 s> 0\r\n
205 205 s> \r\n
206 206
207 207 Request to read-write command fails because server is read-only by default
208 208
209 209 GET to read-write request yields 405
210 210
211 211 $ send << EOF
212 212 > httprequest GET api/$HTTPV2/rw/customreadonly
213 213 > user-agent: test
214 214 > EOF
215 215 using raw connection to peer
216 216 s> GET /api/exp-http-v2-0001/rw/customreadonly HTTP/1.1\r\n
217 217 s> Accept-Encoding: identity\r\n
218 218 s> user-agent: test\r\n
219 219 s> host: $LOCALIP:$HGPORT\r\n (glob)
220 220 s> \r\n
221 221 s> makefile('rb', None)
222 222 s> HTTP/1.1 405 Method Not Allowed\r\n
223 223 s> Server: testing stub value\r\n
224 224 s> Date: $HTTP_DATE$\r\n
225 225 s> Allow: POST\r\n
226 226 s> Content-Length: 30\r\n
227 227 s> \r\n
228 228 s> commands require POST requests
229 229
230 230 Even for unknown commands
231 231
232 232 $ send << EOF
233 233 > httprequest GET api/$HTTPV2/rw/badcommand
234 234 > user-agent: test
235 235 > EOF
236 236 using raw connection to peer
237 237 s> GET /api/exp-http-v2-0001/rw/badcommand HTTP/1.1\r\n
238 238 s> Accept-Encoding: identity\r\n
239 239 s> user-agent: test\r\n
240 240 s> host: $LOCALIP:$HGPORT\r\n (glob)
241 241 s> \r\n
242 242 s> makefile('rb', None)
243 243 s> HTTP/1.1 405 Method Not Allowed\r\n
244 244 s> Server: testing stub value\r\n
245 245 s> Date: $HTTP_DATE$\r\n
246 246 s> Allow: POST\r\n
247 247 s> Content-Length: 30\r\n
248 248 s> \r\n
249 249 s> commands require POST requests
250 250
251 251 SSL required by default
252 252
253 253 $ send << EOF
254 254 > httprequest POST api/$HTTPV2/rw/customreadonly
255 255 > user-agent: test
256 256 > EOF
257 257 using raw connection to peer
258 258 s> POST /api/exp-http-v2-0001/rw/customreadonly HTTP/1.1\r\n
259 259 s> Accept-Encoding: identity\r\n
260 260 s> user-agent: test\r\n
261 261 s> host: $LOCALIP:$HGPORT\r\n (glob)
262 262 s> \r\n
263 263 s> makefile('rb', None)
264 264 s> HTTP/1.1 403 ssl required\r\n
265 265 s> Server: testing stub value\r\n
266 266 s> Date: $HTTP_DATE$\r\n
267 267 s> Content-Length: 17\r\n
268 268 s> \r\n
269 269 s> permission denied
270 270
271 271 Restart server to allow non-ssl read-write operations
272 272
273 273 $ killdaemons.py
274 274 $ cat > server/.hg/hgrc << EOF
275 275 > [experimental]
276 276 > web.apiserver = true
277 277 > web.api.http-v2 = true
278 278 > [web]
279 279 > push_ssl = false
280 280 > allow-push = *
281 281 > EOF
282 282
283 283 $ hg -R server serve -p $HGPORT -d --pid-file hg.pid -E error.log
284 284 $ cat hg.pid > $DAEMON_PIDS
285 285
286 286 Authorized request for valid read-write command works
287 287
288 288 $ send << EOF
289 289 > httprequest POST api/$HTTPV2/rw/customreadonly
290 290 > user-agent: test
291 291 > accept: $MEDIATYPE
292 292 > content-type: $MEDIATYPE
293 293 > frame 1 1 stream-begin command-name eos customreadonly
294 294 > EOF
295 295 using raw connection to peer
296 296 s> POST /api/exp-http-v2-0001/rw/customreadonly HTTP/1.1\r\n
297 297 s> Accept-Encoding: identity\r\n
298 298 s> accept: application/mercurial-exp-framing-0002\r\n
299 299 s> content-type: application/mercurial-exp-framing-0002\r\n
300 300 s> user-agent: test\r\n
301 301 s> content-length: 22\r\n
302 302 s> host: $LOCALIP:$HGPORT\r\n (glob)
303 303 s> \r\n
304 304 s> \x0e\x00\x00\x01\x00\x01\x01\x11customreadonly
305 305 s> makefile('rb', None)
306 306 s> HTTP/1.1 200 OK\r\n
307 307 s> Server: testing stub value\r\n
308 308 s> Date: $HTTP_DATE$\r\n
309 309 s> Content-Type: application/mercurial-exp-framing-0002\r\n
310 310 s> Transfer-Encoding: chunked\r\n
311 311 s> \r\n
312 312 s> 25\r\n
313 313 s> \x1d\x00\x00\x01\x00\x02\x01Bcustomreadonly bytes response
314 314 s> \r\n
315 315 s> 0\r\n
316 316 s> \r\n
317 317
318 318 Authorized request for unknown command is rejected
319 319
320 320 $ send << EOF
321 321 > httprequest POST api/$HTTPV2/rw/badcommand
322 322 > user-agent: test
323 323 > accept: $MEDIATYPE
324 324 > EOF
325 325 using raw connection to peer
326 326 s> POST /api/exp-http-v2-0001/rw/badcommand HTTP/1.1\r\n
327 327 s> Accept-Encoding: identity\r\n
328 328 s> accept: application/mercurial-exp-framing-0002\r\n
329 329 s> user-agent: test\r\n
330 330 s> host: $LOCALIP:$HGPORT\r\n (glob)
331 331 s> \r\n
332 332 s> makefile('rb', None)
333 333 s> HTTP/1.1 404 Not Found\r\n
334 334 s> Server: testing stub value\r\n
335 335 s> Date: $HTTP_DATE$\r\n
336 336 s> Content-Type: text/plain\r\n
337 337 s> Content-Length: 42\r\n
338 338 s> \r\n
339 339 s> unknown wire protocol command: badcommand\n
340 340
341 341 debugreflect isn't enabled by default
342 342
343 343 $ send << EOF
344 344 > httprequest POST api/$HTTPV2/ro/debugreflect
345 345 > user-agent: test
346 346 > EOF
347 347 using raw connection to peer
348 348 s> POST /api/exp-http-v2-0001/ro/debugreflect HTTP/1.1\r\n
349 349 s> Accept-Encoding: identity\r\n
350 350 s> user-agent: test\r\n
351 351 s> host: $LOCALIP:$HGPORT\r\n (glob)
352 352 s> \r\n
353 353 s> makefile('rb', None)
354 354 s> HTTP/1.1 404 Not Found\r\n
355 355 s> Server: testing stub value\r\n
356 356 s> Date: $HTTP_DATE$\r\n
357 357 s> Content-Type: text/plain\r\n
358 358 s> Content-Length: 34\r\n
359 359 s> \r\n
360 360 s> debugreflect service not available
361 361
362 362 Restart server to get debugreflect endpoint
363 363
364 364 $ killdaemons.py
365 365 $ cat > server/.hg/hgrc << EOF
366 366 > [experimental]
367 367 > web.apiserver = true
368 368 > web.api.debugreflect = true
369 369 > web.api.http-v2 = true
370 370 > [web]
371 371 > push_ssl = false
372 372 > allow-push = *
373 373 > EOF
374 374
375 375 $ hg -R server serve -p $HGPORT -d --pid-file hg.pid -E error.log
376 376 $ cat hg.pid > $DAEMON_PIDS
377 377
378 378 Command frames can be reflected via debugreflect
379 379
380 380 $ send << EOF
381 381 > httprequest POST api/$HTTPV2/ro/debugreflect
382 382 > accept: $MEDIATYPE
383 383 > content-type: $MEDIATYPE
384 384 > user-agent: test
385 385 > frame 1 1 stream-begin command-name have-args command1
386 386 > frame 1 1 0 command-argument 0 \x03\x00\x04\x00fooval1
387 387 > frame 1 1 0 command-argument eoa \x04\x00\x03\x00bar1val
388 388 > EOF
389 389 using raw connection to peer
390 390 s> POST /api/exp-http-v2-0001/ro/debugreflect HTTP/1.1\r\n
391 391 s> Accept-Encoding: identity\r\n
392 392 s> accept: application/mercurial-exp-framing-0002\r\n
393 393 s> content-type: application/mercurial-exp-framing-0002\r\n
394 394 s> user-agent: test\r\n
395 395 s> content-length: 54\r\n
396 396 s> host: $LOCALIP:$HGPORT\r\n (glob)
397 397 s> \r\n
398 398 s> \x08\x00\x00\x01\x00\x01\x01\x12command1\x0b\x00\x00\x01\x00\x01\x00 \x03\x00\x04\x00fooval1\x0b\x00\x00\x01\x00\x01\x00"\x04\x00\x03\x00bar1val
399 399 s> makefile('rb', None)
400 400 s> HTTP/1.1 200 OK\r\n
401 401 s> Server: testing stub value\r\n
402 402 s> Date: $HTTP_DATE$\r\n
403 403 s> Content-Type: text/plain\r\n
404 404 s> Content-Length: 322\r\n
405 405 s> \r\n
406 406 s> received: 1 2 1 command1\n
407 407 s> ["wantframe", {"state": "command-receiving"}]\n
408 408 s> received: 2 0 1 \x03\x00\x04\x00fooval1\n
409 409 s> ["wantframe", {"state": "command-receiving"}]\n
410 410 s> received: 2 2 1 \x04\x00\x03\x00bar1val\n
411 411 s> ["runcommand", {"args": {"bar1": "val", "foo": "val1"}, "command": "command1", "data": null, "requestid": 1}]\n
412 412 s> received: <no frame>\n
413 413 s> {"action": "noop"}
414 414
415 415 Multiple requests to regular command URL are not allowed
416 416
417 417 $ send << EOF
418 418 > httprequest POST api/$HTTPV2/ro/customreadonly
419 419 > accept: $MEDIATYPE
420 420 > content-type: $MEDIATYPE
421 421 > user-agent: test
422 422 > frame 1 1 stream-begin command-name eos customreadonly
423 423 > frame 3 1 0 command-name eos customreadonly
424 424 > EOF
425 425 using raw connection to peer
426 426 s> POST /api/exp-http-v2-0001/ro/customreadonly HTTP/1.1\r\n
427 427 s> Accept-Encoding: identity\r\n
428 428 s> accept: application/mercurial-exp-framing-0002\r\n
429 429 s> content-type: application/mercurial-exp-framing-0002\r\n
430 430 s> user-agent: test\r\n
431 431 s> content-length: 44\r\n
432 432 s> host: $LOCALIP:$HGPORT\r\n (glob)
433 433 s> \r\n
434 434 s> \x0e\x00\x00\x01\x00\x01\x01\x11customreadonly\x0e\x00\x00\x03\x00\x01\x00\x11customreadonly
435 435 s> makefile('rb', None)
436 436 s> HTTP/1.1 200 OK\r\n
437 437 s> Server: testing stub value\r\n
438 438 s> Date: $HTTP_DATE$\r\n
439 439 s> Content-Type: text/plain\r\n
440 440 s> Content-Length: 46\r\n
441 441 s> \r\n
442 442 s> multiple commands cannot be issued to this URL
443 443
444 444 Multiple requests to "multirequest" URL are allowed
445 445
446 446 $ send << EOF
447 447 > httprequest POST api/$HTTPV2/ro/multirequest
448 448 > accept: $MEDIATYPE
449 449 > content-type: $MEDIATYPE
450 450 > user-agent: test
451 451 > frame 1 1 stream-begin command-name eos customreadonly
452 452 > frame 3 1 0 command-name eos customreadonly
453 453 > EOF
454 454 using raw connection to peer
455 455 s> POST /api/exp-http-v2-0001/ro/multirequest HTTP/1.1\r\n
456 456 s> Accept-Encoding: identity\r\n
457 457 s> accept: application/mercurial-exp-framing-0002\r\n
458 458 s> content-type: application/mercurial-exp-framing-0002\r\n
459 459 s> user-agent: test\r\n
460 460 s> *\r\n (glob)
461 461 s> host: $LOCALIP:$HGPORT\r\n (glob)
462 462 s> \r\n
463 463 s> \x0e\x00\x00\x01\x00\x01\x01\x11customreadonly\x0e\x00\x00\x03\x00\x01\x00\x11customreadonly
464 464 s> makefile('rb', None)
465 465 s> HTTP/1.1 200 OK\r\n
466 466 s> Server: testing stub value\r\n
467 467 s> Date: $HTTP_DATE$\r\n
468 468 s> Content-Type: application/mercurial-exp-framing-0002\r\n
469 469 s> Transfer-Encoding: chunked\r\n
470 470 s> \r\n
471 471 s> *\r\n (glob)
472 472 s> \x1d\x00\x00\x01\x00\x02\x01Bcustomreadonly bytes response
473 473 s> \r\n
474 474 s> 25\r\n
475 s> \x1d\x00\x00\x03\x00\x02\x01Bcustomreadonly bytes response
475 s> \x1d\x00\x00\x03\x00\x02\x00Bcustomreadonly bytes response
476 476 s> \r\n
477 477 s> 0\r\n
478 478 s> \r\n
479 479
480 480 Interleaved requests to "multirequest" are processed
481 481
482 482 $ send << EOF
483 483 > httprequest POST api/$HTTPV2/ro/multirequest
484 484 > accept: $MEDIATYPE
485 485 > content-type: $MEDIATYPE
486 486 > user-agent: test
487 487 > frame 1 1 stream-begin command-name have-args listkeys
488 488 > frame 3 1 0 command-name have-args listkeys
489 489 > frame 3 1 0 command-argument eoa \x09\x00\x09\x00namespacebookmarks
490 490 > frame 1 1 0 command-argument eoa \x09\x00\x0a\x00namespacenamespaces
491 491 > EOF
492 492 using raw connection to peer
493 493 s> POST /api/exp-http-v2-0001/ro/multirequest HTTP/1.1\r\n
494 494 s> Accept-Encoding: identity\r\n
495 495 s> accept: application/mercurial-exp-framing-0002\r\n
496 496 s> content-type: application/mercurial-exp-framing-0002\r\n
497 497 s> user-agent: test\r\n
498 498 s> content-length: 93\r\n
499 499 s> host: $LOCALIP:$HGPORT\r\n (glob)
500 500 s> \r\n
501 501 s> \x08\x00\x00\x01\x00\x01\x01\x12listkeys\x08\x00\x00\x03\x00\x01\x00\x12listkeys\x16\x00\x00\x03\x00\x01\x00" \x00 \x00namespacebookmarks\x17\x00\x00\x01\x00\x01\x00" \x00\n
502 502 s> \x00namespacenamespaces
503 503 s> makefile('rb', None)
504 504 s> HTTP/1.1 200 OK\r\n
505 505 s> Server: testing stub value\r\n
506 506 s> Date: $HTTP_DATE$\r\n
507 507 s> Content-Type: application/mercurial-exp-framing-0002\r\n
508 508 s> Transfer-Encoding: chunked\r\n
509 509 s> \r\n
510 510 s> 8\r\n
511 511 s> \x00\x00\x00\x03\x00\x02\x01B
512 512 s> \r\n
513 513 s> 26\r\n
514 s> \x1e\x00\x00\x01\x00\x02\x01Bbookmarks \n
514 s> \x1e\x00\x00\x01\x00\x02\x00Bbookmarks \n
515 515 s> namespaces \n
516 516 s> phases
517 517 s> \r\n
518 518 s> 0\r\n
519 519 s> \r\n
520 520
521 521 Restart server to disable read-write access
522 522
523 523 $ killdaemons.py
524 524 $ cat > server/.hg/hgrc << EOF
525 525 > [experimental]
526 526 > web.apiserver = true
527 527 > web.api.debugreflect = true
528 528 > web.api.http-v2 = true
529 529 > [web]
530 530 > push_ssl = false
531 531 > EOF
532 532
533 533 $ hg -R server serve -p $HGPORT -d --pid-file hg.pid -E error.log
534 534 $ cat hg.pid > $DAEMON_PIDS
535 535
536 536 Attempting to run a read-write command via multirequest on read-only URL is not allowed
537 537
538 538 $ send << EOF
539 539 > httprequest POST api/$HTTPV2/ro/multirequest
540 540 > accept: $MEDIATYPE
541 541 > content-type: $MEDIATYPE
542 542 > user-agent: test
543 543 > frame 1 1 stream-begin command-name eos unbundle
544 544 > EOF
545 545 using raw connection to peer
546 546 s> POST /api/exp-http-v2-0001/ro/multirequest HTTP/1.1\r\n
547 547 s> Accept-Encoding: identity\r\n
548 548 s> accept: application/mercurial-exp-framing-0002\r\n
549 549 s> content-type: application/mercurial-exp-framing-0002\r\n
550 550 s> user-agent: test\r\n
551 551 s> content-length: 16\r\n
552 552 s> host: $LOCALIP:$HGPORT\r\n (glob)
553 553 s> \r\n
554 554 s> \x08\x00\x00\x01\x00\x01\x01\x11unbundle
555 555 s> makefile('rb', None)
556 556 s> HTTP/1.1 403 Forbidden\r\n
557 557 s> Server: testing stub value\r\n
558 558 s> Date: $HTTP_DATE$\r\n
559 559 s> Content-Type: text/plain\r\n
560 560 s> Content-Length: 53\r\n
561 561 s> \r\n
562 562 s> insufficient permissions to execute command: unbundle
563 563
564 564 $ cat error.log
@@ -1,684 +1,684
1 1 from __future__ import absolute_import, print_function
2 2
3 3 import unittest
4 4
5 5 from mercurial import (
6 6 util,
7 7 wireprotoframing as framing,
8 8 )
9 9
10 10 ffs = framing.makeframefromhumanstring
11 11
12 12 def makereactor(deferoutput=False):
13 13 return framing.serverreactor(deferoutput=deferoutput)
14 14
15 15 def sendframes(reactor, gen):
16 16 """Send a generator of frame bytearray to a reactor.
17 17
18 18 Emits a generator of results from ``onframerecv()`` calls.
19 19 """
20 20 for frame in gen:
21 21 header = framing.parseheader(frame)
22 22 payload = frame[framing.FRAME_HEADER_SIZE:]
23 23 assert len(payload) == header.length
24 24
25 25 yield reactor.onframerecv(framing.frame(header.requestid,
26 26 header.streamid,
27 27 header.streamflags,
28 28 header.typeid,
29 29 header.flags,
30 30 payload))
31 31
32 32 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
33 33 """Generate frames to run a command and send them to a reactor."""
34 34 return sendframes(reactor,
35 35 framing.createcommandframes(stream, rid, cmd, args,
36 36 datafh))
37 37
38 38 class FrameTests(unittest.TestCase):
39 39 def testdataexactframesize(self):
40 40 data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
41 41
42 42 stream = framing.stream(1)
43 43 frames = list(framing.createcommandframes(stream, 1, b'command',
44 44 {}, data))
45 45 self.assertEqual(frames, [
46 46 ffs(b'1 1 stream-begin command-name have-data command'),
47 47 ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
48 48 ffs(b'1 1 0 command-data eos ')
49 49 ])
50 50
51 51 def testdatamultipleframes(self):
52 52 data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
53 53
54 54 stream = framing.stream(1)
55 55 frames = list(framing.createcommandframes(stream, 1, b'command', {},
56 56 data))
57 57 self.assertEqual(frames, [
58 58 ffs(b'1 1 stream-begin command-name have-data command'),
59 59 ffs(b'1 1 0 command-data continuation %s' % (
60 60 b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
61 61 ffs(b'1 1 0 command-data eos x'),
62 62 ])
63 63
64 64 def testargsanddata(self):
65 65 data = util.bytesio(b'x' * 100)
66 66
67 67 stream = framing.stream(1)
68 68 frames = list(framing.createcommandframes(stream, 1, b'command', {
69 69 b'key1': b'key1value',
70 70 b'key2': b'key2value',
71 71 b'key3': b'key3value',
72 72 }, data))
73 73
74 74 self.assertEqual(frames, [
75 75 ffs(b'1 1 stream-begin command-name have-args|have-data command'),
76 76 ffs(br'1 1 0 command-argument 0 \x04\x00\x09\x00key1key1value'),
77 77 ffs(br'1 1 0 command-argument 0 \x04\x00\x09\x00key2key2value'),
78 78 ffs(br'1 1 0 command-argument eoa \x04\x00\x09\x00key3key3value'),
79 79 ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
80 80 ])
81 81
82 82 def testtextoutputexcessiveargs(self):
83 83 """At most 255 formatting arguments are allowed."""
84 84 with self.assertRaisesRegexp(ValueError,
85 85 'cannot use more than 255 formatting'):
86 86 args = [b'x' for i in range(256)]
87 87 list(framing.createtextoutputframe(None, 1,
88 88 [(b'bleh', args, [])]))
89 89
90 90 def testtextoutputexcessivelabels(self):
91 91 """At most 255 labels are allowed."""
92 92 with self.assertRaisesRegexp(ValueError,
93 93 'cannot use more than 255 labels'):
94 94 labels = [b'l' for i in range(256)]
95 95 list(framing.createtextoutputframe(None, 1,
96 96 [(b'bleh', [], labels)]))
97 97
98 98 def testtextoutputformattingstringtype(self):
99 99 """Formatting string must be bytes."""
100 100 with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
101 101 list(framing.createtextoutputframe(None, 1, [
102 102 (b'foo'.decode('ascii'), [], [])]))
103 103
104 104 def testtextoutputargumentbytes(self):
105 105 with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
106 106 list(framing.createtextoutputframe(None, 1, [
107 107 (b'foo', [b'foo'.decode('ascii')], [])]))
108 108
109 109 def testtextoutputlabelbytes(self):
110 110 with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
111 111 list(framing.createtextoutputframe(None, 1, [
112 112 (b'foo', [], [b'foo'.decode('ascii')])]))
113 113
114 114 def testtextoutputtoolongformatstring(self):
115 115 with self.assertRaisesRegexp(ValueError,
116 116 'formatting string cannot be longer than'):
117 117 list(framing.createtextoutputframe(None, 1, [
118 118 (b'x' * 65536, [], [])]))
119 119
120 120 def testtextoutputtoolongargumentstring(self):
121 121 with self.assertRaisesRegexp(ValueError,
122 122 'argument string cannot be longer than'):
123 123 list(framing.createtextoutputframe(None, 1, [
124 124 (b'bleh', [b'x' * 65536], [])]))
125 125
126 126 def testtextoutputtoolonglabelstring(self):
127 127 with self.assertRaisesRegexp(ValueError,
128 128 'label string cannot be longer than'):
129 129 list(framing.createtextoutputframe(None, 1, [
130 130 (b'bleh', [], [b'x' * 65536])]))
131 131
132 132 def testtextoutput1simpleatom(self):
133 133 stream = framing.stream(1)
134 134 val = list(framing.createtextoutputframe(stream, 1, [
135 135 (b'foo', [], [])]))
136 136
137 137 self.assertEqual(val, [
138 138 ffs(br'1 1 stream-begin text-output 0 \x03\x00\x00\x00foo'),
139 139 ])
140 140
141 141 def testtextoutput2simpleatoms(self):
142 142 stream = framing.stream(1)
143 143 val = list(framing.createtextoutputframe(stream, 1, [
144 144 (b'foo', [], []),
145 145 (b'bar', [], []),
146 146 ]))
147 147
148 148 self.assertEqual(val, [
149 149 ffs(br'1 1 stream-begin text-output 0 '
150 150 br'\x03\x00\x00\x00foo\x03\x00\x00\x00bar'),
151 151 ])
152 152
153 153 def testtextoutput1arg(self):
154 154 stream = framing.stream(1)
155 155 val = list(framing.createtextoutputframe(stream, 1, [
156 156 (b'foo %s', [b'val1'], []),
157 157 ]))
158 158
159 159 self.assertEqual(val, [
160 160 ffs(br'1 1 stream-begin text-output 0 '
161 161 br'\x06\x00\x00\x01\x04\x00foo %sval1'),
162 162 ])
163 163
164 164 def testtextoutput2arg(self):
165 165 stream = framing.stream(1)
166 166 val = list(framing.createtextoutputframe(stream, 1, [
167 167 (b'foo %s %s', [b'val', b'value'], []),
168 168 ]))
169 169
170 170 self.assertEqual(val, [
171 171 ffs(br'1 1 stream-begin text-output 0 '
172 172 br'\x09\x00\x00\x02\x03\x00\x05\x00foo %s %svalvalue'),
173 173 ])
174 174
175 175 def testtextoutput1label(self):
176 176 stream = framing.stream(1)
177 177 val = list(framing.createtextoutputframe(stream, 1, [
178 178 (b'foo', [], [b'label']),
179 179 ]))
180 180
181 181 self.assertEqual(val, [
182 182 ffs(br'1 1 stream-begin text-output 0 '
183 183 br'\x03\x00\x01\x00\x05foolabel'),
184 184 ])
185 185
186 186 def testargandlabel(self):
187 187 stream = framing.stream(1)
188 188 val = list(framing.createtextoutputframe(stream, 1, [
189 189 (b'foo %s', [b'arg'], [b'label']),
190 190 ]))
191 191
192 192 self.assertEqual(val, [
193 193 ffs(br'1 1 stream-begin text-output 0 '
194 194 br'\x06\x00\x01\x01\x05\x03\x00foo %slabelarg'),
195 195 ])
196 196
197 197 class ServerReactorTests(unittest.TestCase):
198 198 def _sendsingleframe(self, reactor, f):
199 199 results = list(sendframes(reactor, [f]))
200 200 self.assertEqual(len(results), 1)
201 201
202 202 return results[0]
203 203
204 204 def assertaction(self, res, expected):
205 205 self.assertIsInstance(res, tuple)
206 206 self.assertEqual(len(res), 2)
207 207 self.assertIsInstance(res[1], dict)
208 208 self.assertEqual(res[0], expected)
209 209
210 210 def assertframesequal(self, frames, framestrings):
211 211 expected = [ffs(s) for s in framestrings]
212 212 self.assertEqual(list(frames), expected)
213 213
214 214 def test1framecommand(self):
215 215 """Receiving a command in a single frame yields request to run it."""
216 216 reactor = makereactor()
217 217 stream = framing.stream(1)
218 218 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
219 219 self.assertEqual(len(results), 1)
220 220 self.assertaction(results[0], 'runcommand')
221 221 self.assertEqual(results[0][1], {
222 222 'requestid': 1,
223 223 'command': b'mycommand',
224 224 'args': {},
225 225 'data': None,
226 226 })
227 227
228 228 result = reactor.oninputeof()
229 229 self.assertaction(result, 'noop')
230 230
231 231 def test1argument(self):
232 232 reactor = makereactor()
233 233 stream = framing.stream(1)
234 234 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
235 235 {b'foo': b'bar'}))
236 236 self.assertEqual(len(results), 2)
237 237 self.assertaction(results[0], 'wantframe')
238 238 self.assertaction(results[1], 'runcommand')
239 239 self.assertEqual(results[1][1], {
240 240 'requestid': 41,
241 241 'command': b'mycommand',
242 242 'args': {b'foo': b'bar'},
243 243 'data': None,
244 244 })
245 245
246 246 def testmultiarguments(self):
247 247 reactor = makereactor()
248 248 stream = framing.stream(1)
249 249 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
250 250 {b'foo': b'bar', b'biz': b'baz'}))
251 251 self.assertEqual(len(results), 3)
252 252 self.assertaction(results[0], 'wantframe')
253 253 self.assertaction(results[1], 'wantframe')
254 254 self.assertaction(results[2], 'runcommand')
255 255 self.assertEqual(results[2][1], {
256 256 'requestid': 1,
257 257 'command': b'mycommand',
258 258 'args': {b'foo': b'bar', b'biz': b'baz'},
259 259 'data': None,
260 260 })
261 261
262 262 def testsimplecommanddata(self):
263 263 reactor = makereactor()
264 264 stream = framing.stream(1)
265 265 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
266 266 util.bytesio(b'data!')))
267 267 self.assertEqual(len(results), 2)
268 268 self.assertaction(results[0], 'wantframe')
269 269 self.assertaction(results[1], 'runcommand')
270 270 self.assertEqual(results[1][1], {
271 271 'requestid': 1,
272 272 'command': b'mycommand',
273 273 'args': {},
274 274 'data': b'data!',
275 275 })
276 276
277 277 def testmultipledataframes(self):
278 278 frames = [
279 279 ffs(b'1 1 stream-begin command-name have-data mycommand'),
280 280 ffs(b'1 1 0 command-data continuation data1'),
281 281 ffs(b'1 1 0 command-data continuation data2'),
282 282 ffs(b'1 1 0 command-data eos data3'),
283 283 ]
284 284
285 285 reactor = makereactor()
286 286 results = list(sendframes(reactor, frames))
287 287 self.assertEqual(len(results), 4)
288 288 for i in range(3):
289 289 self.assertaction(results[i], 'wantframe')
290 290 self.assertaction(results[3], 'runcommand')
291 291 self.assertEqual(results[3][1], {
292 292 'requestid': 1,
293 293 'command': b'mycommand',
294 294 'args': {},
295 295 'data': b'data1data2data3',
296 296 })
297 297
298 298 def testargumentanddata(self):
299 299 frames = [
300 300 ffs(b'1 1 stream-begin command-name have-args|have-data command'),
301 301 ffs(br'1 1 0 command-argument 0 \x03\x00\x03\x00keyval'),
302 302 ffs(br'1 1 0 command-argument eoa \x03\x00\x03\x00foobar'),
303 303 ffs(b'1 1 0 command-data continuation value1'),
304 304 ffs(b'1 1 0 command-data eos value2'),
305 305 ]
306 306
307 307 reactor = makereactor()
308 308 results = list(sendframes(reactor, frames))
309 309
310 310 self.assertaction(results[-1], 'runcommand')
311 311 self.assertEqual(results[-1][1], {
312 312 'requestid': 1,
313 313 'command': b'command',
314 314 'args': {
315 315 b'key': b'val',
316 316 b'foo': b'bar',
317 317 },
318 318 'data': b'value1value2',
319 319 })
320 320
321 321 def testunexpectedcommandargument(self):
322 322 """Command argument frame when not running a command is an error."""
323 323 result = self._sendsingleframe(
324 324 makereactor(), ffs(b'1 1 stream-begin command-argument 0 ignored'))
325 325 self.assertaction(result, 'error')
326 326 self.assertEqual(result[1], {
327 327 'message': b'expected command frame; got 2',
328 328 })
329 329
330 330 def testunexpectedcommandargumentreceiving(self):
331 331 """Same as above but the command is receiving."""
332 332 results = list(sendframes(makereactor(), [
333 333 ffs(b'1 1 stream-begin command-name have-data command'),
334 334 ffs(b'1 1 0 command-argument eoa ignored'),
335 335 ]))
336 336
337 337 self.assertaction(results[1], 'error')
338 338 self.assertEqual(results[1][1], {
339 339 'message': b'received command argument frame for request that is '
340 340 b'not expecting arguments: 1',
341 341 })
342 342
343 343 def testunexpectedcommanddata(self):
344 344 """Command argument frame when not running a command is an error."""
345 345 result = self._sendsingleframe(
346 346 makereactor(), ffs(b'1 1 stream-begin command-data 0 ignored'))
347 347 self.assertaction(result, 'error')
348 348 self.assertEqual(result[1], {
349 349 'message': b'expected command frame; got 3',
350 350 })
351 351
352 352 def testunexpectedcommanddatareceiving(self):
353 353 """Same as above except the command is receiving."""
354 354 results = list(sendframes(makereactor(), [
355 355 ffs(b'1 1 stream-begin command-name have-args command'),
356 356 ffs(b'1 1 0 command-data eos ignored'),
357 357 ]))
358 358
359 359 self.assertaction(results[1], 'error')
360 360 self.assertEqual(results[1][1], {
361 361 'message': b'received command data frame for request that is not '
362 362 b'expecting data: 1',
363 363 })
364 364
365 365 def testmissingcommandframeflags(self):
366 366 """Command name frame must have flags set."""
367 367 result = self._sendsingleframe(
368 368 makereactor(), ffs(b'1 1 stream-begin command-name 0 command'))
369 369 self.assertaction(result, 'error')
370 370 self.assertEqual(result[1], {
371 371 'message': b'missing frame flags on command frame',
372 372 })
373 373
374 374 def testconflictingrequestidallowed(self):
375 375 """Multiple fully serviced commands with same request ID is allowed."""
376 376 reactor = makereactor()
377 377 results = []
378 outstream = framing.stream(2)
378 outstream = reactor.makeoutputstream()
379 379 results.append(self._sendsingleframe(
380 380 reactor, ffs(b'1 1 stream-begin command-name eos command')))
381 381 result = reactor.onbytesresponseready(outstream, 1, b'response1')
382 382 self.assertaction(result, 'sendframes')
383 383 list(result[1]['framegen'])
384 384 results.append(self._sendsingleframe(
385 385 reactor, ffs(b'1 1 0 command-name eos command')))
386 386 result = reactor.onbytesresponseready(outstream, 1, b'response2')
387 387 self.assertaction(result, 'sendframes')
388 388 list(result[1]['framegen'])
389 389 results.append(self._sendsingleframe(
390 390 reactor, ffs(b'1 1 0 command-name eos command')))
391 391 result = reactor.onbytesresponseready(outstream, 1, b'response3')
392 392 self.assertaction(result, 'sendframes')
393 393 list(result[1]['framegen'])
394 394
395 395 for i in range(3):
396 396 self.assertaction(results[i], 'runcommand')
397 397 self.assertEqual(results[i][1], {
398 398 'requestid': 1,
399 399 'command': b'command',
400 400 'args': {},
401 401 'data': None,
402 402 })
403 403
404 404 def testconflictingrequestid(self):
405 405 """Request ID for new command matching in-flight command is illegal."""
406 406 results = list(sendframes(makereactor(), [
407 407 ffs(b'1 1 stream-begin command-name have-args command'),
408 408 ffs(b'1 1 0 command-name eos command'),
409 409 ]))
410 410
411 411 self.assertaction(results[0], 'wantframe')
412 412 self.assertaction(results[1], 'error')
413 413 self.assertEqual(results[1][1], {
414 414 'message': b'request with ID 1 already received',
415 415 })
416 416
417 417 def testinterleavedcommands(self):
418 418 results = list(sendframes(makereactor(), [
419 419 ffs(b'1 1 stream-begin command-name have-args command1'),
420 420 ffs(b'3 1 0 command-name have-args command3'),
421 421 ffs(br'1 1 0 command-argument 0 \x03\x00\x03\x00foobar'),
422 422 ffs(br'3 1 0 command-argument 0 \x03\x00\x03\x00bizbaz'),
423 423 ffs(br'3 1 0 command-argument eoa \x03\x00\x03\x00keyval'),
424 424 ffs(br'1 1 0 command-argument eoa \x04\x00\x03\x00key1val'),
425 425 ]))
426 426
427 427 self.assertEqual([t[0] for t in results], [
428 428 'wantframe',
429 429 'wantframe',
430 430 'wantframe',
431 431 'wantframe',
432 432 'runcommand',
433 433 'runcommand',
434 434 ])
435 435
436 436 self.assertEqual(results[4][1], {
437 437 'requestid': 3,
438 438 'command': 'command3',
439 439 'args': {b'biz': b'baz', b'key': b'val'},
440 440 'data': None,
441 441 })
442 442 self.assertEqual(results[5][1], {
443 443 'requestid': 1,
444 444 'command': 'command1',
445 445 'args': {b'foo': b'bar', b'key1': b'val'},
446 446 'data': None,
447 447 })
448 448
449 449 def testmissingargumentframe(self):
450 450 # This test attempts to test behavior when reactor has an incomplete
451 451 # command request waiting on argument data. But it doesn't handle that
452 452 # scenario yet. So this test does nothing of value.
453 453 frames = [
454 454 ffs(b'1 1 stream-begin command-name have-args command'),
455 455 ]
456 456
457 457 results = list(sendframes(makereactor(), frames))
458 458 self.assertaction(results[0], 'wantframe')
459 459
460 460 def testincompleteargumentname(self):
461 461 """Argument frame with incomplete name."""
462 462 frames = [
463 463 ffs(b'1 1 stream-begin command-name have-args command1'),
464 464 ffs(br'1 1 0 command-argument eoa \x04\x00\xde\xadfoo'),
465 465 ]
466 466
467 467 results = list(sendframes(makereactor(), frames))
468 468 self.assertEqual(len(results), 2)
469 469 self.assertaction(results[0], 'wantframe')
470 470 self.assertaction(results[1], 'error')
471 471 self.assertEqual(results[1][1], {
472 472 'message': b'malformed argument frame: partial argument name',
473 473 })
474 474
475 475 def testincompleteargumentvalue(self):
476 476 """Argument frame with incomplete value."""
477 477 frames = [
478 478 ffs(b'1 1 stream-begin command-name have-args command'),
479 479 ffs(br'1 1 0 command-argument eoa \x03\x00\xaa\xaafoopartialvalue'),
480 480 ]
481 481
482 482 results = list(sendframes(makereactor(), frames))
483 483 self.assertEqual(len(results), 2)
484 484 self.assertaction(results[0], 'wantframe')
485 485 self.assertaction(results[1], 'error')
486 486 self.assertEqual(results[1][1], {
487 487 'message': b'malformed argument frame: partial argument value',
488 488 })
489 489
490 490 def testmissingcommanddataframe(self):
491 491 # The reactor doesn't currently handle partially received commands.
492 492 # So this test is failing to do anything with request 1.
493 493 frames = [
494 494 ffs(b'1 1 stream-begin command-name have-data command1'),
495 495 ffs(b'3 1 0 command-name eos command2'),
496 496 ]
497 497 results = list(sendframes(makereactor(), frames))
498 498 self.assertEqual(len(results), 2)
499 499 self.assertaction(results[0], 'wantframe')
500 500 self.assertaction(results[1], 'runcommand')
501 501
502 502 def testmissingcommanddataframeflags(self):
503 503 frames = [
504 504 ffs(b'1 1 stream-begin command-name have-data command1'),
505 505 ffs(b'1 1 0 command-data 0 data'),
506 506 ]
507 507 results = list(sendframes(makereactor(), frames))
508 508 self.assertEqual(len(results), 2)
509 509 self.assertaction(results[0], 'wantframe')
510 510 self.assertaction(results[1], 'error')
511 511 self.assertEqual(results[1][1], {
512 512 'message': b'command data frame without flags',
513 513 })
514 514
515 515 def testframefornonreceivingrequest(self):
516 516 """Receiving a frame for a command that is not receiving is illegal."""
517 517 results = list(sendframes(makereactor(), [
518 518 ffs(b'1 1 stream-begin command-name eos command1'),
519 519 ffs(b'3 1 0 command-name have-data command3'),
520 520 ffs(b'5 1 0 command-argument eoa ignored'),
521 521 ]))
522 522 self.assertaction(results[2], 'error')
523 523 self.assertEqual(results[2][1], {
524 524 'message': b'received frame for request that is not receiving: 5',
525 525 })
526 526
527 527 def testsimpleresponse(self):
528 528 """Bytes response to command sends result frames."""
529 529 reactor = makereactor()
530 530 instream = framing.stream(1)
531 531 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
532 532
533 outstream = framing.stream(2)
533 outstream = reactor.makeoutputstream()
534 534 result = reactor.onbytesresponseready(outstream, 1, b'response')
535 535 self.assertaction(result, 'sendframes')
536 536 self.assertframesequal(result[1]['framegen'], [
537 537 b'1 2 stream-begin bytes-response eos response',
538 538 ])
539 539
540 540 def testmultiframeresponse(self):
541 541 """Bytes response spanning multiple frames is handled."""
542 542 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
543 543 second = b'y' * 100
544 544
545 545 reactor = makereactor()
546 546 instream = framing.stream(1)
547 547 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
548 548
549 outstream = framing.stream(2)
549 outstream = reactor.makeoutputstream()
550 550 result = reactor.onbytesresponseready(outstream, 1, first + second)
551 551 self.assertaction(result, 'sendframes')
552 552 self.assertframesequal(result[1]['framegen'], [
553 553 b'1 2 stream-begin bytes-response continuation %s' % first,
554 554 b'1 2 0 bytes-response eos %s' % second,
555 555 ])
556 556
557 557 def testapplicationerror(self):
558 558 reactor = makereactor()
559 559 instream = framing.stream(1)
560 560 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
561 561
562 outstream = framing.stream(2)
562 outstream = reactor.makeoutputstream()
563 563 result = reactor.onapplicationerror(outstream, 1, b'some message')
564 564 self.assertaction(result, 'sendframes')
565 565 self.assertframesequal(result[1]['framegen'], [
566 566 b'1 2 stream-begin error-response application some message',
567 567 ])
568 568
569 569 def test1commanddeferresponse(self):
570 570 """Responses when in deferred output mode are delayed until EOF."""
571 571 reactor = makereactor(deferoutput=True)
572 572 instream = framing.stream(1)
573 573 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
574 574 {}))
575 575 self.assertEqual(len(results), 1)
576 576 self.assertaction(results[0], 'runcommand')
577 577
578 outstream = framing.stream(2)
578 outstream = reactor.makeoutputstream()
579 579 result = reactor.onbytesresponseready(outstream, 1, b'response')
580 580 self.assertaction(result, 'noop')
581 581 result = reactor.oninputeof()
582 582 self.assertaction(result, 'sendframes')
583 583 self.assertframesequal(result[1]['framegen'], [
584 584 b'1 2 stream-begin bytes-response eos response',
585 585 ])
586 586
587 587 def testmultiplecommanddeferresponse(self):
588 588 reactor = makereactor(deferoutput=True)
589 589 instream = framing.stream(1)
590 590 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
591 591 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
592 592
593 outstream = framing.stream(2)
593 outstream = reactor.makeoutputstream()
594 594 result = reactor.onbytesresponseready(outstream, 1, b'response1')
595 595 self.assertaction(result, 'noop')
596 596 result = reactor.onbytesresponseready(outstream, 3, b'response2')
597 597 self.assertaction(result, 'noop')
598 598 result = reactor.oninputeof()
599 599 self.assertaction(result, 'sendframes')
600 600 self.assertframesequal(result[1]['framegen'], [
601 601 b'1 2 stream-begin bytes-response eos response1',
602 602 b'3 2 0 bytes-response eos response2'
603 603 ])
604 604
605 605 def testrequestidtracking(self):
606 606 reactor = makereactor(deferoutput=True)
607 607 instream = framing.stream(1)
608 608 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
609 609 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
610 610 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
611 611
612 612 # Register results for commands out of order.
613 outstream = framing.stream(2)
613 outstream = reactor.makeoutputstream()
614 614 reactor.onbytesresponseready(outstream, 3, b'response3')
615 615 reactor.onbytesresponseready(outstream, 1, b'response1')
616 616 reactor.onbytesresponseready(outstream, 5, b'response5')
617 617
618 618 result = reactor.oninputeof()
619 619 self.assertaction(result, 'sendframes')
620 620 self.assertframesequal(result[1]['framegen'], [
621 621 b'3 2 stream-begin bytes-response eos response3',
622 622 b'1 2 0 bytes-response eos response1',
623 623 b'5 2 0 bytes-response eos response5',
624 624 ])
625 625
626 626 def testduplicaterequestonactivecommand(self):
627 627 """Receiving a request ID that matches a request that isn't finished."""
628 628 reactor = makereactor()
629 629 stream = framing.stream(1)
630 630 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
631 631 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
632 632
633 633 self.assertaction(results[0], 'error')
634 634 self.assertEqual(results[0][1], {
635 635 'message': b'request with ID 1 is already active',
636 636 })
637 637
638 638 def testduplicaterequestonactivecommandnosend(self):
639 639 """Same as above but we've registered a response but haven't sent it."""
640 640 reactor = makereactor()
641 641 instream = framing.stream(1)
642 642 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
643 outstream = framing.stream(2)
643 outstream = reactor.makeoutputstream()
644 644 reactor.onbytesresponseready(outstream, 1, b'response')
645 645
646 646 # We've registered the response but haven't sent it. From the
647 647 # perspective of the reactor, the command is still active.
648 648
649 649 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
650 650 self.assertaction(results[0], 'error')
651 651 self.assertEqual(results[0][1], {
652 652 'message': b'request with ID 1 is already active',
653 653 })
654 654
655 655 def testduplicaterequestargumentframe(self):
656 656 """Variant on above except we sent an argument frame instead of name."""
657 657 reactor = makereactor()
658 658 stream = framing.stream(1)
659 659 list(sendcommandframes(reactor, stream, 1, b'command', {}))
660 660 results = list(sendframes(reactor, [
661 661 ffs(b'3 1 stream-begin command-name have-args command'),
662 662 ffs(b'1 1 0 command-argument 0 ignored'),
663 663 ]))
664 664 self.assertaction(results[0], 'wantframe')
665 665 self.assertaction(results[1], 'error')
666 666 self.assertEqual(results[1][1], {
667 667 'message': 'received frame for request that is still active: 1',
668 668 })
669 669
670 670 def testduplicaterequestaftersend(self):
671 671 """We can use a duplicate request ID after we've sent the response."""
672 672 reactor = makereactor()
673 673 instream = framing.stream(1)
674 674 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
675 outstream = framing.stream(2)
675 outstream = reactor.makeoutputstream()
676 676 res = reactor.onbytesresponseready(outstream, 1, b'response')
677 677 list(res[1]['framegen'])
678 678
679 679 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
680 680 self.assertaction(results[0], 'runcommand')
681 681
682 682 if __name__ == '__main__':
683 683 import silenttestrunner
684 684 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now