##// END OF EJS Templates
wireprotov2: pass ui into clientreactor and serverreactor...
Gregory Szorc -
r40165:293835e0 default
parent child Browse files
Show More
@@ -1,986 +1,987
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import weakref
17 17
18 18 from .i18n import _
19 19 from . import (
20 20 bundle2,
21 21 error,
22 22 httpconnection,
23 23 pycompat,
24 24 repository,
25 25 statichttprepo,
26 26 url as urlmod,
27 27 util,
28 28 wireprotoframing,
29 29 wireprototypes,
30 30 wireprotov1peer,
31 31 wireprotov2peer,
32 32 wireprotov2server,
33 33 )
34 34 from .utils import (
35 35 cborutil,
36 36 interfaceutil,
37 37 stringutil,
38 38 )
39 39
40 40 httplib = util.httplib
41 41 urlerr = util.urlerr
42 42 urlreq = util.urlreq
43 43
44 44 def encodevalueinheaders(value, header, limit):
45 45 """Encode a string value into multiple HTTP headers.
46 46
47 47 ``value`` will be encoded into 1 or more HTTP headers with the names
48 48 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
49 49 name + value will be at most ``limit`` bytes long.
50 50
51 51 Returns an iterable of 2-tuples consisting of header names and
52 52 values as native strings.
53 53 """
54 54 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
55 55 # not bytes. This function always takes bytes in as arguments.
56 56 fmt = pycompat.strurl(header) + r'-%s'
57 57 # Note: it is *NOT* a bug that the last bit here is a bytestring
58 58 # and not a unicode: we're just getting the encoded length anyway,
59 59 # and using an r-string to make it portable between Python 2 and 3
60 60 # doesn't work because then the \r is a literal backslash-r
61 61 # instead of a carriage return.
62 62 valuelen = limit - len(fmt % r'000') - len(': \r\n')
63 63 result = []
64 64
65 65 n = 0
66 66 for i in pycompat.xrange(0, len(value), valuelen):
67 67 n += 1
68 68 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
69 69
70 70 return result
71 71
72 72 class _multifile(object):
73 73 def __init__(self, *fileobjs):
74 74 for f in fileobjs:
75 75 if not util.safehasattr(f, 'length'):
76 76 raise ValueError(
77 77 '_multifile only supports file objects that '
78 78 'have a length but this one does not:', type(f), f)
79 79 self._fileobjs = fileobjs
80 80 self._index = 0
81 81
82 82 @property
83 83 def length(self):
84 84 return sum(f.length for f in self._fileobjs)
85 85
86 86 def read(self, amt=None):
87 87 if amt <= 0:
88 88 return ''.join(f.read() for f in self._fileobjs)
89 89 parts = []
90 90 while amt and self._index < len(self._fileobjs):
91 91 parts.append(self._fileobjs[self._index].read(amt))
92 92 got = len(parts[-1])
93 93 if got < amt:
94 94 self._index += 1
95 95 amt -= got
96 96 return ''.join(parts)
97 97
98 98 def seek(self, offset, whence=os.SEEK_SET):
99 99 if whence != os.SEEK_SET:
100 100 raise NotImplementedError(
101 101 '_multifile does not support anything other'
102 102 ' than os.SEEK_SET for whence on seek()')
103 103 if offset != 0:
104 104 raise NotImplementedError(
105 105 '_multifile only supports seeking to start, but that '
106 106 'could be fixed if you need it')
107 107 for f in self._fileobjs:
108 108 f.seek(0)
109 109 self._index = 0
110 110
111 111 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
112 112 repobaseurl, cmd, args):
113 113 """Make an HTTP request to run a command for a version 1 client.
114 114
115 115 ``caps`` is a set of known server capabilities. The value may be
116 116 None if capabilities are not yet known.
117 117
118 118 ``capablefn`` is a function to evaluate a capability.
119 119
120 120 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
121 121 raw data to pass to it.
122 122 """
123 123 if cmd == 'pushkey':
124 124 args['data'] = ''
125 125 data = args.pop('data', None)
126 126 headers = args.pop('headers', {})
127 127
128 128 ui.debug("sending %s command\n" % cmd)
129 129 q = [('cmd', cmd)]
130 130 headersize = 0
131 131 # Important: don't use self.capable() here or else you end up
132 132 # with infinite recursion when trying to look up capabilities
133 133 # for the first time.
134 134 postargsok = caps is not None and 'httppostargs' in caps
135 135
136 136 # Send arguments via POST.
137 137 if postargsok and args:
138 138 strargs = urlreq.urlencode(sorted(args.items()))
139 139 if not data:
140 140 data = strargs
141 141 else:
142 142 if isinstance(data, bytes):
143 143 i = io.BytesIO(data)
144 144 i.length = len(data)
145 145 data = i
146 146 argsio = io.BytesIO(strargs)
147 147 argsio.length = len(strargs)
148 148 data = _multifile(argsio, data)
149 149 headers[r'X-HgArgs-Post'] = len(strargs)
150 150 elif args:
151 151 # Calling self.capable() can infinite loop if we are calling
152 152 # "capabilities". But that command should never accept wire
153 153 # protocol arguments. So this should never happen.
154 154 assert cmd != 'capabilities'
155 155 httpheader = capablefn('httpheader')
156 156 if httpheader:
157 157 headersize = int(httpheader.split(',', 1)[0])
158 158
159 159 # Send arguments via HTTP headers.
160 160 if headersize > 0:
161 161 # The headers can typically carry more data than the URL.
162 162 encargs = urlreq.urlencode(sorted(args.items()))
163 163 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
164 164 headersize):
165 165 headers[header] = value
166 166 # Send arguments via query string (Mercurial <1.9).
167 167 else:
168 168 q += sorted(args.items())
169 169
170 170 qs = '?%s' % urlreq.urlencode(q)
171 171 cu = "%s%s" % (repobaseurl, qs)
172 172 size = 0
173 173 if util.safehasattr(data, 'length'):
174 174 size = data.length
175 175 elif data is not None:
176 176 size = len(data)
177 177 if data is not None and r'Content-Type' not in headers:
178 178 headers[r'Content-Type'] = r'application/mercurial-0.1'
179 179
180 180 # Tell the server we accept application/mercurial-0.2 and multiple
181 181 # compression formats if the server is capable of emitting those
182 182 # payloads.
183 183 # Note: Keep this set empty by default, as client advertisement of
184 184 # protocol parameters should only occur after the handshake.
185 185 protoparams = set()
186 186
187 187 mediatypes = set()
188 188 if caps is not None:
189 189 mt = capablefn('httpmediatype')
190 190 if mt:
191 191 protoparams.add('0.1')
192 192 mediatypes = set(mt.split(','))
193 193
194 194 protoparams.add('partial-pull')
195 195
196 196 if '0.2tx' in mediatypes:
197 197 protoparams.add('0.2')
198 198
199 199 if '0.2tx' in mediatypes and capablefn('compression'):
200 200 # We /could/ compare supported compression formats and prune
201 201 # non-mutually supported or error if nothing is mutually supported.
202 202 # For now, send the full list to the server and have it error.
203 203 comps = [e.wireprotosupport().name for e in
204 204 util.compengines.supportedwireengines(util.CLIENTROLE)]
205 205 protoparams.add('comp=%s' % ','.join(comps))
206 206
207 207 if protoparams:
208 208 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
209 209 'X-HgProto',
210 210 headersize or 1024)
211 211 for header, value in protoheaders:
212 212 headers[header] = value
213 213
214 214 varyheaders = []
215 215 for header in headers:
216 216 if header.lower().startswith(r'x-hg'):
217 217 varyheaders.append(header)
218 218
219 219 if varyheaders:
220 220 headers[r'Vary'] = r','.join(sorted(varyheaders))
221 221
222 222 req = requestbuilder(pycompat.strurl(cu), data, headers)
223 223
224 224 if data is not None:
225 225 ui.debug("sending %d bytes\n" % size)
226 226 req.add_unredirected_header(r'Content-Length', r'%d' % size)
227 227
228 228 return req, cu, qs
229 229
230 230 def _reqdata(req):
231 231 """Get request data, if any. If no data, returns None."""
232 232 if pycompat.ispy3:
233 233 return req.data
234 234 if not req.has_data():
235 235 return None
236 236 return req.get_data()
237 237
238 238 def sendrequest(ui, opener, req):
239 239 """Send a prepared HTTP request.
240 240
241 241 Returns the response object.
242 242 """
243 243 dbg = ui.debug
244 244 if (ui.debugflag
245 245 and ui.configbool('devel', 'debug.peer-request')):
246 246 line = 'devel-peer-request: %s\n'
247 247 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
248 248 pycompat.bytesurl(req.get_full_url())))
249 249 hgargssize = None
250 250
251 251 for header, value in sorted(req.header_items()):
252 252 header = pycompat.bytesurl(header)
253 253 value = pycompat.bytesurl(value)
254 254 if header.startswith('X-hgarg-'):
255 255 if hgargssize is None:
256 256 hgargssize = 0
257 257 hgargssize += len(value)
258 258 else:
259 259 dbg(line % ' %s %s' % (header, value))
260 260
261 261 if hgargssize is not None:
262 262 dbg(line % ' %d bytes of commands arguments in headers'
263 263 % hgargssize)
264 264 data = _reqdata(req)
265 265 if data is not None:
266 266 length = getattr(data, 'length', None)
267 267 if length is None:
268 268 length = len(data)
269 269 dbg(line % ' %d bytes of data' % length)
270 270
271 271 start = util.timer()
272 272
273 273 res = None
274 274 try:
275 275 res = opener.open(req)
276 276 except urlerr.httperror as inst:
277 277 if inst.code == 401:
278 278 raise error.Abort(_('authorization failed'))
279 279 raise
280 280 except httplib.HTTPException as inst:
281 281 ui.debug('http error requesting %s\n' %
282 282 util.hidepassword(req.get_full_url()))
283 283 ui.traceback()
284 284 raise IOError(None, inst)
285 285 finally:
286 286 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
287 287 code = res.code if res else -1
288 288 dbg(line % ' finished in %.4f seconds (%d)'
289 289 % (util.timer() - start, code))
290 290
291 291 # Insert error handlers for common I/O failures.
292 292 urlmod.wrapresponse(res)
293 293
294 294 return res
295 295
296 296 class RedirectedRepoError(error.RepoError):
297 297 def __init__(self, msg, respurl):
298 298 super(RedirectedRepoError, self).__init__(msg)
299 299 self.respurl = respurl
300 300
301 301 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
302 302 allowcbor=False):
303 303 # record the url we got redirected to
304 304 redirected = False
305 305 respurl = pycompat.bytesurl(resp.geturl())
306 306 if respurl.endswith(qs):
307 307 respurl = respurl[:-len(qs)]
308 308 qsdropped = False
309 309 else:
310 310 qsdropped = True
311 311
312 312 if baseurl.rstrip('/') != respurl.rstrip('/'):
313 313 redirected = True
314 314 if not ui.quiet:
315 315 ui.warn(_('real URL is %s\n') % respurl)
316 316
317 317 try:
318 318 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
319 319 except AttributeError:
320 320 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
321 321
322 322 safeurl = util.hidepassword(baseurl)
323 323 if proto.startswith('application/hg-error'):
324 324 raise error.OutOfBandError(resp.read())
325 325
326 326 # Pre 1.0 versions of Mercurial used text/plain and
327 327 # application/hg-changegroup. We don't support such old servers.
328 328 if not proto.startswith('application/mercurial-'):
329 329 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
330 330 msg = _("'%s' does not appear to be an hg repository:\n"
331 331 "---%%<--- (%s)\n%s\n---%%<---\n") % (
332 332 safeurl, proto or 'no content-type', resp.read(1024))
333 333
334 334 # Some servers may strip the query string from the redirect. We
335 335 # raise a special error type so callers can react to this specially.
336 336 if redirected and qsdropped:
337 337 raise RedirectedRepoError(msg, respurl)
338 338 else:
339 339 raise error.RepoError(msg)
340 340
341 341 try:
342 342 subtype = proto.split('-', 1)[1]
343 343
344 344 # Unless we end up supporting CBOR in the legacy wire protocol,
345 345 # this should ONLY be encountered for the initial capabilities
346 346 # request during handshake.
347 347 if subtype == 'cbor':
348 348 if allowcbor:
349 349 return respurl, proto, resp
350 350 else:
351 351 raise error.RepoError(_('unexpected CBOR response from '
352 352 'server'))
353 353
354 354 version_info = tuple([int(n) for n in subtype.split('.')])
355 355 except ValueError:
356 356 raise error.RepoError(_("'%s' sent a broken Content-Type "
357 357 "header (%s)") % (safeurl, proto))
358 358
359 359 # TODO consider switching to a decompression reader that uses
360 360 # generators.
361 361 if version_info == (0, 1):
362 362 if compressible:
363 363 resp = util.compengines['zlib'].decompressorreader(resp)
364 364
365 365 elif version_info == (0, 2):
366 366 # application/mercurial-0.2 always identifies the compression
367 367 # engine in the payload header.
368 368 elen = struct.unpack('B', util.readexactly(resp, 1))[0]
369 369 ename = util.readexactly(resp, elen)
370 370 engine = util.compengines.forwiretype(ename)
371 371
372 372 resp = engine.decompressorreader(resp)
373 373 else:
374 374 raise error.RepoError(_("'%s' uses newer protocol %s") %
375 375 (safeurl, subtype))
376 376
377 377 return respurl, proto, resp
378 378
379 379 class httppeer(wireprotov1peer.wirepeer):
380 380 def __init__(self, ui, path, url, opener, requestbuilder, caps):
381 381 self.ui = ui
382 382 self._path = path
383 383 self._url = url
384 384 self._caps = caps
385 385 self._urlopener = opener
386 386 self._requestbuilder = requestbuilder
387 387
388 388 def __del__(self):
389 389 for h in self._urlopener.handlers:
390 390 h.close()
391 391 getattr(h, "close_all", lambda: None)()
392 392
393 393 # Begin of ipeerconnection interface.
394 394
395 395 def url(self):
396 396 return self._path
397 397
398 398 def local(self):
399 399 return None
400 400
401 401 def peer(self):
402 402 return self
403 403
404 404 def canpush(self):
405 405 return True
406 406
407 407 def close(self):
408 408 self.ui.note(_('(sent %d HTTP requests and %d bytes; '
409 409 'received %d bytes in responses)\n') %
410 410 (self._urlopener.requestscount,
411 411 self._urlopener.sentbytescount,
412 412 self._urlopener.receivedbytescount))
413 413
414 414 # End of ipeerconnection interface.
415 415
416 416 # Begin of ipeercommands interface.
417 417
418 418 def capabilities(self):
419 419 return self._caps
420 420
421 421 # End of ipeercommands interface.
422 422
423 423 def _callstream(self, cmd, _compressible=False, **args):
424 424 args = pycompat.byteskwargs(args)
425 425
426 426 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
427 427 self._caps, self.capable,
428 428 self._url, cmd, args)
429 429
430 430 resp = sendrequest(self.ui, self._urlopener, req)
431 431
432 432 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
433 433 resp, _compressible)
434 434
435 435 return resp
436 436
437 437 def _call(self, cmd, **args):
438 438 fp = self._callstream(cmd, **args)
439 439 try:
440 440 return fp.read()
441 441 finally:
442 442 # if using keepalive, allow connection to be reused
443 443 fp.close()
444 444
445 445 def _callpush(self, cmd, cg, **args):
446 446 # have to stream bundle to a temp file because we do not have
447 447 # http 1.1 chunked transfer.
448 448
449 449 types = self.capable('unbundle')
450 450 try:
451 451 types = types.split(',')
452 452 except AttributeError:
453 453 # servers older than d1b16a746db6 will send 'unbundle' as a
454 454 # boolean capability. They only support headerless/uncompressed
455 455 # bundles.
456 456 types = [""]
457 457 for x in types:
458 458 if x in bundle2.bundletypes:
459 459 type = x
460 460 break
461 461
462 462 tempname = bundle2.writebundle(self.ui, cg, None, type)
463 463 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
464 464 headers = {r'Content-Type': r'application/mercurial-0.1'}
465 465
466 466 try:
467 467 r = self._call(cmd, data=fp, headers=headers, **args)
468 468 vals = r.split('\n', 1)
469 469 if len(vals) < 2:
470 470 raise error.ResponseError(_("unexpected response:"), r)
471 471 return vals
472 472 except urlerr.httperror:
473 473 # Catch and re-raise these so we don't try and treat them
474 474 # like generic socket errors. They lack any values in
475 475 # .args on Python 3 which breaks our socket.error block.
476 476 raise
477 477 except socket.error as err:
478 478 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
479 479 raise error.Abort(_('push failed: %s') % err.args[1])
480 480 raise error.Abort(err.args[1])
481 481 finally:
482 482 fp.close()
483 483 os.unlink(tempname)
484 484
485 485 def _calltwowaystream(self, cmd, fp, **args):
486 486 fh = None
487 487 fp_ = None
488 488 filename = None
489 489 try:
490 490 # dump bundle to disk
491 491 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
492 492 fh = os.fdopen(fd, r"wb")
493 493 d = fp.read(4096)
494 494 while d:
495 495 fh.write(d)
496 496 d = fp.read(4096)
497 497 fh.close()
498 498 # start http push
499 499 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
500 500 headers = {r'Content-Type': r'application/mercurial-0.1'}
501 501 return self._callstream(cmd, data=fp_, headers=headers, **args)
502 502 finally:
503 503 if fp_ is not None:
504 504 fp_.close()
505 505 if fh is not None:
506 506 fh.close()
507 507 os.unlink(filename)
508 508
509 509 def _callcompressable(self, cmd, **args):
510 510 return self._callstream(cmd, _compressible=True, **args)
511 511
512 512 def _abort(self, exception):
513 513 raise exception
514 514
515 515 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests,
516 516 redirect):
517 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
517 reactor = wireprotoframing.clientreactor(ui,
518 hasmultiplesend=False,
518 519 buffersends=True)
519 520
520 521 handler = wireprotov2peer.clienthandler(ui, reactor,
521 522 opener=opener,
522 523 requestbuilder=requestbuilder)
523 524
524 525 url = '%s/%s' % (apiurl, permission)
525 526
526 527 if len(requests) > 1:
527 528 url += '/multirequest'
528 529 else:
529 530 url += '/%s' % requests[0][0]
530 531
531 532 ui.debug('sending %d commands\n' % len(requests))
532 533 for command, args, f in requests:
533 534 ui.debug('sending command %s: %s\n' % (
534 535 command, stringutil.pprint(args, indent=2)))
535 536 assert not list(handler.callcommand(command, args, f,
536 537 redirect=redirect))
537 538
538 539 # TODO stream this.
539 540 body = b''.join(map(bytes, handler.flushcommands()))
540 541
541 542 # TODO modify user-agent to reflect v2
542 543 headers = {
543 544 r'Accept': wireprotov2server.FRAMINGTYPE,
544 545 r'Content-Type': wireprotov2server.FRAMINGTYPE,
545 546 }
546 547
547 548 req = requestbuilder(pycompat.strurl(url), body, headers)
548 549 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
549 550
550 551 try:
551 552 res = opener.open(req)
552 553 except urlerr.httperror as e:
553 554 if e.code == 401:
554 555 raise error.Abort(_('authorization failed'))
555 556
556 557 raise
557 558 except httplib.HTTPException as e:
558 559 ui.traceback()
559 560 raise IOError(None, e)
560 561
561 562 return handler, res
562 563
563 564 class queuedcommandfuture(pycompat.futures.Future):
564 565 """Wraps result() on command futures to trigger submission on call."""
565 566
566 567 def result(self, timeout=None):
567 568 if self.done():
568 569 return pycompat.futures.Future.result(self, timeout)
569 570
570 571 self._peerexecutor.sendcommands()
571 572
572 573 # sendcommands() will restore the original __class__ and self.result
573 574 # will resolve to Future.result.
574 575 return self.result(timeout)
575 576
576 577 @interfaceutil.implementer(repository.ipeercommandexecutor)
577 578 class httpv2executor(object):
578 579 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor,
579 580 redirect):
580 581 self._ui = ui
581 582 self._opener = opener
582 583 self._requestbuilder = requestbuilder
583 584 self._apiurl = apiurl
584 585 self._descriptor = descriptor
585 586 self._redirect = redirect
586 587 self._sent = False
587 588 self._closed = False
588 589 self._neededpermissions = set()
589 590 self._calls = []
590 591 self._futures = weakref.WeakSet()
591 592 self._responseexecutor = None
592 593 self._responsef = None
593 594
594 595 def __enter__(self):
595 596 return self
596 597
597 598 def __exit__(self, exctype, excvalue, exctb):
598 599 self.close()
599 600
600 601 def callcommand(self, command, args):
601 602 if self._sent:
602 603 raise error.ProgrammingError('callcommand() cannot be used after '
603 604 'commands are sent')
604 605
605 606 if self._closed:
606 607 raise error.ProgrammingError('callcommand() cannot be used after '
607 608 'close()')
608 609
609 610 # The service advertises which commands are available. So if we attempt
610 611 # to call an unknown command or pass an unknown argument, we can screen
611 612 # for this.
612 613 if command not in self._descriptor['commands']:
613 614 raise error.ProgrammingError(
614 615 'wire protocol command %s is not available' % command)
615 616
616 617 cmdinfo = self._descriptor['commands'][command]
617 618 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
618 619
619 620 if unknownargs:
620 621 raise error.ProgrammingError(
621 622 'wire protocol command %s does not accept argument: %s' % (
622 623 command, ', '.join(sorted(unknownargs))))
623 624
624 625 self._neededpermissions |= set(cmdinfo['permissions'])
625 626
626 627 # TODO we /could/ also validate types here, since the API descriptor
627 628 # includes types...
628 629
629 630 f = pycompat.futures.Future()
630 631
631 632 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
632 633 # could deadlock.
633 634 f.__class__ = queuedcommandfuture
634 635 f._peerexecutor = self
635 636
636 637 self._futures.add(f)
637 638 self._calls.append((command, args, f))
638 639
639 640 return f
640 641
641 642 def sendcommands(self):
642 643 if self._sent:
643 644 return
644 645
645 646 if not self._calls:
646 647 return
647 648
648 649 self._sent = True
649 650
650 651 # Unhack any future types so caller sees a clean type and so we
651 652 # break reference cycle.
652 653 for f in self._futures:
653 654 if isinstance(f, queuedcommandfuture):
654 655 f.__class__ = pycompat.futures.Future
655 656 f._peerexecutor = None
656 657
657 658 # Mark the future as running and filter out cancelled futures.
658 659 calls = [(command, args, f)
659 660 for command, args, f in self._calls
660 661 if f.set_running_or_notify_cancel()]
661 662
662 663 # Clear out references, prevent improper object usage.
663 664 self._calls = None
664 665
665 666 if not calls:
666 667 return
667 668
668 669 permissions = set(self._neededpermissions)
669 670
670 671 if 'push' in permissions and 'pull' in permissions:
671 672 permissions.remove('pull')
672 673
673 674 if len(permissions) > 1:
674 675 raise error.RepoError(_('cannot make request requiring multiple '
675 676 'permissions: %s') %
676 677 _(', ').join(sorted(permissions)))
677 678
678 679 permission = {
679 680 'push': 'rw',
680 681 'pull': 'ro',
681 682 }[permissions.pop()]
682 683
683 684 handler, resp = sendv2request(
684 685 self._ui, self._opener, self._requestbuilder, self._apiurl,
685 686 permission, calls, self._redirect)
686 687
687 688 # TODO we probably want to validate the HTTP code, media type, etc.
688 689
689 690 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
690 691 self._responsef = self._responseexecutor.submit(self._handleresponse,
691 692 handler, resp)
692 693
693 694 def close(self):
694 695 if self._closed:
695 696 return
696 697
697 698 self.sendcommands()
698 699
699 700 self._closed = True
700 701
701 702 if not self._responsef:
702 703 return
703 704
704 705 # TODO ^C here may not result in immediate program termination.
705 706
706 707 try:
707 708 self._responsef.result()
708 709 finally:
709 710 self._responseexecutor.shutdown(wait=True)
710 711 self._responsef = None
711 712 self._responseexecutor = None
712 713
713 714 # If any of our futures are still in progress, mark them as
714 715 # errored, otherwise a result() could wait indefinitely.
715 716 for f in self._futures:
716 717 if not f.done():
717 718 f.set_exception(error.ResponseError(
718 719 _('unfulfilled command response')))
719 720
720 721 self._futures = None
721 722
722 723 def _handleresponse(self, handler, resp):
723 724 # Called in a thread to read the response.
724 725
725 726 while handler.readdata(resp):
726 727 pass
727 728
728 729 # TODO implement interface for version 2 peers
729 730 @interfaceutil.implementer(repository.ipeerconnection,
730 731 repository.ipeercapabilities,
731 732 repository.ipeerrequests)
732 733 class httpv2peer(object):
733 734 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
734 735 apidescriptor):
735 736 self.ui = ui
736 737
737 738 if repourl.endswith('/'):
738 739 repourl = repourl[:-1]
739 740
740 741 self._url = repourl
741 742 self._apipath = apipath
742 743 self._apiurl = '%s/%s' % (repourl, apipath)
743 744 self._opener = opener
744 745 self._requestbuilder = requestbuilder
745 746 self._descriptor = apidescriptor
746 747
747 748 self._redirect = wireprotov2peer.supportedredirects(ui, apidescriptor)
748 749
749 750 # Start of ipeerconnection.
750 751
751 752 def url(self):
752 753 return self._url
753 754
754 755 def local(self):
755 756 return None
756 757
757 758 def peer(self):
758 759 return self
759 760
760 761 def canpush(self):
761 762 # TODO change once implemented.
762 763 return False
763 764
764 765 def close(self):
765 766 self.ui.note(_('(sent %d HTTP requests and %d bytes; '
766 767 'received %d bytes in responses)\n') %
767 768 (self._opener.requestscount,
768 769 self._opener.sentbytescount,
769 770 self._opener.receivedbytescount))
770 771
771 772 # End of ipeerconnection.
772 773
773 774 # Start of ipeercapabilities.
774 775
775 776 def capable(self, name):
776 777 # The capabilities used internally historically map to capabilities
777 778 # advertised from the "capabilities" wire protocol command. However,
778 779 # version 2 of that command works differently.
779 780
780 781 # Maps to commands that are available.
781 782 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
782 783 return True
783 784
784 785 # Other concepts.
785 786 if name in ('bundle2'):
786 787 return True
787 788
788 789 # Alias command-* to presence of command of that name.
789 790 if name.startswith('command-'):
790 791 return name[len('command-'):] in self._descriptor['commands']
791 792
792 793 return False
793 794
794 795 def requirecap(self, name, purpose):
795 796 if self.capable(name):
796 797 return
797 798
798 799 raise error.CapabilityError(
799 800 _('cannot %s; client or remote repository does not support the %r '
800 801 'capability') % (purpose, name))
801 802
802 803 # End of ipeercapabilities.
803 804
804 805 def _call(self, name, **args):
805 806 with self.commandexecutor() as e:
806 807 return e.callcommand(name, args).result()
807 808
808 809 def commandexecutor(self):
809 810 return httpv2executor(self.ui, self._opener, self._requestbuilder,
810 811 self._apiurl, self._descriptor, self._redirect)
811 812
812 813 # Registry of API service names to metadata about peers that handle it.
813 814 #
814 815 # The following keys are meaningful:
815 816 #
816 817 # init
817 818 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
818 819 # apidescriptor) to create a peer.
819 820 #
820 821 # priority
821 822 # Integer priority for the service. If we could choose from multiple
822 823 # services, we choose the one with the highest priority.
823 824 API_PEERS = {
824 825 wireprototypes.HTTP_WIREPROTO_V2: {
825 826 'init': httpv2peer,
826 827 'priority': 50,
827 828 },
828 829 }
829 830
830 831 def performhandshake(ui, url, opener, requestbuilder):
831 832 # The handshake is a request to the capabilities command.
832 833
833 834 caps = None
834 835 def capable(x):
835 836 raise error.ProgrammingError('should not be called')
836 837
837 838 args = {}
838 839
839 840 # The client advertises support for newer protocols by adding an
840 841 # X-HgUpgrade-* header with a list of supported APIs and an
841 842 # X-HgProto-* header advertising which serializing formats it supports.
842 843 # We only support the HTTP version 2 transport and CBOR responses for
843 844 # now.
844 845 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
845 846
846 847 if advertisev2:
847 848 args['headers'] = {
848 849 r'X-HgProto-1': r'cbor',
849 850 }
850 851
851 852 args['headers'].update(
852 853 encodevalueinheaders(' '.join(sorted(API_PEERS)),
853 854 'X-HgUpgrade',
854 855 # We don't know the header limit this early.
855 856 # So make it small.
856 857 1024))
857 858
858 859 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
859 860 capable, url, 'capabilities',
860 861 args)
861 862 resp = sendrequest(ui, opener, req)
862 863
863 864 # The server may redirect us to the repo root, stripping the
864 865 # ?cmd=capabilities query string from the URL. The server would likely
865 866 # return HTML in this case and ``parsev1commandresponse()`` would raise.
866 867 # We catch this special case and re-issue the capabilities request against
867 868 # the new URL.
868 869 #
869 870 # We should ideally not do this, as a redirect that drops the query
870 871 # string from the URL is arguably a server bug. (Garbage in, garbage out).
871 872 # However, Mercurial clients for several years appeared to handle this
872 873 # issue without behavior degradation. And according to issue 5860, it may
873 874 # be a longstanding bug in some server implementations. So we allow a
874 875 # redirect that drops the query string to "just work."
875 876 try:
876 877 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
877 878 compressible=False,
878 879 allowcbor=advertisev2)
879 880 except RedirectedRepoError as e:
880 881 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
881 882 capable, e.respurl,
882 883 'capabilities', args)
883 884 resp = sendrequest(ui, opener, req)
884 885 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
885 886 compressible=False,
886 887 allowcbor=advertisev2)
887 888
888 889 try:
889 890 rawdata = resp.read()
890 891 finally:
891 892 resp.close()
892 893
893 894 if not ct.startswith('application/mercurial-'):
894 895 raise error.ProgrammingError('unexpected content-type: %s' % ct)
895 896
896 897 if advertisev2:
897 898 if ct == 'application/mercurial-cbor':
898 899 try:
899 900 info = cborutil.decodeall(rawdata)[0]
900 901 except cborutil.CBORDecodeError:
901 902 raise error.Abort(_('error decoding CBOR from remote server'),
902 903 hint=_('try again and consider contacting '
903 904 'the server operator'))
904 905
905 906 # We got a legacy response. That's fine.
906 907 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
907 908 info = {
908 909 'v1capabilities': set(rawdata.split())
909 910 }
910 911
911 912 else:
912 913 raise error.RepoError(
913 914 _('unexpected response type from server: %s') % ct)
914 915 else:
915 916 info = {
916 917 'v1capabilities': set(rawdata.split())
917 918 }
918 919
919 920 return respurl, info
920 921
921 922 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
922 923 """Construct an appropriate HTTP peer instance.
923 924
924 925 ``opener`` is an ``url.opener`` that should be used to establish
925 926 connections, perform HTTP requests.
926 927
927 928 ``requestbuilder`` is the type used for constructing HTTP requests.
928 929 It exists as an argument so extensions can override the default.
929 930 """
930 931 u = util.url(path)
931 932 if u.query or u.fragment:
932 933 raise error.Abort(_('unsupported URL component: "%s"') %
933 934 (u.query or u.fragment))
934 935
935 936 # urllib cannot handle URLs with embedded user or passwd.
936 937 url, authinfo = u.authinfo()
937 938 ui.debug('using %s\n' % url)
938 939
939 940 opener = opener or urlmod.opener(ui, authinfo)
940 941
941 942 respurl, info = performhandshake(ui, url, opener, requestbuilder)
942 943
943 944 # Given the intersection of APIs that both we and the server support,
944 945 # sort by their advertised priority and pick the first one.
945 946 #
946 947 # TODO consider making this request-based and interface driven. For
947 948 # example, the caller could say "I want a peer that does X." It's quite
948 949 # possible that not all peers would do that. Since we know the service
949 950 # capabilities, we could filter out services not meeting the
950 951 # requirements. Possibly by consulting the interfaces defined by the
951 952 # peer type.
952 953 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
953 954
954 955 preferredchoices = sorted(apipeerchoices,
955 956 key=lambda x: API_PEERS[x]['priority'],
956 957 reverse=True)
957 958
958 959 for service in preferredchoices:
959 960 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
960 961
961 962 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
962 963 requestbuilder,
963 964 info['apis'][service])
964 965
965 966 # Failed to construct an API peer. Fall back to legacy.
966 967 return httppeer(ui, path, respurl, opener, requestbuilder,
967 968 info['v1capabilities'])
968 969
969 970 def instance(ui, path, create, intents=None, createopts=None):
970 971 if create:
971 972 raise error.Abort(_('cannot create new http repository'))
972 973 try:
973 974 if path.startswith('https:') and not urlmod.has_https:
974 975 raise error.Abort(_('Python support for SSL and HTTPS '
975 976 'is not installed'))
976 977
977 978 inst = makepeer(ui, path)
978 979
979 980 return inst
980 981 except error.RepoError as httpexception:
981 982 try:
982 983 r = statichttprepo.instance(ui, "static-" + path, create)
983 984 ui.note(_('(falling back to static-http)\n'))
984 985 return r
985 986 except error.RepoError:
986 987 raise httpexception # use the original http RepoError instead
@@ -1,1600 +1,1602
1 1 # wireprotoframing.py - unified framing protocol for wire protocol
2 2 #
3 3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 # This file contains functionality to support the unified frame-based wire
9 9 # protocol. For details about the protocol, see
10 10 # `hg help internals.wireprotocol`.
11 11
12 12 from __future__ import absolute_import
13 13
14 14 import collections
15 15 import struct
16 16
17 17 from .i18n import _
18 18 from .thirdparty import (
19 19 attr,
20 20 )
21 21 from . import (
22 22 encoding,
23 23 error,
24 24 pycompat,
25 25 util,
26 26 wireprototypes,
27 27 )
28 28 from .utils import (
29 29 cborutil,
30 30 stringutil,
31 31 )
32 32
33 33 FRAME_HEADER_SIZE = 8
34 34 DEFAULT_MAX_FRAME_SIZE = 32768
35 35
36 36 STREAM_FLAG_BEGIN_STREAM = 0x01
37 37 STREAM_FLAG_END_STREAM = 0x02
38 38 STREAM_FLAG_ENCODING_APPLIED = 0x04
39 39
40 40 STREAM_FLAGS = {
41 41 b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
42 42 b'stream-end': STREAM_FLAG_END_STREAM,
43 43 b'encoded': STREAM_FLAG_ENCODING_APPLIED,
44 44 }
45 45
46 46 FRAME_TYPE_COMMAND_REQUEST = 0x01
47 47 FRAME_TYPE_COMMAND_DATA = 0x02
48 48 FRAME_TYPE_COMMAND_RESPONSE = 0x03
49 49 FRAME_TYPE_ERROR_RESPONSE = 0x05
50 50 FRAME_TYPE_TEXT_OUTPUT = 0x06
51 51 FRAME_TYPE_PROGRESS = 0x07
52 52 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
53 53 FRAME_TYPE_STREAM_SETTINGS = 0x09
54 54
55 55 FRAME_TYPES = {
56 56 b'command-request': FRAME_TYPE_COMMAND_REQUEST,
57 57 b'command-data': FRAME_TYPE_COMMAND_DATA,
58 58 b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
59 59 b'error-response': FRAME_TYPE_ERROR_RESPONSE,
60 60 b'text-output': FRAME_TYPE_TEXT_OUTPUT,
61 61 b'progress': FRAME_TYPE_PROGRESS,
62 62 b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
63 63 b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
64 64 }
65 65
66 66 FLAG_COMMAND_REQUEST_NEW = 0x01
67 67 FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
68 68 FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
69 69 FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
70 70
71 71 FLAGS_COMMAND_REQUEST = {
72 72 b'new': FLAG_COMMAND_REQUEST_NEW,
73 73 b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
74 74 b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
75 75 b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
76 76 }
77 77
78 78 FLAG_COMMAND_DATA_CONTINUATION = 0x01
79 79 FLAG_COMMAND_DATA_EOS = 0x02
80 80
81 81 FLAGS_COMMAND_DATA = {
82 82 b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
83 83 b'eos': FLAG_COMMAND_DATA_EOS,
84 84 }
85 85
86 86 FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
87 87 FLAG_COMMAND_RESPONSE_EOS = 0x02
88 88
89 89 FLAGS_COMMAND_RESPONSE = {
90 90 b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
91 91 b'eos': FLAG_COMMAND_RESPONSE_EOS,
92 92 }
93 93
94 94 FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
95 95 FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
96 96
97 97 FLAGS_SENDER_PROTOCOL_SETTINGS = {
98 98 b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
99 99 b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
100 100 }
101 101
102 102 FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
103 103 FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
104 104
105 105 FLAGS_STREAM_ENCODING_SETTINGS = {
106 106 b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
107 107 b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
108 108 }
109 109
110 110 # Maps frame types to their available flags.
111 111 FRAME_TYPE_FLAGS = {
112 112 FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
113 113 FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
114 114 FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
115 115 FRAME_TYPE_ERROR_RESPONSE: {},
116 116 FRAME_TYPE_TEXT_OUTPUT: {},
117 117 FRAME_TYPE_PROGRESS: {},
118 118 FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
119 119 FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
120 120 }
121 121
122 122 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
123 123
124 124 def humanflags(mapping, value):
125 125 """Convert a numeric flags value to a human value, using a mapping table."""
126 126 namemap = {v: k for k, v in mapping.iteritems()}
127 127 flags = []
128 128 val = 1
129 129 while value >= val:
130 130 if value & val:
131 131 flags.append(namemap.get(val, '<unknown 0x%02x>' % val))
132 132 val <<= 1
133 133
134 134 return b'|'.join(flags)
135 135
136 136 @attr.s(slots=True)
137 137 class frameheader(object):
138 138 """Represents the data in a frame header."""
139 139
140 140 length = attr.ib()
141 141 requestid = attr.ib()
142 142 streamid = attr.ib()
143 143 streamflags = attr.ib()
144 144 typeid = attr.ib()
145 145 flags = attr.ib()
146 146
147 147 @attr.s(slots=True, repr=False)
148 148 class frame(object):
149 149 """Represents a parsed frame."""
150 150
151 151 requestid = attr.ib()
152 152 streamid = attr.ib()
153 153 streamflags = attr.ib()
154 154 typeid = attr.ib()
155 155 flags = attr.ib()
156 156 payload = attr.ib()
157 157
158 158 @encoding.strmethod
159 159 def __repr__(self):
160 160 typename = '<unknown 0x%02x>' % self.typeid
161 161 for name, value in FRAME_TYPES.iteritems():
162 162 if value == self.typeid:
163 163 typename = name
164 164 break
165 165
166 166 return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
167 167 'type=%s; flags=%s)' % (
168 168 len(self.payload), self.requestid, self.streamid,
169 169 humanflags(STREAM_FLAGS, self.streamflags), typename,
170 170 humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
171 171
172 172 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
173 173 """Assemble a frame into a byte array."""
174 174 # TODO assert size of payload.
175 175 frame = bytearray(FRAME_HEADER_SIZE + len(payload))
176 176
177 177 # 24 bits length
178 178 # 16 bits request id
179 179 # 8 bits stream id
180 180 # 8 bits stream flags
181 181 # 4 bits type
182 182 # 4 bits flags
183 183
184 184 l = struct.pack(r'<I', len(payload))
185 185 frame[0:3] = l[0:3]
186 186 struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
187 187 frame[7] = (typeid << 4) | flags
188 188 frame[8:] = payload
189 189
190 190 return frame
191 191
192 192 def makeframefromhumanstring(s):
193 193 """Create a frame from a human readable string
194 194
195 195 Strings have the form:
196 196
197 197 <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
198 198
199 199 This can be used by user-facing applications and tests for creating
200 200 frames easily without having to type out a bunch of constants.
201 201
202 202 Request ID and stream IDs are integers.
203 203
204 204 Stream flags, frame type, and flags can be specified by integer or
205 205 named constant.
206 206
207 207 Flags can be delimited by `|` to bitwise OR them together.
208 208
209 209 If the payload begins with ``cbor:``, the following string will be
210 210 evaluated as Python literal and the resulting object will be fed into
211 211 a CBOR encoder. Otherwise, the payload is interpreted as a Python
212 212 byte string literal.
213 213 """
214 214 fields = s.split(b' ', 5)
215 215 requestid, streamid, streamflags, frametype, frameflags, payload = fields
216 216
217 217 requestid = int(requestid)
218 218 streamid = int(streamid)
219 219
220 220 finalstreamflags = 0
221 221 for flag in streamflags.split(b'|'):
222 222 if flag in STREAM_FLAGS:
223 223 finalstreamflags |= STREAM_FLAGS[flag]
224 224 else:
225 225 finalstreamflags |= int(flag)
226 226
227 227 if frametype in FRAME_TYPES:
228 228 frametype = FRAME_TYPES[frametype]
229 229 else:
230 230 frametype = int(frametype)
231 231
232 232 finalflags = 0
233 233 validflags = FRAME_TYPE_FLAGS[frametype]
234 234 for flag in frameflags.split(b'|'):
235 235 if flag in validflags:
236 236 finalflags |= validflags[flag]
237 237 else:
238 238 finalflags |= int(flag)
239 239
240 240 if payload.startswith(b'cbor:'):
241 241 payload = b''.join(cborutil.streamencode(
242 242 stringutil.evalpythonliteral(payload[5:])))
243 243
244 244 else:
245 245 payload = stringutil.unescapestr(payload)
246 246
247 247 return makeframe(requestid=requestid, streamid=streamid,
248 248 streamflags=finalstreamflags, typeid=frametype,
249 249 flags=finalflags, payload=payload)
250 250
251 251 def parseheader(data):
252 252 """Parse a unified framing protocol frame header from a buffer.
253 253
254 254 The header is expected to be in the buffer at offset 0 and the
255 255 buffer is expected to be large enough to hold a full header.
256 256 """
257 257 # 24 bits payload length (little endian)
258 258 # 16 bits request ID
259 259 # 8 bits stream ID
260 260 # 8 bits stream flags
261 261 # 4 bits frame type
262 262 # 4 bits frame flags
263 263 # ... payload
264 264 framelength = data[0] + 256 * data[1] + 16384 * data[2]
265 265 requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
266 266 typeflags = data[7]
267 267
268 268 frametype = (typeflags & 0xf0) >> 4
269 269 frameflags = typeflags & 0x0f
270 270
271 271 return frameheader(framelength, requestid, streamid, streamflags,
272 272 frametype, frameflags)
273 273
274 274 def readframe(fh):
275 275 """Read a unified framing protocol frame from a file object.
276 276
277 277 Returns a 3-tuple of (type, flags, payload) for the decoded frame or
278 278 None if no frame is available. May raise if a malformed frame is
279 279 seen.
280 280 """
281 281 header = bytearray(FRAME_HEADER_SIZE)
282 282
283 283 readcount = fh.readinto(header)
284 284
285 285 if readcount == 0:
286 286 return None
287 287
288 288 if readcount != FRAME_HEADER_SIZE:
289 289 raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
290 290 (readcount, header))
291 291
292 292 h = parseheader(header)
293 293
294 294 payload = fh.read(h.length)
295 295 if len(payload) != h.length:
296 296 raise error.Abort(_('frame length error: expected %d; got %d') %
297 297 (h.length, len(payload)))
298 298
299 299 return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
300 300 payload)
301 301
302 302 def createcommandframes(stream, requestid, cmd, args, datafh=None,
303 303 maxframesize=DEFAULT_MAX_FRAME_SIZE,
304 304 redirect=None):
305 305 """Create frames necessary to transmit a request to run a command.
306 306
307 307 This is a generator of bytearrays. Each item represents a frame
308 308 ready to be sent over the wire to a peer.
309 309 """
310 310 data = {b'name': cmd}
311 311 if args:
312 312 data[b'args'] = args
313 313
314 314 if redirect:
315 315 data[b'redirect'] = redirect
316 316
317 317 data = b''.join(cborutil.streamencode(data))
318 318
319 319 offset = 0
320 320
321 321 while True:
322 322 flags = 0
323 323
324 324 # Must set new or continuation flag.
325 325 if not offset:
326 326 flags |= FLAG_COMMAND_REQUEST_NEW
327 327 else:
328 328 flags |= FLAG_COMMAND_REQUEST_CONTINUATION
329 329
330 330 # Data frames is set on all frames.
331 331 if datafh:
332 332 flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
333 333
334 334 payload = data[offset:offset + maxframesize]
335 335 offset += len(payload)
336 336
337 337 if len(payload) == maxframesize and offset < len(data):
338 338 flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
339 339
340 340 yield stream.makeframe(requestid=requestid,
341 341 typeid=FRAME_TYPE_COMMAND_REQUEST,
342 342 flags=flags,
343 343 payload=payload)
344 344
345 345 if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
346 346 break
347 347
348 348 if datafh:
349 349 while True:
350 350 data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
351 351
352 352 done = False
353 353 if len(data) == DEFAULT_MAX_FRAME_SIZE:
354 354 flags = FLAG_COMMAND_DATA_CONTINUATION
355 355 else:
356 356 flags = FLAG_COMMAND_DATA_EOS
357 357 assert datafh.read(1) == b''
358 358 done = True
359 359
360 360 yield stream.makeframe(requestid=requestid,
361 361 typeid=FRAME_TYPE_COMMAND_DATA,
362 362 flags=flags,
363 363 payload=data)
364 364
365 365 if done:
366 366 break
367 367
368 368 def createcommandresponseframesfrombytes(stream, requestid, data,
369 369 maxframesize=DEFAULT_MAX_FRAME_SIZE):
370 370 """Create a raw frame to send a bytes response from static bytes input.
371 371
372 372 Returns a generator of bytearrays.
373 373 """
374 374 # Automatically send the overall CBOR response map.
375 375 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
376 376 if len(overall) > maxframesize:
377 377 raise error.ProgrammingError('not yet implemented')
378 378
379 379 # Simple case where we can fit the full response in a single frame.
380 380 if len(overall) + len(data) <= maxframesize:
381 381 flags = FLAG_COMMAND_RESPONSE_EOS
382 382 yield stream.makeframe(requestid=requestid,
383 383 typeid=FRAME_TYPE_COMMAND_RESPONSE,
384 384 flags=flags,
385 385 payload=overall + data)
386 386 return
387 387
388 388 # It's easier to send the overall CBOR map in its own frame than to track
389 389 # offsets.
390 390 yield stream.makeframe(requestid=requestid,
391 391 typeid=FRAME_TYPE_COMMAND_RESPONSE,
392 392 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
393 393 payload=overall)
394 394
395 395 offset = 0
396 396 while True:
397 397 chunk = data[offset:offset + maxframesize]
398 398 offset += len(chunk)
399 399 done = offset == len(data)
400 400
401 401 if done:
402 402 flags = FLAG_COMMAND_RESPONSE_EOS
403 403 else:
404 404 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
405 405
406 406 yield stream.makeframe(requestid=requestid,
407 407 typeid=FRAME_TYPE_COMMAND_RESPONSE,
408 408 flags=flags,
409 409 payload=chunk)
410 410
411 411 if done:
412 412 break
413 413
414 414 def createbytesresponseframesfromgen(stream, requestid, gen,
415 415 maxframesize=DEFAULT_MAX_FRAME_SIZE):
416 416 """Generator of frames from a generator of byte chunks.
417 417
418 418 This assumes that another frame will follow whatever this emits. i.e.
419 419 this always emits the continuation flag and never emits the end-of-stream
420 420 flag.
421 421 """
422 422 cb = util.chunkbuffer(gen)
423 423 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
424 424
425 425 while True:
426 426 chunk = cb.read(maxframesize)
427 427 if not chunk:
428 428 break
429 429
430 430 yield stream.makeframe(requestid=requestid,
431 431 typeid=FRAME_TYPE_COMMAND_RESPONSE,
432 432 flags=flags,
433 433 payload=chunk)
434 434
435 435 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
436 436
437 437 def createcommandresponseokframe(stream, requestid):
438 438 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
439 439
440 440 return stream.makeframe(requestid=requestid,
441 441 typeid=FRAME_TYPE_COMMAND_RESPONSE,
442 442 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
443 443 payload=overall)
444 444
445 445 def createcommandresponseeosframe(stream, requestid):
446 446 """Create an empty payload frame representing command end-of-stream."""
447 447 return stream.makeframe(requestid=requestid,
448 448 typeid=FRAME_TYPE_COMMAND_RESPONSE,
449 449 flags=FLAG_COMMAND_RESPONSE_EOS,
450 450 payload=b'')
451 451
452 452 def createalternatelocationresponseframe(stream, requestid, location):
453 453 data = {
454 454 b'status': b'redirect',
455 455 b'location': {
456 456 b'url': location.url,
457 457 b'mediatype': location.mediatype,
458 458 }
459 459 }
460 460
461 461 for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
462 462 r'servercadercerts'):
463 463 value = getattr(location, a)
464 464 if value is not None:
465 465 data[b'location'][pycompat.bytestr(a)] = value
466 466
467 467 return stream.makeframe(requestid=requestid,
468 468 typeid=FRAME_TYPE_COMMAND_RESPONSE,
469 469 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
470 470 payload=b''.join(cborutil.streamencode(data)))
471 471
472 472 def createcommanderrorresponse(stream, requestid, message, args=None):
473 473 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
474 474 # formatting works consistently?
475 475 m = {
476 476 b'status': b'error',
477 477 b'error': {
478 478 b'message': message,
479 479 }
480 480 }
481 481
482 482 if args:
483 483 m[b'error'][b'args'] = args
484 484
485 485 overall = b''.join(cborutil.streamencode(m))
486 486
487 487 yield stream.makeframe(requestid=requestid,
488 488 typeid=FRAME_TYPE_COMMAND_RESPONSE,
489 489 flags=FLAG_COMMAND_RESPONSE_EOS,
490 490 payload=overall)
491 491
492 492 def createerrorframe(stream, requestid, msg, errtype):
493 493 # TODO properly handle frame size limits.
494 494 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
495 495
496 496 payload = b''.join(cborutil.streamencode({
497 497 b'type': errtype,
498 498 b'message': [{b'msg': msg}],
499 499 }))
500 500
501 501 yield stream.makeframe(requestid=requestid,
502 502 typeid=FRAME_TYPE_ERROR_RESPONSE,
503 503 flags=0,
504 504 payload=payload)
505 505
506 506 def createtextoutputframe(stream, requestid, atoms,
507 507 maxframesize=DEFAULT_MAX_FRAME_SIZE):
508 508 """Create a text output frame to render text to people.
509 509
510 510 ``atoms`` is a 3-tuple of (formatting string, args, labels).
511 511
512 512 The formatting string contains ``%s`` tokens to be replaced by the
513 513 corresponding indexed entry in ``args``. ``labels`` is an iterable of
514 514 formatters to be applied at rendering time. In terms of the ``ui``
515 515 class, each atom corresponds to a ``ui.write()``.
516 516 """
517 517 atomdicts = []
518 518
519 519 for (formatting, args, labels) in atoms:
520 520 # TODO look for localstr, other types here?
521 521
522 522 if not isinstance(formatting, bytes):
523 523 raise ValueError('must use bytes formatting strings')
524 524 for arg in args:
525 525 if not isinstance(arg, bytes):
526 526 raise ValueError('must use bytes for arguments')
527 527 for label in labels:
528 528 if not isinstance(label, bytes):
529 529 raise ValueError('must use bytes for labels')
530 530
531 531 # Formatting string must be ASCII.
532 532 formatting = formatting.decode(r'ascii', r'replace').encode(r'ascii')
533 533
534 534 # Arguments must be UTF-8.
535 535 args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
536 536
537 537 # Labels must be ASCII.
538 538 labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
539 539 for l in labels]
540 540
541 541 atom = {b'msg': formatting}
542 542 if args:
543 543 atom[b'args'] = args
544 544 if labels:
545 545 atom[b'labels'] = labels
546 546
547 547 atomdicts.append(atom)
548 548
549 549 payload = b''.join(cborutil.streamencode(atomdicts))
550 550
551 551 if len(payload) > maxframesize:
552 552 raise ValueError('cannot encode data in a single frame')
553 553
554 554 yield stream.makeframe(requestid=requestid,
555 555 typeid=FRAME_TYPE_TEXT_OUTPUT,
556 556 flags=0,
557 557 payload=payload)
558 558
559 559 class bufferingcommandresponseemitter(object):
560 560 """Helper object to emit command response frames intelligently.
561 561
562 562 Raw command response data is likely emitted in chunks much smaller
563 563 than what can fit in a single frame. This class exists to buffer
564 564 chunks until enough data is available to fit in a single frame.
565 565
566 566 TODO we'll need something like this when compression is supported.
567 567 So it might make sense to implement this functionality at the stream
568 568 level.
569 569 """
570 570 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
571 571 self._stream = stream
572 572 self._requestid = requestid
573 573 self._maxsize = maxframesize
574 574 self._chunks = []
575 575 self._chunkssize = 0
576 576
577 577 def send(self, data):
578 578 """Send new data for emission.
579 579
580 580 Is a generator of new frames that were derived from the new input.
581 581
582 582 If the special input ``None`` is received, flushes all buffered
583 583 data to frames.
584 584 """
585 585
586 586 if data is None:
587 587 for frame in self._flush():
588 588 yield frame
589 589 return
590 590
591 591 # There is a ton of potential to do more complicated things here.
592 592 # Our immediate goal is to coalesce small chunks into big frames,
593 593 # not achieve the fewest number of frames possible. So we go with
594 594 # a simple implementation:
595 595 #
596 596 # * If a chunk is too large for a frame, we flush and emit frames
597 597 # for the new chunk.
598 598 # * If a chunk can be buffered without total buffered size limits
599 599 # being exceeded, we do that.
600 600 # * If a chunk causes us to go over our buffering limit, we flush
601 601 # and then buffer the new chunk.
602 602
603 603 if len(data) > self._maxsize:
604 604 for frame in self._flush():
605 605 yield frame
606 606
607 607 # Now emit frames for the big chunk.
608 608 offset = 0
609 609 while True:
610 610 chunk = data[offset:offset + self._maxsize]
611 611 offset += len(chunk)
612 612
613 613 yield self._stream.makeframe(
614 614 self._requestid,
615 615 typeid=FRAME_TYPE_COMMAND_RESPONSE,
616 616 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
617 617 payload=chunk)
618 618
619 619 if offset == len(data):
620 620 return
621 621
622 622 # If we don't have enough to constitute a full frame, buffer and
623 623 # return.
624 624 if len(data) + self._chunkssize < self._maxsize:
625 625 self._chunks.append(data)
626 626 self._chunkssize += len(data)
627 627 return
628 628
629 629 # Else flush what we have and buffer the new chunk. We could do
630 630 # something more intelligent here, like break the chunk. Let's
631 631 # keep things simple for now.
632 632 for frame in self._flush():
633 633 yield frame
634 634
635 635 self._chunks.append(data)
636 636 self._chunkssize = len(data)
637 637
638 638 def _flush(self):
639 639 payload = b''.join(self._chunks)
640 640 assert len(payload) <= self._maxsize
641 641
642 642 self._chunks[:] = []
643 643 self._chunkssize = 0
644 644
645 645 yield self._stream.makeframe(
646 646 self._requestid,
647 647 typeid=FRAME_TYPE_COMMAND_RESPONSE,
648 648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
649 649 payload=payload)
650 650
651 651 class stream(object):
652 652 """Represents a logical unidirectional series of frames."""
653 653
654 654 def __init__(self, streamid, active=False):
655 655 self.streamid = streamid
656 656 self._active = active
657 657
658 658 def makeframe(self, requestid, typeid, flags, payload):
659 659 """Create a frame to be sent out over this stream.
660 660
661 661 Only returns the frame instance. Does not actually send it.
662 662 """
663 663 streamflags = 0
664 664 if not self._active:
665 665 streamflags |= STREAM_FLAG_BEGIN_STREAM
666 666 self._active = True
667 667
668 668 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
669 669 payload)
670 670
671 671 def setdecoder(self, name, extraobjs):
672 672 """Set the decoder for this stream.
673 673
674 674 Receives the stream profile name and any additional CBOR objects
675 675 decoded from the stream encoding settings frame payloads.
676 676 """
677 677
678 678 def ensureserverstream(stream):
679 679 if stream.streamid % 2:
680 680 raise error.ProgrammingError('server should only write to even '
681 681 'numbered streams; %d is not even' %
682 682 stream.streamid)
683 683
684 684 DEFAULT_PROTOCOL_SETTINGS = {
685 685 'contentencodings': [b'identity'],
686 686 }
687 687
688 688 class serverreactor(object):
689 689 """Holds state of a server handling frame-based protocol requests.
690 690
691 691 This class is the "brain" of the unified frame-based protocol server
692 692 component. While the protocol is stateless from the perspective of
693 693 requests/commands, something needs to track which frames have been
694 694 received, what frames to expect, etc. This class is that thing.
695 695
696 696 Instances are modeled as a state machine of sorts. Instances are also
697 697 reactionary to external events. The point of this class is to encapsulate
698 698 the state of the connection and the exchange of frames, not to perform
699 699 work. Instead, callers tell this class when something occurs, like a
700 700 frame arriving. If that activity is worthy of a follow-up action (say
701 701 *run a command*), the return value of that handler will say so.
702 702
703 703 I/O and CPU intensive operations are purposefully delegated outside of
704 704 this class.
705 705
706 706 Consumers are expected to tell instances when events occur. They do so by
707 707 calling the various ``on*`` methods. These methods return a 2-tuple
708 708 describing any follow-up action(s) to take. The first element is the
709 709 name of an action to perform. The second is a data structure (usually
710 710 a dict) specific to that action that contains more information. e.g.
711 711 if the server wants to send frames back to the client, the data structure
712 712 will contain a reference to those frames.
713 713
714 714 Valid actions that consumers can be instructed to take are:
715 715
716 716 sendframes
717 717 Indicates that frames should be sent to the client. The ``framegen``
718 718 key contains a generator of frames that should be sent. The server
719 719 assumes that all frames are sent to the client.
720 720
721 721 error
722 722 Indicates that an error occurred. Consumer should probably abort.
723 723
724 724 runcommand
725 725 Indicates that the consumer should run a wire protocol command. Details
726 726 of the command to run are given in the data structure.
727 727
728 728 wantframe
729 729 Indicates that nothing of interest happened and the server is waiting on
730 730 more frames from the client before anything interesting can be done.
731 731
732 732 noop
733 733 Indicates no additional action is required.
734 734
735 735 Known Issues
736 736 ------------
737 737
738 738 There are no limits to the number of partially received commands or their
739 739 size. A malicious client could stream command request data and exhaust the
740 740 server's memory.
741 741
742 742 Partially received commands are not acted upon when end of input is
743 743 reached. Should the server error if it receives a partial request?
744 744 Should the client send a message to abort a partially transmitted request
745 745 to facilitate graceful shutdown?
746 746
747 747 Active requests that haven't been responded to aren't tracked. This means
748 748 that if we receive a command and instruct its dispatch, another command
749 749 with its request ID can come in over the wire and there will be a race
750 750 between who responds to what.
751 751 """
752 752
753 def __init__(self, deferoutput=False):
753 def __init__(self, ui, deferoutput=False):
754 754 """Construct a new server reactor.
755 755
756 756 ``deferoutput`` can be used to indicate that no output frames should be
757 757 instructed to be sent until input has been exhausted. In this mode,
758 758 events that would normally generate output frames (such as a command
759 759 response being ready) will instead defer instructing the consumer to
760 760 send those frames. This is useful for half-duplex transports where the
761 761 sender cannot receive until all data has been transmitted.
762 762 """
763 self._ui = ui
763 764 self._deferoutput = deferoutput
764 765 self._state = 'initial'
765 766 self._nextoutgoingstreamid = 2
766 767 self._bufferedframegens = []
767 768 # stream id -> stream instance for all active streams from the client.
768 769 self._incomingstreams = {}
769 770 self._outgoingstreams = {}
770 771 # request id -> dict of commands that are actively being received.
771 772 self._receivingcommands = {}
772 773 # Request IDs that have been received and are actively being processed.
773 774 # Once all output for a request has been sent, it is removed from this
774 775 # set.
775 776 self._activecommands = set()
776 777
777 778 self._protocolsettingsdecoder = None
778 779
779 780 # Sender protocol settings are optional. Set implied default values.
780 781 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
781 782
782 783 def onframerecv(self, frame):
783 784 """Process a frame that has been received off the wire.
784 785
785 786 Returns a dict with an ``action`` key that details what action,
786 787 if any, the consumer should take next.
787 788 """
788 789 if not frame.streamid % 2:
789 790 self._state = 'errored'
790 791 return self._makeerrorresult(
791 792 _('received frame with even numbered stream ID: %d') %
792 793 frame.streamid)
793 794
794 795 if frame.streamid not in self._incomingstreams:
795 796 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
796 797 self._state = 'errored'
797 798 return self._makeerrorresult(
798 799 _('received frame on unknown inactive stream without '
799 800 'beginning of stream flag set'))
800 801
801 802 self._incomingstreams[frame.streamid] = stream(frame.streamid)
802 803
803 804 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
804 805 # TODO handle decoding frames
805 806 self._state = 'errored'
806 807 raise error.ProgrammingError('support for decoding stream payloads '
807 808 'not yet implemented')
808 809
809 810 if frame.streamflags & STREAM_FLAG_END_STREAM:
810 811 del self._incomingstreams[frame.streamid]
811 812
812 813 handlers = {
813 814 'initial': self._onframeinitial,
814 815 'protocol-settings-receiving': self._onframeprotocolsettings,
815 816 'idle': self._onframeidle,
816 817 'command-receiving': self._onframecommandreceiving,
817 818 'errored': self._onframeerrored,
818 819 }
819 820
820 821 meth = handlers.get(self._state)
821 822 if not meth:
822 823 raise error.ProgrammingError('unhandled state: %s' % self._state)
823 824
824 825 return meth(frame)
825 826
826 827 def oncommandresponseready(self, stream, requestid, data):
827 828 """Signal that a bytes response is ready to be sent to the client.
828 829
829 830 The raw bytes response is passed as an argument.
830 831 """
831 832 ensureserverstream(stream)
832 833
833 834 def sendframes():
834 835 for frame in createcommandresponseframesfrombytes(stream, requestid,
835 836 data):
836 837 yield frame
837 838
838 839 self._activecommands.remove(requestid)
839 840
840 841 result = sendframes()
841 842
842 843 if self._deferoutput:
843 844 self._bufferedframegens.append(result)
844 845 return 'noop', {}
845 846 else:
846 847 return 'sendframes', {
847 848 'framegen': result,
848 849 }
849 850
850 851 def oncommandresponsereadyobjects(self, stream, requestid, objs):
851 852 """Signal that objects are ready to be sent to the client.
852 853
853 854 ``objs`` is an iterable of objects (typically a generator) that will
854 855 be encoded via CBOR and added to frames, which will be sent to the
855 856 client.
856 857 """
857 858 ensureserverstream(stream)
858 859
859 860 # We need to take care over exception handling. Uncaught exceptions
860 861 # when generating frames could lead to premature end of the frame
861 862 # stream and the possibility of the server or client process getting
862 863 # in a bad state.
863 864 #
864 865 # Keep in mind that if ``objs`` is a generator, advancing it could
865 866 # raise exceptions that originated in e.g. wire protocol command
866 867 # functions. That is why we differentiate between exceptions raised
867 868 # when iterating versus other exceptions that occur.
868 869 #
869 870 # In all cases, when the function finishes, the request is fully
870 871 # handled and no new frames for it should be seen.
871 872
872 873 def sendframes():
873 874 emitted = False
874 875 alternatelocationsent = False
875 876 emitter = bufferingcommandresponseemitter(stream, requestid)
876 877 while True:
877 878 try:
878 879 o = next(objs)
879 880 except StopIteration:
880 881 for frame in emitter.send(None):
881 882 yield frame
882 883
883 884 if emitted:
884 885 yield createcommandresponseeosframe(stream, requestid)
885 886 break
886 887
887 888 except error.WireprotoCommandError as e:
888 889 for frame in createcommanderrorresponse(
889 890 stream, requestid, e.message, e.messageargs):
890 891 yield frame
891 892 break
892 893
893 894 except Exception as e:
894 895 for frame in createerrorframe(
895 896 stream, requestid, '%s' % stringutil.forcebytestr(e),
896 897 errtype='server'):
897 898
898 899 yield frame
899 900
900 901 break
901 902
902 903 try:
903 904 # Alternate location responses can only be the first and
904 905 # only object in the output stream.
905 906 if isinstance(o, wireprototypes.alternatelocationresponse):
906 907 if emitted:
907 908 raise error.ProgrammingError(
908 909 'alternatelocationresponse seen after initial '
909 910 'output object')
910 911
911 912 yield createalternatelocationresponseframe(
912 913 stream, requestid, o)
913 914
914 915 alternatelocationsent = True
915 916 emitted = True
916 917 continue
917 918
918 919 if alternatelocationsent:
919 920 raise error.ProgrammingError(
920 921 'object follows alternatelocationresponse')
921 922
922 923 if not emitted:
923 924 yield createcommandresponseokframe(stream, requestid)
924 925 emitted = True
925 926
926 927 # Objects emitted by command functions can be serializable
927 928 # data structures or special types.
928 929 # TODO consider extracting the content normalization to a
929 930 # standalone function, as it may be useful for e.g. cachers.
930 931
931 932 # A pre-encoded object is sent directly to the emitter.
932 933 if isinstance(o, wireprototypes.encodedresponse):
933 934 for frame in emitter.send(o.data):
934 935 yield frame
935 936
936 937 # A regular object is CBOR encoded.
937 938 else:
938 939 for chunk in cborutil.streamencode(o):
939 940 for frame in emitter.send(chunk):
940 941 yield frame
941 942
942 943 except Exception as e:
943 944 for frame in createerrorframe(stream, requestid,
944 945 '%s' % e,
945 946 errtype='server'):
946 947 yield frame
947 948
948 949 break
949 950
950 951 self._activecommands.remove(requestid)
951 952
952 953 return self._handlesendframes(sendframes())
953 954
954 955 def oninputeof(self):
955 956 """Signals that end of input has been received.
956 957
957 958 No more frames will be received. All pending activity should be
958 959 completed.
959 960 """
960 961 # TODO should we do anything about in-flight commands?
961 962
962 963 if not self._deferoutput or not self._bufferedframegens:
963 964 return 'noop', {}
964 965
965 966 # If we buffered all our responses, emit those.
966 967 def makegen():
967 968 for gen in self._bufferedframegens:
968 969 for frame in gen:
969 970 yield frame
970 971
971 972 return 'sendframes', {
972 973 'framegen': makegen(),
973 974 }
974 975
975 976 def _handlesendframes(self, framegen):
976 977 if self._deferoutput:
977 978 self._bufferedframegens.append(framegen)
978 979 return 'noop', {}
979 980 else:
980 981 return 'sendframes', {
981 982 'framegen': framegen,
982 983 }
983 984
984 985 def onservererror(self, stream, requestid, msg):
985 986 ensureserverstream(stream)
986 987
987 988 def sendframes():
988 989 for frame in createerrorframe(stream, requestid, msg,
989 990 errtype='server'):
990 991 yield frame
991 992
992 993 self._activecommands.remove(requestid)
993 994
994 995 return self._handlesendframes(sendframes())
995 996
996 997 def oncommanderror(self, stream, requestid, message, args=None):
997 998 """Called when a command encountered an error before sending output."""
998 999 ensureserverstream(stream)
999 1000
1000 1001 def sendframes():
1001 1002 for frame in createcommanderrorresponse(stream, requestid, message,
1002 1003 args):
1003 1004 yield frame
1004 1005
1005 1006 self._activecommands.remove(requestid)
1006 1007
1007 1008 return self._handlesendframes(sendframes())
1008 1009
1009 1010 def makeoutputstream(self):
1010 1011 """Create a stream to be used for sending data to the client."""
1011 1012 streamid = self._nextoutgoingstreamid
1012 1013 self._nextoutgoingstreamid += 2
1013 1014
1014 1015 s = stream(streamid)
1015 1016 self._outgoingstreams[streamid] = s
1016 1017
1017 1018 return s
1018 1019
1019 1020 def _makeerrorresult(self, msg):
1020 1021 return 'error', {
1021 1022 'message': msg,
1022 1023 }
1023 1024
1024 1025 def _makeruncommandresult(self, requestid):
1025 1026 entry = self._receivingcommands[requestid]
1026 1027
1027 1028 if not entry['requestdone']:
1028 1029 self._state = 'errored'
1029 1030 raise error.ProgrammingError('should not be called without '
1030 1031 'requestdone set')
1031 1032
1032 1033 del self._receivingcommands[requestid]
1033 1034
1034 1035 if self._receivingcommands:
1035 1036 self._state = 'command-receiving'
1036 1037 else:
1037 1038 self._state = 'idle'
1038 1039
1039 1040 # Decode the payloads as CBOR.
1040 1041 entry['payload'].seek(0)
1041 1042 request = cborutil.decodeall(entry['payload'].getvalue())[0]
1042 1043
1043 1044 if b'name' not in request:
1044 1045 self._state = 'errored'
1045 1046 return self._makeerrorresult(
1046 1047 _('command request missing "name" field'))
1047 1048
1048 1049 if b'args' not in request:
1049 1050 request[b'args'] = {}
1050 1051
1051 1052 assert requestid not in self._activecommands
1052 1053 self._activecommands.add(requestid)
1053 1054
1054 1055 return 'runcommand', {
1055 1056 'requestid': requestid,
1056 1057 'command': request[b'name'],
1057 1058 'args': request[b'args'],
1058 1059 'redirect': request.get(b'redirect'),
1059 1060 'data': entry['data'].getvalue() if entry['data'] else None,
1060 1061 }
1061 1062
1062 1063 def _makewantframeresult(self):
1063 1064 return 'wantframe', {
1064 1065 'state': self._state,
1065 1066 }
1066 1067
1067 1068 def _validatecommandrequestframe(self, frame):
1068 1069 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1069 1070 continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
1070 1071
1071 1072 if new and continuation:
1072 1073 self._state = 'errored'
1073 1074 return self._makeerrorresult(
1074 1075 _('received command request frame with both new and '
1075 1076 'continuation flags set'))
1076 1077
1077 1078 if not new and not continuation:
1078 1079 self._state = 'errored'
1079 1080 return self._makeerrorresult(
1080 1081 _('received command request frame with neither new nor '
1081 1082 'continuation flags set'))
1082 1083
1083 1084 def _onframeinitial(self, frame):
1084 1085 # Called when we receive a frame when in the "initial" state.
1085 1086 if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1086 1087 self._state = 'protocol-settings-receiving'
1087 1088 self._protocolsettingsdecoder = cborutil.bufferingdecoder()
1088 1089 return self._onframeprotocolsettings(frame)
1089 1090
1090 1091 elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1091 1092 self._state = 'idle'
1092 1093 return self._onframeidle(frame)
1093 1094
1094 1095 else:
1095 1096 self._state = 'errored'
1096 1097 return self._makeerrorresult(
1097 1098 _('expected sender protocol settings or command request '
1098 1099 'frame; got %d') % frame.typeid)
1099 1100
1100 1101 def _onframeprotocolsettings(self, frame):
1101 1102 assert self._state == 'protocol-settings-receiving'
1102 1103 assert self._protocolsettingsdecoder is not None
1103 1104
1104 1105 if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
1105 1106 self._state = 'errored'
1106 1107 return self._makeerrorresult(
1107 1108 _('expected sender protocol settings frame; got %d') %
1108 1109 frame.typeid)
1109 1110
1110 1111 more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
1111 1112 eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
1112 1113
1113 1114 if more and eos:
1114 1115 self._state = 'errored'
1115 1116 return self._makeerrorresult(
1116 1117 _('sender protocol settings frame cannot have both '
1117 1118 'continuation and end of stream flags set'))
1118 1119
1119 1120 if not more and not eos:
1120 1121 self._state = 'errored'
1121 1122 return self._makeerrorresult(
1122 1123 _('sender protocol settings frame must have continuation or '
1123 1124 'end of stream flag set'))
1124 1125
1125 1126 # TODO establish limits for maximum amount of data that can be
1126 1127 # buffered.
1127 1128 try:
1128 1129 self._protocolsettingsdecoder.decode(frame.payload)
1129 1130 except Exception as e:
1130 1131 self._state = 'errored'
1131 1132 return self._makeerrorresult(
1132 1133 _('error decoding CBOR from sender protocol settings frame: %s')
1133 1134 % stringutil.forcebytestr(e))
1134 1135
1135 1136 if more:
1136 1137 return self._makewantframeresult()
1137 1138
1138 1139 assert eos
1139 1140
1140 1141 decoded = self._protocolsettingsdecoder.getavailable()
1141 1142 self._protocolsettingsdecoder = None
1142 1143
1143 1144 if not decoded:
1144 1145 self._state = 'errored'
1145 1146 return self._makeerrorresult(
1146 1147 _('sender protocol settings frame did not contain CBOR data'))
1147 1148 elif len(decoded) > 1:
1148 1149 self._state = 'errored'
1149 1150 return self._makeerrorresult(
1150 1151 _('sender protocol settings frame contained multiple CBOR '
1151 1152 'values'))
1152 1153
1153 1154 d = decoded[0]
1154 1155
1155 1156 if b'contentencodings' in d:
1156 1157 self._sendersettings['contentencodings'] = d[b'contentencodings']
1157 1158
1158 1159 self._state = 'idle'
1159 1160
1160 1161 return self._makewantframeresult()
1161 1162
1162 1163 def _onframeidle(self, frame):
1163 1164 # The only frame type that should be received in this state is a
1164 1165 # command request.
1165 1166 if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
1166 1167 self._state = 'errored'
1167 1168 return self._makeerrorresult(
1168 1169 _('expected command request frame; got %d') % frame.typeid)
1169 1170
1170 1171 res = self._validatecommandrequestframe(frame)
1171 1172 if res:
1172 1173 return res
1173 1174
1174 1175 if frame.requestid in self._receivingcommands:
1175 1176 self._state = 'errored'
1176 1177 return self._makeerrorresult(
1177 1178 _('request with ID %d already received') % frame.requestid)
1178 1179
1179 1180 if frame.requestid in self._activecommands:
1180 1181 self._state = 'errored'
1181 1182 return self._makeerrorresult(
1182 1183 _('request with ID %d is already active') % frame.requestid)
1183 1184
1184 1185 new = frame.flags & FLAG_COMMAND_REQUEST_NEW
1185 1186 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1186 1187 expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
1187 1188
1188 1189 if not new:
1189 1190 self._state = 'errored'
1190 1191 return self._makeerrorresult(
1191 1192 _('received command request frame without new flag set'))
1192 1193
1193 1194 payload = util.bytesio()
1194 1195 payload.write(frame.payload)
1195 1196
1196 1197 self._receivingcommands[frame.requestid] = {
1197 1198 'payload': payload,
1198 1199 'data': None,
1199 1200 'requestdone': not moreframes,
1200 1201 'expectingdata': bool(expectingdata),
1201 1202 }
1202 1203
1203 1204 # This is the final frame for this request. Dispatch it.
1204 1205 if not moreframes and not expectingdata:
1205 1206 return self._makeruncommandresult(frame.requestid)
1206 1207
1207 1208 assert moreframes or expectingdata
1208 1209 self._state = 'command-receiving'
1209 1210 return self._makewantframeresult()
1210 1211
1211 1212 def _onframecommandreceiving(self, frame):
1212 1213 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1213 1214 # Process new command requests as such.
1214 1215 if frame.flags & FLAG_COMMAND_REQUEST_NEW:
1215 1216 return self._onframeidle(frame)
1216 1217
1217 1218 res = self._validatecommandrequestframe(frame)
1218 1219 if res:
1219 1220 return res
1220 1221
1221 1222 # All other frames should be related to a command that is currently
1222 1223 # receiving but is not active.
1223 1224 if frame.requestid in self._activecommands:
1224 1225 self._state = 'errored'
1225 1226 return self._makeerrorresult(
1226 1227 _('received frame for request that is still active: %d') %
1227 1228 frame.requestid)
1228 1229
1229 1230 if frame.requestid not in self._receivingcommands:
1230 1231 self._state = 'errored'
1231 1232 return self._makeerrorresult(
1232 1233 _('received frame for request that is not receiving: %d') %
1233 1234 frame.requestid)
1234 1235
1235 1236 entry = self._receivingcommands[frame.requestid]
1236 1237
1237 1238 if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
1238 1239 moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
1239 1240 expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
1240 1241
1241 1242 if entry['requestdone']:
1242 1243 self._state = 'errored'
1243 1244 return self._makeerrorresult(
1244 1245 _('received command request frame when request frames '
1245 1246 'were supposedly done'))
1246 1247
1247 1248 if expectingdata != entry['expectingdata']:
1248 1249 self._state = 'errored'
1249 1250 return self._makeerrorresult(
1250 1251 _('mismatch between expect data flag and previous frame'))
1251 1252
1252 1253 entry['payload'].write(frame.payload)
1253 1254
1254 1255 if not moreframes:
1255 1256 entry['requestdone'] = True
1256 1257
1257 1258 if not moreframes and not expectingdata:
1258 1259 return self._makeruncommandresult(frame.requestid)
1259 1260
1260 1261 return self._makewantframeresult()
1261 1262
1262 1263 elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
1263 1264 if not entry['expectingdata']:
1264 1265 self._state = 'errored'
1265 1266 return self._makeerrorresult(_(
1266 1267 'received command data frame for request that is not '
1267 1268 'expecting data: %d') % frame.requestid)
1268 1269
1269 1270 if entry['data'] is None:
1270 1271 entry['data'] = util.bytesio()
1271 1272
1272 1273 return self._handlecommanddataframe(frame, entry)
1273 1274 else:
1274 1275 self._state = 'errored'
1275 1276 return self._makeerrorresult(_(
1276 1277 'received unexpected frame type: %d') % frame.typeid)
1277 1278
1278 1279 def _handlecommanddataframe(self, frame, entry):
1279 1280 assert frame.typeid == FRAME_TYPE_COMMAND_DATA
1280 1281
1281 1282 # TODO support streaming data instead of buffering it.
1282 1283 entry['data'].write(frame.payload)
1283 1284
1284 1285 if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
1285 1286 return self._makewantframeresult()
1286 1287 elif frame.flags & FLAG_COMMAND_DATA_EOS:
1287 1288 entry['data'].seek(0)
1288 1289 return self._makeruncommandresult(frame.requestid)
1289 1290 else:
1290 1291 self._state = 'errored'
1291 1292 return self._makeerrorresult(_('command data frame without '
1292 1293 'flags'))
1293 1294
1294 1295 def _onframeerrored(self, frame):
1295 1296 return self._makeerrorresult(_('server already errored'))
1296 1297
1297 1298 class commandrequest(object):
1298 1299 """Represents a request to run a command."""
1299 1300
1300 1301 def __init__(self, requestid, name, args, datafh=None, redirect=None):
1301 1302 self.requestid = requestid
1302 1303 self.name = name
1303 1304 self.args = args
1304 1305 self.datafh = datafh
1305 1306 self.redirect = redirect
1306 1307 self.state = 'pending'
1307 1308
1308 1309 class clientreactor(object):
1309 1310 """Holds state of a client issuing frame-based protocol requests.
1310 1311
1311 1312 This is like ``serverreactor`` but for client-side state.
1312 1313
1313 1314 Each instance is bound to the lifetime of a connection. For persistent
1314 1315 connection transports using e.g. TCP sockets and speaking the raw
1315 1316 framing protocol, there will be a single instance for the lifetime of
1316 1317 the TCP socket. For transports where there are multiple discrete
1317 1318 interactions (say tunneled within in HTTP request), there will be a
1318 1319 separate instance for each distinct interaction.
1319 1320
1320 1321 Consumers are expected to tell instances when events occur by calling
1321 1322 various methods. These methods return a 2-tuple describing any follow-up
1322 1323 action(s) to take. The first element is the name of an action to
1323 1324 perform. The second is a data structure (usually a dict) specific to
1324 1325 that action that contains more information. e.g. if the reactor wants
1325 1326 to send frames to the server, the data structure will contain a reference
1326 1327 to those frames.
1327 1328
1328 1329 Valid actions that consumers can be instructed to take are:
1329 1330
1330 1331 noop
1331 1332 Indicates no additional action is required.
1332 1333
1333 1334 sendframes
1334 1335 Indicates that frames should be sent to the server. The ``framegen``
1335 1336 key contains a generator of frames that should be sent. The reactor
1336 1337 assumes that all frames in this generator are sent to the server.
1337 1338
1338 1339 error
1339 1340 Indicates that an error occurred. The ``message`` key contains an
1340 1341 error message describing the failure.
1341 1342
1342 1343 responsedata
1343 1344 Indicates a response to a previously-issued command was received.
1344 1345
1345 1346 The ``request`` key contains the ``commandrequest`` instance that
1346 1347 represents the request this data is for.
1347 1348
1348 1349 The ``data`` key contains the decoded data from the server.
1349 1350
1350 1351 ``expectmore`` and ``eos`` evaluate to True when more response data
1351 1352 is expected to follow or we're at the end of the response stream,
1352 1353 respectively.
1353 1354 """
1354 def __init__(self, hasmultiplesend=False, buffersends=True):
1355 def __init__(self, ui, hasmultiplesend=False, buffersends=True):
1355 1356 """Create a new instance.
1356 1357
1357 1358 ``hasmultiplesend`` indicates whether multiple sends are supported
1358 1359 by the transport. When True, it is possible to send commands immediately
1359 1360 instead of buffering until the caller signals an intent to finish a
1360 1361 send operation.
1361 1362
1362 1363 ``buffercommands`` indicates whether sends should be buffered until the
1363 1364 last request has been issued.
1364 1365 """
1366 self._ui = ui
1365 1367 self._hasmultiplesend = hasmultiplesend
1366 1368 self._buffersends = buffersends
1367 1369
1368 1370 self._canissuecommands = True
1369 1371 self._cansend = True
1370 1372
1371 1373 self._nextrequestid = 1
1372 1374 # We only support a single outgoing stream for now.
1373 1375 self._outgoingstream = stream(1)
1374 1376 self._pendingrequests = collections.deque()
1375 1377 self._activerequests = {}
1376 1378 self._incomingstreams = {}
1377 1379 self._streamsettingsdecoders = {}
1378 1380
1379 1381 def callcommand(self, name, args, datafh=None, redirect=None):
1380 1382 """Request that a command be executed.
1381 1383
1382 1384 Receives the command name, a dict of arguments to pass to the command,
1383 1385 and an optional file object containing the raw data for the command.
1384 1386
1385 1387 Returns a 3-tuple of (request, action, action data).
1386 1388 """
1387 1389 if not self._canissuecommands:
1388 1390 raise error.ProgrammingError('cannot issue new commands')
1389 1391
1390 1392 requestid = self._nextrequestid
1391 1393 self._nextrequestid += 2
1392 1394
1393 1395 request = commandrequest(requestid, name, args, datafh=datafh,
1394 1396 redirect=redirect)
1395 1397
1396 1398 if self._buffersends:
1397 1399 self._pendingrequests.append(request)
1398 1400 return request, 'noop', {}
1399 1401 else:
1400 1402 if not self._cansend:
1401 1403 raise error.ProgrammingError('sends cannot be performed on '
1402 1404 'this instance')
1403 1405
1404 1406 if not self._hasmultiplesend:
1405 1407 self._cansend = False
1406 1408 self._canissuecommands = False
1407 1409
1408 1410 return request, 'sendframes', {
1409 1411 'framegen': self._makecommandframes(request),
1410 1412 }
1411 1413
1412 1414 def flushcommands(self):
1413 1415 """Request that all queued commands be sent.
1414 1416
1415 1417 If any commands are buffered, this will instruct the caller to send
1416 1418 them over the wire. If no commands are buffered it instructs the client
1417 1419 to no-op.
1418 1420
1419 1421 If instances aren't configured for multiple sends, no new command
1420 1422 requests are allowed after this is called.
1421 1423 """
1422 1424 if not self._pendingrequests:
1423 1425 return 'noop', {}
1424 1426
1425 1427 if not self._cansend:
1426 1428 raise error.ProgrammingError('sends cannot be performed on this '
1427 1429 'instance')
1428 1430
1429 1431 # If the instance only allows sending once, mark that we have fired
1430 1432 # our one shot.
1431 1433 if not self._hasmultiplesend:
1432 1434 self._canissuecommands = False
1433 1435 self._cansend = False
1434 1436
1435 1437 def makeframes():
1436 1438 while self._pendingrequests:
1437 1439 request = self._pendingrequests.popleft()
1438 1440 for frame in self._makecommandframes(request):
1439 1441 yield frame
1440 1442
1441 1443 return 'sendframes', {
1442 1444 'framegen': makeframes(),
1443 1445 }
1444 1446
1445 1447 def _makecommandframes(self, request):
1446 1448 """Emit frames to issue a command request.
1447 1449
1448 1450 As a side-effect, update request accounting to reflect its changed
1449 1451 state.
1450 1452 """
1451 1453 self._activerequests[request.requestid] = request
1452 1454 request.state = 'sending'
1453 1455
1454 1456 res = createcommandframes(self._outgoingstream,
1455 1457 request.requestid,
1456 1458 request.name,
1457 1459 request.args,
1458 1460 datafh=request.datafh,
1459 1461 redirect=request.redirect)
1460 1462
1461 1463 for frame in res:
1462 1464 yield frame
1463 1465
1464 1466 request.state = 'sent'
1465 1467
1466 1468 def onframerecv(self, frame):
1467 1469 """Process a frame that has been received off the wire.
1468 1470
1469 1471 Returns a 2-tuple of (action, meta) describing further action the
1470 1472 caller needs to take as a result of receiving this frame.
1471 1473 """
1472 1474 if frame.streamid % 2:
1473 1475 return 'error', {
1474 1476 'message': (
1475 1477 _('received frame with odd numbered stream ID: %d') %
1476 1478 frame.streamid),
1477 1479 }
1478 1480
1479 1481 if frame.streamid not in self._incomingstreams:
1480 1482 if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
1481 1483 return 'error', {
1482 1484 'message': _('received frame on unknown stream '
1483 1485 'without beginning of stream flag set'),
1484 1486 }
1485 1487
1486 1488 self._incomingstreams[frame.streamid] = stream(frame.streamid)
1487 1489
1488 1490 if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
1489 1491 raise error.ProgrammingError('support for decoding stream '
1490 1492 'payloads not yet implemneted')
1491 1493
1492 1494 if frame.streamflags & STREAM_FLAG_END_STREAM:
1493 1495 del self._incomingstreams[frame.streamid]
1494 1496
1495 1497 if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
1496 1498 return self._onstreamsettingsframe(frame)
1497 1499
1498 1500 if frame.requestid not in self._activerequests:
1499 1501 return 'error', {
1500 1502 'message': (_('received frame for inactive request ID: %d') %
1501 1503 frame.requestid),
1502 1504 }
1503 1505
1504 1506 request = self._activerequests[frame.requestid]
1505 1507 request.state = 'receiving'
1506 1508
1507 1509 handlers = {
1508 1510 FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
1509 1511 FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
1510 1512 }
1511 1513
1512 1514 meth = handlers.get(frame.typeid)
1513 1515 if not meth:
1514 1516 raise error.ProgrammingError('unhandled frame type: %d' %
1515 1517 frame.typeid)
1516 1518
1517 1519 return meth(request, frame)
1518 1520
1519 1521 def _onstreamsettingsframe(self, frame):
1520 1522 assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
1521 1523
1522 1524 more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
1523 1525 eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
1524 1526
1525 1527 if more and eos:
1526 1528 return 'error', {
1527 1529 'message': (_('stream encoding settings frame cannot have both '
1528 1530 'continuation and end of stream flags set')),
1529 1531 }
1530 1532
1531 1533 if not more and not eos:
1532 1534 return 'error', {
1533 1535 'message': _('stream encoding settings frame must have '
1534 1536 'continuation or end of stream flag set'),
1535 1537 }
1536 1538
1537 1539 if frame.streamid not in self._streamsettingsdecoders:
1538 1540 decoder = cborutil.bufferingdecoder()
1539 1541 self._streamsettingsdecoders[frame.streamid] = decoder
1540 1542
1541 1543 decoder = self._streamsettingsdecoders[frame.streamid]
1542 1544
1543 1545 try:
1544 1546 decoder.decode(frame.payload)
1545 1547 except Exception as e:
1546 1548 return 'error', {
1547 1549 'message': (_('error decoding CBOR from stream encoding '
1548 1550 'settings frame: %s') %
1549 1551 stringutil.forcebytestr(e)),
1550 1552 }
1551 1553
1552 1554 if more:
1553 1555 return 'noop', {}
1554 1556
1555 1557 assert eos
1556 1558
1557 1559 decoded = decoder.getavailable()
1558 1560 del self._streamsettingsdecoders[frame.streamid]
1559 1561
1560 1562 if not decoded:
1561 1563 return 'error', {
1562 1564 'message': _('stream encoding settings frame did not contain '
1563 1565 'CBOR data'),
1564 1566 }
1565 1567
1566 1568 try:
1567 1569 self._incomingstreams[frame.streamid].setdecoder(decoded[0],
1568 1570 decoded[1:])
1569 1571 except Exception as e:
1570 1572 return 'error', {
1571 1573 'message': (_('error setting stream decoder: %s') %
1572 1574 stringutil.forcebytestr(e)),
1573 1575 }
1574 1576
1575 1577 return 'noop', {}
1576 1578
1577 1579 def _oncommandresponseframe(self, request, frame):
1578 1580 if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
1579 1581 request.state = 'received'
1580 1582 del self._activerequests[request.requestid]
1581 1583
1582 1584 return 'responsedata', {
1583 1585 'request': request,
1584 1586 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
1585 1587 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
1586 1588 'data': frame.payload,
1587 1589 }
1588 1590
1589 1591 def _onerrorresponseframe(self, request, frame):
1590 1592 request.state = 'errored'
1591 1593 del self._activerequests[request.requestid]
1592 1594
1593 1595 # The payload should be a CBOR map.
1594 1596 m = cborutil.decodeall(frame.payload)[0]
1595 1597
1596 1598 return 'error', {
1597 1599 'request': request,
1598 1600 'type': m['type'],
1599 1601 'message': m['message'],
1600 1602 }
@@ -1,1187 +1,1187
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import contextlib
10 10 import hashlib
11 11
12 12 from .i18n import _
13 13 from .node import (
14 14 hex,
15 15 nullid,
16 16 )
17 17 from . import (
18 18 discovery,
19 19 encoding,
20 20 error,
21 21 narrowspec,
22 22 pycompat,
23 23 wireprotoframing,
24 24 wireprototypes,
25 25 )
26 26 from .utils import (
27 27 cborutil,
28 28 interfaceutil,
29 29 stringutil,
30 30 )
31 31
32 32 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
33 33
34 34 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
35 35
36 36 COMMANDS = wireprototypes.commanddict()
37 37
38 38 # Value inserted into cache key computation function. Change the value to
39 39 # force new cache keys for every command request. This should be done when
40 40 # there is a change to how caching works, etc.
41 41 GLOBAL_CACHE_VERSION = 1
42 42
43 43 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
44 44 from .hgweb import common as hgwebcommon
45 45
46 46 # URL space looks like: <permissions>/<command>, where <permission> can
47 47 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
48 48
49 49 # Root URL does nothing meaningful... yet.
50 50 if not urlparts:
51 51 res.status = b'200 OK'
52 52 res.headers[b'Content-Type'] = b'text/plain'
53 53 res.setbodybytes(_('HTTP version 2 API handler'))
54 54 return
55 55
56 56 if len(urlparts) == 1:
57 57 res.status = b'404 Not Found'
58 58 res.headers[b'Content-Type'] = b'text/plain'
59 59 res.setbodybytes(_('do not know how to process %s\n') %
60 60 req.dispatchpath)
61 61 return
62 62
63 63 permission, command = urlparts[0:2]
64 64
65 65 if permission not in (b'ro', b'rw'):
66 66 res.status = b'404 Not Found'
67 67 res.headers[b'Content-Type'] = b'text/plain'
68 68 res.setbodybytes(_('unknown permission: %s') % permission)
69 69 return
70 70
71 71 if req.method != 'POST':
72 72 res.status = b'405 Method Not Allowed'
73 73 res.headers[b'Allow'] = b'POST'
74 74 res.setbodybytes(_('commands require POST requests'))
75 75 return
76 76
77 77 # At some point we'll want to use our own API instead of recycling the
78 78 # behavior of version 1 of the wire protocol...
79 79 # TODO return reasonable responses - not responses that overload the
80 80 # HTTP status line message for error reporting.
81 81 try:
82 82 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
83 83 except hgwebcommon.ErrorResponse as e:
84 84 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
85 85 for k, v in e.headers:
86 86 res.headers[k] = v
87 87 res.setbodybytes('permission denied')
88 88 return
89 89
90 90 # We have a special endpoint to reflect the request back at the client.
91 91 if command == b'debugreflect':
92 92 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
93 93 return
94 94
95 95 # Extra commands that we handle that aren't really wire protocol
96 96 # commands. Think extra hard before making this hackery available to
97 97 # extension.
98 98 extracommands = {'multirequest'}
99 99
100 100 if command not in COMMANDS and command not in extracommands:
101 101 res.status = b'404 Not Found'
102 102 res.headers[b'Content-Type'] = b'text/plain'
103 103 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
104 104 return
105 105
106 106 repo = rctx.repo
107 107 ui = repo.ui
108 108
109 109 proto = httpv2protocolhandler(req, ui)
110 110
111 111 if (not COMMANDS.commandavailable(command, proto)
112 112 and command not in extracommands):
113 113 res.status = b'404 Not Found'
114 114 res.headers[b'Content-Type'] = b'text/plain'
115 115 res.setbodybytes(_('invalid wire protocol command: %s') % command)
116 116 return
117 117
118 118 # TODO consider cases where proxies may add additional Accept headers.
119 119 if req.headers.get(b'Accept') != FRAMINGTYPE:
120 120 res.status = b'406 Not Acceptable'
121 121 res.headers[b'Content-Type'] = b'text/plain'
122 122 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
123 123 % FRAMINGTYPE)
124 124 return
125 125
126 126 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
127 127 res.status = b'415 Unsupported Media Type'
128 128 # TODO we should send a response with appropriate media type,
129 129 # since client does Accept it.
130 130 res.headers[b'Content-Type'] = b'text/plain'
131 131 res.setbodybytes(_('client MUST send Content-Type header with '
132 132 'value: %s\n') % FRAMINGTYPE)
133 133 return
134 134
135 135 _processhttpv2request(ui, repo, req, res, permission, command, proto)
136 136
137 137 def _processhttpv2reflectrequest(ui, repo, req, res):
138 138 """Reads unified frame protocol request and dumps out state to client.
139 139
140 140 This special endpoint can be used to help debug the wire protocol.
141 141
142 142 Instead of routing the request through the normal dispatch mechanism,
143 143 we instead read all frames, decode them, and feed them into our state
144 144 tracker. We then dump the log of all that activity back out to the
145 145 client.
146 146 """
147 147 import json
148 148
149 149 # Reflection APIs have a history of being abused, accidentally disclosing
150 150 # sensitive data, etc. So we have a config knob.
151 151 if not ui.configbool('experimental', 'web.api.debugreflect'):
152 152 res.status = b'404 Not Found'
153 153 res.headers[b'Content-Type'] = b'text/plain'
154 154 res.setbodybytes(_('debugreflect service not available'))
155 155 return
156 156
157 157 # We assume we have a unified framing protocol request body.
158 158
159 reactor = wireprotoframing.serverreactor()
159 reactor = wireprotoframing.serverreactor(ui)
160 160 states = []
161 161
162 162 while True:
163 163 frame = wireprotoframing.readframe(req.bodyfh)
164 164
165 165 if not frame:
166 166 states.append(b'received: <no frame>')
167 167 break
168 168
169 169 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
170 170 frame.requestid,
171 171 frame.payload))
172 172
173 173 action, meta = reactor.onframerecv(frame)
174 174 states.append(json.dumps((action, meta), sort_keys=True,
175 175 separators=(', ', ': ')))
176 176
177 177 action, meta = reactor.oninputeof()
178 178 meta['action'] = action
179 179 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
180 180
181 181 res.status = b'200 OK'
182 182 res.headers[b'Content-Type'] = b'text/plain'
183 183 res.setbodybytes(b'\n'.join(states))
184 184
185 185 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
186 186 """Post-validation handler for HTTPv2 requests.
187 187
188 188 Called when the HTTP request contains unified frame-based protocol
189 189 frames for evaluation.
190 190 """
191 191 # TODO Some HTTP clients are full duplex and can receive data before
192 192 # the entire request is transmitted. Figure out a way to indicate support
193 193 # for that so we can opt into full duplex mode.
194 reactor = wireprotoframing.serverreactor(deferoutput=True)
194 reactor = wireprotoframing.serverreactor(ui, deferoutput=True)
195 195 seencommand = False
196 196
197 197 outstream = reactor.makeoutputstream()
198 198
199 199 while True:
200 200 frame = wireprotoframing.readframe(req.bodyfh)
201 201 if not frame:
202 202 break
203 203
204 204 action, meta = reactor.onframerecv(frame)
205 205
206 206 if action == 'wantframe':
207 207 # Need more data before we can do anything.
208 208 continue
209 209 elif action == 'runcommand':
210 210 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
211 211 reqcommand, reactor, outstream,
212 212 meta, issubsequent=seencommand)
213 213
214 214 if sentoutput:
215 215 return
216 216
217 217 seencommand = True
218 218
219 219 elif action == 'error':
220 220 # TODO define proper error mechanism.
221 221 res.status = b'200 OK'
222 222 res.headers[b'Content-Type'] = b'text/plain'
223 223 res.setbodybytes(meta['message'] + b'\n')
224 224 return
225 225 else:
226 226 raise error.ProgrammingError(
227 227 'unhandled action from frame processor: %s' % action)
228 228
229 229 action, meta = reactor.oninputeof()
230 230 if action == 'sendframes':
231 231 # We assume we haven't started sending the response yet. If we're
232 232 # wrong, the response type will raise an exception.
233 233 res.status = b'200 OK'
234 234 res.headers[b'Content-Type'] = FRAMINGTYPE
235 235 res.setbodygen(meta['framegen'])
236 236 elif action == 'noop':
237 237 pass
238 238 else:
239 239 raise error.ProgrammingError('unhandled action from frame processor: %s'
240 240 % action)
241 241
242 242 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
243 243 outstream, command, issubsequent):
244 244 """Dispatch a wire protocol command made from HTTPv2 requests.
245 245
246 246 The authenticated permission (``authedperm``) along with the original
247 247 command from the URL (``reqcommand``) are passed in.
248 248 """
249 249 # We already validated that the session has permissions to perform the
250 250 # actions in ``authedperm``. In the unified frame protocol, the canonical
251 251 # command to run is expressed in a frame. However, the URL also requested
252 252 # to run a specific command. We need to be careful that the command we
253 253 # run doesn't have permissions requirements greater than what was granted
254 254 # by ``authedperm``.
255 255 #
256 256 # Our rule for this is we only allow one command per HTTP request and
257 257 # that command must match the command in the URL. However, we make
258 258 # an exception for the ``multirequest`` URL. This URL is allowed to
259 259 # execute multiple commands. We double check permissions of each command
260 260 # as it is invoked to ensure there is no privilege escalation.
261 261 # TODO consider allowing multiple commands to regular command URLs
262 262 # iff each command is the same.
263 263
264 264 proto = httpv2protocolhandler(req, ui, args=command['args'])
265 265
266 266 if reqcommand == b'multirequest':
267 267 if not COMMANDS.commandavailable(command['command'], proto):
268 268 # TODO proper error mechanism
269 269 res.status = b'200 OK'
270 270 res.headers[b'Content-Type'] = b'text/plain'
271 271 res.setbodybytes(_('wire protocol command not available: %s') %
272 272 command['command'])
273 273 return True
274 274
275 275 # TODO don't use assert here, since it may be elided by -O.
276 276 assert authedperm in (b'ro', b'rw')
277 277 wirecommand = COMMANDS[command['command']]
278 278 assert wirecommand.permission in ('push', 'pull')
279 279
280 280 if authedperm == b'ro' and wirecommand.permission != 'pull':
281 281 # TODO proper error mechanism
282 282 res.status = b'403 Forbidden'
283 283 res.headers[b'Content-Type'] = b'text/plain'
284 284 res.setbodybytes(_('insufficient permissions to execute '
285 285 'command: %s') % command['command'])
286 286 return True
287 287
288 288 # TODO should we also call checkperm() here? Maybe not if we're going
289 289 # to overhaul that API. The granted scope from the URL check should
290 290 # be good enough.
291 291
292 292 else:
293 293 # Don't allow multiple commands outside of ``multirequest`` URL.
294 294 if issubsequent:
295 295 # TODO proper error mechanism
296 296 res.status = b'200 OK'
297 297 res.headers[b'Content-Type'] = b'text/plain'
298 298 res.setbodybytes(_('multiple commands cannot be issued to this '
299 299 'URL'))
300 300 return True
301 301
302 302 if reqcommand != command['command']:
303 303 # TODO define proper error mechanism
304 304 res.status = b'200 OK'
305 305 res.headers[b'Content-Type'] = b'text/plain'
306 306 res.setbodybytes(_('command in frame must match command in URL'))
307 307 return True
308 308
309 309 res.status = b'200 OK'
310 310 res.headers[b'Content-Type'] = FRAMINGTYPE
311 311
312 312 try:
313 313 objs = dispatch(repo, proto, command['command'], command['redirect'])
314 314
315 315 action, meta = reactor.oncommandresponsereadyobjects(
316 316 outstream, command['requestid'], objs)
317 317
318 318 except error.WireprotoCommandError as e:
319 319 action, meta = reactor.oncommanderror(
320 320 outstream, command['requestid'], e.message, e.messageargs)
321 321
322 322 except Exception as e:
323 323 action, meta = reactor.onservererror(
324 324 outstream, command['requestid'],
325 325 _('exception when invoking command: %s') %
326 326 stringutil.forcebytestr(e))
327 327
328 328 if action == 'sendframes':
329 329 res.setbodygen(meta['framegen'])
330 330 return True
331 331 elif action == 'noop':
332 332 return False
333 333 else:
334 334 raise error.ProgrammingError('unhandled event from reactor: %s' %
335 335 action)
336 336
337 337 def getdispatchrepo(repo, proto, command):
338 338 return repo.filtered('served')
339 339
340 340 def dispatch(repo, proto, command, redirect):
341 341 """Run a wire protocol command.
342 342
343 343 Returns an iterable of objects that will be sent to the client.
344 344 """
345 345 repo = getdispatchrepo(repo, proto, command)
346 346
347 347 entry = COMMANDS[command]
348 348 func = entry.func
349 349 spec = entry.args
350 350
351 351 args = proto.getargs(spec)
352 352
353 353 # There is some duplicate boilerplate code here for calling the command and
354 354 # emitting objects. It is either that or a lot of indented code that looks
355 355 # like a pyramid (since there are a lot of code paths that result in not
356 356 # using the cacher).
357 357 callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args))
358 358
359 359 # Request is not cacheable. Don't bother instantiating a cacher.
360 360 if not entry.cachekeyfn:
361 361 for o in callcommand():
362 362 yield o
363 363 return
364 364
365 365 if redirect:
366 366 redirecttargets = redirect[b'targets']
367 367 redirecthashes = redirect[b'hashes']
368 368 else:
369 369 redirecttargets = []
370 370 redirecthashes = []
371 371
372 372 cacher = makeresponsecacher(repo, proto, command, args,
373 373 cborutil.streamencode,
374 374 redirecttargets=redirecttargets,
375 375 redirecthashes=redirecthashes)
376 376
377 377 # But we have no cacher. Do default handling.
378 378 if not cacher:
379 379 for o in callcommand():
380 380 yield o
381 381 return
382 382
383 383 with cacher:
384 384 cachekey = entry.cachekeyfn(repo, proto, cacher, **args)
385 385
386 386 # No cache key or the cacher doesn't like it. Do default handling.
387 387 if cachekey is None or not cacher.setcachekey(cachekey):
388 388 for o in callcommand():
389 389 yield o
390 390 return
391 391
392 392 # Serve it from the cache, if possible.
393 393 cached = cacher.lookup()
394 394
395 395 if cached:
396 396 for o in cached['objs']:
397 397 yield o
398 398 return
399 399
400 400 # Else call the command and feed its output into the cacher, allowing
401 401 # the cacher to buffer/mutate objects as it desires.
402 402 for o in callcommand():
403 403 for o in cacher.onobject(o):
404 404 yield o
405 405
406 406 for o in cacher.onfinished():
407 407 yield o
408 408
409 409 @interfaceutil.implementer(wireprototypes.baseprotocolhandler)
410 410 class httpv2protocolhandler(object):
411 411 def __init__(self, req, ui, args=None):
412 412 self._req = req
413 413 self._ui = ui
414 414 self._args = args
415 415
416 416 @property
417 417 def name(self):
418 418 return HTTP_WIREPROTO_V2
419 419
420 420 def getargs(self, args):
421 421 # First look for args that were passed but aren't registered on this
422 422 # command.
423 423 extra = set(self._args) - set(args)
424 424 if extra:
425 425 raise error.WireprotoCommandError(
426 426 'unsupported argument to command: %s' %
427 427 ', '.join(sorted(extra)))
428 428
429 429 # And look for required arguments that are missing.
430 430 missing = {a for a in args if args[a]['required']} - set(self._args)
431 431
432 432 if missing:
433 433 raise error.WireprotoCommandError(
434 434 'missing required arguments: %s' % ', '.join(sorted(missing)))
435 435
436 436 # Now derive the arguments to pass to the command, taking into
437 437 # account the arguments specified by the client.
438 438 data = {}
439 439 for k, meta in sorted(args.items()):
440 440 # This argument wasn't passed by the client.
441 441 if k not in self._args:
442 442 data[k] = meta['default']()
443 443 continue
444 444
445 445 v = self._args[k]
446 446
447 447 # Sets may be expressed as lists. Silently normalize.
448 448 if meta['type'] == 'set' and isinstance(v, list):
449 449 v = set(v)
450 450
451 451 # TODO consider more/stronger type validation.
452 452
453 453 data[k] = v
454 454
455 455 return data
456 456
457 457 def getprotocaps(self):
458 458 # Protocol capabilities are currently not implemented for HTTP V2.
459 459 return set()
460 460
461 461 def getpayload(self):
462 462 raise NotImplementedError
463 463
464 464 @contextlib.contextmanager
465 465 def mayberedirectstdio(self):
466 466 raise NotImplementedError
467 467
468 468 def client(self):
469 469 raise NotImplementedError
470 470
471 471 def addcapabilities(self, repo, caps):
472 472 return caps
473 473
474 474 def checkperm(self, perm):
475 475 raise NotImplementedError
476 476
477 477 def httpv2apidescriptor(req, repo):
478 478 proto = httpv2protocolhandler(req, repo.ui)
479 479
480 480 return _capabilitiesv2(repo, proto)
481 481
482 482 def _capabilitiesv2(repo, proto):
483 483 """Obtain the set of capabilities for version 2 transports.
484 484
485 485 These capabilities are distinct from the capabilities for version 1
486 486 transports.
487 487 """
488 488 caps = {
489 489 'commands': {},
490 490 'framingmediatypes': [FRAMINGTYPE],
491 491 'pathfilterprefixes': set(narrowspec.VALID_PREFIXES),
492 492 }
493 493
494 494 for command, entry in COMMANDS.items():
495 495 args = {}
496 496
497 497 for arg, meta in entry.args.items():
498 498 args[arg] = {
499 499 # TODO should this be a normalized type using CBOR's
500 500 # terminology?
501 501 b'type': meta['type'],
502 502 b'required': meta['required'],
503 503 }
504 504
505 505 if not meta['required']:
506 506 args[arg][b'default'] = meta['default']()
507 507
508 508 if meta['validvalues']:
509 509 args[arg][b'validvalues'] = meta['validvalues']
510 510
511 511 caps['commands'][command] = {
512 512 'args': args,
513 513 'permissions': [entry.permission],
514 514 }
515 515
516 516 caps['rawrepoformats'] = sorted(repo.requirements &
517 517 repo.supportedformats)
518 518
519 519 targets = getadvertisedredirecttargets(repo, proto)
520 520 if targets:
521 521 caps[b'redirect'] = {
522 522 b'targets': [],
523 523 b'hashes': [b'sha256', b'sha1'],
524 524 }
525 525
526 526 for target in targets:
527 527 entry = {
528 528 b'name': target['name'],
529 529 b'protocol': target['protocol'],
530 530 b'uris': target['uris'],
531 531 }
532 532
533 533 for key in ('snirequired', 'tlsversions'):
534 534 if key in target:
535 535 entry[key] = target[key]
536 536
537 537 caps[b'redirect'][b'targets'].append(entry)
538 538
539 539 return proto.addcapabilities(repo, caps)
540 540
541 541 def getadvertisedredirecttargets(repo, proto):
542 542 """Obtain a list of content redirect targets.
543 543
544 544 Returns a list containing potential redirect targets that will be
545 545 advertised in capabilities data. Each dict MUST have the following
546 546 keys:
547 547
548 548 name
549 549 The name of this redirect target. This is the identifier clients use
550 550 to refer to a target. It is transferred as part of every command
551 551 request.
552 552
553 553 protocol
554 554 Network protocol used by this target. Typically this is the string
555 555 in front of the ``://`` in a URL. e.g. ``https``.
556 556
557 557 uris
558 558 List of representative URIs for this target. Clients can use the
559 559 URIs to test parsing for compatibility or for ordering preference
560 560 for which target to use.
561 561
562 562 The following optional keys are recognized:
563 563
564 564 snirequired
565 565 Bool indicating if Server Name Indication (SNI) is required to
566 566 connect to this target.
567 567
568 568 tlsversions
569 569 List of bytes indicating which TLS versions are supported by this
570 570 target.
571 571
572 572 By default, clients reflect the target order advertised by servers
573 573 and servers will use the first client-advertised target when picking
574 574 a redirect target. So targets should be advertised in the order the
575 575 server prefers they be used.
576 576 """
577 577 return []
578 578
579 579 def wireprotocommand(name, args=None, permission='push', cachekeyfn=None):
580 580 """Decorator to declare a wire protocol command.
581 581
582 582 ``name`` is the name of the wire protocol command being provided.
583 583
584 584 ``args`` is a dict defining arguments accepted by the command. Keys are
585 585 the argument name. Values are dicts with the following keys:
586 586
587 587 ``type``
588 588 The argument data type. Must be one of the following string
589 589 literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``,
590 590 or ``bool``.
591 591
592 592 ``default``
593 593 A callable returning the default value for this argument. If not
594 594 specified, ``None`` will be the default value.
595 595
596 596 ``example``
597 597 An example value for this argument.
598 598
599 599 ``validvalues``
600 600 Set of recognized values for this argument.
601 601
602 602 ``permission`` defines the permission type needed to run this command.
603 603 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
604 604 respectively. Default is to assume command requires ``push`` permissions
605 605 because otherwise commands not declaring their permissions could modify
606 606 a repository that is supposed to be read-only.
607 607
608 608 ``cachekeyfn`` defines an optional callable that can derive the
609 609 cache key for this request.
610 610
611 611 Wire protocol commands are generators of objects to be serialized and
612 612 sent to the client.
613 613
614 614 If a command raises an uncaught exception, this will be translated into
615 615 a command error.
616 616
617 617 All commands can opt in to being cacheable by defining a function
618 618 (``cachekeyfn``) that is called to derive a cache key. This function
619 619 receives the same arguments as the command itself plus a ``cacher``
620 620 argument containing the active cacher for the request and returns a bytes
621 621 containing the key in a cache the response to this command may be cached
622 622 under.
623 623 """
624 624 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
625 625 if v['version'] == 2}
626 626
627 627 if permission not in ('push', 'pull'):
628 628 raise error.ProgrammingError('invalid wire protocol permission; '
629 629 'got %s; expected "push" or "pull"' %
630 630 permission)
631 631
632 632 if args is None:
633 633 args = {}
634 634
635 635 if not isinstance(args, dict):
636 636 raise error.ProgrammingError('arguments for version 2 commands '
637 637 'must be declared as dicts')
638 638
639 639 for arg, meta in args.items():
640 640 if arg == '*':
641 641 raise error.ProgrammingError('* argument name not allowed on '
642 642 'version 2 commands')
643 643
644 644 if not isinstance(meta, dict):
645 645 raise error.ProgrammingError('arguments for version 2 commands '
646 646 'must declare metadata as a dict')
647 647
648 648 if 'type' not in meta:
649 649 raise error.ProgrammingError('%s argument for command %s does not '
650 650 'declare type field' % (arg, name))
651 651
652 652 if meta['type'] not in ('bytes', 'int', 'list', 'dict', 'set', 'bool'):
653 653 raise error.ProgrammingError('%s argument for command %s has '
654 654 'illegal type: %s' % (arg, name,
655 655 meta['type']))
656 656
657 657 if 'example' not in meta:
658 658 raise error.ProgrammingError('%s argument for command %s does not '
659 659 'declare example field' % (arg, name))
660 660
661 661 meta['required'] = 'default' not in meta
662 662
663 663 meta.setdefault('default', lambda: None)
664 664 meta.setdefault('validvalues', None)
665 665
666 666 def register(func):
667 667 if name in COMMANDS:
668 668 raise error.ProgrammingError('%s command already registered '
669 669 'for version 2' % name)
670 670
671 671 COMMANDS[name] = wireprototypes.commandentry(
672 672 func, args=args, transports=transports, permission=permission,
673 673 cachekeyfn=cachekeyfn)
674 674
675 675 return func
676 676
677 677 return register
678 678
679 679 def makecommandcachekeyfn(command, localversion=None, allargs=False):
680 680 """Construct a cache key derivation function with common features.
681 681
682 682 By default, the cache key is a hash of:
683 683
684 684 * The command name.
685 685 * A global cache version number.
686 686 * A local cache version number (passed via ``localversion``).
687 687 * All the arguments passed to the command.
688 688 * The media type used.
689 689 * Wire protocol version string.
690 690 * The repository path.
691 691 """
692 692 if not allargs:
693 693 raise error.ProgrammingError('only allargs=True is currently supported')
694 694
695 695 if localversion is None:
696 696 raise error.ProgrammingError('must set localversion argument value')
697 697
698 698 def cachekeyfn(repo, proto, cacher, **args):
699 699 spec = COMMANDS[command]
700 700
701 701 # Commands that mutate the repo can not be cached.
702 702 if spec.permission == 'push':
703 703 return None
704 704
705 705 # TODO config option to disable caching.
706 706
707 707 # Our key derivation strategy is to construct a data structure
708 708 # holding everything that could influence cacheability and to hash
709 709 # the CBOR representation of that. Using CBOR seems like it might
710 710 # be overkill. However, simpler hashing mechanisms are prone to
711 711 # duplicate input issues. e.g. if you just concatenate two values,
712 712 # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides
713 713 # "padding" between values and prevents these problems.
714 714
715 715 # Seed the hash with various data.
716 716 state = {
717 717 # To invalidate all cache keys.
718 718 b'globalversion': GLOBAL_CACHE_VERSION,
719 719 # More granular cache key invalidation.
720 720 b'localversion': localversion,
721 721 # Cache keys are segmented by command.
722 722 b'command': pycompat.sysbytes(command),
723 723 # Throw in the media type and API version strings so changes
724 724 # to exchange semantics invalid cache.
725 725 b'mediatype': FRAMINGTYPE,
726 726 b'version': HTTP_WIREPROTO_V2,
727 727 # So same requests for different repos don't share cache keys.
728 728 b'repo': repo.root,
729 729 }
730 730
731 731 # The arguments passed to us will have already been normalized.
732 732 # Default values will be set, etc. This is important because it
733 733 # means that it doesn't matter if clients send an explicit argument
734 734 # or rely on the default value: it will all normalize to the same
735 735 # set of arguments on the server and therefore the same cache key.
736 736 #
737 737 # Arguments by their very nature must support being encoded to CBOR.
738 738 # And the CBOR encoder is deterministic. So we hash the arguments
739 739 # by feeding the CBOR of their representation into the hasher.
740 740 if allargs:
741 741 state[b'args'] = pycompat.byteskwargs(args)
742 742
743 743 cacher.adjustcachekeystate(state)
744 744
745 745 hasher = hashlib.sha1()
746 746 for chunk in cborutil.streamencode(state):
747 747 hasher.update(chunk)
748 748
749 749 return pycompat.sysbytes(hasher.hexdigest())
750 750
751 751 return cachekeyfn
752 752
753 753 def makeresponsecacher(repo, proto, command, args, objencoderfn,
754 754 redirecttargets, redirecthashes):
755 755 """Construct a cacher for a cacheable command.
756 756
757 757 Returns an ``iwireprotocolcommandcacher`` instance.
758 758
759 759 Extensions can monkeypatch this function to provide custom caching
760 760 backends.
761 761 """
762 762 return None
763 763
764 764 @wireprotocommand('branchmap', permission='pull')
765 765 def branchmapv2(repo, proto):
766 766 yield {encoding.fromlocal(k): v
767 767 for k, v in repo.branchmap().iteritems()}
768 768
769 769 @wireprotocommand('capabilities', permission='pull')
770 770 def capabilitiesv2(repo, proto):
771 771 yield _capabilitiesv2(repo, proto)
772 772
773 773 @wireprotocommand(
774 774 'changesetdata',
775 775 args={
776 776 'noderange': {
777 777 'type': 'list',
778 778 'default': lambda: None,
779 779 'example': [[b'0123456...'], [b'abcdef...']],
780 780 },
781 781 'nodes': {
782 782 'type': 'list',
783 783 'default': lambda: None,
784 784 'example': [b'0123456...'],
785 785 },
786 786 'nodesdepth': {
787 787 'type': 'int',
788 788 'default': lambda: None,
789 789 'example': 10,
790 790 },
791 791 'fields': {
792 792 'type': 'set',
793 793 'default': set,
794 794 'example': {b'parents', b'revision'},
795 795 'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'},
796 796 },
797 797 },
798 798 permission='pull')
799 799 def changesetdata(repo, proto, noderange, nodes, nodesdepth, fields):
800 800 # TODO look for unknown fields and abort when they can't be serviced.
801 801 # This could probably be validated by dispatcher using validvalues.
802 802
803 803 if noderange is None and nodes is None:
804 804 raise error.WireprotoCommandError(
805 805 'noderange or nodes must be defined')
806 806
807 807 if nodesdepth is not None and nodes is None:
808 808 raise error.WireprotoCommandError(
809 809 'nodesdepth requires the nodes argument')
810 810
811 811 if noderange is not None:
812 812 if len(noderange) != 2:
813 813 raise error.WireprotoCommandError(
814 814 'noderange must consist of 2 elements')
815 815
816 816 if not noderange[1]:
817 817 raise error.WireprotoCommandError(
818 818 'heads in noderange request cannot be empty')
819 819
820 820 cl = repo.changelog
821 821 hasnode = cl.hasnode
822 822
823 823 seen = set()
824 824 outgoing = []
825 825
826 826 if nodes is not None:
827 827 outgoing = [n for n in nodes if hasnode(n)]
828 828
829 829 if nodesdepth:
830 830 outgoing = [cl.node(r) for r in
831 831 repo.revs(b'ancestors(%ln, %d)', outgoing,
832 832 nodesdepth - 1)]
833 833
834 834 seen |= set(outgoing)
835 835
836 836 if noderange is not None:
837 837 if noderange[0]:
838 838 common = [n for n in noderange[0] if hasnode(n)]
839 839 else:
840 840 common = [nullid]
841 841
842 842 for n in discovery.outgoing(repo, common, noderange[1]).missing:
843 843 if n not in seen:
844 844 outgoing.append(n)
845 845 # Don't need to add to seen here because this is the final
846 846 # source of nodes and there should be no duplicates in this
847 847 # list.
848 848
849 849 seen.clear()
850 850 publishing = repo.publishing()
851 851
852 852 if outgoing:
853 853 repo.hook('preoutgoing', throw=True, source='serve')
854 854
855 855 yield {
856 856 b'totalitems': len(outgoing),
857 857 }
858 858
859 859 # The phases of nodes already transferred to the client may have changed
860 860 # since the client last requested data. We send phase-only records
861 861 # for these revisions, if requested.
862 862 if b'phase' in fields and noderange is not None:
863 863 # TODO skip nodes whose phase will be reflected by a node in the
864 864 # outgoing set. This is purely an optimization to reduce data
865 865 # size.
866 866 for node in noderange[0]:
867 867 yield {
868 868 b'node': node,
869 869 b'phase': b'public' if publishing else repo[node].phasestr()
870 870 }
871 871
872 872 nodebookmarks = {}
873 873 for mark, node in repo._bookmarks.items():
874 874 nodebookmarks.setdefault(node, set()).add(mark)
875 875
876 876 # It is already topologically sorted by revision number.
877 877 for node in outgoing:
878 878 d = {
879 879 b'node': node,
880 880 }
881 881
882 882 if b'parents' in fields:
883 883 d[b'parents'] = cl.parents(node)
884 884
885 885 if b'phase' in fields:
886 886 if publishing:
887 887 d[b'phase'] = b'public'
888 888 else:
889 889 ctx = repo[node]
890 890 d[b'phase'] = ctx.phasestr()
891 891
892 892 if b'bookmarks' in fields and node in nodebookmarks:
893 893 d[b'bookmarks'] = sorted(nodebookmarks[node])
894 894 del nodebookmarks[node]
895 895
896 896 followingmeta = []
897 897 followingdata = []
898 898
899 899 if b'revision' in fields:
900 900 revisiondata = cl.revision(node, raw=True)
901 901 followingmeta.append((b'revision', len(revisiondata)))
902 902 followingdata.append(revisiondata)
903 903
904 904 # TODO make it possible for extensions to wrap a function or register
905 905 # a handler to service custom fields.
906 906
907 907 if followingmeta:
908 908 d[b'fieldsfollowing'] = followingmeta
909 909
910 910 yield d
911 911
912 912 for extra in followingdata:
913 913 yield extra
914 914
915 915 # If requested, send bookmarks from nodes that didn't have revision
916 916 # data sent so receiver is aware of any bookmark updates.
917 917 if b'bookmarks' in fields:
918 918 for node, marks in sorted(nodebookmarks.iteritems()):
919 919 yield {
920 920 b'node': node,
921 921 b'bookmarks': sorted(marks),
922 922 }
923 923
924 924 class FileAccessError(Exception):
925 925 """Represents an error accessing a specific file."""
926 926
927 927 def __init__(self, path, msg, args):
928 928 self.path = path
929 929 self.msg = msg
930 930 self.args = args
931 931
932 932 def getfilestore(repo, proto, path):
933 933 """Obtain a file storage object for use with wire protocol.
934 934
935 935 Exists as a standalone function so extensions can monkeypatch to add
936 936 access control.
937 937 """
938 938 # This seems to work even if the file doesn't exist. So catch
939 939 # "empty" files and return an error.
940 940 fl = repo.file(path)
941 941
942 942 if not len(fl):
943 943 raise FileAccessError(path, 'unknown file: %s', (path,))
944 944
945 945 return fl
946 946
947 947 @wireprotocommand(
948 948 'filedata',
949 949 args={
950 950 'haveparents': {
951 951 'type': 'bool',
952 952 'default': lambda: False,
953 953 'example': True,
954 954 },
955 955 'nodes': {
956 956 'type': 'list',
957 957 'example': [b'0123456...'],
958 958 },
959 959 'fields': {
960 960 'type': 'set',
961 961 'default': set,
962 962 'example': {b'parents', b'revision'},
963 963 'validvalues': {b'parents', b'revision'},
964 964 },
965 965 'path': {
966 966 'type': 'bytes',
967 967 'example': b'foo.txt',
968 968 }
969 969 },
970 970 permission='pull',
971 971 # TODO censoring a file revision won't invalidate the cache.
972 972 # Figure out a way to take censoring into account when deriving
973 973 # the cache key.
974 974 cachekeyfn=makecommandcachekeyfn('filedata', 1, allargs=True))
975 975 def filedata(repo, proto, haveparents, nodes, fields, path):
976 976 try:
977 977 # Extensions may wish to access the protocol handler.
978 978 store = getfilestore(repo, proto, path)
979 979 except FileAccessError as e:
980 980 raise error.WireprotoCommandError(e.msg, e.args)
981 981
982 982 # Validate requested nodes.
983 983 for node in nodes:
984 984 try:
985 985 store.rev(node)
986 986 except error.LookupError:
987 987 raise error.WireprotoCommandError('unknown file node: %s',
988 988 (hex(node),))
989 989
990 990 revisions = store.emitrevisions(nodes,
991 991 revisiondata=b'revision' in fields,
992 992 assumehaveparentrevisions=haveparents)
993 993
994 994 yield {
995 995 b'totalitems': len(nodes),
996 996 }
997 997
998 998 for revision in revisions:
999 999 d = {
1000 1000 b'node': revision.node,
1001 1001 }
1002 1002
1003 1003 if b'parents' in fields:
1004 1004 d[b'parents'] = [revision.p1node, revision.p2node]
1005 1005
1006 1006 followingmeta = []
1007 1007 followingdata = []
1008 1008
1009 1009 if b'revision' in fields:
1010 1010 if revision.revision is not None:
1011 1011 followingmeta.append((b'revision', len(revision.revision)))
1012 1012 followingdata.append(revision.revision)
1013 1013 else:
1014 1014 d[b'deltabasenode'] = revision.basenode
1015 1015 followingmeta.append((b'delta', len(revision.delta)))
1016 1016 followingdata.append(revision.delta)
1017 1017
1018 1018 if followingmeta:
1019 1019 d[b'fieldsfollowing'] = followingmeta
1020 1020
1021 1021 yield d
1022 1022
1023 1023 for extra in followingdata:
1024 1024 yield extra
1025 1025
1026 1026 @wireprotocommand(
1027 1027 'heads',
1028 1028 args={
1029 1029 'publiconly': {
1030 1030 'type': 'bool',
1031 1031 'default': lambda: False,
1032 1032 'example': False,
1033 1033 },
1034 1034 },
1035 1035 permission='pull')
1036 1036 def headsv2(repo, proto, publiconly):
1037 1037 if publiconly:
1038 1038 repo = repo.filtered('immutable')
1039 1039
1040 1040 yield repo.heads()
1041 1041
1042 1042 @wireprotocommand(
1043 1043 'known',
1044 1044 args={
1045 1045 'nodes': {
1046 1046 'type': 'list',
1047 1047 'default': list,
1048 1048 'example': [b'deadbeef'],
1049 1049 },
1050 1050 },
1051 1051 permission='pull')
1052 1052 def knownv2(repo, proto, nodes):
1053 1053 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1054 1054 yield result
1055 1055
1056 1056 @wireprotocommand(
1057 1057 'listkeys',
1058 1058 args={
1059 1059 'namespace': {
1060 1060 'type': 'bytes',
1061 1061 'example': b'ns',
1062 1062 },
1063 1063 },
1064 1064 permission='pull')
1065 1065 def listkeysv2(repo, proto, namespace):
1066 1066 keys = repo.listkeys(encoding.tolocal(namespace))
1067 1067 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
1068 1068 for k, v in keys.iteritems()}
1069 1069
1070 1070 yield keys
1071 1071
1072 1072 @wireprotocommand(
1073 1073 'lookup',
1074 1074 args={
1075 1075 'key': {
1076 1076 'type': 'bytes',
1077 1077 'example': b'foo',
1078 1078 },
1079 1079 },
1080 1080 permission='pull')
1081 1081 def lookupv2(repo, proto, key):
1082 1082 key = encoding.tolocal(key)
1083 1083
1084 1084 # TODO handle exception.
1085 1085 node = repo.lookup(key)
1086 1086
1087 1087 yield node
1088 1088
1089 1089 @wireprotocommand(
1090 1090 'manifestdata',
1091 1091 args={
1092 1092 'nodes': {
1093 1093 'type': 'list',
1094 1094 'example': [b'0123456...'],
1095 1095 },
1096 1096 'haveparents': {
1097 1097 'type': 'bool',
1098 1098 'default': lambda: False,
1099 1099 'example': True,
1100 1100 },
1101 1101 'fields': {
1102 1102 'type': 'set',
1103 1103 'default': set,
1104 1104 'example': {b'parents', b'revision'},
1105 1105 'validvalues': {b'parents', b'revision'},
1106 1106 },
1107 1107 'tree': {
1108 1108 'type': 'bytes',
1109 1109 'example': b'',
1110 1110 },
1111 1111 },
1112 1112 permission='pull',
1113 1113 cachekeyfn=makecommandcachekeyfn('manifestdata', 1, allargs=True))
1114 1114 def manifestdata(repo, proto, haveparents, nodes, fields, tree):
1115 1115 store = repo.manifestlog.getstorage(tree)
1116 1116
1117 1117 # Validate the node is known and abort on unknown revisions.
1118 1118 for node in nodes:
1119 1119 try:
1120 1120 store.rev(node)
1121 1121 except error.LookupError:
1122 1122 raise error.WireprotoCommandError(
1123 1123 'unknown node: %s', (node,))
1124 1124
1125 1125 revisions = store.emitrevisions(nodes,
1126 1126 revisiondata=b'revision' in fields,
1127 1127 assumehaveparentrevisions=haveparents)
1128 1128
1129 1129 yield {
1130 1130 b'totalitems': len(nodes),
1131 1131 }
1132 1132
1133 1133 for revision in revisions:
1134 1134 d = {
1135 1135 b'node': revision.node,
1136 1136 }
1137 1137
1138 1138 if b'parents' in fields:
1139 1139 d[b'parents'] = [revision.p1node, revision.p2node]
1140 1140
1141 1141 followingmeta = []
1142 1142 followingdata = []
1143 1143
1144 1144 if b'revision' in fields:
1145 1145 if revision.revision is not None:
1146 1146 followingmeta.append((b'revision', len(revision.revision)))
1147 1147 followingdata.append(revision.revision)
1148 1148 else:
1149 1149 d[b'deltabasenode'] = revision.basenode
1150 1150 followingmeta.append((b'delta', len(revision.delta)))
1151 1151 followingdata.append(revision.delta)
1152 1152
1153 1153 if followingmeta:
1154 1154 d[b'fieldsfollowing'] = followingmeta
1155 1155
1156 1156 yield d
1157 1157
1158 1158 for extra in followingdata:
1159 1159 yield extra
1160 1160
1161 1161 @wireprotocommand(
1162 1162 'pushkey',
1163 1163 args={
1164 1164 'namespace': {
1165 1165 'type': 'bytes',
1166 1166 'example': b'ns',
1167 1167 },
1168 1168 'key': {
1169 1169 'type': 'bytes',
1170 1170 'example': b'key',
1171 1171 },
1172 1172 'old': {
1173 1173 'type': 'bytes',
1174 1174 'example': b'old',
1175 1175 },
1176 1176 'new': {
1177 1177 'type': 'bytes',
1178 1178 'example': 'new',
1179 1179 },
1180 1180 },
1181 1181 permission='push')
1182 1182 def pushkeyv2(repo, proto, namespace, key, old, new):
1183 1183 # TODO handle ui output redirection
1184 1184 yield repo.pushkey(encoding.tolocal(namespace),
1185 1185 encoding.tolocal(key),
1186 1186 encoding.tolocal(old),
1187 1187 encoding.tolocal(new))
@@ -1,284 +1,291
1 1 from __future__ import absolute_import
2 2
3 3 import unittest
4 4
5 5 from mercurial import (
6 6 error,
7 ui as uimod,
7 8 wireprotoframing as framing,
8 9 )
9 10 from mercurial.utils import (
10 11 cborutil,
11 12 )
12 13
13 14 ffs = framing.makeframefromhumanstring
14 15
16 globalui = uimod.ui()
17
15 18 def sendframe(reactor, frame):
16 19 """Send a frame bytearray to a reactor."""
17 20 header = framing.parseheader(frame)
18 21 payload = frame[framing.FRAME_HEADER_SIZE:]
19 22 assert len(payload) == header.length
20 23
21 24 return reactor.onframerecv(framing.frame(header.requestid,
22 25 header.streamid,
23 26 header.streamflags,
24 27 header.typeid,
25 28 header.flags,
26 29 payload))
27 30
28 31 class SingleSendTests(unittest.TestCase):
29 32 """A reactor that can only send once rejects subsequent sends."""
30 33
31 34 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
32 35 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
33 36 # the regex version.
34 37 assertRaisesRegex = (# camelcase-required
35 38 unittest.TestCase.assertRaisesRegexp)
36 39
37 40 def testbasic(self):
38 reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True)
41 reactor = framing.clientreactor(globalui,
42 hasmultiplesend=False,
43 buffersends=True)
39 44
40 45 request, action, meta = reactor.callcommand(b'foo', {})
41 46 self.assertEqual(request.state, b'pending')
42 47 self.assertEqual(action, b'noop')
43 48
44 49 action, meta = reactor.flushcommands()
45 50 self.assertEqual(action, b'sendframes')
46 51
47 52 for frame in meta[b'framegen']:
48 53 self.assertEqual(request.state, b'sending')
49 54
50 55 self.assertEqual(request.state, b'sent')
51 56
52 57 with self.assertRaisesRegex(error.ProgrammingError,
53 58 'cannot issue new commands'):
54 59 reactor.callcommand(b'foo', {})
55 60
56 61 with self.assertRaisesRegex(error.ProgrammingError,
57 62 'cannot issue new commands'):
58 63 reactor.callcommand(b'foo', {})
59 64
60 65 class NoBufferTests(unittest.TestCase):
61 66 """A reactor without send buffering sends requests immediately."""
62 67 def testbasic(self):
63 reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False)
68 reactor = framing.clientreactor(globalui,
69 hasmultiplesend=True,
70 buffersends=False)
64 71
65 72 request, action, meta = reactor.callcommand(b'command1', {})
66 73 self.assertEqual(request.requestid, 1)
67 74 self.assertEqual(action, b'sendframes')
68 75
69 76 self.assertEqual(request.state, b'pending')
70 77
71 78 for frame in meta[b'framegen']:
72 79 self.assertEqual(request.state, b'sending')
73 80
74 81 self.assertEqual(request.state, b'sent')
75 82
76 83 action, meta = reactor.flushcommands()
77 84 self.assertEqual(action, b'noop')
78 85
79 86 # And we can send another command.
80 87 request, action, meta = reactor.callcommand(b'command2', {})
81 88 self.assertEqual(request.requestid, 3)
82 89 self.assertEqual(action, b'sendframes')
83 90
84 91 for frame in meta[b'framegen']:
85 92 self.assertEqual(request.state, b'sending')
86 93
87 94 self.assertEqual(request.state, b'sent')
88 95
89 96 class BadFrameRecvTests(unittest.TestCase):
90 97 if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
91 98 # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
92 99 # the regex version.
93 100 assertRaisesRegex = (# camelcase-required
94 101 unittest.TestCase.assertRaisesRegexp)
95 102
96 103 def testoddstream(self):
97 reactor = framing.clientreactor()
104 reactor = framing.clientreactor(globalui)
98 105
99 106 action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
100 107 self.assertEqual(action, b'error')
101 108 self.assertEqual(meta[b'message'],
102 109 b'received frame with odd numbered stream ID: 1')
103 110
104 111 def testunknownstream(self):
105 reactor = framing.clientreactor()
112 reactor = framing.clientreactor(globalui)
106 113
107 114 action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
108 115 self.assertEqual(action, b'error')
109 116 self.assertEqual(meta[b'message'],
110 117 b'received frame on unknown stream without beginning '
111 118 b'of stream flag set')
112 119
113 120 def testunhandledframetype(self):
114 reactor = framing.clientreactor(buffersends=False)
121 reactor = framing.clientreactor(globalui, buffersends=False)
115 122
116 123 request, action, meta = reactor.callcommand(b'foo', {})
117 124 for frame in meta[b'framegen']:
118 125 pass
119 126
120 127 with self.assertRaisesRegex(error.ProgrammingError,
121 128 'unhandled frame type'):
122 129 sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
123 130
124 131 class StreamTests(unittest.TestCase):
125 132 def testmultipleresponseframes(self):
126 reactor = framing.clientreactor(buffersends=False)
133 reactor = framing.clientreactor(globalui, buffersends=False)
127 134
128 135 request, action, meta = reactor.callcommand(b'foo', {})
129 136
130 137 self.assertEqual(action, b'sendframes')
131 138 for f in meta[b'framegen']:
132 139 pass
133 140
134 141 action, meta = sendframe(
135 142 reactor,
136 143 ffs(b'%d 0 stream-begin command-response 0 foo' %
137 144 request.requestid))
138 145 self.assertEqual(action, b'responsedata')
139 146
140 147 action, meta = sendframe(
141 148 reactor,
142 149 ffs(b'%d 0 0 command-response eos bar' % request.requestid))
143 150 self.assertEqual(action, b'responsedata')
144 151
145 152 class RedirectTests(unittest.TestCase):
146 153 def testredirect(self):
147 reactor = framing.clientreactor(buffersends=False)
154 reactor = framing.clientreactor(globalui, buffersends=False)
148 155
149 156 redirect = {
150 157 b'targets': [b'a', b'b'],
151 158 b'hashes': [b'sha256'],
152 159 }
153 160
154 161 request, action, meta = reactor.callcommand(
155 162 b'foo', {}, redirect=redirect)
156 163
157 164 self.assertEqual(action, b'sendframes')
158 165
159 166 frames = list(meta[b'framegen'])
160 167 self.assertEqual(len(frames), 1)
161 168
162 169 self.assertEqual(frames[0],
163 170 ffs(b'1 1 stream-begin command-request new '
164 171 b"cbor:{b'name': b'foo', "
165 172 b"b'redirect': {b'targets': [b'a', b'b'], "
166 173 b"b'hashes': [b'sha256']}}"))
167 174
168 175 class StreamSettingsTests(unittest.TestCase):
169 176 def testnoflags(self):
170 reactor = framing.clientreactor(buffersends=False)
177 reactor = framing.clientreactor(globalui, buffersends=False)
171 178
172 179 request, action, meta = reactor.callcommand(b'foo', {})
173 180 for f in meta[b'framegen']:
174 181 pass
175 182
176 183 action, meta = sendframe(reactor,
177 184 ffs(b'1 2 stream-begin stream-settings 0 '))
178 185
179 186 self.assertEqual(action, b'error')
180 187 self.assertEqual(meta, {
181 188 b'message': b'stream encoding settings frame must have '
182 189 b'continuation or end of stream flag set',
183 190 })
184 191
185 192 def testconflictflags(self):
186 reactor = framing.clientreactor(buffersends=False)
193 reactor = framing.clientreactor(globalui, buffersends=False)
187 194
188 195 request, action, meta = reactor.callcommand(b'foo', {})
189 196 for f in meta[b'framegen']:
190 197 pass
191 198
192 199 action, meta = sendframe(reactor,
193 200 ffs(b'1 2 stream-begin stream-settings continuation|eos '))
194 201
195 202 self.assertEqual(action, b'error')
196 203 self.assertEqual(meta, {
197 204 b'message': b'stream encoding settings frame cannot have both '
198 205 b'continuation and end of stream flags set',
199 206 })
200 207
201 208 def testemptypayload(self):
202 reactor = framing.clientreactor(buffersends=False)
209 reactor = framing.clientreactor(globalui, buffersends=False)
203 210
204 211 request, action, meta = reactor.callcommand(b'foo', {})
205 212 for f in meta[b'framegen']:
206 213 pass
207 214
208 215 action, meta = sendframe(reactor,
209 216 ffs(b'1 2 stream-begin stream-settings eos '))
210 217
211 218 self.assertEqual(action, b'error')
212 219 self.assertEqual(meta, {
213 220 b'message': b'stream encoding settings frame did not contain '
214 221 b'CBOR data'
215 222 })
216 223
217 224 def testbadcbor(self):
218 reactor = framing.clientreactor(buffersends=False)
225 reactor = framing.clientreactor(globalui, buffersends=False)
219 226
220 227 request, action, meta = reactor.callcommand(b'foo', {})
221 228 for f in meta[b'framegen']:
222 229 pass
223 230
224 231 action, meta = sendframe(reactor,
225 232 ffs(b'1 2 stream-begin stream-settings eos badvalue'))
226 233
227 234 self.assertEqual(action, b'error')
228 235
229 236 def testsingleobject(self):
230 reactor = framing.clientreactor(buffersends=False)
237 reactor = framing.clientreactor(globalui, buffersends=False)
231 238
232 239 request, action, meta = reactor.callcommand(b'foo', {})
233 240 for f in meta[b'framegen']:
234 241 pass
235 242
236 243 action, meta = sendframe(reactor,
237 244 ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'))
238 245
239 246 self.assertEqual(action, b'noop')
240 247 self.assertEqual(meta, {})
241 248
242 249 def testmultipleobjects(self):
243 reactor = framing.clientreactor(buffersends=False)
250 reactor = framing.clientreactor(globalui, buffersends=False)
244 251
245 252 request, action, meta = reactor.callcommand(b'foo', {})
246 253 for f in meta[b'framegen']:
247 254 pass
248 255
249 256 data = b''.join([
250 257 b''.join(cborutil.streamencode(b'identity')),
251 258 b''.join(cborutil.streamencode({b'foo', b'bar'})),
252 259 ])
253 260
254 261 action, meta = sendframe(reactor,
255 262 ffs(b'1 2 stream-begin stream-settings eos %s' % data))
256 263
257 264 self.assertEqual(action, b'noop')
258 265 self.assertEqual(meta, {})
259 266
260 267 def testmultipleframes(self):
261 reactor = framing.clientreactor(buffersends=False)
268 reactor = framing.clientreactor(globalui, buffersends=False)
262 269
263 270 request, action, meta = reactor.callcommand(b'foo', {})
264 271 for f in meta[b'framegen']:
265 272 pass
266 273
267 274 data = b''.join(cborutil.streamencode(b'identity'))
268 275
269 276 action, meta = sendframe(reactor,
270 277 ffs(b'1 2 stream-begin stream-settings continuation %s' %
271 278 data[0:3]))
272 279
273 280 self.assertEqual(action, b'noop')
274 281 self.assertEqual(meta, {})
275 282
276 283 action, meta = sendframe(reactor,
277 284 ffs(b'1 2 0 stream-settings eos %s' % data[3:]))
278 285
279 286 self.assertEqual(action, b'noop')
280 287 self.assertEqual(meta, {})
281 288
282 289 if __name__ == '__main__':
283 290 import silenttestrunner
284 291 silenttestrunner.main(__name__)
@@ -1,602 +1,604
1 1 from __future__ import absolute_import, print_function
2 2
3 3 import unittest
4 4
5 5 from mercurial.thirdparty import (
6 6 cbor,
7 7 )
8 8 from mercurial import (
9 ui as uimod,
9 10 util,
10 11 wireprotoframing as framing,
11 12 )
12 13 from mercurial.utils import (
13 14 cborutil,
14 15 )
15 16
16 17 ffs = framing.makeframefromhumanstring
17 18
18 19 OK = cbor.dumps({b'status': b'ok'})
19 20
20 21 def makereactor(deferoutput=False):
21 return framing.serverreactor(deferoutput=deferoutput)
22 ui = uimod.ui()
23 return framing.serverreactor(ui, deferoutput=deferoutput)
22 24
23 25 def sendframes(reactor, gen):
24 26 """Send a generator of frame bytearray to a reactor.
25 27
26 28 Emits a generator of results from ``onframerecv()`` calls.
27 29 """
28 30 for frame in gen:
29 31 header = framing.parseheader(frame)
30 32 payload = frame[framing.FRAME_HEADER_SIZE:]
31 33 assert len(payload) == header.length
32 34
33 35 yield reactor.onframerecv(framing.frame(header.requestid,
34 36 header.streamid,
35 37 header.streamflags,
36 38 header.typeid,
37 39 header.flags,
38 40 payload))
39 41
40 42 def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
41 43 """Generate frames to run a command and send them to a reactor."""
42 44 return sendframes(reactor,
43 45 framing.createcommandframes(stream, rid, cmd, args,
44 46 datafh))
45 47
46 48
47 49 class ServerReactorTests(unittest.TestCase):
48 50 def _sendsingleframe(self, reactor, f):
49 51 results = list(sendframes(reactor, [f]))
50 52 self.assertEqual(len(results), 1)
51 53
52 54 return results[0]
53 55
54 56 def assertaction(self, res, expected):
55 57 self.assertIsInstance(res, tuple)
56 58 self.assertEqual(len(res), 2)
57 59 self.assertIsInstance(res[1], dict)
58 60 self.assertEqual(res[0], expected)
59 61
60 62 def assertframesequal(self, frames, framestrings):
61 63 expected = [ffs(s) for s in framestrings]
62 64 self.assertEqual(list(frames), expected)
63 65
64 66 def test1framecommand(self):
65 67 """Receiving a command in a single frame yields request to run it."""
66 68 reactor = makereactor()
67 69 stream = framing.stream(1)
68 70 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
69 71 self.assertEqual(len(results), 1)
70 72 self.assertaction(results[0], b'runcommand')
71 73 self.assertEqual(results[0][1], {
72 74 b'requestid': 1,
73 75 b'command': b'mycommand',
74 76 b'args': {},
75 77 b'redirect': None,
76 78 b'data': None,
77 79 })
78 80
79 81 result = reactor.oninputeof()
80 82 self.assertaction(result, b'noop')
81 83
82 84 def test1argument(self):
83 85 reactor = makereactor()
84 86 stream = framing.stream(1)
85 87 results = list(sendcommandframes(reactor, stream, 41, b'mycommand',
86 88 {b'foo': b'bar'}))
87 89 self.assertEqual(len(results), 1)
88 90 self.assertaction(results[0], b'runcommand')
89 91 self.assertEqual(results[0][1], {
90 92 b'requestid': 41,
91 93 b'command': b'mycommand',
92 94 b'args': {b'foo': b'bar'},
93 95 b'redirect': None,
94 96 b'data': None,
95 97 })
96 98
97 99 def testmultiarguments(self):
98 100 reactor = makereactor()
99 101 stream = framing.stream(1)
100 102 results = list(sendcommandframes(reactor, stream, 1, b'mycommand',
101 103 {b'foo': b'bar', b'biz': b'baz'}))
102 104 self.assertEqual(len(results), 1)
103 105 self.assertaction(results[0], b'runcommand')
104 106 self.assertEqual(results[0][1], {
105 107 b'requestid': 1,
106 108 b'command': b'mycommand',
107 109 b'args': {b'foo': b'bar', b'biz': b'baz'},
108 110 b'redirect': None,
109 111 b'data': None,
110 112 })
111 113
112 114 def testsimplecommanddata(self):
113 115 reactor = makereactor()
114 116 stream = framing.stream(1)
115 117 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {},
116 118 util.bytesio(b'data!')))
117 119 self.assertEqual(len(results), 2)
118 120 self.assertaction(results[0], b'wantframe')
119 121 self.assertaction(results[1], b'runcommand')
120 122 self.assertEqual(results[1][1], {
121 123 b'requestid': 1,
122 124 b'command': b'mycommand',
123 125 b'args': {},
124 126 b'redirect': None,
125 127 b'data': b'data!',
126 128 })
127 129
128 130 def testmultipledataframes(self):
129 131 frames = [
130 132 ffs(b'1 1 stream-begin command-request new|have-data '
131 133 b"cbor:{b'name': b'mycommand'}"),
132 134 ffs(b'1 1 0 command-data continuation data1'),
133 135 ffs(b'1 1 0 command-data continuation data2'),
134 136 ffs(b'1 1 0 command-data eos data3'),
135 137 ]
136 138
137 139 reactor = makereactor()
138 140 results = list(sendframes(reactor, frames))
139 141 self.assertEqual(len(results), 4)
140 142 for i in range(3):
141 143 self.assertaction(results[i], b'wantframe')
142 144 self.assertaction(results[3], b'runcommand')
143 145 self.assertEqual(results[3][1], {
144 146 b'requestid': 1,
145 147 b'command': b'mycommand',
146 148 b'args': {},
147 149 b'redirect': None,
148 150 b'data': b'data1data2data3',
149 151 })
150 152
151 153 def testargumentanddata(self):
152 154 frames = [
153 155 ffs(b'1 1 stream-begin command-request new|have-data '
154 156 b"cbor:{b'name': b'command', b'args': {b'key': b'val',"
155 157 b"b'foo': b'bar'}}"),
156 158 ffs(b'1 1 0 command-data continuation value1'),
157 159 ffs(b'1 1 0 command-data eos value2'),
158 160 ]
159 161
160 162 reactor = makereactor()
161 163 results = list(sendframes(reactor, frames))
162 164
163 165 self.assertaction(results[-1], b'runcommand')
164 166 self.assertEqual(results[-1][1], {
165 167 b'requestid': 1,
166 168 b'command': b'command',
167 169 b'args': {
168 170 b'key': b'val',
169 171 b'foo': b'bar',
170 172 },
171 173 b'redirect': None,
172 174 b'data': b'value1value2',
173 175 })
174 176
175 177 def testnewandcontinuation(self):
176 178 result = self._sendsingleframe(makereactor(),
177 179 ffs(b'1 1 stream-begin command-request new|continuation '))
178 180 self.assertaction(result, b'error')
179 181 self.assertEqual(result[1], {
180 182 b'message': b'received command request frame with both new and '
181 183 b'continuation flags set',
182 184 })
183 185
184 186 def testneithernewnorcontinuation(self):
185 187 result = self._sendsingleframe(makereactor(),
186 188 ffs(b'1 1 stream-begin command-request 0 '))
187 189 self.assertaction(result, b'error')
188 190 self.assertEqual(result[1], {
189 191 b'message': b'received command request frame with neither new nor '
190 192 b'continuation flags set',
191 193 })
192 194
193 195 def testunexpectedcommanddata(self):
194 196 """Command data frame when not running a command is an error."""
195 197 result = self._sendsingleframe(makereactor(),
196 198 ffs(b'1 1 stream-begin command-data 0 ignored'))
197 199 self.assertaction(result, b'error')
198 200 self.assertEqual(result[1], {
199 201 b'message': b'expected sender protocol settings or command request '
200 202 b'frame; got 2',
201 203 })
202 204
203 205 def testunexpectedcommanddatareceiving(self):
204 206 """Same as above except the command is receiving."""
205 207 results = list(sendframes(makereactor(), [
206 208 ffs(b'1 1 stream-begin command-request new|more '
207 209 b"cbor:{b'name': b'ignored'}"),
208 210 ffs(b'1 1 0 command-data eos ignored'),
209 211 ]))
210 212
211 213 self.assertaction(results[0], b'wantframe')
212 214 self.assertaction(results[1], b'error')
213 215 self.assertEqual(results[1][1], {
214 216 b'message': b'received command data frame for request that is not '
215 217 b'expecting data: 1',
216 218 })
217 219
218 220 def testconflictingrequestidallowed(self):
219 221 """Multiple fully serviced commands with same request ID is allowed."""
220 222 reactor = makereactor()
221 223 results = []
222 224 outstream = reactor.makeoutputstream()
223 225 results.append(self._sendsingleframe(
224 226 reactor, ffs(b'1 1 stream-begin command-request new '
225 227 b"cbor:{b'name': b'command'}")))
226 228 result = reactor.oncommandresponseready(outstream, 1, b'response1')
227 229 self.assertaction(result, b'sendframes')
228 230 list(result[1][b'framegen'])
229 231 results.append(self._sendsingleframe(
230 232 reactor, ffs(b'1 1 stream-begin command-request new '
231 233 b"cbor:{b'name': b'command'}")))
232 234 result = reactor.oncommandresponseready(outstream, 1, b'response2')
233 235 self.assertaction(result, b'sendframes')
234 236 list(result[1][b'framegen'])
235 237 results.append(self._sendsingleframe(
236 238 reactor, ffs(b'1 1 stream-begin command-request new '
237 239 b"cbor:{b'name': b'command'}")))
238 240 result = reactor.oncommandresponseready(outstream, 1, b'response3')
239 241 self.assertaction(result, b'sendframes')
240 242 list(result[1][b'framegen'])
241 243
242 244 for i in range(3):
243 245 self.assertaction(results[i], b'runcommand')
244 246 self.assertEqual(results[i][1], {
245 247 b'requestid': 1,
246 248 b'command': b'command',
247 249 b'args': {},
248 250 b'redirect': None,
249 251 b'data': None,
250 252 })
251 253
252 254 def testconflictingrequestid(self):
253 255 """Request ID for new command matching in-flight command is illegal."""
254 256 results = list(sendframes(makereactor(), [
255 257 ffs(b'1 1 stream-begin command-request new|more '
256 258 b"cbor:{b'name': b'command'}"),
257 259 ffs(b'1 1 0 command-request new '
258 260 b"cbor:{b'name': b'command1'}"),
259 261 ]))
260 262
261 263 self.assertaction(results[0], b'wantframe')
262 264 self.assertaction(results[1], b'error')
263 265 self.assertEqual(results[1][1], {
264 266 b'message': b'request with ID 1 already received',
265 267 })
266 268
267 269 def testinterleavedcommands(self):
268 270 cbor1 = cbor.dumps({
269 271 b'name': b'command1',
270 272 b'args': {
271 273 b'foo': b'bar',
272 274 b'key1': b'val',
273 275 }
274 276 }, canonical=True)
275 277 cbor3 = cbor.dumps({
276 278 b'name': b'command3',
277 279 b'args': {
278 280 b'biz': b'baz',
279 281 b'key': b'val',
280 282 },
281 283 }, canonical=True)
282 284
283 285 results = list(sendframes(makereactor(), [
284 286 ffs(b'1 1 stream-begin command-request new|more %s' % cbor1[0:6]),
285 287 ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]),
286 288 ffs(b'1 1 0 command-request continuation|more %s' % cbor1[6:9]),
287 289 ffs(b'3 1 0 command-request continuation|more %s' % cbor3[10:13]),
288 290 ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]),
289 291 ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]),
290 292 ]))
291 293
292 294 self.assertEqual([t[0] for t in results], [
293 295 b'wantframe',
294 296 b'wantframe',
295 297 b'wantframe',
296 298 b'wantframe',
297 299 b'runcommand',
298 300 b'runcommand',
299 301 ])
300 302
301 303 self.assertEqual(results[4][1], {
302 304 b'requestid': 3,
303 305 b'command': b'command3',
304 306 b'args': {b'biz': b'baz', b'key': b'val'},
305 307 b'redirect': None,
306 308 b'data': None,
307 309 })
308 310 self.assertEqual(results[5][1], {
309 311 b'requestid': 1,
310 312 b'command': b'command1',
311 313 b'args': {b'foo': b'bar', b'key1': b'val'},
312 314 b'redirect': None,
313 315 b'data': None,
314 316 })
315 317
316 318 def testmissingcommanddataframe(self):
317 319 # The reactor doesn't currently handle partially received commands.
318 320 # So this test is failing to do anything with request 1.
319 321 frames = [
320 322 ffs(b'1 1 stream-begin command-request new|have-data '
321 323 b"cbor:{b'name': b'command1'}"),
322 324 ffs(b'3 1 0 command-request new '
323 325 b"cbor:{b'name': b'command2'}"),
324 326 ]
325 327 results = list(sendframes(makereactor(), frames))
326 328 self.assertEqual(len(results), 2)
327 329 self.assertaction(results[0], b'wantframe')
328 330 self.assertaction(results[1], b'runcommand')
329 331
330 332 def testmissingcommanddataframeflags(self):
331 333 frames = [
332 334 ffs(b'1 1 stream-begin command-request new|have-data '
333 335 b"cbor:{b'name': b'command1'}"),
334 336 ffs(b'1 1 0 command-data 0 data'),
335 337 ]
336 338 results = list(sendframes(makereactor(), frames))
337 339 self.assertEqual(len(results), 2)
338 340 self.assertaction(results[0], b'wantframe')
339 341 self.assertaction(results[1], b'error')
340 342 self.assertEqual(results[1][1], {
341 343 b'message': b'command data frame without flags',
342 344 })
343 345
344 346 def testframefornonreceivingrequest(self):
345 347 """Receiving a frame for a command that is not receiving is illegal."""
346 348 results = list(sendframes(makereactor(), [
347 349 ffs(b'1 1 stream-begin command-request new '
348 350 b"cbor:{b'name': b'command1'}"),
349 351 ffs(b'3 1 0 command-request new|have-data '
350 352 b"cbor:{b'name': b'command3'}"),
351 353 ffs(b'5 1 0 command-data eos ignored'),
352 354 ]))
353 355 self.assertaction(results[2], b'error')
354 356 self.assertEqual(results[2][1], {
355 357 b'message': b'received frame for request that is not receiving: 5',
356 358 })
357 359
358 360 def testsimpleresponse(self):
359 361 """Bytes response to command sends result frames."""
360 362 reactor = makereactor()
361 363 instream = framing.stream(1)
362 364 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
363 365
364 366 outstream = reactor.makeoutputstream()
365 367 result = reactor.oncommandresponseready(outstream, 1, b'response')
366 368 self.assertaction(result, b'sendframes')
367 369 self.assertframesequal(result[1][b'framegen'], [
368 370 b'1 2 stream-begin command-response eos %sresponse' % OK,
369 371 ])
370 372
371 373 def testmultiframeresponse(self):
372 374 """Bytes response spanning multiple frames is handled."""
373 375 first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
374 376 second = b'y' * 100
375 377
376 378 reactor = makereactor()
377 379 instream = framing.stream(1)
378 380 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
379 381
380 382 outstream = reactor.makeoutputstream()
381 383 result = reactor.oncommandresponseready(outstream, 1, first + second)
382 384 self.assertaction(result, b'sendframes')
383 385 self.assertframesequal(result[1][b'framegen'], [
384 386 b'1 2 stream-begin command-response continuation %s' % OK,
385 387 b'1 2 0 command-response continuation %s' % first,
386 388 b'1 2 0 command-response eos %s' % second,
387 389 ])
388 390
389 391 def testservererror(self):
390 392 reactor = makereactor()
391 393 instream = framing.stream(1)
392 394 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
393 395
394 396 outstream = reactor.makeoutputstream()
395 397 result = reactor.onservererror(outstream, 1, b'some message')
396 398 self.assertaction(result, b'sendframes')
397 399 self.assertframesequal(result[1][b'framegen'], [
398 400 b"1 2 stream-begin error-response 0 "
399 401 b"cbor:{b'type': b'server', "
400 402 b"b'message': [{b'msg': b'some message'}]}",
401 403 ])
402 404
403 405 def test1commanddeferresponse(self):
404 406 """Responses when in deferred output mode are delayed until EOF."""
405 407 reactor = makereactor(deferoutput=True)
406 408 instream = framing.stream(1)
407 409 results = list(sendcommandframes(reactor, instream, 1, b'mycommand',
408 410 {}))
409 411 self.assertEqual(len(results), 1)
410 412 self.assertaction(results[0], b'runcommand')
411 413
412 414 outstream = reactor.makeoutputstream()
413 415 result = reactor.oncommandresponseready(outstream, 1, b'response')
414 416 self.assertaction(result, b'noop')
415 417 result = reactor.oninputeof()
416 418 self.assertaction(result, b'sendframes')
417 419 self.assertframesequal(result[1][b'framegen'], [
418 420 b'1 2 stream-begin command-response eos %sresponse' % OK,
419 421 ])
420 422
421 423 def testmultiplecommanddeferresponse(self):
422 424 reactor = makereactor(deferoutput=True)
423 425 instream = framing.stream(1)
424 426 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
425 427 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
426 428
427 429 outstream = reactor.makeoutputstream()
428 430 result = reactor.oncommandresponseready(outstream, 1, b'response1')
429 431 self.assertaction(result, b'noop')
430 432 result = reactor.oncommandresponseready(outstream, 3, b'response2')
431 433 self.assertaction(result, b'noop')
432 434 result = reactor.oninputeof()
433 435 self.assertaction(result, b'sendframes')
434 436 self.assertframesequal(result[1][b'framegen'], [
435 437 b'1 2 stream-begin command-response eos %sresponse1' % OK,
436 438 b'3 2 0 command-response eos %sresponse2' % OK,
437 439 ])
438 440
439 441 def testrequestidtracking(self):
440 442 reactor = makereactor(deferoutput=True)
441 443 instream = framing.stream(1)
442 444 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
443 445 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
444 446 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
445 447
446 448 # Register results for commands out of order.
447 449 outstream = reactor.makeoutputstream()
448 450 reactor.oncommandresponseready(outstream, 3, b'response3')
449 451 reactor.oncommandresponseready(outstream, 1, b'response1')
450 452 reactor.oncommandresponseready(outstream, 5, b'response5')
451 453
452 454 result = reactor.oninputeof()
453 455 self.assertaction(result, b'sendframes')
454 456 self.assertframesequal(result[1][b'framegen'], [
455 457 b'3 2 stream-begin command-response eos %sresponse3' % OK,
456 458 b'1 2 0 command-response eos %sresponse1' % OK,
457 459 b'5 2 0 command-response eos %sresponse5' % OK,
458 460 ])
459 461
460 462 def testduplicaterequestonactivecommand(self):
461 463 """Receiving a request ID that matches a request that isn't finished."""
462 464 reactor = makereactor()
463 465 stream = framing.stream(1)
464 466 list(sendcommandframes(reactor, stream, 1, b'command1', {}))
465 467 results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
466 468
467 469 self.assertaction(results[0], b'error')
468 470 self.assertEqual(results[0][1], {
469 471 b'message': b'request with ID 1 is already active',
470 472 })
471 473
472 474 def testduplicaterequestonactivecommandnosend(self):
473 475 """Same as above but we've registered a response but haven't sent it."""
474 476 reactor = makereactor()
475 477 instream = framing.stream(1)
476 478 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
477 479 outstream = reactor.makeoutputstream()
478 480 reactor.oncommandresponseready(outstream, 1, b'response')
479 481
480 482 # We've registered the response but haven't sent it. From the
481 483 # perspective of the reactor, the command is still active.
482 484
483 485 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
484 486 self.assertaction(results[0], b'error')
485 487 self.assertEqual(results[0][1], {
486 488 b'message': b'request with ID 1 is already active',
487 489 })
488 490
489 491 def testduplicaterequestaftersend(self):
490 492 """We can use a duplicate request ID after we've sent the response."""
491 493 reactor = makereactor()
492 494 instream = framing.stream(1)
493 495 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
494 496 outstream = reactor.makeoutputstream()
495 497 res = reactor.oncommandresponseready(outstream, 1, b'response')
496 498 list(res[1][b'framegen'])
497 499
498 500 results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
499 501 self.assertaction(results[0], b'runcommand')
500 502
501 503 def testprotocolsettingsnoflags(self):
502 504 result = self._sendsingleframe(
503 505 makereactor(),
504 506 ffs(b'0 1 stream-begin sender-protocol-settings 0 '))
505 507 self.assertaction(result, b'error')
506 508 self.assertEqual(result[1], {
507 509 b'message': b'sender protocol settings frame must have '
508 510 b'continuation or end of stream flag set',
509 511 })
510 512
511 513 def testprotocolsettingsconflictflags(self):
512 514 result = self._sendsingleframe(
513 515 makereactor(),
514 516 ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '))
515 517 self.assertaction(result, b'error')
516 518 self.assertEqual(result[1], {
517 519 b'message': b'sender protocol settings frame cannot have both '
518 520 b'continuation and end of stream flags set',
519 521 })
520 522
521 523 def testprotocolsettingsemptypayload(self):
522 524 result = self._sendsingleframe(
523 525 makereactor(),
524 526 ffs(b'0 1 stream-begin sender-protocol-settings eos '))
525 527 self.assertaction(result, b'error')
526 528 self.assertEqual(result[1], {
527 529 b'message': b'sender protocol settings frame did not contain CBOR '
528 530 b'data',
529 531 })
530 532
531 533 def testprotocolsettingsmultipleobjects(self):
532 534 result = self._sendsingleframe(
533 535 makereactor(),
534 536 ffs(b'0 1 stream-begin sender-protocol-settings eos '
535 537 b'\x46foobar\x43foo'))
536 538 self.assertaction(result, b'error')
537 539 self.assertEqual(result[1], {
538 540 b'message': b'sender protocol settings frame contained multiple '
539 541 b'CBOR values',
540 542 })
541 543
542 544 def testprotocolsettingscontentencodings(self):
543 545 reactor = makereactor()
544 546
545 547 result = self._sendsingleframe(
546 548 reactor,
547 549 ffs(b'0 1 stream-begin sender-protocol-settings eos '
548 550 b'cbor:{b"contentencodings": [b"a", b"b"]}'))
549 551 self.assertaction(result, b'wantframe')
550 552
551 553 self.assertEqual(reactor._state, b'idle')
552 554 self.assertEqual(reactor._sendersettings[b'contentencodings'],
553 555 [b'a', b'b'])
554 556
555 557 def testprotocolsettingsmultipleframes(self):
556 558 reactor = makereactor()
557 559
558 560 data = b''.join(cborutil.streamencode({
559 561 b'contentencodings': [b'value1', b'value2'],
560 562 }))
561 563
562 564 results = list(sendframes(reactor, [
563 565 ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' %
564 566 data[0:5]),
565 567 ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
566 568 ]))
567 569
568 570 self.assertEqual(len(results), 2)
569 571
570 572 self.assertaction(results[0], b'wantframe')
571 573 self.assertaction(results[1], b'wantframe')
572 574
573 575 self.assertEqual(reactor._state, b'idle')
574 576 self.assertEqual(reactor._sendersettings[b'contentencodings'],
575 577 [b'value1', b'value2'])
576 578
577 579 def testprotocolsettingsbadcbor(self):
578 580 result = self._sendsingleframe(
579 581 makereactor(),
580 582 ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'))
581 583 self.assertaction(result, b'error')
582 584
583 585 def testprotocolsettingsnoninitial(self):
584 586 # Cannot have protocol settings frames as non-initial frames.
585 587 reactor = makereactor()
586 588
587 589 stream = framing.stream(1)
588 590 results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
589 591 self.assertEqual(len(results), 1)
590 592 self.assertaction(results[0], b'runcommand')
591 593
592 594 result = self._sendsingleframe(
593 595 reactor,
594 596 ffs(b'0 1 0 sender-protocol-settings eos '))
595 597 self.assertaction(result, b'error')
596 598 self.assertEqual(result[1], {
597 599 b'message': b'expected command request frame; got 8',
598 600 })
599 601
600 602 if __name__ == '__main__':
601 603 import silenttestrunner
602 604 silenttestrunner.main(__name__)
General Comments 0
You need to be logged in to leave comments. Login now