##// END OF EJS Templates
wireproto: introduce a reactor for client-side state...
Gregory Szorc -
r37561:01361be9 default
parent child Browse files
Show More
@@ -0,0 +1,66 b''
1 from __future__ import absolute_import
2
3 import unittest
4
5 from mercurial import (
6 error,
7 wireprotoframing as framing,
8 )
9
10 class SingleSendTests(unittest.TestCase):
11 """A reactor that can only send once rejects subsequent sends."""
12 def testbasic(self):
13 reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True)
14
15 request, action, meta = reactor.callcommand(b'foo', {})
16 self.assertEqual(request.state, 'pending')
17 self.assertEqual(action, 'noop')
18
19 action, meta = reactor.flushcommands()
20 self.assertEqual(action, 'sendframes')
21
22 for frame in meta['framegen']:
23 self.assertEqual(request.state, 'sending')
24
25 self.assertEqual(request.state, 'sent')
26
27 with self.assertRaisesRegexp(error.ProgrammingError,
28 'cannot issue new commands'):
29 reactor.callcommand(b'foo', {})
30
31 with self.assertRaisesRegexp(error.ProgrammingError,
32 'cannot issue new commands'):
33 reactor.callcommand(b'foo', {})
34
35 class NoBufferTests(unittest.TestCase):
36 """A reactor without send buffering sends requests immediately."""
37 def testbasic(self):
38 reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False)
39
40 request, action, meta = reactor.callcommand(b'command1', {})
41 self.assertEqual(request.requestid, 1)
42 self.assertEqual(action, 'sendframes')
43
44 self.assertEqual(request.state, 'pending')
45
46 for frame in meta['framegen']:
47 self.assertEqual(request.state, 'sending')
48
49 self.assertEqual(request.state, 'sent')
50
51 action, meta = reactor.flushcommands()
52 self.assertEqual(action, 'noop')
53
54 # And we can send another command.
55 request, action, meta = reactor.callcommand(b'command2', {})
56 self.assertEqual(request.requestid, 3)
57 self.assertEqual(action, 'sendframes')
58
59 for frame in meta['framegen']:
60 self.assertEqual(request.state, 'sending')
61
62 self.assertEqual(request.state, 'sent')
63
64 if __name__ == '__main__':
65 import silenttestrunner
66 silenttestrunner.main(__name__)
@@ -1,596 +1,601 b''
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import tempfile
17 17
18 18 from .i18n import _
19 19 from .thirdparty import (
20 20 cbor,
21 21 )
22 22 from . import (
23 23 bundle2,
24 24 error,
25 25 httpconnection,
26 26 pycompat,
27 27 statichttprepo,
28 28 url as urlmod,
29 29 util,
30 30 wireproto,
31 31 wireprotoframing,
32 32 wireprotoserver,
33 33 )
34 34
35 35 httplib = util.httplib
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 def encodevalueinheaders(value, header, limit):
40 40 """Encode a string value into multiple HTTP headers.
41 41
42 42 ``value`` will be encoded into 1 or more HTTP headers with the names
43 43 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
44 44 name + value will be at most ``limit`` bytes long.
45 45
46 46 Returns an iterable of 2-tuples consisting of header names and
47 47 values as native strings.
48 48 """
49 49 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
50 50 # not bytes. This function always takes bytes in as arguments.
51 51 fmt = pycompat.strurl(header) + r'-%s'
52 52 # Note: it is *NOT* a bug that the last bit here is a bytestring
53 53 # and not a unicode: we're just getting the encoded length anyway,
54 54 # and using an r-string to make it portable between Python 2 and 3
55 55 # doesn't work because then the \r is a literal backslash-r
56 56 # instead of a carriage return.
57 57 valuelen = limit - len(fmt % r'000') - len(': \r\n')
58 58 result = []
59 59
60 60 n = 0
61 61 for i in xrange(0, len(value), valuelen):
62 62 n += 1
63 63 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
64 64
65 65 return result
66 66
67 67 def _wraphttpresponse(resp):
68 68 """Wrap an HTTPResponse with common error handlers.
69 69
70 70 This ensures that any I/O from any consumer raises the appropriate
71 71 error and messaging.
72 72 """
73 73 origread = resp.read
74 74
75 75 class readerproxy(resp.__class__):
76 76 def read(self, size=None):
77 77 try:
78 78 return origread(size)
79 79 except httplib.IncompleteRead as e:
80 80 # e.expected is an integer if length known or None otherwise.
81 81 if e.expected:
82 82 msg = _('HTTP request error (incomplete response; '
83 83 'expected %d bytes got %d)') % (e.expected,
84 84 len(e.partial))
85 85 else:
86 86 msg = _('HTTP request error (incomplete response)')
87 87
88 88 raise error.PeerTransportError(
89 89 msg,
90 90 hint=_('this may be an intermittent network failure; '
91 91 'if the error persists, consider contacting the '
92 92 'network or server operator'))
93 93 except httplib.HTTPException as e:
94 94 raise error.PeerTransportError(
95 95 _('HTTP request error (%s)') % e,
96 96 hint=_('this may be an intermittent network failure; '
97 97 'if the error persists, consider contacting the '
98 98 'network or server operator'))
99 99
100 100 resp.__class__ = readerproxy
101 101
102 102 class _multifile(object):
103 103 def __init__(self, *fileobjs):
104 104 for f in fileobjs:
105 105 if not util.safehasattr(f, 'length'):
106 106 raise ValueError(
107 107 '_multifile only supports file objects that '
108 108 'have a length but this one does not:', type(f), f)
109 109 self._fileobjs = fileobjs
110 110 self._index = 0
111 111
112 112 @property
113 113 def length(self):
114 114 return sum(f.length for f in self._fileobjs)
115 115
116 116 def read(self, amt=None):
117 117 if amt <= 0:
118 118 return ''.join(f.read() for f in self._fileobjs)
119 119 parts = []
120 120 while amt and self._index < len(self._fileobjs):
121 121 parts.append(self._fileobjs[self._index].read(amt))
122 122 got = len(parts[-1])
123 123 if got < amt:
124 124 self._index += 1
125 125 amt -= got
126 126 return ''.join(parts)
127 127
128 128 def seek(self, offset, whence=os.SEEK_SET):
129 129 if whence != os.SEEK_SET:
130 130 raise NotImplementedError(
131 131 '_multifile does not support anything other'
132 132 ' than os.SEEK_SET for whence on seek()')
133 133 if offset != 0:
134 134 raise NotImplementedError(
135 135 '_multifile only supports seeking to start, but that '
136 136 'could be fixed if you need it')
137 137 for f in self._fileobjs:
138 138 f.seek(0)
139 139 self._index = 0
140 140
141 141 class httppeer(wireproto.wirepeer):
142 142 def __init__(self, ui, path, url, opener):
143 143 self.ui = ui
144 144 self._path = path
145 145 self._url = url
146 146 self._caps = None
147 147 self._urlopener = opener
148 148 # This is an its own attribute to facilitate extensions overriding
149 149 # the default type.
150 150 self._requestbuilder = urlreq.request
151 151
152 152 def __del__(self):
153 153 for h in self._urlopener.handlers:
154 154 h.close()
155 155 getattr(h, "close_all", lambda: None)()
156 156
157 157 def _openurl(self, req):
158 158 if (self.ui.debugflag
159 159 and self.ui.configbool('devel', 'debug.peer-request')):
160 160 dbg = self.ui.debug
161 161 line = 'devel-peer-request: %s\n'
162 162 dbg(line % '%s %s' % (req.get_method(), req.get_full_url()))
163 163 hgargssize = None
164 164
165 165 for header, value in sorted(req.header_items()):
166 166 if header.startswith('X-hgarg-'):
167 167 if hgargssize is None:
168 168 hgargssize = 0
169 169 hgargssize += len(value)
170 170 else:
171 171 dbg(line % ' %s %s' % (header, value))
172 172
173 173 if hgargssize is not None:
174 174 dbg(line % ' %d bytes of commands arguments in headers'
175 175 % hgargssize)
176 176
177 177 if req.has_data():
178 178 data = req.get_data()
179 179 length = getattr(data, 'length', None)
180 180 if length is None:
181 181 length = len(data)
182 182 dbg(line % ' %d bytes of data' % length)
183 183
184 184 start = util.timer()
185 185
186 186 ret = self._urlopener.open(req)
187 187 if self.ui.configbool('devel', 'debug.peer-request'):
188 188 dbg(line % ' finished in %.4f seconds (%s)'
189 189 % (util.timer() - start, ret.code))
190 190 return ret
191 191
192 192 # Begin of ipeerconnection interface.
193 193
194 194 def url(self):
195 195 return self._path
196 196
197 197 def local(self):
198 198 return None
199 199
200 200 def peer(self):
201 201 return self
202 202
203 203 def canpush(self):
204 204 return True
205 205
206 206 def close(self):
207 207 pass
208 208
209 209 # End of ipeerconnection interface.
210 210
211 211 # Begin of ipeercommands interface.
212 212
213 213 def capabilities(self):
214 214 # self._fetchcaps() should have been called as part of peer
215 215 # handshake. So self._caps should always be set.
216 216 assert self._caps is not None
217 217 return self._caps
218 218
219 219 # End of ipeercommands interface.
220 220
221 221 # look up capabilities only when needed
222 222
223 223 def _fetchcaps(self):
224 224 self._caps = set(self._call('capabilities').split())
225 225
226 226 def _callstream(self, cmd, _compressible=False, **args):
227 227 args = pycompat.byteskwargs(args)
228 228 if cmd == 'pushkey':
229 229 args['data'] = ''
230 230 data = args.pop('data', None)
231 231 headers = args.pop('headers', {})
232 232
233 233 self.ui.debug("sending %s command\n" % cmd)
234 234 q = [('cmd', cmd)]
235 235 headersize = 0
236 236 varyheaders = []
237 237 # Important: don't use self.capable() here or else you end up
238 238 # with infinite recursion when trying to look up capabilities
239 239 # for the first time.
240 240 postargsok = self._caps is not None and 'httppostargs' in self._caps
241 241
242 242 # Send arguments via POST.
243 243 if postargsok and args:
244 244 strargs = urlreq.urlencode(sorted(args.items()))
245 245 if not data:
246 246 data = strargs
247 247 else:
248 248 if isinstance(data, bytes):
249 249 i = io.BytesIO(data)
250 250 i.length = len(data)
251 251 data = i
252 252 argsio = io.BytesIO(strargs)
253 253 argsio.length = len(strargs)
254 254 data = _multifile(argsio, data)
255 255 headers[r'X-HgArgs-Post'] = len(strargs)
256 256 elif args:
257 257 # Calling self.capable() can infinite loop if we are calling
258 258 # "capabilities". But that command should never accept wire
259 259 # protocol arguments. So this should never happen.
260 260 assert cmd != 'capabilities'
261 261 httpheader = self.capable('httpheader')
262 262 if httpheader:
263 263 headersize = int(httpheader.split(',', 1)[0])
264 264
265 265 # Send arguments via HTTP headers.
266 266 if headersize > 0:
267 267 # The headers can typically carry more data than the URL.
268 268 encargs = urlreq.urlencode(sorted(args.items()))
269 269 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
270 270 headersize):
271 271 headers[header] = value
272 272 varyheaders.append(header)
273 273 # Send arguments via query string (Mercurial <1.9).
274 274 else:
275 275 q += sorted(args.items())
276 276
277 277 qs = '?%s' % urlreq.urlencode(q)
278 278 cu = "%s%s" % (self._url, qs)
279 279 size = 0
280 280 if util.safehasattr(data, 'length'):
281 281 size = data.length
282 282 elif data is not None:
283 283 size = len(data)
284 284 if data is not None and r'Content-Type' not in headers:
285 285 headers[r'Content-Type'] = r'application/mercurial-0.1'
286 286
287 287 # Tell the server we accept application/mercurial-0.2 and multiple
288 288 # compression formats if the server is capable of emitting those
289 289 # payloads.
290 290 protoparams = {'partial-pull'}
291 291
292 292 mediatypes = set()
293 293 if self._caps is not None:
294 294 mt = self.capable('httpmediatype')
295 295 if mt:
296 296 protoparams.add('0.1')
297 297 mediatypes = set(mt.split(','))
298 298
299 299 if '0.2tx' in mediatypes:
300 300 protoparams.add('0.2')
301 301
302 302 if '0.2tx' in mediatypes and self.capable('compression'):
303 303 # We /could/ compare supported compression formats and prune
304 304 # non-mutually supported or error if nothing is mutually supported.
305 305 # For now, send the full list to the server and have it error.
306 306 comps = [e.wireprotosupport().name for e in
307 307 util.compengines.supportedwireengines(util.CLIENTROLE)]
308 308 protoparams.add('comp=%s' % ','.join(comps))
309 309
310 310 if protoparams:
311 311 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
312 312 'X-HgProto',
313 313 headersize or 1024)
314 314 for header, value in protoheaders:
315 315 headers[header] = value
316 316 varyheaders.append(header)
317 317
318 318 if varyheaders:
319 319 headers[r'Vary'] = r','.join(varyheaders)
320 320
321 321 req = self._requestbuilder(pycompat.strurl(cu), data, headers)
322 322
323 323 if data is not None:
324 324 self.ui.debug("sending %d bytes\n" % size)
325 325 req.add_unredirected_header(r'Content-Length', r'%d' % size)
326 326 try:
327 327 resp = self._openurl(req)
328 328 except urlerr.httperror as inst:
329 329 if inst.code == 401:
330 330 raise error.Abort(_('authorization failed'))
331 331 raise
332 332 except httplib.HTTPException as inst:
333 333 self.ui.debug('http error while sending %s command\n' % cmd)
334 334 self.ui.traceback()
335 335 raise IOError(None, inst)
336 336
337 337 # Insert error handlers for common I/O failures.
338 338 _wraphttpresponse(resp)
339 339
340 340 # record the url we got redirected to
341 341 resp_url = pycompat.bytesurl(resp.geturl())
342 342 if resp_url.endswith(qs):
343 343 resp_url = resp_url[:-len(qs)]
344 344 if self._url.rstrip('/') != resp_url.rstrip('/'):
345 345 if not self.ui.quiet:
346 346 self.ui.warn(_('real URL is %s\n') % resp_url)
347 347 self._url = resp_url
348 348 try:
349 349 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
350 350 except AttributeError:
351 351 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
352 352
353 353 safeurl = util.hidepassword(self._url)
354 354 if proto.startswith('application/hg-error'):
355 355 raise error.OutOfBandError(resp.read())
356 356 # accept old "text/plain" and "application/hg-changegroup" for now
357 357 if not (proto.startswith('application/mercurial-') or
358 358 (proto.startswith('text/plain')
359 359 and not resp.headers.get('content-length')) or
360 360 proto.startswith('application/hg-changegroup')):
361 361 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
362 362 raise error.RepoError(
363 363 _("'%s' does not appear to be an hg repository:\n"
364 364 "---%%<--- (%s)\n%s\n---%%<---\n")
365 365 % (safeurl, proto or 'no content-type', resp.read(1024)))
366 366
367 367 if proto.startswith('application/mercurial-'):
368 368 try:
369 369 version = proto.split('-', 1)[1]
370 370 version_info = tuple([int(n) for n in version.split('.')])
371 371 except ValueError:
372 372 raise error.RepoError(_("'%s' sent a broken Content-Type "
373 373 "header (%s)") % (safeurl, proto))
374 374
375 375 # TODO consider switching to a decompression reader that uses
376 376 # generators.
377 377 if version_info == (0, 1):
378 378 if _compressible:
379 379 return util.compengines['zlib'].decompressorreader(resp)
380 380 return resp
381 381 elif version_info == (0, 2):
382 382 # application/mercurial-0.2 always identifies the compression
383 383 # engine in the payload header.
384 384 elen = struct.unpack('B', resp.read(1))[0]
385 385 ename = resp.read(elen)
386 386 engine = util.compengines.forwiretype(ename)
387 387 return engine.decompressorreader(resp)
388 388 else:
389 389 raise error.RepoError(_("'%s' uses newer protocol %s") %
390 390 (safeurl, version))
391 391
392 392 if _compressible:
393 393 return util.compengines['zlib'].decompressorreader(resp)
394 394
395 395 return resp
396 396
397 397 def _call(self, cmd, **args):
398 398 fp = self._callstream(cmd, **args)
399 399 try:
400 400 return fp.read()
401 401 finally:
402 402 # if using keepalive, allow connection to be reused
403 403 fp.close()
404 404
405 405 def _callpush(self, cmd, cg, **args):
406 406 # have to stream bundle to a temp file because we do not have
407 407 # http 1.1 chunked transfer.
408 408
409 409 types = self.capable('unbundle')
410 410 try:
411 411 types = types.split(',')
412 412 except AttributeError:
413 413 # servers older than d1b16a746db6 will send 'unbundle' as a
414 414 # boolean capability. They only support headerless/uncompressed
415 415 # bundles.
416 416 types = [""]
417 417 for x in types:
418 418 if x in bundle2.bundletypes:
419 419 type = x
420 420 break
421 421
422 422 tempname = bundle2.writebundle(self.ui, cg, None, type)
423 423 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
424 424 headers = {r'Content-Type': r'application/mercurial-0.1'}
425 425
426 426 try:
427 427 r = self._call(cmd, data=fp, headers=headers, **args)
428 428 vals = r.split('\n', 1)
429 429 if len(vals) < 2:
430 430 raise error.ResponseError(_("unexpected response:"), r)
431 431 return vals
432 432 except urlerr.httperror:
433 433 # Catch and re-raise these so we don't try and treat them
434 434 # like generic socket errors. They lack any values in
435 435 # .args on Python 3 which breaks our socket.error block.
436 436 raise
437 437 except socket.error as err:
438 438 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
439 439 raise error.Abort(_('push failed: %s') % err.args[1])
440 440 raise error.Abort(err.args[1])
441 441 finally:
442 442 fp.close()
443 443 os.unlink(tempname)
444 444
445 445 def _calltwowaystream(self, cmd, fp, **args):
446 446 fh = None
447 447 fp_ = None
448 448 filename = None
449 449 try:
450 450 # dump bundle to disk
451 451 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
452 452 fh = os.fdopen(fd, r"wb")
453 453 d = fp.read(4096)
454 454 while d:
455 455 fh.write(d)
456 456 d = fp.read(4096)
457 457 fh.close()
458 458 # start http push
459 459 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
460 460 headers = {r'Content-Type': r'application/mercurial-0.1'}
461 461 return self._callstream(cmd, data=fp_, headers=headers, **args)
462 462 finally:
463 463 if fp_ is not None:
464 464 fp_.close()
465 465 if fh is not None:
466 466 fh.close()
467 467 os.unlink(filename)
468 468
469 469 def _callcompressable(self, cmd, **args):
470 470 return self._callstream(cmd, _compressible=True, **args)
471 471
472 472 def _abort(self, exception):
473 473 raise exception
474 474
475 475 # TODO implement interface for version 2 peers
476 476 class httpv2peer(object):
477 477 def __init__(self, ui, repourl, opener):
478 478 self.ui = ui
479 479
480 480 if repourl.endswith('/'):
481 481 repourl = repourl[:-1]
482 482
483 483 self.url = repourl
484 484 self._opener = opener
485 485 # This is an its own attribute to facilitate extensions overriding
486 486 # the default type.
487 487 self._requestbuilder = urlreq.request
488 488
489 489 def close(self):
490 490 pass
491 491
492 492 # TODO require to be part of a batched primitive, use futures.
493 493 def _call(self, name, **args):
494 494 """Call a wire protocol command with arguments."""
495 495
496 496 # TODO permissions should come from capabilities results.
497 497 permission = wireproto.commandsv2[name].permission
498 498 if permission not in ('push', 'pull'):
499 499 raise error.ProgrammingError('unknown permission type: %s' %
500 500 permission)
501 501
502 502 permission = {
503 503 'push': 'rw',
504 504 'pull': 'ro',
505 505 }[permission]
506 506
507 507 url = '%s/api/%s/%s/%s' % (self.url, wireprotoserver.HTTPV2, permission,
508 508 name)
509 509
510 510 # TODO modify user-agent to reflect v2.
511 511 headers = {
512 512 r'Accept': wireprotoserver.FRAMINGTYPE,
513 513 r'Content-Type': wireprotoserver.FRAMINGTYPE,
514 514 }
515 515
516 516 # TODO this should be part of a generic peer for the frame-based
517 517 # protocol.
518 stream = wireprotoframing.stream(1)
519 frames = wireprotoframing.createcommandframes(stream, 1,
520 name, args)
518 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
519 buffersends=True)
521 520
522 body = b''.join(map(bytes, frames))
521 request, action, meta = reactor.callcommand(name, args)
522 assert action == 'noop'
523
524 action, meta = reactor.flushcommands()
525 assert action == 'sendframes'
526
527 body = b''.join(map(bytes, meta['framegen']))
523 528 req = self._requestbuilder(pycompat.strurl(url), body, headers)
524 529 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
525 530
526 531 # TODO unify this code with httppeer.
527 532 try:
528 533 res = self._opener.open(req)
529 534 except urlerr.httperror as e:
530 535 if e.code == 401:
531 536 raise error.Abort(_('authorization failed'))
532 537
533 538 raise
534 539 except httplib.HTTPException as e:
535 540 self.ui.traceback()
536 541 raise IOError(None, e)
537 542
538 543 # TODO validate response type, wrap response to handle I/O errors.
539 544 # TODO more robust frame receiver.
540 545 results = []
541 546
542 547 while True:
543 548 frame = wireprotoframing.readframe(res)
544 549 if frame is None:
545 550 break
546 551
547 552 self.ui.note(_('received %r\n') % frame)
548 553
549 554 if frame.typeid == wireprotoframing.FRAME_TYPE_BYTES_RESPONSE:
550 555 if frame.flags & wireprotoframing.FLAG_BYTES_RESPONSE_CBOR:
551 556 payload = util.bytesio(frame.payload)
552 557
553 558 decoder = cbor.CBORDecoder(payload)
554 559 while payload.tell() + 1 < len(frame.payload):
555 560 results.append(decoder.decode())
556 561 else:
557 562 results.append(frame.payload)
558 563 else:
559 564 error.ProgrammingError('unhandled frame type: %d' %
560 565 frame.typeid)
561 566
562 567 return results
563 568
564 569 def makepeer(ui, path):
565 570 u = util.url(path)
566 571 if u.query or u.fragment:
567 572 raise error.Abort(_('unsupported URL component: "%s"') %
568 573 (u.query or u.fragment))
569 574
570 575 # urllib cannot handle URLs with embedded user or passwd.
571 576 url, authinfo = u.authinfo()
572 577 ui.debug('using %s\n' % url)
573 578
574 579 opener = urlmod.opener(ui, authinfo)
575 580
576 581 return httppeer(ui, path, url, opener)
577 582
578 583 def instance(ui, path, create):
579 584 if create:
580 585 raise error.Abort(_('cannot create new http repository'))
581 586 try:
582 587 if path.startswith('https:') and not urlmod.has_https:
583 588 raise error.Abort(_('Python support for SSL and HTTPS '
584 589 'is not installed'))
585 590
586 591 inst = makepeer(ui, path)
587 592 inst._fetchcaps()
588 593
589 594 return inst
590 595 except error.RepoError as httpexception:
591 596 try:
592 597 r = statichttprepo.instance(ui, "static-" + path, create)
593 598 ui.note(_('(falling back to static-http)\n'))
594 599 return r
595 600 except error.RepoError:
596 601 raise httpexception # use the original http RepoError instead
@@ -1,878 +1,1009 b''
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 import collections
14 15 import struct
15 16
16 17 from .i18n import _
17 18 from .thirdparty import (
18 19 attr,
19 20 cbor,
20 21 )
21 22 from . import (
22 23 encoding,
23 24 error,
24 25 util,
25 26 )
26 27 from .utils import (
27 28 stringutil,
28 29 )
29 30
30 31 FRAME_HEADER_SIZE = 8
31 32 DEFAULT_MAX_FRAME_SIZE = 32768
32 33
33 34 STREAM_FLAG_BEGIN_STREAM = 0x01
34 35 STREAM_FLAG_END_STREAM = 0x02
35 36 STREAM_FLAG_ENCODING_APPLIED = 0x04
36 37
37 38 STREAM_FLAGS = {
38 39 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
39 40 b'stream-end': STREAM_FLAG_END_STREAM,
40 41 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
41 42 }
42 43
43 44 FRAME_TYPE_COMMAND_REQUEST = 0x01
44 45 FRAME_TYPE_COMMAND_DATA = 0x03
45 46 FRAME_TYPE_BYTES_RESPONSE = 0x04
46 47 FRAME_TYPE_ERROR_RESPONSE = 0x05
47 48 FRAME_TYPE_TEXT_OUTPUT = 0x06
48 49 FRAME_TYPE_PROGRESS = 0x07
49 50 FRAME_TYPE_STREAM_SETTINGS = 0x08
50 51
51 52 FRAME_TYPES = {
52 53 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
53 54 b'command-data': FRAME_TYPE_COMMAND_DATA,
54 55 b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
55 56 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
56 57 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
57 58 b'progress': FRAME_TYPE_PROGRESS,
58 59 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
59 60 }
60 61
61 62 FLAG_COMMAND_REQUEST_NEW = 0x01
62 63 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
63 64 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
64 65 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
65 66
66 67 FLAGS_COMMAND_REQUEST = {
67 68 b'new': FLAG_COMMAND_REQUEST_NEW,
68 69 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
69 70 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
70 71 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
71 72 }
72 73
73 74 FLAG_COMMAND_DATA_CONTINUATION = 0x01
74 75 FLAG_COMMAND_DATA_EOS = 0x02
75 76
76 77 FLAGS_COMMAND_DATA = {
77 78 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
78 79 b'eos': FLAG_COMMAND_DATA_EOS,
79 80 }
80 81
81 82 FLAG_BYTES_RESPONSE_CONTINUATION = 0x01
82 83 FLAG_BYTES_RESPONSE_EOS = 0x02
83 84 FLAG_BYTES_RESPONSE_CBOR = 0x04
84 85
85 86 FLAGS_BYTES_RESPONSE = {
86 87 b'continuation': FLAG_BYTES_RESPONSE_CONTINUATION,
87 88 b'eos': FLAG_BYTES_RESPONSE_EOS,
88 89 b'cbor': FLAG_BYTES_RESPONSE_CBOR,
89 90 }
90 91
91 92 FLAG_ERROR_RESPONSE_PROTOCOL = 0x01
92 93 FLAG_ERROR_RESPONSE_APPLICATION = 0x02
93 94
94 95 FLAGS_ERROR_RESPONSE = {
95 96 b'protocol': FLAG_ERROR_RESPONSE_PROTOCOL,
96 97 b'application': FLAG_ERROR_RESPONSE_APPLICATION,
97 98 }
98 99
99 100 # Maps frame types to their available flags.
100 101 FRAME_TYPE_FLAGS = {
101 102 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
102 103 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
103 104 FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
104 105 FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
105 106 FRAME_TYPE_TEXT_OUTPUT: {},
106 107 FRAME_TYPE_PROGRESS: {},
107 108 FRAME_TYPE_STREAM_SETTINGS: {},
108 109 }
109 110
110 111 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
111 112
112 113 def humanflags(mapping, value):
113 114 """Convert a numeric flags value to a human value, using a mapping table."""
114 115 namemap = {v: k for k, v in mapping.iteritems()}
115 116 flags = []
116 117 val = 1
117 118 while value >= val:
118 119 if value & val:
119 120 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
120 121 val <<= 1
121 122
122 123 return b'|'.join(flags)
123 124
124 125 @attr.s(slots=True)
125 126 class frameheader(object):
126 127 """Represents the data in a frame header."""
127 128
128 129 length = attr.ib()
129 130 requestid = attr.ib()
130 131 streamid = attr.ib()
131 132 streamflags = attr.ib()
132 133 typeid = attr.ib()
133 134 flags = attr.ib()
134 135
135 136 @attr.s(slots=True, repr=False)
136 137 class frame(object):
137 138 """Represents a parsed frame."""
138 139
139 140 requestid = attr.ib()
140 141 streamid = attr.ib()
141 142 streamflags = attr.ib()
142 143 typeid = attr.ib()
143 144 flags = attr.ib()
144 145 payload = attr.ib()
145 146
146 147 @encoding.strmethod
147 148 def __repr__(self):
148 149 typename = '<unknown 0x%02x>' % self.typeid
149 150 for name, value in FRAME_TYPES.iteritems():
150 151 if value == self.typeid:
151 152 typename = name
152 153 break
153 154
154 155 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
155 156 'type=%s; flags=%s)' % (
156 157 len(self.payload), self.requestid, self.streamid,
157 158 humanflags(STREAM_FLAGS, self.streamflags), typename,
158 159 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
159 160
160 161 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
161 162 """Assemble a frame into a byte array."""
162 163 # TODO assert size of payload.
163 164 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
164 165
165 166 # 24 bits length
166 167 # 16 bits request id
167 168 # 8 bits stream id
168 169 # 8 bits stream flags
169 170 # 4 bits type
170 171 # 4 bits flags
171 172
172 173 l = struct.pack(r'<I', len(payload))
173 174 frame[0:3] = l[0:3]
174 175 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
175 176 frame[7] = (typeid << 4) | flags
176 177 frame[8:] = payload
177 178
178 179 return frame
179 180
180 181 def makeframefromhumanstring(s):
181 182 """Create a frame from a human readable string
182 183
183 184 Strings have the form:
184 185
185 186 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
186 187
187 188 This can be used by user-facing applications and tests for creating
188 189 frames easily without having to type out a bunch of constants.
189 190
190 191 Request ID and stream IDs are integers.
191 192
192 193 Stream flags, frame type, and flags can be specified by integer or
193 194 named constant.
194 195
195 196 Flags can be delimited by `|` to bitwise OR them together.
196 197
197 198 If the payload begins with ``cbor:``, the following string will be
198 199 evaluated as Python literal and the resulting object will be fed into
199 200 a CBOR encoder. Otherwise, the payload is interpreted as a Python
200 201 byte string literal.
201 202 """
202 203 fields = s.split(b' ', 5)
203 204 requestid, streamid, streamflags, frametype, frameflags, payload = fields
204 205
205 206 requestid = int(requestid)
206 207 streamid = int(streamid)
207 208
208 209 finalstreamflags = 0
209 210 for flag in streamflags.split(b'|'):
210 211 if flag in STREAM_FLAGS:
211 212 finalstreamflags |= STREAM_FLAGS[flag]
212 213 else:
213 214 finalstreamflags |= int(flag)
214 215
215 216 if frametype in FRAME_TYPES:
216 217 frametype = FRAME_TYPES[frametype]
217 218 else:
218 219 frametype = int(frametype)
219 220
220 221 finalflags = 0
221 222 validflags = FRAME_TYPE_FLAGS[frametype]
222 223 for flag in frameflags.split(b'|'):
223 224 if flag in validflags:
224 225 finalflags |= validflags[flag]
225 226 else:
226 227 finalflags |= int(flag)
227 228
228 229 if payload.startswith(b'cbor:'):
229 230 payload = cbor.dumps(stringutil.evalpythonliteral(payload[5:]),
230 231 canonical=True)
231 232
232 233 else:
233 234 payload = stringutil.unescapestr(payload)
234 235
235 236 return makeframe(requestid=requestid, streamid=streamid,
236 237 streamflags=finalstreamflags, typeid=frametype,
237 238 flags=finalflags, payload=payload)
238 239
239 240 def parseheader(data):
240 241 """Parse a unified framing protocol frame header from a buffer.
241 242
242 243 The header is expected to be in the buffer at offset 0 and the
243 244 buffer is expected to be large enough to hold a full header.
244 245 """
245 246 # 24 bits payload length (little endian)
246 247 # 16 bits request ID
247 248 # 8 bits stream ID
248 249 # 8 bits stream flags
249 250 # 4 bits frame type
250 251 # 4 bits frame flags
251 252 # ... payload
252 253 framelength = data[0] + 256 * data[1] + 16384 * data[2]
253 254 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
254 255 typeflags = data[7]
255 256
256 257 frametype = (typeflags & 0xf0) >> 4
257 258 frameflags = typeflags & 0x0f
258 259
259 260 return frameheader(framelength, requestid, streamid, streamflags,
260 261 frametype, frameflags)
261 262
262 263 def readframe(fh):
263 264 """Read a unified framing protocol frame from a file object.
264 265
265 266 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
266 267 None if no frame is available. May raise if a malformed frame is
267 268 seen.
268 269 """
269 270 header = bytearray(FRAME_HEADER_SIZE)
270 271
271 272 readcount = fh.readinto(header)
272 273
273 274 if readcount == 0:
274 275 return None
275 276
276 277 if readcount != FRAME_HEADER_SIZE:
277 278 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
278 279 (readcount, header))
279 280
280 281 h = parseheader(header)
281 282
282 283 payload = fh.read(h.length)
283 284 if len(payload) != h.length:
284 285 raise error.Abort(_('frame length error: expected %d; got %d') %
285 286 (h.length, len(payload)))
286 287
287 288 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
288 289 payload)
289 290
290 291 def createcommandframes(stream, requestid, cmd, args, datafh=None,
291 292 maxframesize=DEFAULT_MAX_FRAME_SIZE):
292 293 """Create frames necessary to transmit a request to run a command.
293 294
294 295 This is a generator of bytearrays. Each item represents a frame
295 296 ready to be sent over the wire to a peer.
296 297 """
297 298 data = {b'name': cmd}
298 299 if args:
299 300 data[b'args'] = args
300 301
301 302 data = cbor.dumps(data, canonical=True)
302 303
303 304 offset = 0
304 305
305 306 while True:
306 307 flags = 0
307 308
308 309 # Must set new or continuation flag.
309 310 if not offset:
310 311 flags |= FLAG_COMMAND_REQUEST_NEW
311 312 else:
312 313 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
313 314
314 315 # Data frames is set on all frames.
315 316 if datafh:
316 317 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
317 318
318 319 payload = data[offset:offset + maxframesize]
319 320 offset += len(payload)
320 321
321 322 if len(payload) == maxframesize and offset < len(data):
322 323 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
323 324
324 325 yield stream.makeframe(requestid=requestid,
325 326 typeid=FRAME_TYPE_COMMAND_REQUEST,
326 327 flags=flags,
327 328 payload=payload)
328 329
329 330 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
330 331 break
331 332
332 333 if datafh:
333 334 while True:
334 335 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
335 336
336 337 done = False
337 338 if len(data) == DEFAULT_MAX_FRAME_SIZE:
338 339 flags = FLAG_COMMAND_DATA_CONTINUATION
339 340 else:
340 341 flags = FLAG_COMMAND_DATA_EOS
341 342 assert datafh.read(1) == b''
342 343 done = True
343 344
344 345 yield stream.makeframe(requestid=requestid,
345 346 typeid=FRAME_TYPE_COMMAND_DATA,
346 347 flags=flags,
347 348 payload=data)
348 349
349 350 if done:
350 351 break
351 352
352 353 def createbytesresponseframesfrombytes(stream, requestid, data, iscbor=False,
353 354 maxframesize=DEFAULT_MAX_FRAME_SIZE):
354 355 """Create a raw frame to send a bytes response from static bytes input.
355 356
356 357 Returns a generator of bytearrays.
357 358 """
358 359
359 360 # Simple case of a single frame.
360 361 if len(data) <= maxframesize:
361 362 flags = FLAG_BYTES_RESPONSE_EOS
362 363 if iscbor:
363 364 flags |= FLAG_BYTES_RESPONSE_CBOR
364 365
365 366 yield stream.makeframe(requestid=requestid,
366 367 typeid=FRAME_TYPE_BYTES_RESPONSE,
367 368 flags=flags,
368 369 payload=data)
369 370 return
370 371
371 372 offset = 0
372 373 while True:
373 374 chunk = data[offset:offset + maxframesize]
374 375 offset += len(chunk)
375 376 done = offset == len(data)
376 377
377 378 if done:
378 379 flags = FLAG_BYTES_RESPONSE_EOS
379 380 else:
380 381 flags = FLAG_BYTES_RESPONSE_CONTINUATION
381 382
382 383 if iscbor:
383 384 flags |= FLAG_BYTES_RESPONSE_CBOR
384 385
385 386 yield stream.makeframe(requestid=requestid,
386 387 typeid=FRAME_TYPE_BYTES_RESPONSE,
387 388 flags=flags,
388 389 payload=chunk)
389 390
390 391 if done:
391 392 break
392 393
393 394 def createerrorframe(stream, requestid, msg, protocol=False, application=False):
394 395 # TODO properly handle frame size limits.
395 396 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
396 397
397 398 flags = 0
398 399 if protocol:
399 400 flags |= FLAG_ERROR_RESPONSE_PROTOCOL
400 401 if application:
401 402 flags |= FLAG_ERROR_RESPONSE_APPLICATION
402 403
403 404 yield stream.makeframe(requestid=requestid,
404 405 typeid=FRAME_TYPE_ERROR_RESPONSE,
405 406 flags=flags,
406 407 payload=msg)
407 408
408 409 def createtextoutputframe(stream, requestid, atoms,
409 410 maxframesize=DEFAULT_MAX_FRAME_SIZE):
410 411 """Create a text output frame to render text to people.
411 412
412 413 ``atoms`` is a 3-tuple of (formatting string, args, labels).
413 414
414 415 The formatting string contains ``%s`` tokens to be replaced by the
415 416 corresponding indexed entry in ``args``. ``labels`` is an iterable of
416 417 formatters to be applied at rendering time. In terms of the ``ui``
417 418 class, each atom corresponds to a ``ui.write()``.
418 419 """
419 420 atomdicts = []
420 421
421 422 for (formatting, args, labels) in atoms:
422 423 # TODO look for localstr, other types here?
423 424
424 425 if not isinstance(formatting, bytes):
425 426 raise ValueError('must use bytes formatting strings')
426 427 for arg in args:
427 428 if not isinstance(arg, bytes):
428 429 raise ValueError('must use bytes for arguments')
429 430 for label in labels:
430 431 if not isinstance(label, bytes):
431 432 raise ValueError('must use bytes for labels')
432 433
433 434 # Formatting string must be ASCII.
434 435 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
435 436
436 437 # Arguments must be UTF-8.
437 438 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
438 439
439 440 # Labels must be ASCII.
440 441 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
441 442 for l in labels]
442 443
443 444 atom = {b'msg': formatting}
444 445 if args:
445 446 atom[b'args'] = args
446 447 if labels:
447 448 atom[b'labels'] = labels
448 449
449 450 atomdicts.append(atom)
450 451
451 452 payload = cbor.dumps(atomdicts, canonical=True)
452 453
453 454 if len(payload) > maxframesize:
454 455 raise ValueError('cannot encode data in a single frame')
455 456
456 457 yield stream.makeframe(requestid=requestid,
457 458 typeid=FRAME_TYPE_TEXT_OUTPUT,
458 459 flags=0,
459 460 payload=payload)
460 461
461 462 class stream(object):
462 463 """Represents a logical unidirectional series of frames."""
463 464
464 465 def __init__(self, streamid, active=False):
465 466 self.streamid = streamid
466 467 self._active = False
467 468
468 469 def makeframe(self, requestid, typeid, flags, payload):
469 470 """Create a frame to be sent out over this stream.
470 471
471 472 Only returns the frame instance. Does not actually send it.
472 473 """
473 474 streamflags = 0
474 475 if not self._active:
475 476 streamflags |= STREAM_FLAG_BEGIN_STREAM
476 477 self._active = True
477 478
478 479 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
479 480 payload)
480 481
481 482 def ensureserverstream(stream):
482 483 if stream.streamid % 2:
483 484 raise error.ProgrammingError('server should only write to even '
484 485 'numbered streams; %d is not even' %
485 486 stream.streamid)
486 487
487 488 class serverreactor(object):
488 489 """Holds state of a server handling frame-based protocol requests.
489 490
490 491 This class is the "brain" of the unified frame-based protocol server
491 492 component. While the protocol is stateless from the perspective of
492 493 requests/commands, something needs to track which frames have been
493 494 received, what frames to expect, etc. This class is that thing.
494 495
495 496 Instances are modeled as a state machine of sorts. Instances are also
496 497 reactionary to external events. The point of this class is to encapsulate
497 498 the state of the connection and the exchange of frames, not to perform
498 499 work. Instead, callers tell this class when something occurs, like a
499 500 frame arriving. If that activity is worthy of a follow-up action (say
500 501 *run a command*), the return value of that handler will say so.
501 502
502 503 I/O and CPU intensive operations are purposefully delegated outside of
503 504 this class.
504 505
505 506 Consumers are expected to tell instances when events occur. They do so by
506 507 calling the various ``on*`` methods. These methods return a 2-tuple
507 508 describing any follow-up action(s) to take. The first element is the
508 509 name of an action to perform. The second is a data structure (usually
509 510 a dict) specific to that action that contains more information. e.g.
510 511 if the server wants to send frames back to the client, the data structure
511 512 will contain a reference to those frames.
512 513
513 514 Valid actions that consumers can be instructed to take are:
514 515
515 516 sendframes
516 517 Indicates that frames should be sent to the client. The ``framegen``
517 518 key contains a generator of frames that should be sent. The server
518 519 assumes that all frames are sent to the client.
519 520
520 521 error
521 522 Indicates that an error occurred. Consumer should probably abort.
522 523
523 524 runcommand
524 525 Indicates that the consumer should run a wire protocol command. Details
525 526 of the command to run are given in the data structure.
526 527
527 528 wantframe
528 529 Indicates that nothing of interest happened and the server is waiting on
529 530 more frames from the client before anything interesting can be done.
530 531
531 532 noop
532 533 Indicates no additional action is required.
533 534
534 535 Known Issues
535 536 ------------
536 537
537 538 There are no limits to the number of partially received commands or their
538 539 size. A malicious client could stream command request data and exhaust the
539 540 server's memory.
540 541
541 542 Partially received commands are not acted upon when end of input is
542 543 reached. Should the server error if it receives a partial request?
543 544 Should the client send a message to abort a partially transmitted request
544 545 to facilitate graceful shutdown?
545 546
546 547 Active requests that haven't been responded to aren't tracked. This means
547 548 that if we receive a command and instruct its dispatch, another command
548 549 with its request ID can come in over the wire and there will be a race
549 550 between who responds to what.
550 551 """
551 552
552 553 def __init__(self, deferoutput=False):
553 554 """Construct a new server reactor.
554 555
555 556 ``deferoutput`` can be used to indicate that no output frames should be
556 557 instructed to be sent until input has been exhausted. In this mode,
557 558 events that would normally generate output frames (such as a command
558 559 response being ready) will instead defer instructing the consumer to
559 560 send those frames. This is useful for half-duplex transports where the
560 561 sender cannot receive until all data has been transmitted.
561 562 """
562 563 self._deferoutput = deferoutput
563 564 self._state = 'idle'
564 565 self._nextoutgoingstreamid = 2
565 566 self._bufferedframegens = []
566 567 # stream id -> stream instance for all active streams from the client.
567 568 self._incomingstreams = {}
568 569 self._outgoingstreams = {}
569 570 # request id -> dict of commands that are actively being received.
570 571 self._receivingcommands = {}
571 572 # Request IDs that have been received and are actively being processed.
572 573 # Once all output for a request has been sent, it is removed from this
573 574 # set.
574 575 self._activecommands = set()
575 576
576 577 def onframerecv(self, frame):
577 578 """Process a frame that has been received off the wire.
578 579
579 580 Returns a dict with an ``action`` key that details what action,
580 581 if any, the consumer should take next.
581 582 """
582 583 if not frame.streamid % 2:
583 584 self._state = 'errored'
584 585 return self._makeerrorresult(
585 586 _('received frame with even numbered stream ID: %d') %
586 587 frame.streamid)
587 588
588 589 if frame.streamid not in self._incomingstreams:
589 590 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
590 591 self._state = 'errored'
591 592 return self._makeerrorresult(
592 593 _('received frame on unknown inactive stream without '
593 594 'beginning of stream flag set'))
594 595
595 596 self._incomingstreams[frame.streamid] = stream(frame.streamid)
596 597
597 598 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
598 599 # TODO handle decoding frames
599 600 self._state = 'errored'
600 601 raise error.ProgrammingError('support for decoding stream payloads '
601 602 'not yet implemented')
602 603
603 604 if frame.streamflags & STREAM_FLAG_END_STREAM:
604 605 del self._incomingstreams[frame.streamid]
605 606
606 607 handlers = {
607 608 'idle': self._onframeidle,
608 609 'command-receiving': self._onframecommandreceiving,
609 610 'errored': self._onframeerrored,
610 611 }
611 612
612 613 meth = handlers.get(self._state)
613 614 if not meth:
614 615 raise error.ProgrammingError('unhandled state: %s' % self._state)
615 616
616 617 return meth(frame)
617 618
618 619 def onbytesresponseready(self, stream, requestid, data, iscbor=False):
619 620 """Signal that a bytes response is ready to be sent to the client.
620 621
621 622 The raw bytes response is passed as an argument.
622 623 """
623 624 ensureserverstream(stream)
624 625
625 626 def sendframes():
626 627 for frame in createbytesresponseframesfrombytes(stream, requestid,
627 628 data,
628 629 iscbor=iscbor):
629 630 yield frame
630 631
631 632 self._activecommands.remove(requestid)
632 633
633 634 result = sendframes()
634 635
635 636 if self._deferoutput:
636 637 self._bufferedframegens.append(result)
637 638 return 'noop', {}
638 639 else:
639 640 return 'sendframes', {
640 641 'framegen': result,
641 642 }
642 643
643 644 def oninputeof(self):
644 645 """Signals that end of input has been received.
645 646
646 647 No more frames will be received. All pending activity should be
647 648 completed.
648 649 """
649 650 # TODO should we do anything about in-flight commands?
650 651
651 652 if not self._deferoutput or not self._bufferedframegens:
652 653 return 'noop', {}
653 654
654 655 # If we buffered all our responses, emit those.
655 656 def makegen():
656 657 for gen in self._bufferedframegens:
657 658 for frame in gen:
658 659 yield frame
659 660
660 661 return 'sendframes', {
661 662 'framegen': makegen(),
662 663 }
663 664
664 665 def onapplicationerror(self, stream, requestid, msg):
665 666 ensureserverstream(stream)
666 667
667 668 return 'sendframes', {
668 669 'framegen': createerrorframe(stream, requestid, msg,
669 670 application=True),
670 671 }
671 672
672 673 def makeoutputstream(self):
673 674 """Create a stream to be used for sending data to the client."""
674 675 streamid = self._nextoutgoingstreamid
675 676 self._nextoutgoingstreamid += 2
676 677
677 678 s = stream(streamid)
678 679 self._outgoingstreams[streamid] = s
679 680
680 681 return s
681 682
682 683 def _makeerrorresult(self, msg):
683 684 return 'error', {
684 685 'message': msg,
685 686 }
686 687
687 688 def _makeruncommandresult(self, requestid):
688 689 entry = self._receivingcommands[requestid]
689 690
690 691 if not entry['requestdone']:
691 692 self._state = 'errored'
692 693 raise error.ProgrammingError('should not be called without '
693 694 'requestdone set')
694 695
695 696 del self._receivingcommands[requestid]
696 697
697 698 if self._receivingcommands:
698 699 self._state = 'command-receiving'
699 700 else:
700 701 self._state = 'idle'
701 702
702 703 # Decode the payloads as CBOR.
703 704 entry['payload'].seek(0)
704 705 request = cbor.load(entry['payload'])
705 706
706 707 if b'name' not in request:
707 708 self._state = 'errored'
708 709 return self._makeerrorresult(
709 710 _('command request missing "name" field'))
710 711
711 712 if b'args' not in request:
712 713 request[b'args'] = {}
713 714
714 715 assert requestid not in self._activecommands
715 716 self._activecommands.add(requestid)
716 717
717 718 return 'runcommand', {
718 719 'requestid': requestid,
719 720 'command': request[b'name'],
720 721 'args': request[b'args'],
721 722 'data': entry['data'].getvalue() if entry['data'] else None,
722 723 }
723 724
724 725 def _makewantframeresult(self):
725 726 return 'wantframe', {
726 727 'state': self._state,
727 728 }
728 729
729 730 def _validatecommandrequestframe(self, frame):
730 731 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
731 732 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
732 733
733 734 if new and continuation:
734 735 self._state = 'errored'
735 736 return self._makeerrorresult(
736 737 _('received command request frame with both new and '
737 738 'continuation flags set'))
738 739
739 740 if not new and not continuation:
740 741 self._state = 'errored'
741 742 return self._makeerrorresult(
742 743 _('received command request frame with neither new nor '
743 744 'continuation flags set'))
744 745
745 746 def _onframeidle(self, frame):
746 747 # The only frame type that should be received in this state is a
747 748 # command request.
748 749 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
749 750 self._state = 'errored'
750 751 return self._makeerrorresult(
751 752 _('expected command request frame; got %d') % frame.typeid)
752 753
753 754 res = self._validatecommandrequestframe(frame)
754 755 if res:
755 756 return res
756 757
757 758 if frame.requestid in self._receivingcommands:
758 759 self._state = 'errored'
759 760 return self._makeerrorresult(
760 761 _('request with ID %d already received') % frame.requestid)
761 762
762 763 if frame.requestid in self._activecommands:
763 764 self._state = 'errored'
764 765 return self._makeerrorresult(
765 766 _('request with ID %d is already active') % frame.requestid)
766 767
767 768 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
768 769 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
769 770 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
770 771
771 772 if not new:
772 773 self._state = 'errored'
773 774 return self._makeerrorresult(
774 775 _('received command request frame without new flag set'))
775 776
776 777 payload = util.bytesio()
777 778 payload.write(frame.payload)
778 779
779 780 self._receivingcommands[frame.requestid] = {
780 781 'payload': payload,
781 782 'data': None,
782 783 'requestdone': not moreframes,
783 784 'expectingdata': bool(expectingdata),
784 785 }
785 786
786 787 # This is the final frame for this request. Dispatch it.
787 788 if not moreframes and not expectingdata:
788 789 return self._makeruncommandresult(frame.requestid)
789 790
790 791 assert moreframes or expectingdata
791 792 self._state = 'command-receiving'
792 793 return self._makewantframeresult()
793 794
794 795 def _onframecommandreceiving(self, frame):
795 796 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
796 797 # Process new command requests as such.
797 798 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
798 799 return self._onframeidle(frame)
799 800
800 801 res = self._validatecommandrequestframe(frame)
801 802 if res:
802 803 return res
803 804
804 805 # All other frames should be related to a command that is currently
805 806 # receiving but is not active.
806 807 if frame.requestid in self._activecommands:
807 808 self._state = 'errored'
808 809 return self._makeerrorresult(
809 810 _('received frame for request that is still active: %d') %
810 811 frame.requestid)
811 812
812 813 if frame.requestid not in self._receivingcommands:
813 814 self._state = 'errored'
814 815 return self._makeerrorresult(
815 816 _('received frame for request that is not receiving: %d') %
816 817 frame.requestid)
817 818
818 819 entry = self._receivingcommands[frame.requestid]
819 820
820 821 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
821 822 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
822 823 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
823 824
824 825 if entry['requestdone']:
825 826 self._state = 'errored'
826 827 return self._makeerrorresult(
827 828 _('received command request frame when request frames '
828 829 'were supposedly done'))
829 830
830 831 if expectingdata != entry['expectingdata']:
831 832 self._state = 'errored'
832 833 return self._makeerrorresult(
833 834 _('mismatch between expect data flag and previous frame'))
834 835
835 836 entry['payload'].write(frame.payload)
836 837
837 838 if not moreframes:
838 839 entry['requestdone'] = True
839 840
840 841 if not moreframes and not expectingdata:
841 842 return self._makeruncommandresult(frame.requestid)
842 843
843 844 return self._makewantframeresult()
844 845
845 846 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
846 847 if not entry['expectingdata']:
847 848 self._state = 'errored'
848 849 return self._makeerrorresult(_(
849 850 'received command data frame for request that is not '
850 851 'expecting data: %d') % frame.requestid)
851 852
852 853 if entry['data'] is None:
853 854 entry['data'] = util.bytesio()
854 855
855 856 return self._handlecommanddataframe(frame, entry)
856 857 else:
857 858 self._state = 'errored'
858 859 return self._makeerrorresult(_(
859 860 'received unexpected frame type: %d') % frame.typeid)
860 861
861 862 def _handlecommanddataframe(self, frame, entry):
862 863 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
863 864
864 865 # TODO support streaming data instead of buffering it.
865 866 entry['data'].write(frame.payload)
866 867
867 868 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
868 869 return self._makewantframeresult()
869 870 elif frame.flags & FLAG_COMMAND_DATA_EOS:
870 871 entry['data'].seek(0)
871 872 return self._makeruncommandresult(frame.requestid)
872 873 else:
873 874 self._state = 'errored'
874 875 return self._makeerrorresult(_('command data frame without '
875 876 'flags'))
876 877
877 878 def _onframeerrored(self, frame):
878 879 return self._makeerrorresult(_('server already errored'))
880
881 class commandrequest(object):
882 """Represents a request to run a command."""
883
884 def __init__(self, requestid, name, args, datafh=None):
885 self.requestid = requestid
886 self.name = name
887 self.args = args
888 self.datafh = datafh
889 self.state = 'pending'
890
891 class clientreactor(object):
892 """Holds state of a client issuing frame-based protocol requests.
893
894 This is like ``serverreactor`` but for client-side state.
895
896 Each instance is bound to the lifetime of a connection. For persistent
897 connection transports using e.g. TCP sockets and speaking the raw
898 framing protocol, there will be a single instance for the lifetime of
899 the TCP socket. For transports where there are multiple discrete
900 interactions (say tunneled within in HTTP request), there will be a
901 separate instance for each distinct interaction.
902 """
903 def __init__(self, hasmultiplesend=False, buffersends=True):
904 """Create a new instance.
905
906 ``hasmultiplesend`` indicates whether multiple sends are supported
907 by the transport. When True, it is possible to send commands immediately
908 instead of buffering until the caller signals an intent to finish a
909 send operation.
910
911 ``buffercommands`` indicates whether sends should be buffered until the
912 last request has been issued.
913 """
914 self._hasmultiplesend = hasmultiplesend
915 self._buffersends = buffersends
916
917 self._canissuecommands = True
918 self._cansend = True
919
920 self._nextrequestid = 1
921 # We only support a single outgoing stream for now.
922 self._outgoingstream = stream(1)
923 self._pendingrequests = collections.deque()
924 self._activerequests = {}
925
926 def callcommand(self, name, args, datafh=None):
927 """Request that a command be executed.
928
929 Receives the command name, a dict of arguments to pass to the command,
930 and an optional file object containing the raw data for the command.
931
932 Returns a 3-tuple of (request, action, action data).
933 """
934 if not self._canissuecommands:
935 raise error.ProgrammingError('cannot issue new commands')
936
937 requestid = self._nextrequestid
938 self._nextrequestid += 2
939
940 request = commandrequest(requestid, name, args, datafh=datafh)
941
942 if self._buffersends:
943 self._pendingrequests.append(request)
944 return request, 'noop', {}
945 else:
946 if not self._cansend:
947 raise error.ProgrammingError('sends cannot be performed on '
948 'this instance')
949
950 if not self._hasmultiplesend:
951 self._cansend = False
952 self._canissuecommands = False
953
954 return request, 'sendframes', {
955 'framegen': self._makecommandframes(request),
956 }
957
958 def flushcommands(self):
959 """Request that all queued commands be sent.
960
961 If any commands are buffered, this will instruct the caller to send
962 them over the wire. If no commands are buffered it instructs the client
963 to no-op.
964
965 If instances aren't configured for multiple sends, no new command
966 requests are allowed after this is called.
967 """
968 if not self._pendingrequests:
969 return 'noop', {}
970
971 if not self._cansend:
972 raise error.ProgrammingError('sends cannot be performed on this '
973 'instance')
974
975 # If the instance only allows sending once, mark that we have fired
976 # our one shot.
977 if not self._hasmultiplesend:
978 self._canissuecommands = False
979 self._cansend = False
980
981 def makeframes():
982 while self._pendingrequests:
983 request = self._pendingrequests.popleft()
984 for frame in self._makecommandframes(request):
985 yield frame
986
987 return 'sendframes', {
988 'framegen': makeframes(),
989 }
990
991 def _makecommandframes(self, request):
992 """Emit frames to issue a command request.
993
994 As a side-effect, update request accounting to reflect its changed
995 state.
996 """
997 self._activerequests[request.requestid] = request
998 request.state = 'sending'
999
1000 res = createcommandframes(self._outgoingstream,
1001 request.requestid,
1002 request.name,
1003 request.args,
1004 request.datafh)
1005
1006 for frame in res:
1007 yield frame
1008
1009 request.state = 'sent'
General Comments 0
You need to be logged in to leave comments. Login now