Show More
@@ -0,0 +1,135 | |||||
|
1 | # wireprotov2peer.py - client side code for wire protocol version 2 | |||
|
2 | # | |||
|
3 | # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> | |||
|
4 | # | |||
|
5 | # This software may be used and distributed according to the terms of the | |||
|
6 | # GNU General Public License version 2 or any later version. | |||
|
7 | ||||
|
8 | from __future__ import absolute_import | |||
|
9 | ||||
|
10 | from .i18n import _ | |||
|
11 | from .thirdparty import ( | |||
|
12 | cbor, | |||
|
13 | ) | |||
|
14 | from . import ( | |||
|
15 | error, | |||
|
16 | util, | |||
|
17 | wireprotoframing, | |||
|
18 | ) | |||
|
19 | ||||
|
20 | class clienthandler(object): | |||
|
21 | """Object to handle higher-level client activities. | |||
|
22 | ||||
|
23 | The ``clientreactor`` is used to hold low-level state about the frame-based | |||
|
24 | protocol, such as which requests and streams are active. This type is used | |||
|
25 | for higher-level operations, such as reading frames from a socket, exposing | |||
|
26 | and managing a higher-level primitive for representing command responses, | |||
|
27 | etc. This class is what peers should probably use to bridge wire activity | |||
|
28 | with the higher-level peer API. | |||
|
29 | """ | |||
|
30 | ||||
|
31 | def __init__(self, ui, clientreactor): | |||
|
32 | self._ui = ui | |||
|
33 | self._reactor = clientreactor | |||
|
34 | self._requests = {} | |||
|
35 | self._futures = {} | |||
|
36 | self._responses = {} | |||
|
37 | ||||
|
38 | def callcommand(self, command, args, f): | |||
|
39 | """Register a request to call a command. | |||
|
40 | ||||
|
41 | Returns an iterable of frames that should be sent over the wire. | |||
|
42 | """ | |||
|
43 | request, action, meta = self._reactor.callcommand(command, args) | |||
|
44 | ||||
|
45 | if action != 'noop': | |||
|
46 | raise error.ProgrammingError('%s not yet supported' % action) | |||
|
47 | ||||
|
48 | rid = request.requestid | |||
|
49 | self._requests[rid] = request | |||
|
50 | self._futures[rid] = f | |||
|
51 | self._responses[rid] = { | |||
|
52 | 'cbor': False, | |||
|
53 | 'b': util.bytesio(), | |||
|
54 | } | |||
|
55 | ||||
|
56 | return iter(()) | |||
|
57 | ||||
|
58 | def flushcommands(self): | |||
|
59 | """Flush all queued commands. | |||
|
60 | ||||
|
61 | Returns an iterable of frames that should be sent over the wire. | |||
|
62 | """ | |||
|
63 | action, meta = self._reactor.flushcommands() | |||
|
64 | ||||
|
65 | if action != 'sendframes': | |||
|
66 | raise error.ProgrammingError('%s not yet supported' % action) | |||
|
67 | ||||
|
68 | return meta['framegen'] | |||
|
69 | ||||
|
70 | def readframe(self, fh): | |||
|
71 | """Attempt to read and process a frame. | |||
|
72 | ||||
|
73 | Returns None if no frame was read. Presumably this means EOF. | |||
|
74 | """ | |||
|
75 | frame = wireprotoframing.readframe(fh) | |||
|
76 | if frame is None: | |||
|
77 | # TODO tell reactor? | |||
|
78 | return | |||
|
79 | ||||
|
80 | self._ui.note(_('received %r\n') % frame) | |||
|
81 | self._processframe(frame) | |||
|
82 | ||||
|
83 | return True | |||
|
84 | ||||
|
85 | def _processframe(self, frame): | |||
|
86 | """Process a single read frame.""" | |||
|
87 | ||||
|
88 | action, meta = self._reactor.onframerecv(frame) | |||
|
89 | ||||
|
90 | if action == 'error': | |||
|
91 | e = error.RepoError(meta['message']) | |||
|
92 | ||||
|
93 | if frame.requestid in self._futures: | |||
|
94 | self._futures[frame.requestid].set_exception(e) | |||
|
95 | else: | |||
|
96 | raise e | |||
|
97 | ||||
|
98 | if frame.requestid not in self._requests: | |||
|
99 | raise error.ProgrammingError( | |||
|
100 | 'received frame for unknown request; this is either a bug in ' | |||
|
101 | 'the clientreactor not screening for this or this instance was ' | |||
|
102 | 'never told about this request: %r' % frame) | |||
|
103 | ||||
|
104 | response = self._responses[frame.requestid] | |||
|
105 | ||||
|
106 | if action == 'responsedata': | |||
|
107 | response['b'].write(meta['data']) | |||
|
108 | ||||
|
109 | if meta['cbor']: | |||
|
110 | response['cbor'] = True | |||
|
111 | ||||
|
112 | if meta['eos']: | |||
|
113 | if meta['cbor']: | |||
|
114 | # If CBOR, decode every object. | |||
|
115 | b = response['b'] | |||
|
116 | ||||
|
117 | size = b.tell() | |||
|
118 | b.seek(0) | |||
|
119 | ||||
|
120 | decoder = cbor.CBORDecoder(b) | |||
|
121 | ||||
|
122 | result = [] | |||
|
123 | while b.tell() < size: | |||
|
124 | result.append(decoder.decode()) | |||
|
125 | else: | |||
|
126 | result = [response['b'].getvalue()] | |||
|
127 | ||||
|
128 | self._futures[frame.requestid].set_result(result) | |||
|
129 | ||||
|
130 | del self._requests[frame.requestid] | |||
|
131 | del self._futures[frame.requestid] | |||
|
132 | ||||
|
133 | else: | |||
|
134 | raise error.ProgrammingError( | |||
|
135 | 'unhandled action from clientreactor: %s' % action) |
@@ -13,7 +13,6 import io | |||||
13 | import os |
|
13 | import os | |
14 | import socket |
|
14 | import socket | |
15 | import struct |
|
15 | import struct | |
16 | import sys |
|
|||
17 | import tempfile |
|
16 | import tempfile | |
18 | import weakref |
|
17 | import weakref | |
19 |
|
18 | |||
@@ -36,6 +35,7 from . import ( | |||||
36 | wireprotoframing, |
|
35 | wireprotoframing, | |
37 | wireprototypes, |
|
36 | wireprototypes, | |
38 | wireprotov1peer, |
|
37 | wireprotov1peer, | |
|
38 | wireprotov2peer, | |||
39 | wireprotov2server, |
|
39 | wireprotov2server, | |
40 | ) |
|
40 | ) | |
41 |
|
41 | |||
@@ -522,6 +522,8 def sendv2request(ui, opener, requestbui | |||||
522 | reactor = wireprotoframing.clientreactor(hasmultiplesend=False, |
|
522 | reactor = wireprotoframing.clientreactor(hasmultiplesend=False, | |
523 | buffersends=True) |
|
523 | buffersends=True) | |
524 |
|
524 | |||
|
525 | handler = wireprotov2peer.clienthandler(ui, reactor) | |||
|
526 | ||||
525 | url = '%s/%s' % (apiurl, permission) |
|
527 | url = '%s/%s' % (apiurl, permission) | |
526 |
|
528 | |||
527 | if len(requests) > 1: |
|
529 | if len(requests) > 1: | |
@@ -529,20 +531,11 def sendv2request(ui, opener, requestbui | |||||
529 | else: |
|
531 | else: | |
530 | url += '/%s' % requests[0][0] |
|
532 | url += '/%s' % requests[0][0] | |
531 |
|
533 | |||
532 | # Request ID to (request, future) |
|
|||
533 | requestmap = {} |
|
|||
534 |
|
||||
535 | for command, args, f in requests: |
|
534 | for command, args, f in requests: | |
536 |
|
|
535 | assert not list(handler.callcommand(command, args, f)) | |
537 | assert action == 'noop' |
|
|||
538 |
|
||||
539 | requestmap[request.requestid] = (request, f) |
|
|||
540 |
|
||||
541 | action, meta = reactor.flushcommands() |
|
|||
542 | assert action == 'sendframes' |
|
|||
543 |
|
536 | |||
544 | # TODO stream this. |
|
537 | # TODO stream this. | |
545 |
body = b''.join(map(bytes, |
|
538 | body = b''.join(map(bytes, handler.flushcommands())) | |
546 |
|
539 | |||
547 | # TODO modify user-agent to reflect v2 |
|
540 | # TODO modify user-agent to reflect v2 | |
548 | headers = { |
|
541 | headers = { | |
@@ -564,7 +557,7 def sendv2request(ui, opener, requestbui | |||||
564 | ui.traceback() |
|
557 | ui.traceback() | |
565 | raise IOError(None, e) |
|
558 | raise IOError(None, e) | |
566 |
|
559 | |||
567 |
return |
|
560 | return handler, res | |
568 |
|
561 | |||
569 | class queuedcommandfuture(pycompat.futures.Future): |
|
562 | class queuedcommandfuture(pycompat.futures.Future): | |
570 | """Wraps result() on command futures to trigger submission on call.""" |
|
563 | """Wraps result() on command futures to trigger submission on call.""" | |
@@ -684,7 +677,7 class httpv2executor(object): | |||||
684 | 'pull': 'ro', |
|
677 | 'pull': 'ro', | |
685 | }[permissions.pop()] |
|
678 | }[permissions.pop()] | |
686 |
|
679 | |||
687 |
|
|
680 | handler, resp = sendv2request( | |
688 | self._ui, self._opener, self._requestbuilder, self._apiurl, |
|
681 | self._ui, self._opener, self._requestbuilder, self._apiurl, | |
689 | permission, calls) |
|
682 | permission, calls) | |
690 |
|
683 | |||
@@ -692,9 +685,7 class httpv2executor(object): | |||||
692 |
|
685 | |||
693 | self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) |
|
686 | self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1) | |
694 | self._responsef = self._responseexecutor.submit(self._handleresponse, |
|
687 | self._responsef = self._responseexecutor.submit(self._handleresponse, | |
695 |
|
|
688 | handler, resp) | |
696 | requests, |
|
|||
697 | resp) |
|
|||
698 |
|
689 | |||
699 | def close(self): |
|
690 | def close(self): | |
700 | if self._closed: |
|
691 | if self._closed: | |
@@ -723,62 +714,11 class httpv2executor(object): | |||||
723 |
|
714 | |||
724 | self._futures = None |
|
715 | self._futures = None | |
725 |
|
716 | |||
726 |
def _handleresponse(self, |
|
717 | def _handleresponse(self, handler, resp): | |
727 | # Called in a thread to read the response. |
|
718 | # Called in a thread to read the response. | |
728 |
|
719 | |||
729 | results = {k: [] for k in requests} |
|
720 | while handler.readframe(resp): | |
730 |
|
721 | pass | ||
731 | while True: |
|
|||
732 | frame = wireprotoframing.readframe(resp) |
|
|||
733 | if frame is None: |
|
|||
734 | break |
|
|||
735 |
|
||||
736 | self._ui.note(_('received %r\n') % frame) |
|
|||
737 |
|
||||
738 | # Guard against receiving a frame with a request ID that we |
|
|||
739 | # didn't issue. This should never happen. |
|
|||
740 | request, f = requests.get(frame.requestid, [None, None]) |
|
|||
741 |
|
||||
742 | action, meta = reactor.onframerecv(frame) |
|
|||
743 |
|
||||
744 | if action == 'responsedata': |
|
|||
745 | assert request.requestid == meta['request'].requestid |
|
|||
746 |
|
||||
747 | result = results[request.requestid] |
|
|||
748 |
|
||||
749 | if meta['cbor']: |
|
|||
750 | payload = util.bytesio(meta['data']) |
|
|||
751 |
|
||||
752 | decoder = cbor.CBORDecoder(payload) |
|
|||
753 | while payload.tell() + 1 < len(meta['data']): |
|
|||
754 | try: |
|
|||
755 | result.append(decoder.decode()) |
|
|||
756 | except Exception: |
|
|||
757 | pycompat.future_set_exception_info( |
|
|||
758 | f, sys.exc_info()[1:]) |
|
|||
759 | continue |
|
|||
760 | else: |
|
|||
761 | result.append(meta['data']) |
|
|||
762 |
|
||||
763 | if meta['eos']: |
|
|||
764 | f.set_result(result) |
|
|||
765 | del results[request.requestid] |
|
|||
766 |
|
||||
767 | elif action == 'error': |
|
|||
768 | e = error.RepoError(meta['message']) |
|
|||
769 |
|
||||
770 | if f: |
|
|||
771 | f.set_exception(e) |
|
|||
772 | else: |
|
|||
773 | raise e |
|
|||
774 |
|
||||
775 | else: |
|
|||
776 | e = error.ProgrammingError('unhandled action: %s' % action) |
|
|||
777 |
|
||||
778 | if f: |
|
|||
779 | f.set_exception(e) |
|
|||
780 | else: |
|
|||
781 | raise e |
|
|||
782 |
|
722 | |||
783 | # TODO implement interface for version 2 peers |
|
723 | # TODO implement interface for version 2 peers | |
784 | @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, |
|
724 | @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities, |
@@ -50,7 +50,7 No arguments returns something reasonabl | |||||
50 | received frame(size=1; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor) |
|
50 | received frame(size=1; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor) | |
51 | s> 0\r\n |
|
51 | s> 0\r\n | |
52 | s> \r\n |
|
52 | s> \r\n | |
53 | response: [] |
|
53 | response: [b''] | |
54 |
|
54 | |||
55 | Single known node works |
|
55 | Single known node works | |
56 |
|
56 |
@@ -53,7 +53,7 pushkey for a bookmark works | |||||
53 | received frame(size=*; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor) (glob) |
|
53 | received frame(size=*; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor) (glob) | |
54 | s> 0\r\n |
|
54 | s> 0\r\n | |
55 | s> \r\n |
|
55 | s> \r\n | |
56 | response: [] |
|
56 | response: [True] | |
57 |
|
57 | |||
58 | $ sendhttpv2peer << EOF |
|
58 | $ sendhttpv2peer << EOF | |
59 | > command listkeys |
|
59 | > command listkeys |
General Comments 0
You need to be logged in to leave comments.
Login now