##// END OF EJS Templates
wireprotov2peer: use our CBOR decoder...
Gregory Szorc -
r39481:9f51fd22 default
parent child Browse files
Show More
@@ -1,206 +1,203 b''
1 # wireprotov2peer.py - client side code for wire protocol version 2
1 # wireprotov2peer.py - client side code for wire protocol version 2
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 from .i18n import _
10 from .i18n import _
11 from .thirdparty import (
12 cbor,
13 )
14 from . import (
11 from . import (
15 encoding,
12 encoding,
16 error,
13 error,
17 util,
14 util,
18 wireprotoframing,
15 wireprotoframing,
19 )
16 )
17 from .utils import (
18 cborutil,
19 )
20
20
21 def formatrichmessage(atoms):
21 def formatrichmessage(atoms):
22 """Format an encoded message from the framing protocol."""
22 """Format an encoded message from the framing protocol."""
23
23
24 chunks = []
24 chunks = []
25
25
26 for atom in atoms:
26 for atom in atoms:
27 msg = _(atom[b'msg'])
27 msg = _(atom[b'msg'])
28
28
29 if b'args' in atom:
29 if b'args' in atom:
30 msg = msg % atom[b'args']
30 msg = msg % atom[b'args']
31
31
32 chunks.append(msg)
32 chunks.append(msg)
33
33
34 return b''.join(chunks)
34 return b''.join(chunks)
35
35
36 class commandresponse(object):
36 class commandresponse(object):
37 """Represents the response to a command request."""
37 """Represents the response to a command request."""
38
38
39 def __init__(self, requestid, command):
39 def __init__(self, requestid, command):
40 self.requestid = requestid
40 self.requestid = requestid
41 self.command = command
41 self.command = command
42
42
43 self.b = util.bytesio()
43 self.b = util.bytesio()
44
44
45 def cborobjects(self):
45 def cborobjects(self):
46 """Obtain decoded CBOR objects from this response."""
46 """Obtain decoded CBOR objects from this response."""
47 size = self.b.tell()
48 self.b.seek(0)
47 self.b.seek(0)
49
48
50 decoder = cbor.CBORDecoder(self.b)
49 for v in cborutil.decodeall(self.b.getvalue()):
51
50 yield v
52 while self.b.tell() < size:
53 yield decoder.decode()
54
51
55 class clienthandler(object):
52 class clienthandler(object):
56 """Object to handle higher-level client activities.
53 """Object to handle higher-level client activities.
57
54
58 The ``clientreactor`` is used to hold low-level state about the frame-based
55 The ``clientreactor`` is used to hold low-level state about the frame-based
59 protocol, such as which requests and streams are active. This type is used
56 protocol, such as which requests and streams are active. This type is used
60 for higher-level operations, such as reading frames from a socket, exposing
57 for higher-level operations, such as reading frames from a socket, exposing
61 and managing a higher-level primitive for representing command responses,
58 and managing a higher-level primitive for representing command responses,
62 etc. This class is what peers should probably use to bridge wire activity
59 etc. This class is what peers should probably use to bridge wire activity
63 with the higher-level peer API.
60 with the higher-level peer API.
64 """
61 """
65
62
66 def __init__(self, ui, clientreactor):
63 def __init__(self, ui, clientreactor):
67 self._ui = ui
64 self._ui = ui
68 self._reactor = clientreactor
65 self._reactor = clientreactor
69 self._requests = {}
66 self._requests = {}
70 self._futures = {}
67 self._futures = {}
71 self._responses = {}
68 self._responses = {}
72
69
73 def callcommand(self, command, args, f):
70 def callcommand(self, command, args, f):
74 """Register a request to call a command.
71 """Register a request to call a command.
75
72
76 Returns an iterable of frames that should be sent over the wire.
73 Returns an iterable of frames that should be sent over the wire.
77 """
74 """
78 request, action, meta = self._reactor.callcommand(command, args)
75 request, action, meta = self._reactor.callcommand(command, args)
79
76
80 if action != 'noop':
77 if action != 'noop':
81 raise error.ProgrammingError('%s not yet supported' % action)
78 raise error.ProgrammingError('%s not yet supported' % action)
82
79
83 rid = request.requestid
80 rid = request.requestid
84 self._requests[rid] = request
81 self._requests[rid] = request
85 self._futures[rid] = f
82 self._futures[rid] = f
86 self._responses[rid] = commandresponse(rid, command)
83 self._responses[rid] = commandresponse(rid, command)
87
84
88 return iter(())
85 return iter(())
89
86
90 def flushcommands(self):
87 def flushcommands(self):
91 """Flush all queued commands.
88 """Flush all queued commands.
92
89
93 Returns an iterable of frames that should be sent over the wire.
90 Returns an iterable of frames that should be sent over the wire.
94 """
91 """
95 action, meta = self._reactor.flushcommands()
92 action, meta = self._reactor.flushcommands()
96
93
97 if action != 'sendframes':
94 if action != 'sendframes':
98 raise error.ProgrammingError('%s not yet supported' % action)
95 raise error.ProgrammingError('%s not yet supported' % action)
99
96
100 return meta['framegen']
97 return meta['framegen']
101
98
102 def readframe(self, fh):
99 def readframe(self, fh):
103 """Attempt to read and process a frame.
100 """Attempt to read and process a frame.
104
101
105 Returns None if no frame was read. Presumably this means EOF.
102 Returns None if no frame was read. Presumably this means EOF.
106 """
103 """
107 frame = wireprotoframing.readframe(fh)
104 frame = wireprotoframing.readframe(fh)
108 if frame is None:
105 if frame is None:
109 # TODO tell reactor?
106 # TODO tell reactor?
110 return
107 return
111
108
112 self._ui.note(_('received %r\n') % frame)
109 self._ui.note(_('received %r\n') % frame)
113 self._processframe(frame)
110 self._processframe(frame)
114
111
115 return True
112 return True
116
113
117 def _processframe(self, frame):
114 def _processframe(self, frame):
118 """Process a single read frame."""
115 """Process a single read frame."""
119
116
120 action, meta = self._reactor.onframerecv(frame)
117 action, meta = self._reactor.onframerecv(frame)
121
118
122 if action == 'error':
119 if action == 'error':
123 e = error.RepoError(meta['message'])
120 e = error.RepoError(meta['message'])
124
121
125 if frame.requestid in self._futures:
122 if frame.requestid in self._futures:
126 self._futures[frame.requestid].set_exception(e)
123 self._futures[frame.requestid].set_exception(e)
127 else:
124 else:
128 raise e
125 raise e
129
126
130 if frame.requestid not in self._requests:
127 if frame.requestid not in self._requests:
131 raise error.ProgrammingError(
128 raise error.ProgrammingError(
132 'received frame for unknown request; this is either a bug in '
129 'received frame for unknown request; this is either a bug in '
133 'the clientreactor not screening for this or this instance was '
130 'the clientreactor not screening for this or this instance was '
134 'never told about this request: %r' % frame)
131 'never told about this request: %r' % frame)
135
132
136 response = self._responses[frame.requestid]
133 response = self._responses[frame.requestid]
137
134
138 if action == 'responsedata':
135 if action == 'responsedata':
139 self._processresponsedata(frame, meta, response)
136 self._processresponsedata(frame, meta, response)
140 else:
137 else:
141 raise error.ProgrammingError(
138 raise error.ProgrammingError(
142 'unhandled action from clientreactor: %s' % action)
139 'unhandled action from clientreactor: %s' % action)
143
140
144 def _processresponsedata(self, frame, meta, response):
141 def _processresponsedata(self, frame, meta, response):
145 # This buffers all data until end of stream is received. This
142 # This buffers all data until end of stream is received. This
146 # is bad for performance.
143 # is bad for performance.
147 # TODO make response data streamable
144 # TODO make response data streamable
148 response.b.write(meta['data'])
145 response.b.write(meta['data'])
149
146
150 if meta['eos']:
147 if meta['eos']:
151 # If the command has a decoder, resolve the future to the
148 # If the command has a decoder, resolve the future to the
152 # decoded value. Otherwise resolve to the rich response object.
149 # decoded value. Otherwise resolve to the rich response object.
153 decoder = COMMAND_DECODERS.get(response.command)
150 decoder = COMMAND_DECODERS.get(response.command)
154
151
155 # TODO consider always resolving the overall status map.
152 # TODO consider always resolving the overall status map.
156 if decoder:
153 if decoder:
157 objs = response.cborobjects()
154 objs = response.cborobjects()
158
155
159 overall = next(objs)
156 overall = next(objs)
160
157
161 if overall['status'] == 'ok':
158 if overall['status'] == 'ok':
162 self._futures[frame.requestid].set_result(decoder(objs))
159 self._futures[frame.requestid].set_result(decoder(objs))
163 else:
160 else:
164 e = error.RepoError(
161 e = error.RepoError(
165 formatrichmessage(overall['error']['message']))
162 formatrichmessage(overall['error']['message']))
166 self._futures[frame.requestid].set_exception(e)
163 self._futures[frame.requestid].set_exception(e)
167 else:
164 else:
168 self._futures[frame.requestid].set_result(response)
165 self._futures[frame.requestid].set_result(response)
169
166
170 del self._requests[frame.requestid]
167 del self._requests[frame.requestid]
171 del self._futures[frame.requestid]
168 del self._futures[frame.requestid]
172
169
173 def decodebranchmap(objs):
170 def decodebranchmap(objs):
174 # Response should be a single CBOR map of branch name to array of nodes.
171 # Response should be a single CBOR map of branch name to array of nodes.
175 bm = next(objs)
172 bm = next(objs)
176
173
177 return {encoding.tolocal(k): v for k, v in bm.items()}
174 return {encoding.tolocal(k): v for k, v in bm.items()}
178
175
179 def decodeheads(objs):
176 def decodeheads(objs):
180 # Array of node bytestrings.
177 # Array of node bytestrings.
181 return next(objs)
178 return next(objs)
182
179
183 def decodeknown(objs):
180 def decodeknown(objs):
184 # Bytestring where each byte is a 0 or 1.
181 # Bytestring where each byte is a 0 or 1.
185 raw = next(objs)
182 raw = next(objs)
186
183
187 return [True if c == '1' else False for c in raw]
184 return [True if c == '1' else False for c in raw]
188
185
189 def decodelistkeys(objs):
186 def decodelistkeys(objs):
190 # Map with bytestring keys and values.
187 # Map with bytestring keys and values.
191 return next(objs)
188 return next(objs)
192
189
193 def decodelookup(objs):
190 def decodelookup(objs):
194 return next(objs)
191 return next(objs)
195
192
196 def decodepushkey(objs):
193 def decodepushkey(objs):
197 return next(objs)
194 return next(objs)
198
195
199 COMMAND_DECODERS = {
196 COMMAND_DECODERS = {
200 'branchmap': decodebranchmap,
197 'branchmap': decodebranchmap,
201 'heads': decodeheads,
198 'heads': decodeheads,
202 'known': decodeknown,
199 'known': decodeknown,
203 'listkeys': decodelistkeys,
200 'listkeys': decodelistkeys,
204 'lookup': decodelookup,
201 'lookup': decodelookup,
205 'pushkey': decodepushkey,
202 'pushkey': decodepushkey,
206 }
203 }
General Comments 0
You need to be logged in to leave comments. Login now