##// END OF EJS Templates
wireprotov2: move response handling out of httppeer...
Gregory Szorc -
r37737:a656cba0 default
parent child Browse files
Show More
@@ -0,0 +1,135
1 # wireprotov2peer.py - client side code for wire protocol version 2
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
7
8 from __future__ import absolute_import
9
10 from .i18n import _
11 from .thirdparty import (
12 cbor,
13 )
14 from . import (
15 error,
16 util,
17 wireprotoframing,
18 )
19
20 class clienthandler(object):
21 """Object to handle higher-level client activities.
22
23 The ``clientreactor`` is used to hold low-level state about the frame-based
24 protocol, such as which requests and streams are active. This type is used
25 for higher-level operations, such as reading frames from a socket, exposing
26 and managing a higher-level primitive for representing command responses,
27 etc. This class is what peers should probably use to bridge wire activity
28 with the higher-level peer API.
29 """
30
31 def __init__(self, ui, clientreactor):
32 self._ui = ui
33 self._reactor = clientreactor
34 self._requests = {}
35 self._futures = {}
36 self._responses = {}
37
38 def callcommand(self, command, args, f):
39 """Register a request to call a command.
40
41 Returns an iterable of frames that should be sent over the wire.
42 """
43 request, action, meta = self._reactor.callcommand(command, args)
44
45 if action != 'noop':
46 raise error.ProgrammingError('%s not yet supported' % action)
47
48 rid = request.requestid
49 self._requests[rid] = request
50 self._futures[rid] = f
51 self._responses[rid] = {
52 'cbor': False,
53 'b': util.bytesio(),
54 }
55
56 return iter(())
57
58 def flushcommands(self):
59 """Flush all queued commands.
60
61 Returns an iterable of frames that should be sent over the wire.
62 """
63 action, meta = self._reactor.flushcommands()
64
65 if action != 'sendframes':
66 raise error.ProgrammingError('%s not yet supported' % action)
67
68 return meta['framegen']
69
70 def readframe(self, fh):
71 """Attempt to read and process a frame.
72
73 Returns None if no frame was read. Presumably this means EOF.
74 """
75 frame = wireprotoframing.readframe(fh)
76 if frame is None:
77 # TODO tell reactor?
78 return
79
80 self._ui.note(_('received %r\n') % frame)
81 self._processframe(frame)
82
83 return True
84
85 def _processframe(self, frame):
86 """Process a single read frame."""
87
88 action, meta = self._reactor.onframerecv(frame)
89
90 if action == 'error':
91 e = error.RepoError(meta['message'])
92
93 if frame.requestid in self._futures:
94 self._futures[frame.requestid].set_exception(e)
95 else:
96 raise e
97
98 if frame.requestid not in self._requests:
99 raise error.ProgrammingError(
100 'received frame for unknown request; this is either a bug in '
101 'the clientreactor not screening for this or this instance was '
102 'never told about this request: %r' % frame)
103
104 response = self._responses[frame.requestid]
105
106 if action == 'responsedata':
107 response['b'].write(meta['data'])
108
109 if meta['cbor']:
110 response['cbor'] = True
111
112 if meta['eos']:
113 if meta['cbor']:
114 # If CBOR, decode every object.
115 b = response['b']
116
117 size = b.tell()
118 b.seek(0)
119
120 decoder = cbor.CBORDecoder(b)
121
122 result = []
123 while b.tell() < size:
124 result.append(decoder.decode())
125 else:
126 result = [response['b'].getvalue()]
127
128 self._futures[frame.requestid].set_result(result)
129
130 del self._requests[frame.requestid]
131 del self._futures[frame.requestid]
132
133 else:
134 raise error.ProgrammingError(
135 'unhandled action from clientreactor: %s' % action)
@@ -13,7 +13,6 import io
13 13 import os
14 14 import socket
15 15 import struct
16 import sys
17 16 import tempfile
18 17 import weakref
19 18
@@ -36,6 +35,7 from . import (
36 35 wireprotoframing,
37 36 wireprototypes,
38 37 wireprotov1peer,
38 wireprotov2peer,
39 39 wireprotov2server,
40 40 )
41 41
@@ -522,6 +522,8 def sendv2request(ui, opener, requestbui
522 522 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
523 523 buffersends=True)
524 524
525 handler = wireprotov2peer.clienthandler(ui, reactor)
526
525 527 url = '%s/%s' % (apiurl, permission)
526 528
527 529 if len(requests) > 1:
@@ -529,20 +531,11 def sendv2request(ui, opener, requestbui
529 531 else:
530 532 url += '/%s' % requests[0][0]
531 533
532 # Request ID to (request, future)
533 requestmap = {}
534
535 534 for command, args, f in requests:
536 request, action, meta = reactor.callcommand(command, args)
537 assert action == 'noop'
538
539 requestmap[request.requestid] = (request, f)
540
541 action, meta = reactor.flushcommands()
542 assert action == 'sendframes'
535 assert not list(handler.callcommand(command, args, f))
543 536
544 537 # TODO stream this.
545 body = b''.join(map(bytes, meta['framegen']))
538 body = b''.join(map(bytes, handler.flushcommands()))
546 539
547 540 # TODO modify user-agent to reflect v2
548 541 headers = {
@@ -564,7 +557,7 def sendv2request(ui, opener, requestbui
564 557 ui.traceback()
565 558 raise IOError(None, e)
566 559
567 return reactor, requestmap, res
560 return handler, res
568 561
569 562 class queuedcommandfuture(pycompat.futures.Future):
570 563 """Wraps result() on command futures to trigger submission on call."""
@@ -684,7 +677,7 class httpv2executor(object):
684 677 'pull': 'ro',
685 678 }[permissions.pop()]
686 679
687 reactor, requests, resp = sendv2request(
680 handler, resp = sendv2request(
688 681 self._ui, self._opener, self._requestbuilder, self._apiurl,
689 682 permission, calls)
690 683
@@ -692,9 +685,7 class httpv2executor(object):
692 685
693 686 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
694 687 self._responsef = self._responseexecutor.submit(self._handleresponse,
695 reactor,
696 requests,
697 resp)
688 handler, resp)
698 689
699 690 def close(self):
700 691 if self._closed:
@@ -723,62 +714,11 class httpv2executor(object):
723 714
724 715 self._futures = None
725 716
726 def _handleresponse(self, reactor, requests, resp):
717 def _handleresponse(self, handler, resp):
727 718 # Called in a thread to read the response.
728 719
729 results = {k: [] for k in requests}
730
731 while True:
732 frame = wireprotoframing.readframe(resp)
733 if frame is None:
734 break
735
736 self._ui.note(_('received %r\n') % frame)
737
738 # Guard against receiving a frame with a request ID that we
739 # didn't issue. This should never happen.
740 request, f = requests.get(frame.requestid, [None, None])
741
742 action, meta = reactor.onframerecv(frame)
743
744 if action == 'responsedata':
745 assert request.requestid == meta['request'].requestid
746
747 result = results[request.requestid]
748
749 if meta['cbor']:
750 payload = util.bytesio(meta['data'])
751
752 decoder = cbor.CBORDecoder(payload)
753 while payload.tell() + 1 < len(meta['data']):
754 try:
755 result.append(decoder.decode())
756 except Exception:
757 pycompat.future_set_exception_info(
758 f, sys.exc_info()[1:])
759 continue
760 else:
761 result.append(meta['data'])
762
763 if meta['eos']:
764 f.set_result(result)
765 del results[request.requestid]
766
767 elif action == 'error':
768 e = error.RepoError(meta['message'])
769
770 if f:
771 f.set_exception(e)
772 else:
773 raise e
774
775 else:
776 e = error.ProgrammingError('unhandled action: %s' % action)
777
778 if f:
779 f.set_exception(e)
780 else:
781 raise e
720 while handler.readframe(resp):
721 pass
782 722
783 723 # TODO implement interface for version 2 peers
784 724 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
@@ -50,7 +50,7 No arguments returns something reasonabl
50 50 received frame(size=1; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor)
51 51 s> 0\r\n
52 52 s> \r\n
53 response: []
53 response: [b'']
54 54
55 55 Single known node works
56 56
@@ -53,7 +53,7 pushkey for a bookmark works
53 53 received frame(size=*; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor) (glob)
54 54 s> 0\r\n
55 55 s> \r\n
56 response: []
56 response: [True]
57 57
58 58 $ sendhttpv2peer << EOF
59 59 > command listkeys
General Comments 0
You need to be logged in to leave comments. Login now