##// END OF EJS Templates
wireproto: introduce a reactor for client-side state...
Gregory Szorc -
r37561:01361be9 default
parent child Browse files
Show More
@@ -0,0 +1,66 b''
1 from __future__ import absolute_import
2
3 import unittest
4
5 from mercurial import (
6 error,
7 wireprotoframing as framing,
8 )
9
10 class SingleSendTests(unittest.TestCase):
11 """A reactor that can only send once rejects subsequent sends."""
12 def testbasic(self):
13 reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True)
14
15 request, action, meta = reactor.callcommand(b'foo', {})
16 self.assertEqual(request.state, 'pending')
17 self.assertEqual(action, 'noop')
18
19 action, meta = reactor.flushcommands()
20 self.assertEqual(action, 'sendframes')
21
22 for frame in meta['framegen']:
23 self.assertEqual(request.state, 'sending')
24
25 self.assertEqual(request.state, 'sent')
26
27 with self.assertRaisesRegexp(error.ProgrammingError,
28 'cannot issue new commands'):
29 reactor.callcommand(b'foo', {})
30
31 with self.assertRaisesRegexp(error.ProgrammingError,
32 'cannot issue new commands'):
33 reactor.callcommand(b'foo', {})
34
35 class NoBufferTests(unittest.TestCase):
36 """A reactor without send buffering sends requests immediately."""
37 def testbasic(self):
38 reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False)
39
40 request, action, meta = reactor.callcommand(b'command1', {})
41 self.assertEqual(request.requestid, 1)
42 self.assertEqual(action, 'sendframes')
43
44 self.assertEqual(request.state, 'pending')
45
46 for frame in meta['framegen']:
47 self.assertEqual(request.state, 'sending')
48
49 self.assertEqual(request.state, 'sent')
50
51 action, meta = reactor.flushcommands()
52 self.assertEqual(action, 'noop')
53
54 # And we can send another command.
55 request, action, meta = reactor.callcommand(b'command2', {})
56 self.assertEqual(request.requestid, 3)
57 self.assertEqual(action, 'sendframes')
58
59 for frame in meta['framegen']:
60 self.assertEqual(request.state, 'sending')
61
62 self.assertEqual(request.state, 'sent')
63
64 if __name__ == '__main__':
65 import silenttestrunner
66 silenttestrunner.main(__name__)
@@ -1,596 +1,601 b''
1 # httppeer.py - HTTP repository proxy classes for mercurial
1 # httppeer.py - HTTP repository proxy classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 import errno
11 import errno
12 import io
12 import io
13 import os
13 import os
14 import socket
14 import socket
15 import struct
15 import struct
16 import tempfile
16 import tempfile
17
17
18 from .i18n import _
18 from .i18n import _
19 from .thirdparty import (
19 from .thirdparty import (
20 cbor,
20 cbor,
21 )
21 )
22 from . import (
22 from . import (
23 bundle2,
23 bundle2,
24 error,
24 error,
25 httpconnection,
25 httpconnection,
26 pycompat,
26 pycompat,
27 statichttprepo,
27 statichttprepo,
28 url as urlmod,
28 url as urlmod,
29 util,
29 util,
30 wireproto,
30 wireproto,
31 wireprotoframing,
31 wireprotoframing,
32 wireprotoserver,
32 wireprotoserver,
33 )
33 )
34
34
35 httplib = util.httplib
35 httplib = util.httplib
36 urlerr = util.urlerr
36 urlerr = util.urlerr
37 urlreq = util.urlreq
37 urlreq = util.urlreq
38
38
39 def encodevalueinheaders(value, header, limit):
39 def encodevalueinheaders(value, header, limit):
40 """Encode a string value into multiple HTTP headers.
40 """Encode a string value into multiple HTTP headers.
41
41
42 ``value`` will be encoded into 1 or more HTTP headers with the names
42 ``value`` will be encoded into 1 or more HTTP headers with the names
43 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
43 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
44 name + value will be at most ``limit`` bytes long.
44 name + value will be at most ``limit`` bytes long.
45
45
46 Returns an iterable of 2-tuples consisting of header names and
46 Returns an iterable of 2-tuples consisting of header names and
47 values as native strings.
47 values as native strings.
48 """
48 """
49 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
49 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
50 # not bytes. This function always takes bytes in as arguments.
50 # not bytes. This function always takes bytes in as arguments.
51 fmt = pycompat.strurl(header) + r'-%s'
51 fmt = pycompat.strurl(header) + r'-%s'
52 # Note: it is *NOT* a bug that the last bit here is a bytestring
52 # Note: it is *NOT* a bug that the last bit here is a bytestring
53 # and not a unicode: we're just getting the encoded length anyway,
53 # and not a unicode: we're just getting the encoded length anyway,
54 # and using an r-string to make it portable between Python 2 and 3
54 # and using an r-string to make it portable between Python 2 and 3
55 # doesn't work because then the \r is a literal backslash-r
55 # doesn't work because then the \r is a literal backslash-r
56 # instead of a carriage return.
56 # instead of a carriage return.
57 valuelen = limit - len(fmt % r'000') - len(': \r\n')
57 valuelen = limit - len(fmt % r'000') - len(': \r\n')
58 result = []
58 result = []
59
59
60 n = 0
60 n = 0
61 for i in xrange(0, len(value), valuelen):
61 for i in xrange(0, len(value), valuelen):
62 n += 1
62 n += 1
63 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
63 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
64
64
65 return result
65 return result
66
66
67 def _wraphttpresponse(resp):
67 def _wraphttpresponse(resp):
68 """Wrap an HTTPResponse with common error handlers.
68 """Wrap an HTTPResponse with common error handlers.
69
69
70 This ensures that any I/O from any consumer raises the appropriate
70 This ensures that any I/O from any consumer raises the appropriate
71 error and messaging.
71 error and messaging.
72 """
72 """
73 origread = resp.read
73 origread = resp.read
74
74
75 class readerproxy(resp.__class__):
75 class readerproxy(resp.__class__):
76 def read(self, size=None):
76 def read(self, size=None):
77 try:
77 try:
78 return origread(size)
78 return origread(size)
79 except httplib.IncompleteRead as e:
79 except httplib.IncompleteRead as e:
80 # e.expected is an integer if length known or None otherwise.
80 # e.expected is an integer if length known or None otherwise.
81 if e.expected:
81 if e.expected:
82 msg = _('HTTP request error (incomplete response; '
82 msg = _('HTTP request error (incomplete response; '
83 'expected %d bytes got %d)') % (e.expected,
83 'expected %d bytes got %d)') % (e.expected,
84 len(e.partial))
84 len(e.partial))
85 else:
85 else:
86 msg = _('HTTP request error (incomplete response)')
86 msg = _('HTTP request error (incomplete response)')
87
87
88 raise error.PeerTransportError(
88 raise error.PeerTransportError(
89 msg,
89 msg,
90 hint=_('this may be an intermittent network failure; '
90 hint=_('this may be an intermittent network failure; '
91 'if the error persists, consider contacting the '
91 'if the error persists, consider contacting the '
92 'network or server operator'))
92 'network or server operator'))
93 except httplib.HTTPException as e:
93 except httplib.HTTPException as e:
94 raise error.PeerTransportError(
94 raise error.PeerTransportError(
95 _('HTTP request error (%s)') % e,
95 _('HTTP request error (%s)') % e,
96 hint=_('this may be an intermittent network failure; '
96 hint=_('this may be an intermittent network failure; '
97 'if the error persists, consider contacting the '
97 'if the error persists, consider contacting the '
98 'network or server operator'))
98 'network or server operator'))
99
99
100 resp.__class__ = readerproxy
100 resp.__class__ = readerproxy
101
101
102 class _multifile(object):
102 class _multifile(object):
103 def __init__(self, *fileobjs):
103 def __init__(self, *fileobjs):
104 for f in fileobjs:
104 for f in fileobjs:
105 if not util.safehasattr(f, 'length'):
105 if not util.safehasattr(f, 'length'):
106 raise ValueError(
106 raise ValueError(
107 '_multifile only supports file objects that '
107 '_multifile only supports file objects that '
108 'have a length but this one does not:', type(f), f)
108 'have a length but this one does not:', type(f), f)
109 self._fileobjs = fileobjs
109 self._fileobjs = fileobjs
110 self._index = 0
110 self._index = 0
111
111
112 @property
112 @property
113 def length(self):
113 def length(self):
114 return sum(f.length for f in self._fileobjs)
114 return sum(f.length for f in self._fileobjs)
115
115
116 def read(self, amt=None):
116 def read(self, amt=None):
117 if amt <= 0:
117 if amt <= 0:
118 return ''.join(f.read() for f in self._fileobjs)
118 return ''.join(f.read() for f in self._fileobjs)
119 parts = []
119 parts = []
120 while amt and self._index < len(self._fileobjs):
120 while amt and self._index < len(self._fileobjs):
121 parts.append(self._fileobjs[self._index].read(amt))
121 parts.append(self._fileobjs[self._index].read(amt))
122 got = len(parts[-1])
122 got = len(parts[-1])
123 if got < amt:
123 if got < amt:
124 self._index += 1
124 self._index += 1
125 amt -= got
125 amt -= got
126 return ''.join(parts)
126 return ''.join(parts)
127
127
128 def seek(self, offset, whence=os.SEEK_SET):
128 def seek(self, offset, whence=os.SEEK_SET):
129 if whence != os.SEEK_SET:
129 if whence != os.SEEK_SET:
130 raise NotImplementedError(
130 raise NotImplementedError(
131 '_multifile does not support anything other'
131 '_multifile does not support anything other'
132 ' than os.SEEK_SET for whence on seek()')
132 ' than os.SEEK_SET for whence on seek()')
133 if offset != 0:
133 if offset != 0:
134 raise NotImplementedError(
134 raise NotImplementedError(
135 '_multifile only supports seeking to start, but that '
135 '_multifile only supports seeking to start, but that '
136 'could be fixed if you need it')
136 'could be fixed if you need it')
137 for f in self._fileobjs:
137 for f in self._fileobjs:
138 f.seek(0)
138 f.seek(0)
139 self._index = 0
139 self._index = 0
140
140
141 class httppeer(wireproto.wirepeer):
141 class httppeer(wireproto.wirepeer):
142 def __init__(self, ui, path, url, opener):
142 def __init__(self, ui, path, url, opener):
143 self.ui = ui
143 self.ui = ui
144 self._path = path
144 self._path = path
145 self._url = url
145 self._url = url
146 self._caps = None
146 self._caps = None
147 self._urlopener = opener
147 self._urlopener = opener
148 # This is an its own attribute to facilitate extensions overriding
148 # This is an its own attribute to facilitate extensions overriding
149 # the default type.
149 # the default type.
150 self._requestbuilder = urlreq.request
150 self._requestbuilder = urlreq.request
151
151
152 def __del__(self):
152 def __del__(self):
153 for h in self._urlopener.handlers:
153 for h in self._urlopener.handlers:
154 h.close()
154 h.close()
155 getattr(h, "close_all", lambda: None)()
155 getattr(h, "close_all", lambda: None)()
156
156
157 def _openurl(self, req):
157 def _openurl(self, req):
158 if (self.ui.debugflag
158 if (self.ui.debugflag
159 and self.ui.configbool('devel', 'debug.peer-request')):
159 and self.ui.configbool('devel', 'debug.peer-request')):
160 dbg = self.ui.debug
160 dbg = self.ui.debug
161 line = 'devel-peer-request: %s\n'
161 line = 'devel-peer-request: %s\n'
162 dbg(line % '%s %s' % (req.get_method(), req.get_full_url()))
162 dbg(line % '%s %s' % (req.get_method(), req.get_full_url()))
163 hgargssize = None
163 hgargssize = None
164
164
165 for header, value in sorted(req.header_items()):
165 for header, value in sorted(req.header_items()):
166 if header.startswith('X-hgarg-'):
166 if header.startswith('X-hgarg-'):
167 if hgargssize is None:
167 if hgargssize is None:
168 hgargssize = 0
168 hgargssize = 0
169 hgargssize += len(value)
169 hgargssize += len(value)
170 else:
170 else:
171 dbg(line % ' %s %s' % (header, value))
171 dbg(line % ' %s %s' % (header, value))
172
172
173 if hgargssize is not None:
173 if hgargssize is not None:
174 dbg(line % ' %d bytes of commands arguments in headers'
174 dbg(line % ' %d bytes of commands arguments in headers'
175 % hgargssize)
175 % hgargssize)
176
176
177 if req.has_data():
177 if req.has_data():
178 data = req.get_data()
178 data = req.get_data()
179 length = getattr(data, 'length', None)
179 length = getattr(data, 'length', None)
180 if length is None:
180 if length is None:
181 length = len(data)
181 length = len(data)
182 dbg(line % ' %d bytes of data' % length)
182 dbg(line % ' %d bytes of data' % length)
183
183
184 start = util.timer()
184 start = util.timer()
185
185
186 ret = self._urlopener.open(req)
186 ret = self._urlopener.open(req)
187 if self.ui.configbool('devel', 'debug.peer-request'):
187 if self.ui.configbool('devel', 'debug.peer-request'):
188 dbg(line % ' finished in %.4f seconds (%s)'
188 dbg(line % ' finished in %.4f seconds (%s)'
189 % (util.timer() - start, ret.code))
189 % (util.timer() - start, ret.code))
190 return ret
190 return ret
191
191
192 # Begin of ipeerconnection interface.
192 # Begin of ipeerconnection interface.
193
193
194 def url(self):
194 def url(self):
195 return self._path
195 return self._path
196
196
197 def local(self):
197 def local(self):
198 return None
198 return None
199
199
200 def peer(self):
200 def peer(self):
201 return self
201 return self
202
202
203 def canpush(self):
203 def canpush(self):
204 return True
204 return True
205
205
206 def close(self):
206 def close(self):
207 pass
207 pass
208
208
209 # End of ipeerconnection interface.
209 # End of ipeerconnection interface.
210
210
211 # Begin of ipeercommands interface.
211 # Begin of ipeercommands interface.
212
212
213 def capabilities(self):
213 def capabilities(self):
214 # self._fetchcaps() should have been called as part of peer
214 # self._fetchcaps() should have been called as part of peer
215 # handshake. So self._caps should always be set.
215 # handshake. So self._caps should always be set.
216 assert self._caps is not None
216 assert self._caps is not None
217 return self._caps
217 return self._caps
218
218
219 # End of ipeercommands interface.
219 # End of ipeercommands interface.
220
220
221 # look up capabilities only when needed
221 # look up capabilities only when needed
222
222
223 def _fetchcaps(self):
223 def _fetchcaps(self):
224 self._caps = set(self._call('capabilities').split())
224 self._caps = set(self._call('capabilities').split())
225
225
226 def _callstream(self, cmd, _compressible=False, **args):
226 def _callstream(self, cmd, _compressible=False, **args):
227 args = pycompat.byteskwargs(args)
227 args = pycompat.byteskwargs(args)
228 if cmd == 'pushkey':
228 if cmd == 'pushkey':
229 args['data'] = ''
229 args['data'] = ''
230 data = args.pop('data', None)
230 data = args.pop('data', None)
231 headers = args.pop('headers', {})
231 headers = args.pop('headers', {})
232
232
233 self.ui.debug("sending %s command\n" % cmd)
233 self.ui.debug("sending %s command\n" % cmd)
234 q = [('cmd', cmd)]
234 q = [('cmd', cmd)]
235 headersize = 0
235 headersize = 0
236 varyheaders = []
236 varyheaders = []
237 # Important: don't use self.capable() here or else you end up
237 # Important: don't use self.capable() here or else you end up
238 # with infinite recursion when trying to look up capabilities
238 # with infinite recursion when trying to look up capabilities
239 # for the first time.
239 # for the first time.
240 postargsok = self._caps is not None and 'httppostargs' in self._caps
240 postargsok = self._caps is not None and 'httppostargs' in self._caps
241
241
242 # Send arguments via POST.
242 # Send arguments via POST.
243 if postargsok and args:
243 if postargsok and args:
244 strargs = urlreq.urlencode(sorted(args.items()))
244 strargs = urlreq.urlencode(sorted(args.items()))
245 if not data:
245 if not data:
246 data = strargs
246 data = strargs
247 else:
247 else:
248 if isinstance(data, bytes):
248 if isinstance(data, bytes):
249 i = io.BytesIO(data)
249 i = io.BytesIO(data)
250 i.length = len(data)
250 i.length = len(data)
251 data = i
251 data = i
252 argsio = io.BytesIO(strargs)
252 argsio = io.BytesIO(strargs)
253 argsio.length = len(strargs)
253 argsio.length = len(strargs)
254 data = _multifile(argsio, data)
254 data = _multifile(argsio, data)
255 headers[r'X-HgArgs-Post'] = len(strargs)
255 headers[r'X-HgArgs-Post'] = len(strargs)
256 elif args:
256 elif args:
257 # Calling self.capable() can infinite loop if we are calling
257 # Calling self.capable() can infinite loop if we are calling
258 # "capabilities". But that command should never accept wire
258 # "capabilities". But that command should never accept wire
259 # protocol arguments. So this should never happen.
259 # protocol arguments. So this should never happen.
260 assert cmd != 'capabilities'
260 assert cmd != 'capabilities'
261 httpheader = self.capable('httpheader')
261 httpheader = self.capable('httpheader')
262 if httpheader:
262 if httpheader:
263 headersize = int(httpheader.split(',', 1)[0])
263 headersize = int(httpheader.split(',', 1)[0])
264
264
265 # Send arguments via HTTP headers.
265 # Send arguments via HTTP headers.
266 if headersize > 0:
266 if headersize > 0:
267 # The headers can typically carry more data than the URL.
267 # The headers can typically carry more data than the URL.
268 encargs = urlreq.urlencode(sorted(args.items()))
268 encargs = urlreq.urlencode(sorted(args.items()))
269 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
269 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
270 headersize):
270 headersize):
271 headers[header] = value
271 headers[header] = value
272 varyheaders.append(header)
272 varyheaders.append(header)
273 # Send arguments via query string (Mercurial <1.9).
273 # Send arguments via query string (Mercurial <1.9).
274 else:
274 else:
275 q += sorted(args.items())
275 q += sorted(args.items())
276
276
277 qs = '?%s' % urlreq.urlencode(q)
277 qs = '?%s' % urlreq.urlencode(q)
278 cu = "%s%s" % (self._url, qs)
278 cu = "%s%s" % (self._url, qs)
279 size = 0
279 size = 0
280 if util.safehasattr(data, 'length'):
280 if util.safehasattr(data, 'length'):
281 size = data.length
281 size = data.length
282 elif data is not None:
282 elif data is not None:
283 size = len(data)
283 size = len(data)
284 if data is not None and r'Content-Type' not in headers:
284 if data is not None and r'Content-Type' not in headers:
285 headers[r'Content-Type'] = r'application/mercurial-0.1'
285 headers[r'Content-Type'] = r'application/mercurial-0.1'
286
286
287 # Tell the server we accept application/mercurial-0.2 and multiple
287 # Tell the server we accept application/mercurial-0.2 and multiple
288 # compression formats if the server is capable of emitting those
288 # compression formats if the server is capable of emitting those
289 # payloads.
289 # payloads.
290 protoparams = {'partial-pull'}
290 protoparams = {'partial-pull'}
291
291
292 mediatypes = set()
292 mediatypes = set()
293 if self._caps is not None:
293 if self._caps is not None:
294 mt = self.capable('httpmediatype')
294 mt = self.capable('httpmediatype')
295 if mt:
295 if mt:
296 protoparams.add('0.1')
296 protoparams.add('0.1')
297 mediatypes = set(mt.split(','))
297 mediatypes = set(mt.split(','))
298
298
299 if '0.2tx' in mediatypes:
299 if '0.2tx' in mediatypes:
300 protoparams.add('0.2')
300 protoparams.add('0.2')
301
301
302 if '0.2tx' in mediatypes and self.capable('compression'):
302 if '0.2tx' in mediatypes and self.capable('compression'):
303 # We /could/ compare supported compression formats and prune
303 # We /could/ compare supported compression formats and prune
304 # non-mutually supported or error if nothing is mutually supported.
304 # non-mutually supported or error if nothing is mutually supported.
305 # For now, send the full list to the server and have it error.
305 # For now, send the full list to the server and have it error.
306 comps = [e.wireprotosupport().name for e in
306 comps = [e.wireprotosupport().name for e in
307 util.compengines.supportedwireengines(util.CLIENTROLE)]
307 util.compengines.supportedwireengines(util.CLIENTROLE)]
308 protoparams.add('comp=%s' % ','.join(comps))
308 protoparams.add('comp=%s' % ','.join(comps))
309
309
310 if protoparams:
310 if protoparams:
311 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
311 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
312 'X-HgProto',
312 'X-HgProto',
313 headersize or 1024)
313 headersize or 1024)
314 for header, value in protoheaders:
314 for header, value in protoheaders:
315 headers[header] = value
315 headers[header] = value
316 varyheaders.append(header)
316 varyheaders.append(header)
317
317
318 if varyheaders:
318 if varyheaders:
319 headers[r'Vary'] = r','.join(varyheaders)
319 headers[r'Vary'] = r','.join(varyheaders)
320
320
321 req = self._requestbuilder(pycompat.strurl(cu), data, headers)
321 req = self._requestbuilder(pycompat.strurl(cu), data, headers)
322
322
323 if data is not None:
323 if data is not None:
324 self.ui.debug("sending %d bytes\n" % size)
324 self.ui.debug("sending %d bytes\n" % size)
325 req.add_unredirected_header(r'Content-Length', r'%d' % size)
325 req.add_unredirected_header(r'Content-Length', r'%d' % size)
326 try:
326 try:
327 resp = self._openurl(req)
327 resp = self._openurl(req)
328 except urlerr.httperror as inst:
328 except urlerr.httperror as inst:
329 if inst.code == 401:
329 if inst.code == 401:
330 raise error.Abort(_('authorization failed'))
330 raise error.Abort(_('authorization failed'))
331 raise
331 raise
332 except httplib.HTTPException as inst:
332 except httplib.HTTPException as inst:
333 self.ui.debug('http error while sending %s command\n' % cmd)
333 self.ui.debug('http error while sending %s command\n' % cmd)
334 self.ui.traceback()
334 self.ui.traceback()
335 raise IOError(None, inst)
335 raise IOError(None, inst)
336
336
337 # Insert error handlers for common I/O failures.
337 # Insert error handlers for common I/O failures.
338 _wraphttpresponse(resp)
338 _wraphttpresponse(resp)
339
339
340 # record the url we got redirected to
340 # record the url we got redirected to
341 resp_url = pycompat.bytesurl(resp.geturl())
341 resp_url = pycompat.bytesurl(resp.geturl())
342 if resp_url.endswith(qs):
342 if resp_url.endswith(qs):
343 resp_url = resp_url[:-len(qs)]
343 resp_url = resp_url[:-len(qs)]
344 if self._url.rstrip('/') != resp_url.rstrip('/'):
344 if self._url.rstrip('/') != resp_url.rstrip('/'):
345 if not self.ui.quiet:
345 if not self.ui.quiet:
346 self.ui.warn(_('real URL is %s\n') % resp_url)
346 self.ui.warn(_('real URL is %s\n') % resp_url)
347 self._url = resp_url
347 self._url = resp_url
348 try:
348 try:
349 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
349 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
350 except AttributeError:
350 except AttributeError:
351 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
351 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
352
352
353 safeurl = util.hidepassword(self._url)
353 safeurl = util.hidepassword(self._url)
354 if proto.startswith('application/hg-error'):
354 if proto.startswith('application/hg-error'):
355 raise error.OutOfBandError(resp.read())
355 raise error.OutOfBandError(resp.read())
356 # accept old "text/plain" and "application/hg-changegroup" for now
356 # accept old "text/plain" and "application/hg-changegroup" for now
357 if not (proto.startswith('application/mercurial-') or
357 if not (proto.startswith('application/mercurial-') or
358 (proto.startswith('text/plain')
358 (proto.startswith('text/plain')
359 and not resp.headers.get('content-length')) or
359 and not resp.headers.get('content-length')) or
360 proto.startswith('application/hg-changegroup')):
360 proto.startswith('application/hg-changegroup')):
361 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
361 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
362 raise error.RepoError(
362 raise error.RepoError(
363 _("'%s' does not appear to be an hg repository:\n"
363 _("'%s' does not appear to be an hg repository:\n"
364 "---%%<--- (%s)\n%s\n---%%<---\n")
364 "---%%<--- (%s)\n%s\n---%%<---\n")
365 % (safeurl, proto or 'no content-type', resp.read(1024)))
365 % (safeurl, proto or 'no content-type', resp.read(1024)))
366
366
367 if proto.startswith('application/mercurial-'):
367 if proto.startswith('application/mercurial-'):
368 try:
368 try:
369 version = proto.split('-', 1)[1]
369 version = proto.split('-', 1)[1]
370 version_info = tuple([int(n) for n in version.split('.')])
370 version_info = tuple([int(n) for n in version.split('.')])
371 except ValueError:
371 except ValueError:
372 raise error.RepoError(_("'%s' sent a broken Content-Type "
372 raise error.RepoError(_("'%s' sent a broken Content-Type "
373 "header (%s)") % (safeurl, proto))
373 "header (%s)") % (safeurl, proto))
374
374
375 # TODO consider switching to a decompression reader that uses
375 # TODO consider switching to a decompression reader that uses
376 # generators.
376 # generators.
377 if version_info == (0, 1):
377 if version_info == (0, 1):
378 if _compressible:
378 if _compressible:
379 return util.compengines['zlib'].decompressorreader(resp)
379 return util.compengines['zlib'].decompressorreader(resp)
380 return resp
380 return resp
381 elif version_info == (0, 2):
381 elif version_info == (0, 2):
382 # application/mercurial-0.2 always identifies the compression
382 # application/mercurial-0.2 always identifies the compression
383 # engine in the payload header.
383 # engine in the payload header.
384 elen = struct.unpack('B', resp.read(1))[0]
384 elen = struct.unpack('B', resp.read(1))[0]
385 ename = resp.read(elen)
385 ename = resp.read(elen)
386 engine = util.compengines.forwiretype(ename)
386 engine = util.compengines.forwiretype(ename)
387 return engine.decompressorreader(resp)
387 return engine.decompressorreader(resp)
388 else:
388 else:
389 raise error.RepoError(_("'%s' uses newer protocol %s") %
389 raise error.RepoError(_("'%s' uses newer protocol %s") %
390 (safeurl, version))
390 (safeurl, version))
391
391
392 if _compressible:
392 if _compressible:
393 return util.compengines['zlib'].decompressorreader(resp)
393 return util.compengines['zlib'].decompressorreader(resp)
394
394
395 return resp
395 return resp
396
396
397 def _call(self, cmd, **args):
397 def _call(self, cmd, **args):
398 fp = self._callstream(cmd, **args)
398 fp = self._callstream(cmd, **args)
399 try:
399 try:
400 return fp.read()
400 return fp.read()
401 finally:
401 finally:
402 # if using keepalive, allow connection to be reused
402 # if using keepalive, allow connection to be reused
403 fp.close()
403 fp.close()
404
404
405 def _callpush(self, cmd, cg, **args):
405 def _callpush(self, cmd, cg, **args):
406 # have to stream bundle to a temp file because we do not have
406 # have to stream bundle to a temp file because we do not have
407 # http 1.1 chunked transfer.
407 # http 1.1 chunked transfer.
408
408
409 types = self.capable('unbundle')
409 types = self.capable('unbundle')
410 try:
410 try:
411 types = types.split(',')
411 types = types.split(',')
412 except AttributeError:
412 except AttributeError:
413 # servers older than d1b16a746db6 will send 'unbundle' as a
413 # servers older than d1b16a746db6 will send 'unbundle' as a
414 # boolean capability. They only support headerless/uncompressed
414 # boolean capability. They only support headerless/uncompressed
415 # bundles.
415 # bundles.
416 types = [""]
416 types = [""]
417 for x in types:
417 for x in types:
418 if x in bundle2.bundletypes:
418 if x in bundle2.bundletypes:
419 type = x
419 type = x
420 break
420 break
421
421
422 tempname = bundle2.writebundle(self.ui, cg, None, type)
422 tempname = bundle2.writebundle(self.ui, cg, None, type)
423 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
423 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
424 headers = {r'Content-Type': r'application/mercurial-0.1'}
424 headers = {r'Content-Type': r'application/mercurial-0.1'}
425
425
426 try:
426 try:
427 r = self._call(cmd, data=fp, headers=headers, **args)
427 r = self._call(cmd, data=fp, headers=headers, **args)
428 vals = r.split('\n', 1)
428 vals = r.split('\n', 1)
429 if len(vals) < 2:
429 if len(vals) < 2:
430 raise error.ResponseError(_("unexpected response:"), r)
430 raise error.ResponseError(_("unexpected response:"), r)
431 return vals
431 return vals
432 except urlerr.httperror:
432 except urlerr.httperror:
433 # Catch and re-raise these so we don't try and treat them
433 # Catch and re-raise these so we don't try and treat them
434 # like generic socket errors. They lack any values in
434 # like generic socket errors. They lack any values in
435 # .args on Python 3 which breaks our socket.error block.
435 # .args on Python 3 which breaks our socket.error block.
436 raise
436 raise
437 except socket.error as err:
437 except socket.error as err:
438 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
438 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
439 raise error.Abort(_('push failed: %s') % err.args[1])
439 raise error.Abort(_('push failed: %s') % err.args[1])
440 raise error.Abort(err.args[1])
440 raise error.Abort(err.args[1])
441 finally:
441 finally:
442 fp.close()
442 fp.close()
443 os.unlink(tempname)
443 os.unlink(tempname)
444
444
445 def _calltwowaystream(self, cmd, fp, **args):
445 def _calltwowaystream(self, cmd, fp, **args):
446 fh = None
446 fh = None
447 fp_ = None
447 fp_ = None
448 filename = None
448 filename = None
449 try:
449 try:
450 # dump bundle to disk
450 # dump bundle to disk
451 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
451 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
452 fh = os.fdopen(fd, r"wb")
452 fh = os.fdopen(fd, r"wb")
453 d = fp.read(4096)
453 d = fp.read(4096)
454 while d:
454 while d:
455 fh.write(d)
455 fh.write(d)
456 d = fp.read(4096)
456 d = fp.read(4096)
457 fh.close()
457 fh.close()
458 # start http push
458 # start http push
459 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
459 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
460 headers = {r'Content-Type': r'application/mercurial-0.1'}
460 headers = {r'Content-Type': r'application/mercurial-0.1'}
461 return self._callstream(cmd, data=fp_, headers=headers, **args)
461 return self._callstream(cmd, data=fp_, headers=headers, **args)
462 finally:
462 finally:
463 if fp_ is not None:
463 if fp_ is not None:
464 fp_.close()
464 fp_.close()
465 if fh is not None:
465 if fh is not None:
466 fh.close()
466 fh.close()
467 os.unlink(filename)
467 os.unlink(filename)
468
468
469 def _callcompressable(self, cmd, **args):
469 def _callcompressable(self, cmd, **args):
470 return self._callstream(cmd, _compressible=True, **args)
470 return self._callstream(cmd, _compressible=True, **args)
471
471
472 def _abort(self, exception):
472 def _abort(self, exception):
473 raise exception
473 raise exception
474
474
475 # TODO implement interface for version 2 peers
475 # TODO implement interface for version 2 peers
476 class httpv2peer(object):
476 class httpv2peer(object):
477 def __init__(self, ui, repourl, opener):
477 def __init__(self, ui, repourl, opener):
478 self.ui = ui
478 self.ui = ui
479
479
480 if repourl.endswith('/'):
480 if repourl.endswith('/'):
481 repourl = repourl[:-1]
481 repourl = repourl[:-1]
482
482
483 self.url = repourl
483 self.url = repourl
484 self._opener = opener
484 self._opener = opener
485 # This is an its own attribute to facilitate extensions overriding
485 # This is an its own attribute to facilitate extensions overriding
486 # the default type.
486 # the default type.
487 self._requestbuilder = urlreq.request
487 self._requestbuilder = urlreq.request
488
488
489 def close(self):
489 def close(self):
490 pass
490 pass
491
491
492 # TODO require to be part of a batched primitive, use futures.
492 # TODO require to be part of a batched primitive, use futures.
493 def _call(self, name, **args):
493 def _call(self, name, **args):
494 """Call a wire protocol command with arguments."""
494 """Call a wire protocol command with arguments."""
495
495
496 # TODO permissions should come from capabilities results.
496 # TODO permissions should come from capabilities results.
497 permission = wireproto.commandsv2[name].permission
497 permission = wireproto.commandsv2[name].permission
498 if permission not in ('push', 'pull'):
498 if permission not in ('push', 'pull'):
499 raise error.ProgrammingError('unknown permission type: %s' %
499 raise error.ProgrammingError('unknown permission type: %s' %
500 permission)
500 permission)
501
501
502 permission = {
502 permission = {
503 'push': 'rw',
503 'push': 'rw',
504 'pull': 'ro',
504 'pull': 'ro',
505 }[permission]
505 }[permission]
506
506
507 url = '%s/api/%s/%s/%s' % (self.url, wireprotoserver.HTTPV2, permission,
507 url = '%s/api/%s/%s/%s' % (self.url, wireprotoserver.HTTPV2, permission,
508 name)
508 name)
509
509
510 # TODO modify user-agent to reflect v2.
510 # TODO modify user-agent to reflect v2.
511 headers = {
511 headers = {
512 r'Accept': wireprotoserver.FRAMINGTYPE,
512 r'Accept': wireprotoserver.FRAMINGTYPE,
513 r'Content-Type': wireprotoserver.FRAMINGTYPE,
513 r'Content-Type': wireprotoserver.FRAMINGTYPE,
514 }
514 }
515
515
516 # TODO this should be part of a generic peer for the frame-based
516 # TODO this should be part of a generic peer for the frame-based
517 # protocol.
517 # protocol.
518 stream = wireprotoframing.stream(1)
518 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
519 frames = wireprotoframing.createcommandframes(stream, 1,
519 buffersends=True)
520 name, args)
521
520
522 body = b''.join(map(bytes, frames))
521 request, action, meta = reactor.callcommand(name, args)
522 assert action == 'noop'
523
524 action, meta = reactor.flushcommands()
525 assert action == 'sendframes'
526
527 body = b''.join(map(bytes, meta['framegen']))
523 req = self._requestbuilder(pycompat.strurl(url), body, headers)
528 req = self._requestbuilder(pycompat.strurl(url), body, headers)
524 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
529 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
525
530
526 # TODO unify this code with httppeer.
531 # TODO unify this code with httppeer.
527 try:
532 try:
528 res = self._opener.open(req)
533 res = self._opener.open(req)
529 except urlerr.httperror as e:
534 except urlerr.httperror as e:
530 if e.code == 401:
535 if e.code == 401:
531 raise error.Abort(_('authorization failed'))
536 raise error.Abort(_('authorization failed'))
532
537
533 raise
538 raise
534 except httplib.HTTPException as e:
539 except httplib.HTTPException as e:
535 self.ui.traceback()
540 self.ui.traceback()
536 raise IOError(None, e)
541 raise IOError(None, e)
537
542
538 # TODO validate response type, wrap response to handle I/O errors.
543 # TODO validate response type, wrap response to handle I/O errors.
539 # TODO more robust frame receiver.
544 # TODO more robust frame receiver.
540 results = []
545 results = []
541
546
542 while True:
547 while True:
543 frame = wireprotoframing.readframe(res)
548 frame = wireprotoframing.readframe(res)
544 if frame is None:
549 if frame is None:
545 break
550 break
546
551
547 self.ui.note(_('received %r\n') % frame)
552 self.ui.note(_('received %r\n') % frame)
548
553
549 if frame.typeid == wireprotoframing.FRAME_TYPE_BYTES_RESPONSE:
554 if frame.typeid == wireprotoframing.FRAME_TYPE_BYTES_RESPONSE:
550 if frame.flags & wireprotoframing.FLAG_BYTES_RESPONSE_CBOR:
555 if frame.flags & wireprotoframing.FLAG_BYTES_RESPONSE_CBOR:
551 payload = util.bytesio(frame.payload)
556 payload = util.bytesio(frame.payload)
552
557
553 decoder = cbor.CBORDecoder(payload)
558 decoder = cbor.CBORDecoder(payload)
554 while payload.tell() + 1 < len(frame.payload):
559 while payload.tell() + 1 < len(frame.payload):
555 results.append(decoder.decode())
560 results.append(decoder.decode())
556 else:
561 else:
557 results.append(frame.payload)
562 results.append(frame.payload)
558 else:
563 else:
559 error.ProgrammingError('unhandled frame type: %d' %
564 error.ProgrammingError('unhandled frame type: %d' %
560 frame.typeid)
565 frame.typeid)
561
566
562 return results
567 return results
563
568
564 def makepeer(ui, path):
569 def makepeer(ui, path):
565 u = util.url(path)
570 u = util.url(path)
566 if u.query or u.fragment:
571 if u.query or u.fragment:
567 raise error.Abort(_('unsupported URL component: "%s"') %
572 raise error.Abort(_('unsupported URL component: "%s"') %
568 (u.query or u.fragment))
573 (u.query or u.fragment))
569
574
570 # urllib cannot handle URLs with embedded user or passwd.
575 # urllib cannot handle URLs with embedded user or passwd.
571 url, authinfo = u.authinfo()
576 url, authinfo = u.authinfo()
572 ui.debug('using %s\n' % url)
577 ui.debug('using %s\n' % url)
573
578
574 opener = urlmod.opener(ui, authinfo)
579 opener = urlmod.opener(ui, authinfo)
575
580
576 return httppeer(ui, path, url, opener)
581 return httppeer(ui, path, url, opener)
577
582
578 def instance(ui, path, create):
583 def instance(ui, path, create):
579 if create:
584 if create:
580 raise error.Abort(_('cannot create new http repository'))
585 raise error.Abort(_('cannot create new http repository'))
581 try:
586 try:
582 if path.startswith('https:') and not urlmod.has_https:
587 if path.startswith('https:') and not urlmod.has_https:
583 raise error.Abort(_('Python support for SSL and HTTPS '
588 raise error.Abort(_('Python support for SSL and HTTPS '
584 'is not installed'))
589 'is not installed'))
585
590
586 inst = makepeer(ui, path)
591 inst = makepeer(ui, path)
587 inst._fetchcaps()
592 inst._fetchcaps()
588
593
589 return inst
594 return inst
590 except error.RepoError as httpexception:
595 except error.RepoError as httpexception:
591 try:
596 try:
592 r = statichttprepo.instance(ui, "static-" + path, create)
597 r = statichttprepo.instance(ui, "static-" + path, create)
593 ui.note(_('(falling back to static-http)\n'))
598 ui.note(_('(falling back to static-http)\n'))
594 return r
599 return r
595 except error.RepoError:
600 except error.RepoError:
596 raise httpexception # use the original http RepoError instead
601 raise httpexception # use the original http RepoError instead
@@ -1,878 +1,1009 b''
1 # wireprotoframing.py - unified framing protocol for wire protocol
1 # wireprotoframing.py - unified framing protocol for wire protocol
2 #
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 # This file contains functionality to support the unified frame-based wire
8 # This file contains functionality to support the unified frame-based wire
9 # protocol. For details about the protocol, see
9 # protocol. For details about the protocol, see
10 # `hg help internals.wireprotocol`.
10 # `hg help internals.wireprotocol`.
11
11
12 from __future__ import absolute_import
12 from __future__ import absolute_import
13
13
14 import collections
14 import struct
15 import struct
15
16
16 from .i18n import _
17 from .i18n import _
17 from .thirdparty import (
18 from .thirdparty import (
18 attr,
19 attr,
19 cbor,
20 cbor,
20 )
21 )
21 from . import (
22 from . import (
22 encoding,
23 encoding,
23 error,
24 error,
24 util,
25 util,
25 )
26 )
26 from .utils import (
27 from .utils import (
27 stringutil,
28 stringutil,
28 )
29 )
29
30
30 FRAME_HEADER_SIZE = 8
31 FRAME_HEADER_SIZE = 8
31 DEFAULT_MAX_FRAME_SIZE = 32768
32 DEFAULT_MAX_FRAME_SIZE = 32768
32
33
33 STREAM_FLAG_BEGIN_STREAM = 0x01
34 STREAM_FLAG_BEGIN_STREAM = 0x01
34 STREAM_FLAG_END_STREAM = 0x02
35 STREAM_FLAG_END_STREAM = 0x02
35 STREAM_FLAG_ENCODING_APPLIED = 0x04
36 STREAM_FLAG_ENCODING_APPLIED = 0x04
36
37
37 STREAM_FLAGS = {
38 STREAM_FLAGS = {
38 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
39 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
39 b'stream-end': STREAM_FLAG_END_STREAM,
40 b'stream-end': STREAM_FLAG_END_STREAM,
40 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
41 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
41 }
42 }
42
43
43 FRAME_TYPE_COMMAND_REQUEST = 0x01
44 FRAME_TYPE_COMMAND_REQUEST = 0x01
44 FRAME_TYPE_COMMAND_DATA = 0x03
45 FRAME_TYPE_COMMAND_DATA = 0x03
45 FRAME_TYPE_BYTES_RESPONSE = 0x04
46 FRAME_TYPE_BYTES_RESPONSE = 0x04
46 FRAME_TYPE_ERROR_RESPONSE = 0x05
47 FRAME_TYPE_ERROR_RESPONSE = 0x05
47 FRAME_TYPE_TEXT_OUTPUT = 0x06
48 FRAME_TYPE_TEXT_OUTPUT = 0x06
48 FRAME_TYPE_PROGRESS = 0x07
49 FRAME_TYPE_PROGRESS = 0x07
49 FRAME_TYPE_STREAM_SETTINGS = 0x08
50 FRAME_TYPE_STREAM_SETTINGS = 0x08
50
51
51 FRAME_TYPES = {
52 FRAME_TYPES = {
52 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
53 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
53 b'command-data': FRAME_TYPE_COMMAND_DATA,
54 b'command-data': FRAME_TYPE_COMMAND_DATA,
54 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
55 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
55 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
56 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
56 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
57 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
57 b'progress': FRAME_TYPE_PROGRESS,
58 b'progress': FRAME_TYPE_PROGRESS,
58 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
59 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
59 }
60 }
60
61
61 FLAG_COMMAND_REQUEST_NEW = 0x01
62 FLAG_COMMAND_REQUEST_NEW = 0x01
62 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
63 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
63 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
64 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
64 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
65 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
65
66
66 FLAGS_COMMAND_REQUEST = {
67 FLAGS_COMMAND_REQUEST = {
67 b'new': FLAG_COMMAND_REQUEST_NEW,
68 b'new': FLAG_COMMAND_REQUEST_NEW,
68 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
69 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
69 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
70 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
70 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
71 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
71 }
72 }
72
73
73 FLAG_COMMAND_DATA_CONTINUATION = 0x01
74 FLAG_COMMAND_DATA_CONTINUATION = 0x01
74 FLAG_COMMAND_DATA_EOS = 0x02
75 FLAG_COMMAND_DATA_EOS = 0x02
75
76
76 FLAGS_COMMAND_DATA = {
77 FLAGS_COMMAND_DATA = {
77 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
78 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
78 b'eos': FLAG_COMMAND_DATA_EOS,
79 b'eos': FLAG_COMMAND_DATA_EOS,
79 }
80 }
80
81
81 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
82 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
82 FLAG_BYTES_RESPONSE_EOS = 0x02
83 FLAG_BYTES_RESPONSE_EOS = 0x02
83 FLAG_BYTES_RESPONSE_CBOR = 0x04
84 FLAG_BYTES_RESPONSE_CBOR = 0x04
84
85
85 FLAGS_BYTES_RESPONSE = {
86 FLAGS_BYTES_RESPONSE = {
86 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
87 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
87 b'eos': FLAG_BYTES_RESPONSE_EOS,
88 b'eos': FLAG_BYTES_RESPONSE_EOS,
88 b'cbor': FLAG_BYTES_RESPONSE_CBOR,
89 b'cbor': FLAG_BYTES_RESPONSE_CBOR,
89 }
90 }
90
91
91 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
92 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
92 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
93 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
93
94
94 FLAGS_ERROR_RESPONSE = {
95 FLAGS_ERROR_RESPONSE = {
95 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
96 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
96 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
97 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
97 }
98 }
98
99
99 # Maps frame types to their available flags.
100 # Maps frame types to their available flags.
100 FRAME_TYPE_FLAGS = {
101 FRAME_TYPE_FLAGS = {
101 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
102 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
102 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
103 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
103 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
104 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
104 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
105 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
105 FRAME_TYPE_TEXT_OUTPUT: {},
106 FRAME_TYPE_TEXT_OUTPUT: {},
106 FRAME_TYPE_PROGRESS: {},
107 FRAME_TYPE_PROGRESS: {},
107 FRAME_TYPE_STREAM_SETTINGS: {},
108 FRAME_TYPE_STREAM_SETTINGS: {},
108 }
109 }
109
110
110 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
111 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
111
112
112 def humanflags(mapping, value):
113 def humanflags(mapping, value):
113 """Convert a numeric flags value to a human value, using a mapping table."""
114 """Convert a numeric flags value to a human value, using a mapping table."""
114 namemap = {v: k for k, v in mapping.iteritems()}
115 namemap = {v: k for k, v in mapping.iteritems()}
115 flags = []
116 flags = []
116 val = 1
117 val = 1
117 while value >= val:
118 while value >= val:
118 if value & val:
119 if value & val:
119 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
120 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
120 val <<= 1
121 val <<= 1
121
122
122 return b'|'.join(flags)
123 return b'|'.join(flags)
123
124
124 @attr.s(slots=True)
125 @attr.s(slots=True)
125 class frameheader(object):
126 class frameheader(object):
126 """Represents the data in a frame header."""
127 """Represents the data in a frame header."""
127
128
128 length = attr.ib()
129 length = attr.ib()
129 requestid = attr.ib()
130 requestid = attr.ib()
130 streamid = attr.ib()
131 streamid = attr.ib()
131 streamflags = attr.ib()
132 streamflags = attr.ib()
132 typeid = attr.ib()
133 typeid = attr.ib()
133 flags = attr.ib()
134 flags = attr.ib()
134
135
135 @attr.s(slots=True, repr=False)
136 @attr.s(slots=True, repr=False)
136 class frame(object):
137 class frame(object):
137 """Represents a parsed frame."""
138 """Represents a parsed frame."""
138
139
139 requestid = attr.ib()
140 requestid = attr.ib()
140 streamid = attr.ib()
141 streamid = attr.ib()
141 streamflags = attr.ib()
142 streamflags = attr.ib()
142 typeid = attr.ib()
143 typeid = attr.ib()
143 flags = attr.ib()
144 flags = attr.ib()
144 payload = attr.ib()
145 payload = attr.ib()
145
146
146 @encoding.strmethod
147 @encoding.strmethod
147 def __repr__(self):
148 def __repr__(self):
148 typename = '<unknown 0x%02x>' % self.typeid
149 typename = '<unknown 0x%02x>' % self.typeid
149 for name, value in FRAME_TYPES.iteritems():
150 for name, value in FRAME_TYPES.iteritems():
150 if value == self.typeid:
151 if value == self.typeid:
151 typename = name
152 typename = name
152 break
153 break
153
154
154 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
155 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
155 'type=%s; flags=%s)' % (
156 'type=%s; flags=%s)' % (
156 len(self.payload), self.requestid, self.streamid,
157 len(self.payload), self.requestid, self.streamid,
157 humanflags(STREAM_FLAGS, self.streamflags), typename,
158 humanflags(STREAM_FLAGS, self.streamflags), typename,
158 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
159 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
159
160
160 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
161 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
161 """Assemble a frame into a byte array."""
162 """Assemble a frame into a byte array."""
162 # TODO assert size of payload.
163 # TODO assert size of payload.
163 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
164 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
164
165
165 # 24 bits length
166 # 24 bits length
166 # 16 bits request id
167 # 16 bits request id
167 # 8 bits stream id
168 # 8 bits stream id
168 # 8 bits stream flags
169 # 8 bits stream flags
169 # 4 bits type
170 # 4 bits type
170 # 4 bits flags
171 # 4 bits flags
171
172
172 l = struct.pack(r'<I', len(payload))
173 l = struct.pack(r'<I', len(payload))
173 frame[0:3] = l[0:3]
174 frame[0:3] = l[0:3]
174 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
175 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
175 frame[7] = (typeid << 4) | flags
176 frame[7] = (typeid << 4) | flags
176 frame[8:] = payload
177 frame[8:] = payload
177
178
178 return frame
179 return frame
179
180
180 def makeframefromhumanstring(s):
181 def makeframefromhumanstring(s):
181 """Create a frame from a human readable string
182 """Create a frame from a human readable string
182
183
183 Strings have the form:
184 Strings have the form:
184
185
185 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
186 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
186
187
187 This can be used by user-facing applications and tests for creating
188 This can be used by user-facing applications and tests for creating
188 frames easily without having to type out a bunch of constants.
189 frames easily without having to type out a bunch of constants.
189
190
190 Request ID and stream IDs are integers.
191 Request ID and stream IDs are integers.
191
192
192 Stream flags, frame type, and flags can be specified by integer or
193 Stream flags, frame type, and flags can be specified by integer or
193 named constant.
194 named constant.
194
195
195 Flags can be delimited by `|` to bitwise OR them together.
196 Flags can be delimited by `|` to bitwise OR them together.
196
197
197 If the payload begins with ``cbor:``, the following string will be
198 If the payload begins with ``cbor:``, the following string will be
198 evaluated as Python literal and the resulting object will be fed into
199 evaluated as Python literal and the resulting object will be fed into
199 a CBOR encoder. Otherwise, the payload is interpreted as a Python
200 a CBOR encoder. Otherwise, the payload is interpreted as a Python
200 byte string literal.
201 byte string literal.
201 """
202 """
202 fields = s.split(b' ', 5)
203 fields = s.split(b' ', 5)
203 requestid, streamid, streamflags, frametype, frameflags, payload = fields
204 requestid, streamid, streamflags, frametype, frameflags, payload = fields
204
205
205 requestid = int(requestid)
206 requestid = int(requestid)
206 streamid = int(streamid)
207 streamid = int(streamid)
207
208
208 finalstreamflags = 0
209 finalstreamflags = 0
209 for flag in streamflags.split(b'|'):
210 for flag in streamflags.split(b'|'):
210 if flag in STREAM_FLAGS:
211 if flag in STREAM_FLAGS:
211 finalstreamflags |= STREAM_FLAGS[flag]
212 finalstreamflags |= STREAM_FLAGS[flag]
212 else:
213 else:
213 finalstreamflags |= int(flag)
214 finalstreamflags |= int(flag)
214
215
215 if frametype in FRAME_TYPES:
216 if frametype in FRAME_TYPES:
216 frametype = FRAME_TYPES[frametype]
217 frametype = FRAME_TYPES[frametype]
217 else:
218 else:
218 frametype = int(frametype)
219 frametype = int(frametype)
219
220
220 finalflags = 0
221 finalflags = 0
221 validflags = FRAME_TYPE_FLAGS[frametype]
222 validflags = FRAME_TYPE_FLAGS[frametype]
222 for flag in frameflags.split(b'|'):
223 for flag in frameflags.split(b'|'):
223 if flag in validflags:
224 if flag in validflags:
224 finalflags |= validflags[flag]
225 finalflags |= validflags[flag]
225 else:
226 else:
226 finalflags |= int(flag)
227 finalflags |= int(flag)
227
228
228 if payload.startswith(b'cbor:'):
229 if payload.startswith(b'cbor:'):
229 payload = cbor.dumps(stringutil.evalpythonliteral(payload[5:]),
230 payload = cbor.dumps(stringutil.evalpythonliteral(payload[5:]),
230 canonical=True)
231 canonical=True)
231
232
232 else:
233 else:
233 payload = stringutil.unescapestr(payload)
234 payload = stringutil.unescapestr(payload)
234
235
235 return makeframe(requestid=requestid, streamid=streamid,
236 return makeframe(requestid=requestid, streamid=streamid,
236 streamflags=finalstreamflags, typeid=frametype,
237 streamflags=finalstreamflags, typeid=frametype,
237 flags=finalflags, payload=payload)
238 flags=finalflags, payload=payload)
238
239
239 def parseheader(data):
240 def parseheader(data):
240 """Parse a unified framing protocol frame header from a buffer.
241 """Parse a unified framing protocol frame header from a buffer.
241
242
242 The header is expected to be in the buffer at offset 0 and the
243 The header is expected to be in the buffer at offset 0 and the
243 buffer is expected to be large enough to hold a full header.
244 buffer is expected to be large enough to hold a full header.
244 """
245 """
245 # 24 bits payload length (little endian)
246 # 24 bits payload length (little endian)
246 # 16 bits request ID
247 # 16 bits request ID
247 # 8 bits stream ID
248 # 8 bits stream ID
248 # 8 bits stream flags
249 # 8 bits stream flags
249 # 4 bits frame type
250 # 4 bits frame type
250 # 4 bits frame flags
251 # 4 bits frame flags
251 # ... payload
252 # ... payload
252 framelength = data[0] + 256 * data[1] + 16384 * data[2]
253 framelength = data[0] + 256 * data[1] + 16384 * data[2]
253 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
254 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
254 typeflags = data[7]
255 typeflags = data[7]
255
256
256 frametype = (typeflags & 0xf0) >> 4
257 frametype = (typeflags & 0xf0) >> 4
257 frameflags = typeflags & 0x0f
258 frameflags = typeflags & 0x0f
258
259
259 return frameheader(framelength, requestid, streamid, streamflags,
260 return frameheader(framelength, requestid, streamid, streamflags,
260 frametype, frameflags)
261 frametype, frameflags)
261
262
262 def readframe(fh):
263 def readframe(fh):
263 """Read a unified framing protocol frame from a file object.
264 """Read a unified framing protocol frame from a file object.
264
265
265 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
266 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
266 None if no frame is available. May raise if a malformed frame is
267 None if no frame is available. May raise if a malformed frame is
267 seen.
268 seen.
268 """
269 """
269 header = bytearray(FRAME_HEADER_SIZE)
270 header = bytearray(FRAME_HEADER_SIZE)
270
271
271 readcount = fh.readinto(header)
272 readcount = fh.readinto(header)
272
273
273 if readcount == 0:
274 if readcount == 0:
274 return None
275 return None
275
276
276 if readcount != FRAME_HEADER_SIZE:
277 if readcount != FRAME_HEADER_SIZE:
277 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
278 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
278 (readcount, header))
279 (readcount, header))
279
280
280 h = parseheader(header)
281 h = parseheader(header)
281
282
282 payload = fh.read(h.length)
283 payload = fh.read(h.length)
283 if len(payload) != h.length:
284 if len(payload) != h.length:
284 raise error.Abort(_('frame length error: expected %d; got %d') %
285 raise error.Abort(_('frame length error: expected %d; got %d') %
285 (h.length, len(payload)))
286 (h.length, len(payload)))
286
287
287 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
288 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
288 payload)
289 payload)
289
290
290 def createcommandframes(stream, requestid, cmd, args, datafh=None,
291 def createcommandframes(stream, requestid, cmd, args, datafh=None,
291 maxframesize=DEFAULT_MAX_FRAME_SIZE):
292 maxframesize=DEFAULT_MAX_FRAME_SIZE):
292 """Create frames necessary to transmit a request to run a command.
293 """Create frames necessary to transmit a request to run a command.
293
294
294 This is a generator of bytearrays. Each item represents a frame
295 This is a generator of bytearrays. Each item represents a frame
295 ready to be sent over the wire to a peer.
296 ready to be sent over the wire to a peer.
296 """
297 """
297 data = {b'name': cmd}
298 data = {b'name': cmd}
298 if args:
299 if args:
299 data[b'args'] = args
300 data[b'args'] = args
300
301
301 data = cbor.dumps(data, canonical=True)
302 data = cbor.dumps(data, canonical=True)
302
303
303 offset = 0
304 offset = 0
304
305
305 while True:
306 while True:
306 flags = 0
307 flags = 0
307
308
308 # Must set new or continuation flag.
309 # Must set new or continuation flag.
309 if not offset:
310 if not offset:
310 flags |= FLAG_COMMAND_REQUEST_NEW
311 flags |= FLAG_COMMAND_REQUEST_NEW
311 else:
312 else:
312 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
313 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
313
314
314 # Data frames is set on all frames.
315 # Data frames is set on all frames.
315 if datafh:
316 if datafh:
316 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
317 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
317
318
318 payload = data[offset:offset + maxframesize]
319 payload = data[offset:offset + maxframesize]
319 offset += len(payload)
320 offset += len(payload)
320
321
321 if len(payload) == maxframesize and offset < len(data):
322 if len(payload) == maxframesize and offset < len(data):
322 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
323 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
323
324
324 yield stream.makeframe(requestid=requestid,
325 yield stream.makeframe(requestid=requestid,
325 typeid=FRAME_TYPE_COMMAND_REQUEST,
326 typeid=FRAME_TYPE_COMMAND_REQUEST,
326 flags=flags,
327 flags=flags,
327 payload=payload)
328 payload=payload)
328
329
329 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
330 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
330 break
331 break
331
332
332 if datafh:
333 if datafh:
333 while True:
334 while True:
334 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
335 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
335
336
336 done = False
337 done = False
337 if len(data) == DEFAULT_MAX_FRAME_SIZE:
338 if len(data) == DEFAULT_MAX_FRAME_SIZE:
338 flags = FLAG_COMMAND_DATA_CONTINUATION
339 flags = FLAG_COMMAND_DATA_CONTINUATION
339 else:
340 else:
340 flags = FLAG_COMMAND_DATA_EOS
341 flags = FLAG_COMMAND_DATA_EOS
341 assert datafh.read(1) == b''
342 assert datafh.read(1) == b''
342 done = True
343 done = True
343
344
344 yield stream.makeframe(requestid=requestid,
345 yield stream.makeframe(requestid=requestid,
345 typeid=FRAME_TYPE_COMMAND_DATA,
346 typeid=FRAME_TYPE_COMMAND_DATA,
346 flags=flags,
347 flags=flags,
347 payload=data)
348 payload=data)
348
349
349 if done:
350 if done:
350 break
351 break
351
352
352 def createbytesresponseframesfrombytes(stream, requestid, data, iscbor=False,
353 def createbytesresponseframesfrombytes(stream, requestid, data, iscbor=False,
353 maxframesize=DEFAULT_MAX_FRAME_SIZE):
354 maxframesize=DEFAULT_MAX_FRAME_SIZE):
354 """Create a raw frame to send a bytes response from static bytes input.
355 """Create a raw frame to send a bytes response from static bytes input.
355
356
356 Returns a generator of bytearrays.
357 Returns a generator of bytearrays.
357 """
358 """
358
359
359 # Simple case of a single frame.
360 # Simple case of a single frame.
360 if len(data) <= maxframesize:
361 if len(data) <= maxframesize:
361 flags = FLAG_BYTES_RESPONSE_EOS
362 flags = FLAG_BYTES_RESPONSE_EOS
362 if iscbor:
363 if iscbor:
363 flags |= FLAG_BYTES_RESPONSE_CBOR
364 flags |= FLAG_BYTES_RESPONSE_CBOR
364
365
365 yield stream.makeframe(requestid=requestid,
366 yield stream.makeframe(requestid=requestid,
366 typeid=FRAME_TYPE_BYTES_RESPONSE,
367 typeid=FRAME_TYPE_BYTES_RESPONSE,
367 flags=flags,
368 flags=flags,
368 payload=data)
369 payload=data)
369 return
370 return
370
371
371 offset = 0
372 offset = 0
372 while True:
373 while True:
373 chunk = data[offset:offset + maxframesize]
374 chunk = data[offset:offset + maxframesize]
374 offset += len(chunk)
375 offset += len(chunk)
375 done = offset == len(data)
376 done = offset == len(data)
376
377
377 if done:
378 if done:
378 flags = FLAG_BYTES_RESPONSE_EOS
379 flags = FLAG_BYTES_RESPONSE_EOS
379 else:
380 else:
380 flags = FLAG_BYTES_RESPONSE_CONTINUATION
381 flags = FLAG_BYTES_RESPONSE_CONTINUATION
381
382
382 if iscbor:
383 if iscbor:
383 flags |= FLAG_BYTES_RESPONSE_CBOR
384 flags |= FLAG_BYTES_RESPONSE_CBOR
384
385
385 yield stream.makeframe(requestid=requestid,
386 yield stream.makeframe(requestid=requestid,
386 typeid=FRAME_TYPE_BYTES_RESPONSE,
387 typeid=FRAME_TYPE_BYTES_RESPONSE,
387 flags=flags,
388 flags=flags,
388 payload=chunk)
389 payload=chunk)
389
390
390 if done:
391 if done:
391 break
392 break
392
393
393 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
394 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
394 # TODO properly handle frame size limits.
395 # TODO properly handle frame size limits.
395 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
396 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
396
397
397 flags = 0
398 flags = 0
398 if protocol:
399 if protocol:
399 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
400 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
400 if application:
401 if application:
401 flags |= FLAG_ERROR_RESPONSE_APPLICATION
402 flags |= FLAG_ERROR_RESPONSE_APPLICATION
402
403
403 yield stream.makeframe(requestid=requestid,
404 yield stream.makeframe(requestid=requestid,
404 typeid=FRAME_TYPE_ERROR_RESPONSE,
405 typeid=FRAME_TYPE_ERROR_RESPONSE,
405 flags=flags,
406 flags=flags,
406 payload=msg)
407 payload=msg)
407
408
408 def createtextoutputframe(stream, requestid, atoms,
409 def createtextoutputframe(stream, requestid, atoms,
409 maxframesize=DEFAULT_MAX_FRAME_SIZE):
410 maxframesize=DEFAULT_MAX_FRAME_SIZE):
410 """Create a text output frame to render text to people.
411 """Create a text output frame to render text to people.
411
412
412 ``atoms`` is a 3-tuple of (formatting string, args, labels).
413 ``atoms`` is a 3-tuple of (formatting string, args, labels).
413
414
414 The formatting string contains ``%s`` tokens to be replaced by the
415 The formatting string contains ``%s`` tokens to be replaced by the
415 corresponding indexed entry in ``args``. ``labels`` is an iterable of
416 corresponding indexed entry in ``args``. ``labels`` is an iterable of
416 formatters to be applied at rendering time. In terms of the ``ui``
417 formatters to be applied at rendering time. In terms of the ``ui``
417 class, each atom corresponds to a ``ui.write()``.
418 class, each atom corresponds to a ``ui.write()``.
418 """
419 """
419 atomdicts = []
420 atomdicts = []
420
421
421 for (formatting, args, labels) in atoms:
422 for (formatting, args, labels) in atoms:
422 # TODO look for localstr, other types here?
423 # TODO look for localstr, other types here?
423
424
424 if not isinstance(formatting, bytes):
425 if not isinstance(formatting, bytes):
425 raise ValueError('must use bytes formatting strings')
426 raise ValueError('must use bytes formatting strings')
426 for arg in args:
427 for arg in args:
427 if not isinstance(arg, bytes):
428 if not isinstance(arg, bytes):
428 raise ValueError('must use bytes for arguments')
429 raise ValueError('must use bytes for arguments')
429 for label in labels:
430 for label in labels:
430 if not isinstance(label, bytes):
431 if not isinstance(label, bytes):
431 raise ValueError('must use bytes for labels')
432 raise ValueError('must use bytes for labels')
432
433
433 # Formatting string must be ASCII.
434 # Formatting string must be ASCII.
434 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
435 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
435
436
436 # Arguments must be UTF-8.
437 # Arguments must be UTF-8.
437 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
438 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
438
439
439 # Labels must be ASCII.
440 # Labels must be ASCII.
440 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
441 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
441 for l in labels]
442 for l in labels]
442
443
443 atom = {b'msg': formatting}
444 atom = {b'msg': formatting}
444 if args:
445 if args:
445 atom[b'args'] = args
446 atom[b'args'] = args
446 if labels:
447 if labels:
447 atom[b'labels'] = labels
448 atom[b'labels'] = labels
448
449
449 atomdicts.append(atom)
450 atomdicts.append(atom)
450
451
451 payload = cbor.dumps(atomdicts, canonical=True)
452 payload = cbor.dumps(atomdicts, canonical=True)
452
453
453 if len(payload) > maxframesize:
454 if len(payload) > maxframesize:
454 raise ValueError('cannot encode data in a single frame')
455 raise ValueError('cannot encode data in a single frame')
455
456
456 yield stream.makeframe(requestid=requestid,
457 yield stream.makeframe(requestid=requestid,
457 typeid=FRAME_TYPE_TEXT_OUTPUT,
458 typeid=FRAME_TYPE_TEXT_OUTPUT,
458 flags=0,
459 flags=0,
459 payload=payload)
460 payload=payload)
460
461
461 class stream(object):
462 class stream(object):
462 """Represents a logical unidirectional series of frames."""
463 """Represents a logical unidirectional series of frames."""
463
464
464 def __init__(self, streamid, active=False):
465 def __init__(self, streamid, active=False):
465 self.streamid = streamid
466 self.streamid = streamid
466 self._active = False
467 self._active = False
467
468
468 def makeframe(self, requestid, typeid, flags, payload):
469 def makeframe(self, requestid, typeid, flags, payload):
469 """Create a frame to be sent out over this stream.
470 """Create a frame to be sent out over this stream.
470
471
471 Only returns the frame instance. Does not actually send it.
472 Only returns the frame instance. Does not actually send it.
472 """
473 """
473 streamflags = 0
474 streamflags = 0
474 if not self._active:
475 if not self._active:
475 streamflags |= STREAM_FLAG_BEGIN_STREAM
476 streamflags |= STREAM_FLAG_BEGIN_STREAM
476 self._active = True
477 self._active = True
477
478
478 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
479 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
479 payload)
480 payload)
480
481
481 def ensureserverstream(stream):
482 def ensureserverstream(stream):
482 if stream.streamid % 2:
483 if stream.streamid % 2:
483 raise error.ProgrammingError('server should only write to even '
484 raise error.ProgrammingError('server should only write to even '
484 'numbered streams; %d is not even' %
485 'numbered streams; %d is not even' %
485 stream.streamid)
486 stream.streamid)
486
487
487 class serverreactor(object):
488 class serverreactor(object):
488 """Holds state of a server handling frame-based protocol requests.
489 """Holds state of a server handling frame-based protocol requests.
489
490
490 This class is the "brain" of the unified frame-based protocol server
491 This class is the "brain" of the unified frame-based protocol server
491 component. While the protocol is stateless from the perspective of
492 component. While the protocol is stateless from the perspective of
492 requests/commands, something needs to track which frames have been
493 requests/commands, something needs to track which frames have been
493 received, what frames to expect, etc. This class is that thing.
494 received, what frames to expect, etc. This class is that thing.
494
495
495 Instances are modeled as a state machine of sorts. Instances are also
496 Instances are modeled as a state machine of sorts. Instances are also
496 reactionary to external events. The point of this class is to encapsulate
497 reactionary to external events. The point of this class is to encapsulate
497 the state of the connection and the exchange of frames, not to perform
498 the state of the connection and the exchange of frames, not to perform
498 work. Instead, callers tell this class when something occurs, like a
499 work. Instead, callers tell this class when something occurs, like a
499 frame arriving. If that activity is worthy of a follow-up action (say
500 frame arriving. If that activity is worthy of a follow-up action (say
500 *run a command*), the return value of that handler will say so.
501 *run a command*), the return value of that handler will say so.
501
502
502 I/O and CPU intensive operations are purposefully delegated outside of
503 I/O and CPU intensive operations are purposefully delegated outside of
503 this class.
504 this class.
504
505
505 Consumers are expected to tell instances when events occur. They do so by
506 Consumers are expected to tell instances when events occur. They do so by
506 calling the various ``on*`` methods. These methods return a 2-tuple
507 calling the various ``on*`` methods. These methods return a 2-tuple
507 describing any follow-up action(s) to take. The first element is the
508 describing any follow-up action(s) to take. The first element is the
508 name of an action to perform. The second is a data structure (usually
509 name of an action to perform. The second is a data structure (usually
509 a dict) specific to that action that contains more information. e.g.
510 a dict) specific to that action that contains more information. e.g.
510 if the server wants to send frames back to the client, the data structure
511 if the server wants to send frames back to the client, the data structure
511 will contain a reference to those frames.
512 will contain a reference to those frames.
512
513
513 Valid actions that consumers can be instructed to take are:
514 Valid actions that consumers can be instructed to take are:
514
515
515 sendframes
516 sendframes
516 Indicates that frames should be sent to the client. The ``framegen``
517 Indicates that frames should be sent to the client. The ``framegen``
517 key contains a generator of frames that should be sent. The server
518 key contains a generator of frames that should be sent. The server
518 assumes that all frames are sent to the client.
519 assumes that all frames are sent to the client.
519
520
520 error
521 error
521 Indicates that an error occurred. Consumer should probably abort.
522 Indicates that an error occurred. Consumer should probably abort.
522
523
523 runcommand
524 runcommand
524 Indicates that the consumer should run a wire protocol command. Details
525 Indicates that the consumer should run a wire protocol command. Details
525 of the command to run are given in the data structure.
526 of the command to run are given in the data structure.
526
527
527 wantframe
528 wantframe
528 Indicates that nothing of interest happened and the server is waiting on
529 Indicates that nothing of interest happened and the server is waiting on
529 more frames from the client before anything interesting can be done.
530 more frames from the client before anything interesting can be done.
530
531
531 noop
532 noop
532 Indicates no additional action is required.
533 Indicates no additional action is required.
533
534
534 Known Issues
535 Known Issues
535 ------------
536 ------------
536
537
537 There are no limits to the number of partially received commands or their
538 There are no limits to the number of partially received commands or their
538 size. A malicious client could stream command request data and exhaust the
539 size. A malicious client could stream command request data and exhaust the
539 server's memory.
540 server's memory.
540
541
541 Partially received commands are not acted upon when end of input is
542 Partially received commands are not acted upon when end of input is
542 reached. Should the server error if it receives a partial request?
543 reached. Should the server error if it receives a partial request?
543 Should the client send a message to abort a partially transmitted request
544 Should the client send a message to abort a partially transmitted request
544 to facilitate graceful shutdown?
545 to facilitate graceful shutdown?
545
546
546 Active requests that haven't been responded to aren't tracked. This means
547 Active requests that haven't been responded to aren't tracked. This means
547 that if we receive a command and instruct its dispatch, another command
548 that if we receive a command and instruct its dispatch, another command
548 with its request ID can come in over the wire and there will be a race
549 with its request ID can come in over the wire and there will be a race
549 between who responds to what.
550 between who responds to what.
550 """
551 """
551
552
552 def __init__(self, deferoutput=False):
553 def __init__(self, deferoutput=False):
553 """Construct a new server reactor.
554 """Construct a new server reactor.
554
555
555 ``deferoutput`` can be used to indicate that no output frames should be
556 ``deferoutput`` can be used to indicate that no output frames should be
556 instructed to be sent until input has been exhausted. In this mode,
557 instructed to be sent until input has been exhausted. In this mode,
557 events that would normally generate output frames (such as a command
558 events that would normally generate output frames (such as a command
558 response being ready) will instead defer instructing the consumer to
559 response being ready) will instead defer instructing the consumer to
559 send those frames. This is useful for half-duplex transports where the
560 send those frames. This is useful for half-duplex transports where the
560 sender cannot receive until all data has been transmitted.
561 sender cannot receive until all data has been transmitted.
561 """
562 """
562 self._deferoutput = deferoutput
563 self._deferoutput = deferoutput
563 self._state = 'idle'
564 self._state = 'idle'
564 self._nextoutgoingstreamid = 2
565 self._nextoutgoingstreamid = 2
565 self._bufferedframegens = []
566 self._bufferedframegens = []
566 # stream id -> stream instance for all active streams from the client.
567 # stream id -> stream instance for all active streams from the client.
567 self._incomingstreams = {}
568 self._incomingstreams = {}
568 self._outgoingstreams = {}
569 self._outgoingstreams = {}
569 # request id -> dict of commands that are actively being received.
570 # request id -> dict of commands that are actively being received.
570 self._receivingcommands = {}
571 self._receivingcommands = {}
571 # Request IDs that have been received and are actively being processed.
572 # Request IDs that have been received and are actively being processed.
572 # Once all output for a request has been sent, it is removed from this
573 # Once all output for a request has been sent, it is removed from this
573 # set.
574 # set.
574 self._activecommands = set()
575 self._activecommands = set()
575
576
576 def onframerecv(self, frame):
577 def onframerecv(self, frame):
577 """Process a frame that has been received off the wire.
578 """Process a frame that has been received off the wire.
578
579
579 Returns a dict with an ``action`` key that details what action,
580 Returns a dict with an ``action`` key that details what action,
580 if any, the consumer should take next.
581 if any, the consumer should take next.
581 """
582 """
582 if not frame.streamid % 2:
583 if not frame.streamid % 2:
583 self._state = 'errored'
584 self._state = 'errored'
584 return self._makeerrorresult(
585 return self._makeerrorresult(
585 _('received frame with even numbered stream ID: %d') %
586 _('received frame with even numbered stream ID: %d') %
586 frame.streamid)
587 frame.streamid)
587
588
588 if frame.streamid not in self._incomingstreams:
589 if frame.streamid not in self._incomingstreams:
589 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
590 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
590 self._state = 'errored'
591 self._state = 'errored'
591 return self._makeerrorresult(
592 return self._makeerrorresult(
592 _('received frame on unknown inactive stream without '
593 _('received frame on unknown inactive stream without '
593 'beginning of stream flag set'))
594 'beginning of stream flag set'))
594
595
595 self._incomingstreams[frame.streamid] = stream(frame.streamid)
596 self._incomingstreams[frame.streamid] = stream(frame.streamid)
596
597
597 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
598 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
598 # TODO handle decoding frames
599 # TODO handle decoding frames
599 self._state = 'errored'
600 self._state = 'errored'
600 raise error.ProgrammingError('support for decoding stream payloads '
601 raise error.ProgrammingError('support for decoding stream payloads '
601 'not yet implemented')
602 'not yet implemented')
602
603
603 if frame.streamflags & STREAM_FLAG_END_STREAM:
604 if frame.streamflags & STREAM_FLAG_END_STREAM:
604 del self._incomingstreams[frame.streamid]
605 del self._incomingstreams[frame.streamid]
605
606
606 handlers = {
607 handlers = {
607 'idle': self._onframeidle,
608 'idle': self._onframeidle,
608 'command-receiving': self._onframecommandreceiving,
609 'command-receiving': self._onframecommandreceiving,
609 'errored': self._onframeerrored,
610 'errored': self._onframeerrored,
610 }
611 }
611
612
612 meth = handlers.get(self._state)
613 meth = handlers.get(self._state)
613 if not meth:
614 if not meth:
614 raise error.ProgrammingError('unhandled state: %s' % self._state)
615 raise error.ProgrammingError('unhandled state: %s' % self._state)
615
616
616 return meth(frame)
617 return meth(frame)
617
618
618 def onbytesresponseready(self, stream, requestid, data, iscbor=False):
619 def onbytesresponseready(self, stream, requestid, data, iscbor=False):
619 """Signal that a bytes response is ready to be sent to the client.
620 """Signal that a bytes response is ready to be sent to the client.
620
621
621 The raw bytes response is passed as an argument.
622 The raw bytes response is passed as an argument.
622 """
623 """
623 ensureserverstream(stream)
624 ensureserverstream(stream)
624
625
625 def sendframes():
626 def sendframes():
626 for frame in createbytesresponseframesfrombytes(stream, requestid,
627 for frame in createbytesresponseframesfrombytes(stream, requestid,
627 data,
628 data,
628 iscbor=iscbor):
629 iscbor=iscbor):
629 yield frame
630 yield frame
630
631
631 self._activecommands.remove(requestid)
632 self._activecommands.remove(requestid)
632
633
633 result = sendframes()
634 result = sendframes()
634
635
635 if self._deferoutput:
636 if self._deferoutput:
636 self._bufferedframegens.append(result)
637 self._bufferedframegens.append(result)
637 return 'noop', {}
638 return 'noop', {}
638 else:
639 else:
639 return 'sendframes', {
640 return 'sendframes', {
640 'framegen': result,
641 'framegen': result,
641 }
642 }
642
643
643 def oninputeof(self):
644 def oninputeof(self):
644 """Signals that end of input has been received.
645 """Signals that end of input has been received.
645
646
646 No more frames will be received. All pending activity should be
647 No more frames will be received. All pending activity should be
647 completed.
648 completed.
648 """
649 """
649 # TODO should we do anything about in-flight commands?
650 # TODO should we do anything about in-flight commands?
650
651
651 if not self._deferoutput or not self._bufferedframegens:
652 if not self._deferoutput or not self._bufferedframegens:
652 return 'noop', {}
653 return 'noop', {}
653
654
654 # If we buffered all our responses, emit those.
655 # If we buffered all our responses, emit those.
655 def makegen():
656 def makegen():
656 for gen in self._bufferedframegens:
657 for gen in self._bufferedframegens:
657 for frame in gen:
658 for frame in gen:
658 yield frame
659 yield frame
659
660
660 return 'sendframes', {
661 return 'sendframes', {
661 'framegen': makegen(),
662 'framegen': makegen(),
662 }
663 }
663
664
664 def onapplicationerror(self, stream, requestid, msg):
665 def onapplicationerror(self, stream, requestid, msg):
665 ensureserverstream(stream)
666 ensureserverstream(stream)
666
667
667 return 'sendframes', {
668 return 'sendframes', {
668 'framegen': createerrorframe(stream, requestid, msg,
669 'framegen': createerrorframe(stream, requestid, msg,
669 application=True),
670 application=True),
670 }
671 }
671
672
672 def makeoutputstream(self):
673 def makeoutputstream(self):
673 """Create a stream to be used for sending data to the client."""
674 """Create a stream to be used for sending data to the client."""
674 streamid = self._nextoutgoingstreamid
675 streamid = self._nextoutgoingstreamid
675 self._nextoutgoingstreamid += 2
676 self._nextoutgoingstreamid += 2
676
677
677 s = stream(streamid)
678 s = stream(streamid)
678 self._outgoingstreams[streamid] = s
679 self._outgoingstreams[streamid] = s
679
680
680 return s
681 return s
681
682
682 def _makeerrorresult(self, msg):
683 def _makeerrorresult(self, msg):
683 return 'error', {
684 return 'error', {
684 'message': msg,
685 'message': msg,
685 }
686 }
686
687
687 def _makeruncommandresult(self, requestid):
688 def _makeruncommandresult(self, requestid):
688 entry = self._receivingcommands[requestid]
689 entry = self._receivingcommands[requestid]
689
690
690 if not entry['requestdone']:
691 if not entry['requestdone']:
691 self._state = 'errored'
692 self._state = 'errored'
692 raise error.ProgrammingError('should not be called without '
693 raise error.ProgrammingError('should not be called without '
693 'requestdone set')
694 'requestdone set')
694
695
695 del self._receivingcommands[requestid]
696 del self._receivingcommands[requestid]
696
697
697 if self._receivingcommands:
698 if self._receivingcommands:
698 self._state = 'command-receiving'
699 self._state = 'command-receiving'
699 else:
700 else:
700 self._state = 'idle'
701 self._state = 'idle'
701
702
702 # Decode the payloads as CBOR.
703 # Decode the payloads as CBOR.
703 entry['payload'].seek(0)
704 entry['payload'].seek(0)
704 request = cbor.load(entry['payload'])
705 request = cbor.load(entry['payload'])
705
706
706 if b'name' not in request:
707 if b'name' not in request:
707 self._state = 'errored'
708 self._state = 'errored'
708 return self._makeerrorresult(
709 return self._makeerrorresult(
709 _('command request missing "name" field'))
710 _('command request missing "name" field'))
710
711
711 if b'args' not in request:
712 if b'args' not in request:
712 request[b'args'] = {}
713 request[b'args'] = {}
713
714
714 assert requestid not in self._activecommands
715 assert requestid not in self._activecommands
715 self._activecommands.add(requestid)
716 self._activecommands.add(requestid)
716
717
717 return 'runcommand', {
718 return 'runcommand', {
718 'requestid': requestid,
719 'requestid': requestid,
719 'command': request[b'name'],
720 'command': request[b'name'],
720 'args': request[b'args'],
721 'args': request[b'args'],
721 'data': entry['data'].getvalue() if entry['data'] else None,
722 'data': entry['data'].getvalue() if entry['data'] else None,
722 }
723 }
723
724
724 def _makewantframeresult(self):
725 def _makewantframeresult(self):
725 return 'wantframe', {
726 return 'wantframe', {
726 'state': self._state,
727 'state': self._state,
727 }
728 }
728
729
729 def _validatecommandrequestframe(self, frame):
730 def _validatecommandrequestframe(self, frame):
730 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
731 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
731 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
732 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
732
733
733 if new and continuation:
734 if new and continuation:
734 self._state = 'errored'
735 self._state = 'errored'
735 return self._makeerrorresult(
736 return self._makeerrorresult(
736 _('received command request frame with both new and '
737 _('received command request frame with both new and '
737 'continuation flags set'))
738 'continuation flags set'))
738
739
739 if not new and not continuation:
740 if not new and not continuation:
740 self._state = 'errored'
741 self._state = 'errored'
741 return self._makeerrorresult(
742 return self._makeerrorresult(
742 _('received command request frame with neither new nor '
743 _('received command request frame with neither new nor '
743 'continuation flags set'))
744 'continuation flags set'))
744
745
745 def _onframeidle(self, frame):
746 def _onframeidle(self, frame):
746 # The only frame type that should be received in this state is a
747 # The only frame type that should be received in this state is a
747 # command request.
748 # command request.
748 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
749 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
749 self._state = 'errored'
750 self._state = 'errored'
750 return self._makeerrorresult(
751 return self._makeerrorresult(
751 _('expected command request frame; got %d') % frame.typeid)
752 _('expected command request frame; got %d') % frame.typeid)
752
753
753 res = self._validatecommandrequestframe(frame)
754 res = self._validatecommandrequestframe(frame)
754 if res:
755 if res:
755 return res
756 return res
756
757
757 if frame.requestid in self._receivingcommands:
758 if frame.requestid in self._receivingcommands:
758 self._state = 'errored'
759 self._state = 'errored'
759 return self._makeerrorresult(
760 return self._makeerrorresult(
760 _('request with ID %d already received') % frame.requestid)
761 _('request with ID %d already received') % frame.requestid)
761
762
762 if frame.requestid in self._activecommands:
763 if frame.requestid in self._activecommands:
763 self._state = 'errored'
764 self._state = 'errored'
764 return self._makeerrorresult(
765 return self._makeerrorresult(
765 _('request with ID %d is already active') % frame.requestid)
766 _('request with ID %d is already active') % frame.requestid)
766
767
767 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
768 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
768 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
769 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
769 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
770 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
770
771
771 if not new:
772 if not new:
772 self._state = 'errored'
773 self._state = 'errored'
773 return self._makeerrorresult(
774 return self._makeerrorresult(
774 _('received command request frame without new flag set'))
775 _('received command request frame without new flag set'))
775
776
776 payload = util.bytesio()
777 payload = util.bytesio()
777 payload.write(frame.payload)
778 payload.write(frame.payload)
778
779
779 self._receivingcommands[frame.requestid] = {
780 self._receivingcommands[frame.requestid] = {
780 'payload': payload,
781 'payload': payload,
781 'data': None,
782 'data': None,
782 'requestdone': not moreframes,
783 'requestdone': not moreframes,
783 'expectingdata': bool(expectingdata),
784 'expectingdata': bool(expectingdata),
784 }
785 }
785
786
786 # This is the final frame for this request. Dispatch it.
787 # This is the final frame for this request. Dispatch it.
787 if not moreframes and not expectingdata:
788 if not moreframes and not expectingdata:
788 return self._makeruncommandresult(frame.requestid)
789 return self._makeruncommandresult(frame.requestid)
789
790
790 assert moreframes or expectingdata
791 assert moreframes or expectingdata
791 self._state = 'command-receiving'
792 self._state = 'command-receiving'
792 return self._makewantframeresult()
793 return self._makewantframeresult()
793
794
794 def _onframecommandreceiving(self, frame):
795 def _onframecommandreceiving(self, frame):
795 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
796 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
796 # Process new command requests as such.
797 # Process new command requests as such.
797 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
798 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
798 return self._onframeidle(frame)
799 return self._onframeidle(frame)
799
800
800 res = self._validatecommandrequestframe(frame)
801 res = self._validatecommandrequestframe(frame)
801 if res:
802 if res:
802 return res
803 return res
803
804
804 # All other frames should be related to a command that is currently
805 # All other frames should be related to a command that is currently
805 # receiving but is not active.
806 # receiving but is not active.
806 if frame.requestid in self._activecommands:
807 if frame.requestid in self._activecommands:
807 self._state = 'errored'
808 self._state = 'errored'
808 return self._makeerrorresult(
809 return self._makeerrorresult(
809 _('received frame for request that is still active: %d') %
810 _('received frame for request that is still active: %d') %
810 frame.requestid)
811 frame.requestid)
811
812
812 if frame.requestid not in self._receivingcommands:
813 if frame.requestid not in self._receivingcommands:
813 self._state = 'errored'
814 self._state = 'errored'
814 return self._makeerrorresult(
815 return self._makeerrorresult(
815 _('received frame for request that is not receiving: %d') %
816 _('received frame for request that is not receiving: %d') %
816 frame.requestid)
817 frame.requestid)
817
818
818 entry = self._receivingcommands[frame.requestid]
819 entry = self._receivingcommands[frame.requestid]
819
820
820 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
821 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
821 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
822 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
822 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
823 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
823
824
824 if entry['requestdone']:
825 if entry['requestdone']:
825 self._state = 'errored'
826 self._state = 'errored'
826 return self._makeerrorresult(
827 return self._makeerrorresult(
827 _('received command request frame when request frames '
828 _('received command request frame when request frames '
828 'were supposedly done'))
829 'were supposedly done'))
829
830
830 if expectingdata != entry['expectingdata']:
831 if expectingdata != entry['expectingdata']:
831 self._state = 'errored'
832 self._state = 'errored'
832 return self._makeerrorresult(
833 return self._makeerrorresult(
833 _('mismatch between expect data flag and previous frame'))
834 _('mismatch between expect data flag and previous frame'))
834
835
835 entry['payload'].write(frame.payload)
836 entry['payload'].write(frame.payload)
836
837
837 if not moreframes:
838 if not moreframes:
838 entry['requestdone'] = True
839 entry['requestdone'] = True
839
840
840 if not moreframes and not expectingdata:
841 if not moreframes and not expectingdata:
841 return self._makeruncommandresult(frame.requestid)
842 return self._makeruncommandresult(frame.requestid)
842
843
843 return self._makewantframeresult()
844 return self._makewantframeresult()
844
845
845 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
846 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
846 if not entry['expectingdata']:
847 if not entry['expectingdata']:
847 self._state = 'errored'
848 self._state = 'errored'
848 return self._makeerrorresult(_(
849 return self._makeerrorresult(_(
849 'received command data frame for request that is not '
850 'received command data frame for request that is not '
850 'expecting data: %d') % frame.requestid)
851 'expecting data: %d') % frame.requestid)
851
852
852 if entry['data'] is None:
853 if entry['data'] is None:
853 entry['data'] = util.bytesio()
854 entry['data'] = util.bytesio()
854
855
855 return self._handlecommanddataframe(frame, entry)
856 return self._handlecommanddataframe(frame, entry)
856 else:
857 else:
857 self._state = 'errored'
858 self._state = 'errored'
858 return self._makeerrorresult(_(
859 return self._makeerrorresult(_(
859 'received unexpected frame type: %d') % frame.typeid)
860 'received unexpected frame type: %d') % frame.typeid)
860
861
861 def _handlecommanddataframe(self, frame, entry):
862 def _handlecommanddataframe(self, frame, entry):
862 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
863 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
863
864
864 # TODO support streaming data instead of buffering it.
865 # TODO support streaming data instead of buffering it.
865 entry['data'].write(frame.payload)
866 entry['data'].write(frame.payload)
866
867
867 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
868 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
868 return self._makewantframeresult()
869 return self._makewantframeresult()
869 elif frame.flags & FLAG_COMMAND_DATA_EOS:
870 elif frame.flags & FLAG_COMMAND_DATA_EOS:
870 entry['data'].seek(0)
871 entry['data'].seek(0)
871 return self._makeruncommandresult(frame.requestid)
872 return self._makeruncommandresult(frame.requestid)
872 else:
873 else:
873 self._state = 'errored'
874 self._state = 'errored'
874 return self._makeerrorresult(_('command data frame without '
875 return self._makeerrorresult(_('command data frame without '
875 'flags'))
876 'flags'))
876
877
877 def _onframeerrored(self, frame):
878 def _onframeerrored(self, frame):
878 return self._makeerrorresult(_('server already errored'))
879 return self._makeerrorresult(_('server already errored'))
880
881 class commandrequest(object):
882 """Represents a request to run a command."""
883
884 def __init__(self, requestid, name, args, datafh=None):
885 self.requestid = requestid
886 self.name = name
887 self.args = args
888 self.datafh = datafh
889 self.state = 'pending'
890
891 class clientreactor(object):
892 """Holds state of a client issuing frame-based protocol requests.
893
894 This is like ``serverreactor`` but for client-side state.
895
896 Each instance is bound to the lifetime of a connection. For persistent
897 connection transports using e.g. TCP sockets and speaking the raw
898 framing protocol, there will be a single instance for the lifetime of
899 the TCP socket. For transports where there are multiple discrete
900 interactions (say tunneled within in HTTP request), there will be a
901 separate instance for each distinct interaction.
902 """
903 def __init__(self, hasmultiplesend=False, buffersends=True):
904 """Create a new instance.
905
906 ``hasmultiplesend`` indicates whether multiple sends are supported
907 by the transport. When True, it is possible to send commands immediately
908 instead of buffering until the caller signals an intent to finish a
909 send operation.
910
911 ``buffercommands`` indicates whether sends should be buffered until the
912 last request has been issued.
913 """
914 self._hasmultiplesend = hasmultiplesend
915 self._buffersends = buffersends
916
917 self._canissuecommands = True
918 self._cansend = True
919
920 self._nextrequestid = 1
921 # We only support a single outgoing stream for now.
922 self._outgoingstream = stream(1)
923 self._pendingrequests = collections.deque()
924 self._activerequests = {}
925
926 def callcommand(self, name, args, datafh=None):
927 """Request that a command be executed.
928
929 Receives the command name, a dict of arguments to pass to the command,
930 and an optional file object containing the raw data for the command.
931
932 Returns a 3-tuple of (request, action, action data).
933 """
934 if not self._canissuecommands:
935 raise error.ProgrammingError('cannot issue new commands')
936
937 requestid = self._nextrequestid
938 self._nextrequestid += 2
939
940 request = commandrequest(requestid, name, args, datafh=datafh)
941
942 if self._buffersends:
943 self._pendingrequests.append(request)
944 return request, 'noop', {}
945 else:
946 if not self._cansend:
947 raise error.ProgrammingError('sends cannot be performed on '
948 'this instance')
949
950 if not self._hasmultiplesend:
951 self._cansend = False
952 self._canissuecommands = False
953
954 return request, 'sendframes', {
955 'framegen': self._makecommandframes(request),
956 }
957
958 def flushcommands(self):
959 """Request that all queued commands be sent.
960
961 If any commands are buffered, this will instruct the caller to send
962 them over the wire. If no commands are buffered it instructs the client
963 to no-op.
964
965 If instances aren't configured for multiple sends, no new command
966 requests are allowed after this is called.
967 """
968 if not self._pendingrequests:
969 return 'noop', {}
970
971 if not self._cansend:
972 raise error.ProgrammingError('sends cannot be performed on this '
973 'instance')
974
975 # If the instance only allows sending once, mark that we have fired
976 # our one shot.
977 if not self._hasmultiplesend:
978 self._canissuecommands = False
979 self._cansend = False
980
981 def makeframes():
982 while self._pendingrequests:
983 request = self._pendingrequests.popleft()
984 for frame in self._makecommandframes(request):
985 yield frame
986
987 return 'sendframes', {
988 'framegen': makeframes(),
989 }
990
991 def _makecommandframes(self, request):
992 """Emit frames to issue a command request.
993
994 As a side-effect, update request accounting to reflect its changed
995 state.
996 """
997 self._activerequests[request.requestid] = request
998 request.state = 'sending'
999
1000 res = createcommandframes(self._outgoingstream,
1001 request.requestid,
1002 request.name,
1003 request.args,
1004 request.datafh)
1005
1006 for frame in res:
1007 yield frame
1008
1009 request.state = 'sent'
General Comments 0
You need to be logged in to leave comments. Login now