Show More
@@ -1,206 +1,203 b'' | |||||
1 | # wireprotov2peer.py - client side code for wire protocol version 2 |
|
1 | # wireprotov2peer.py - client side code for wire protocol version 2 | |
2 | # |
|
2 | # | |
3 | # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> |
|
3 | # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> | |
4 | # |
|
4 | # | |
5 | # This software may be used and distributed according to the terms of the |
|
5 | # This software may be used and distributed according to the terms of the | |
6 | # GNU General Public License version 2 or any later version. |
|
6 | # GNU General Public License version 2 or any later version. | |
7 |
|
7 | |||
8 | from __future__ import absolute_import |
|
8 | from __future__ import absolute_import | |
9 |
|
9 | |||
10 | from .i18n import _ |
|
10 | from .i18n import _ | |
11 | from .thirdparty import ( |
|
|||
12 | cbor, |
|
|||
13 | ) |
|
|||
14 | from . import ( |
|
11 | from . import ( | |
15 | encoding, |
|
12 | encoding, | |
16 | error, |
|
13 | error, | |
17 | util, |
|
14 | util, | |
18 | wireprotoframing, |
|
15 | wireprotoframing, | |
19 | ) |
|
16 | ) | |
|
17 | from .utils import ( | |||
|
18 | cborutil, | |||
|
19 | ) | |||
20 |
|
20 | |||
21 | def formatrichmessage(atoms): |
|
21 | def formatrichmessage(atoms): | |
22 | """Format an encoded message from the framing protocol.""" |
|
22 | """Format an encoded message from the framing protocol.""" | |
23 |
|
23 | |||
24 | chunks = [] |
|
24 | chunks = [] | |
25 |
|
25 | |||
26 | for atom in atoms: |
|
26 | for atom in atoms: | |
27 | msg = _(atom[b'msg']) |
|
27 | msg = _(atom[b'msg']) | |
28 |
|
28 | |||
29 | if b'args' in atom: |
|
29 | if b'args' in atom: | |
30 | msg = msg % atom[b'args'] |
|
30 | msg = msg % atom[b'args'] | |
31 |
|
31 | |||
32 | chunks.append(msg) |
|
32 | chunks.append(msg) | |
33 |
|
33 | |||
34 | return b''.join(chunks) |
|
34 | return b''.join(chunks) | |
35 |
|
35 | |||
36 | class commandresponse(object): |
|
36 | class commandresponse(object): | |
37 | """Represents the response to a command request.""" |
|
37 | """Represents the response to a command request.""" | |
38 |
|
38 | |||
39 | def __init__(self, requestid, command): |
|
39 | def __init__(self, requestid, command): | |
40 | self.requestid = requestid |
|
40 | self.requestid = requestid | |
41 | self.command = command |
|
41 | self.command = command | |
42 |
|
42 | |||
43 | self.b = util.bytesio() |
|
43 | self.b = util.bytesio() | |
44 |
|
44 | |||
45 | def cborobjects(self): |
|
45 | def cborobjects(self): | |
46 | """Obtain decoded CBOR objects from this response.""" |
|
46 | """Obtain decoded CBOR objects from this response.""" | |
47 | size = self.b.tell() |
|
|||
48 | self.b.seek(0) |
|
47 | self.b.seek(0) | |
49 |
|
48 | |||
50 | decoder = cbor.CBORDecoder(self.b) |
|
49 | for v in cborutil.decodeall(self.b.getvalue()): | |
51 |
|
50 | yield v | ||
52 | while self.b.tell() < size: |
|
|||
53 | yield decoder.decode() |
|
|||
54 |
|
51 | |||
55 | class clienthandler(object): |
|
52 | class clienthandler(object): | |
56 | """Object to handle higher-level client activities. |
|
53 | """Object to handle higher-level client activities. | |
57 |
|
54 | |||
58 | The ``clientreactor`` is used to hold low-level state about the frame-based |
|
55 | The ``clientreactor`` is used to hold low-level state about the frame-based | |
59 | protocol, such as which requests and streams are active. This type is used |
|
56 | protocol, such as which requests and streams are active. This type is used | |
60 | for higher-level operations, such as reading frames from a socket, exposing |
|
57 | for higher-level operations, such as reading frames from a socket, exposing | |
61 | and managing a higher-level primitive for representing command responses, |
|
58 | and managing a higher-level primitive for representing command responses, | |
62 | etc. This class is what peers should probably use to bridge wire activity |
|
59 | etc. This class is what peers should probably use to bridge wire activity | |
63 | with the higher-level peer API. |
|
60 | with the higher-level peer API. | |
64 | """ |
|
61 | """ | |
65 |
|
62 | |||
66 | def __init__(self, ui, clientreactor): |
|
63 | def __init__(self, ui, clientreactor): | |
67 | self._ui = ui |
|
64 | self._ui = ui | |
68 | self._reactor = clientreactor |
|
65 | self._reactor = clientreactor | |
69 | self._requests = {} |
|
66 | self._requests = {} | |
70 | self._futures = {} |
|
67 | self._futures = {} | |
71 | self._responses = {} |
|
68 | self._responses = {} | |
72 |
|
69 | |||
73 | def callcommand(self, command, args, f): |
|
70 | def callcommand(self, command, args, f): | |
74 | """Register a request to call a command. |
|
71 | """Register a request to call a command. | |
75 |
|
72 | |||
76 | Returns an iterable of frames that should be sent over the wire. |
|
73 | Returns an iterable of frames that should be sent over the wire. | |
77 | """ |
|
74 | """ | |
78 | request, action, meta = self._reactor.callcommand(command, args) |
|
75 | request, action, meta = self._reactor.callcommand(command, args) | |
79 |
|
76 | |||
80 | if action != 'noop': |
|
77 | if action != 'noop': | |
81 | raise error.ProgrammingError('%s not yet supported' % action) |
|
78 | raise error.ProgrammingError('%s not yet supported' % action) | |
82 |
|
79 | |||
83 | rid = request.requestid |
|
80 | rid = request.requestid | |
84 | self._requests[rid] = request |
|
81 | self._requests[rid] = request | |
85 | self._futures[rid] = f |
|
82 | self._futures[rid] = f | |
86 | self._responses[rid] = commandresponse(rid, command) |
|
83 | self._responses[rid] = commandresponse(rid, command) | |
87 |
|
84 | |||
88 | return iter(()) |
|
85 | return iter(()) | |
89 |
|
86 | |||
90 | def flushcommands(self): |
|
87 | def flushcommands(self): | |
91 | """Flush all queued commands. |
|
88 | """Flush all queued commands. | |
92 |
|
89 | |||
93 | Returns an iterable of frames that should be sent over the wire. |
|
90 | Returns an iterable of frames that should be sent over the wire. | |
94 | """ |
|
91 | """ | |
95 | action, meta = self._reactor.flushcommands() |
|
92 | action, meta = self._reactor.flushcommands() | |
96 |
|
93 | |||
97 | if action != 'sendframes': |
|
94 | if action != 'sendframes': | |
98 | raise error.ProgrammingError('%s not yet supported' % action) |
|
95 | raise error.ProgrammingError('%s not yet supported' % action) | |
99 |
|
96 | |||
100 | return meta['framegen'] |
|
97 | return meta['framegen'] | |
101 |
|
98 | |||
102 | def readframe(self, fh): |
|
99 | def readframe(self, fh): | |
103 | """Attempt to read and process a frame. |
|
100 | """Attempt to read and process a frame. | |
104 |
|
101 | |||
105 | Returns None if no frame was read. Presumably this means EOF. |
|
102 | Returns None if no frame was read. Presumably this means EOF. | |
106 | """ |
|
103 | """ | |
107 | frame = wireprotoframing.readframe(fh) |
|
104 | frame = wireprotoframing.readframe(fh) | |
108 | if frame is None: |
|
105 | if frame is None: | |
109 | # TODO tell reactor? |
|
106 | # TODO tell reactor? | |
110 | return |
|
107 | return | |
111 |
|
108 | |||
112 | self._ui.note(_('received %r\n') % frame) |
|
109 | self._ui.note(_('received %r\n') % frame) | |
113 | self._processframe(frame) |
|
110 | self._processframe(frame) | |
114 |
|
111 | |||
115 | return True |
|
112 | return True | |
116 |
|
113 | |||
117 | def _processframe(self, frame): |
|
114 | def _processframe(self, frame): | |
118 | """Process a single read frame.""" |
|
115 | """Process a single read frame.""" | |
119 |
|
116 | |||
120 | action, meta = self._reactor.onframerecv(frame) |
|
117 | action, meta = self._reactor.onframerecv(frame) | |
121 |
|
118 | |||
122 | if action == 'error': |
|
119 | if action == 'error': | |
123 | e = error.RepoError(meta['message']) |
|
120 | e = error.RepoError(meta['message']) | |
124 |
|
121 | |||
125 | if frame.requestid in self._futures: |
|
122 | if frame.requestid in self._futures: | |
126 | self._futures[frame.requestid].set_exception(e) |
|
123 | self._futures[frame.requestid].set_exception(e) | |
127 | else: |
|
124 | else: | |
128 | raise e |
|
125 | raise e | |
129 |
|
126 | |||
130 | if frame.requestid not in self._requests: |
|
127 | if frame.requestid not in self._requests: | |
131 | raise error.ProgrammingError( |
|
128 | raise error.ProgrammingError( | |
132 | 'received frame for unknown request; this is either a bug in ' |
|
129 | 'received frame for unknown request; this is either a bug in ' | |
133 | 'the clientreactor not screening for this or this instance was ' |
|
130 | 'the clientreactor not screening for this or this instance was ' | |
134 | 'never told about this request: %r' % frame) |
|
131 | 'never told about this request: %r' % frame) | |
135 |
|
132 | |||
136 | response = self._responses[frame.requestid] |
|
133 | response = self._responses[frame.requestid] | |
137 |
|
134 | |||
138 | if action == 'responsedata': |
|
135 | if action == 'responsedata': | |
139 | self._processresponsedata(frame, meta, response) |
|
136 | self._processresponsedata(frame, meta, response) | |
140 | else: |
|
137 | else: | |
141 | raise error.ProgrammingError( |
|
138 | raise error.ProgrammingError( | |
142 | 'unhandled action from clientreactor: %s' % action) |
|
139 | 'unhandled action from clientreactor: %s' % action) | |
143 |
|
140 | |||
144 | def _processresponsedata(self, frame, meta, response): |
|
141 | def _processresponsedata(self, frame, meta, response): | |
145 | # This buffers all data until end of stream is received. This |
|
142 | # This buffers all data until end of stream is received. This | |
146 | # is bad for performance. |
|
143 | # is bad for performance. | |
147 | # TODO make response data streamable |
|
144 | # TODO make response data streamable | |
148 | response.b.write(meta['data']) |
|
145 | response.b.write(meta['data']) | |
149 |
|
146 | |||
150 | if meta['eos']: |
|
147 | if meta['eos']: | |
151 | # If the command has a decoder, resolve the future to the |
|
148 | # If the command has a decoder, resolve the future to the | |
152 | # decoded value. Otherwise resolve to the rich response object. |
|
149 | # decoded value. Otherwise resolve to the rich response object. | |
153 | decoder = COMMAND_DECODERS.get(response.command) |
|
150 | decoder = COMMAND_DECODERS.get(response.command) | |
154 |
|
151 | |||
155 | # TODO consider always resolving the overall status map. |
|
152 | # TODO consider always resolving the overall status map. | |
156 | if decoder: |
|
153 | if decoder: | |
157 | objs = response.cborobjects() |
|
154 | objs = response.cborobjects() | |
158 |
|
155 | |||
159 | overall = next(objs) |
|
156 | overall = next(objs) | |
160 |
|
157 | |||
161 | if overall['status'] == 'ok': |
|
158 | if overall['status'] == 'ok': | |
162 | self._futures[frame.requestid].set_result(decoder(objs)) |
|
159 | self._futures[frame.requestid].set_result(decoder(objs)) | |
163 | else: |
|
160 | else: | |
164 | e = error.RepoError( |
|
161 | e = error.RepoError( | |
165 | formatrichmessage(overall['error']['message'])) |
|
162 | formatrichmessage(overall['error']['message'])) | |
166 | self._futures[frame.requestid].set_exception(e) |
|
163 | self._futures[frame.requestid].set_exception(e) | |
167 | else: |
|
164 | else: | |
168 | self._futures[frame.requestid].set_result(response) |
|
165 | self._futures[frame.requestid].set_result(response) | |
169 |
|
166 | |||
170 | del self._requests[frame.requestid] |
|
167 | del self._requests[frame.requestid] | |
171 | del self._futures[frame.requestid] |
|
168 | del self._futures[frame.requestid] | |
172 |
|
169 | |||
173 | def decodebranchmap(objs): |
|
170 | def decodebranchmap(objs): | |
174 | # Response should be a single CBOR map of branch name to array of nodes. |
|
171 | # Response should be a single CBOR map of branch name to array of nodes. | |
175 | bm = next(objs) |
|
172 | bm = next(objs) | |
176 |
|
173 | |||
177 | return {encoding.tolocal(k): v for k, v in bm.items()} |
|
174 | return {encoding.tolocal(k): v for k, v in bm.items()} | |
178 |
|
175 | |||
179 | def decodeheads(objs): |
|
176 | def decodeheads(objs): | |
180 | # Array of node bytestrings. |
|
177 | # Array of node bytestrings. | |
181 | return next(objs) |
|
178 | return next(objs) | |
182 |
|
179 | |||
183 | def decodeknown(objs): |
|
180 | def decodeknown(objs): | |
184 | # Bytestring where each byte is a 0 or 1. |
|
181 | # Bytestring where each byte is a 0 or 1. | |
185 | raw = next(objs) |
|
182 | raw = next(objs) | |
186 |
|
183 | |||
187 | return [True if c == '1' else False for c in raw] |
|
184 | return [True if c == '1' else False for c in raw] | |
188 |
|
185 | |||
189 | def decodelistkeys(objs): |
|
186 | def decodelistkeys(objs): | |
190 | # Map with bytestring keys and values. |
|
187 | # Map with bytestring keys and values. | |
191 | return next(objs) |
|
188 | return next(objs) | |
192 |
|
189 | |||
193 | def decodelookup(objs): |
|
190 | def decodelookup(objs): | |
194 | return next(objs) |
|
191 | return next(objs) | |
195 |
|
192 | |||
196 | def decodepushkey(objs): |
|
193 | def decodepushkey(objs): | |
197 | return next(objs) |
|
194 | return next(objs) | |
198 |
|
195 | |||
199 | COMMAND_DECODERS = { |
|
196 | COMMAND_DECODERS = { | |
200 | 'branchmap': decodebranchmap, |
|
197 | 'branchmap': decodebranchmap, | |
201 | 'heads': decodeheads, |
|
198 | 'heads': decodeheads, | |
202 | 'known': decodeknown, |
|
199 | 'known': decodeknown, | |
203 | 'listkeys': decodelistkeys, |
|
200 | 'listkeys': decodelistkeys, | |
204 | 'lookup': decodelookup, |
|
201 | 'lookup': decodelookup, | |
205 | 'pushkey': decodepushkey, |
|
202 | 'pushkey': decodepushkey, | |
206 | } |
|
203 | } |
General Comments 0
You need to be logged in to leave comments.
Login now