##// END OF EJS Templates
bundle2: only emit compressed chunks if they have data...
Gregory Szorc -
r30177:9626022f default
parent child Browse files
Show More
@@ -1,1622 +1,1626
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 pushkey,
162 162 pycompat,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 urlerr = util.urlerr
169 169 urlreq = util.urlreq
170 170
171 171 _pack = struct.pack
172 172 _unpack = struct.unpack
173 173
174 174 _fstreamparamsize = '>i'
175 175 _fpartheadersize = '>i'
176 176 _fparttypesize = '>B'
177 177 _fpartid = '>I'
178 178 _fpayloadsize = '>i'
179 179 _fpartparamcount = '>BB'
180 180
181 181 preferedchunksize = 4096
182 182
183 183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184 184
185 185 def outdebug(ui, message):
186 186 """debug regarding output stream (bundling)"""
187 187 if ui.configbool('devel', 'bundle2.debug', False):
188 188 ui.debug('bundle2-output: %s\n' % message)
189 189
190 190 def indebug(ui, message):
191 191 """debug on input stream (unbundling)"""
192 192 if ui.configbool('devel', 'bundle2.debug', False):
193 193 ui.debug('bundle2-input: %s\n' % message)
194 194
195 195 def validateparttype(parttype):
196 196 """raise ValueError if a parttype contains invalid character"""
197 197 if _parttypeforbidden.search(parttype):
198 198 raise ValueError(parttype)
199 199
200 200 def _makefpartparamsizes(nbparams):
201 201 """return a struct format to read part parameter sizes
202 202
203 203 The number parameters is variable so we need to build that format
204 204 dynamically.
205 205 """
206 206 return '>'+('BB'*nbparams)
207 207
208 208 parthandlermapping = {}
209 209
210 210 def parthandler(parttype, params=()):
211 211 """decorator that register a function as a bundle2 part handler
212 212
213 213 eg::
214 214
215 215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 216 def myparttypehandler(...):
217 217 '''process a part of type "my part".'''
218 218 ...
219 219 """
220 220 validateparttype(parttype)
221 221 def _decorator(func):
222 222 lparttype = parttype.lower() # enforce lower case matching.
223 223 assert lparttype not in parthandlermapping
224 224 parthandlermapping[lparttype] = func
225 225 func.params = frozenset(params)
226 226 return func
227 227 return _decorator
228 228
229 229 class unbundlerecords(object):
230 230 """keep record of what happens during and unbundle
231 231
232 232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 233 category of record and obj is an arbitrary object.
234 234
235 235 `records['cat']` will return all entries of this category 'cat'.
236 236
237 237 Iterating on the object itself will yield `('category', obj)` tuples
238 238 for all entries.
239 239
240 240 All iterations happens in chronological order.
241 241 """
242 242
243 243 def __init__(self):
244 244 self._categories = {}
245 245 self._sequences = []
246 246 self._replies = {}
247 247
248 248 def add(self, category, entry, inreplyto=None):
249 249 """add a new record of a given category.
250 250
251 251 The entry can then be retrieved in the list returned by
252 252 self['category']."""
253 253 self._categories.setdefault(category, []).append(entry)
254 254 self._sequences.append((category, entry))
255 255 if inreplyto is not None:
256 256 self.getreplies(inreplyto).add(category, entry)
257 257
258 258 def getreplies(self, partid):
259 259 """get the records that are replies to a specific part"""
260 260 return self._replies.setdefault(partid, unbundlerecords())
261 261
262 262 def __getitem__(self, cat):
263 263 return tuple(self._categories.get(cat, ()))
264 264
265 265 def __iter__(self):
266 266 return iter(self._sequences)
267 267
268 268 def __len__(self):
269 269 return len(self._sequences)
270 270
271 271 def __nonzero__(self):
272 272 return bool(self._sequences)
273 273
274 274 class bundleoperation(object):
275 275 """an object that represents a single bundling process
276 276
277 277 Its purpose is to carry unbundle-related objects and states.
278 278
279 279 A new object should be created at the beginning of each bundle processing.
280 280 The object is to be returned by the processing function.
281 281
282 282 The object has very little content now it will ultimately contain:
283 283 * an access to the repo the bundle is applied to,
284 284 * a ui object,
285 285 * a way to retrieve a transaction to add changes to the repo,
286 286 * a way to record the result of processing each part,
287 287 * a way to construct a bundle response when applicable.
288 288 """
289 289
290 290 def __init__(self, repo, transactiongetter, captureoutput=True):
291 291 self.repo = repo
292 292 self.ui = repo.ui
293 293 self.records = unbundlerecords()
294 294 self.gettransaction = transactiongetter
295 295 self.reply = None
296 296 self.captureoutput = captureoutput
297 297
298 298 class TransactionUnavailable(RuntimeError):
299 299 pass
300 300
301 301 def _notransaction():
302 302 """default method to get a transaction while processing a bundle
303 303
304 304 Raise an exception to highlight the fact that no transaction was expected
305 305 to be created"""
306 306 raise TransactionUnavailable()
307 307
308 308 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
309 309 # transform me into unbundler.apply() as soon as the freeze is lifted
310 310 tr.hookargs['bundle2'] = '1'
311 311 if source is not None and 'source' not in tr.hookargs:
312 312 tr.hookargs['source'] = source
313 313 if url is not None and 'url' not in tr.hookargs:
314 314 tr.hookargs['url'] = url
315 315 return processbundle(repo, unbundler, lambda: tr, op=op)
316 316
317 317 def processbundle(repo, unbundler, transactiongetter=None, op=None):
318 318 """This function process a bundle, apply effect to/from a repo
319 319
320 320 It iterates over each part then searches for and uses the proper handling
321 321 code to process the part. Parts are processed in order.
322 322
323 323 This is very early version of this function that will be strongly reworked
324 324 before final usage.
325 325
326 326 Unknown Mandatory part will abort the process.
327 327
328 328 It is temporarily possible to provide a prebuilt bundleoperation to the
329 329 function. This is used to ensure output is properly propagated in case of
330 330 an error during the unbundling. This output capturing part will likely be
331 331 reworked and this ability will probably go away in the process.
332 332 """
333 333 if op is None:
334 334 if transactiongetter is None:
335 335 transactiongetter = _notransaction
336 336 op = bundleoperation(repo, transactiongetter)
337 337 # todo:
338 338 # - replace this is a init function soon.
339 339 # - exception catching
340 340 unbundler.params
341 341 if repo.ui.debugflag:
342 342 msg = ['bundle2-input-bundle:']
343 343 if unbundler.params:
344 344 msg.append(' %i params')
345 345 if op.gettransaction is None:
346 346 msg.append(' no-transaction')
347 347 else:
348 348 msg.append(' with-transaction')
349 349 msg.append('\n')
350 350 repo.ui.debug(''.join(msg))
351 351 iterparts = enumerate(unbundler.iterparts())
352 352 part = None
353 353 nbpart = 0
354 354 try:
355 355 for nbpart, part in iterparts:
356 356 _processpart(op, part)
357 357 except Exception as exc:
358 358 for nbpart, part in iterparts:
359 359 # consume the bundle content
360 360 part.seek(0, 2)
361 361 # Small hack to let caller code distinguish exceptions from bundle2
362 362 # processing from processing the old format. This is mostly
363 363 # needed to handle different return codes to unbundle according to the
364 364 # type of bundle. We should probably clean up or drop this return code
365 365 # craziness in a future version.
366 366 exc.duringunbundle2 = True
367 367 salvaged = []
368 368 replycaps = None
369 369 if op.reply is not None:
370 370 salvaged = op.reply.salvageoutput()
371 371 replycaps = op.reply.capabilities
372 372 exc._replycaps = replycaps
373 373 exc._bundle2salvagedoutput = salvaged
374 374 raise
375 375 finally:
376 376 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
377 377
378 378 return op
379 379
380 380 def _processpart(op, part):
381 381 """process a single part from a bundle
382 382
383 383 The part is guaranteed to have been fully consumed when the function exits
384 384 (even if an exception is raised)."""
385 385 status = 'unknown' # used by debug output
386 386 hardabort = False
387 387 try:
388 388 try:
389 389 handler = parthandlermapping.get(part.type)
390 390 if handler is None:
391 391 status = 'unsupported-type'
392 392 raise error.BundleUnknownFeatureError(parttype=part.type)
393 393 indebug(op.ui, 'found a handler for part %r' % part.type)
394 394 unknownparams = part.mandatorykeys - handler.params
395 395 if unknownparams:
396 396 unknownparams = list(unknownparams)
397 397 unknownparams.sort()
398 398 status = 'unsupported-params (%s)' % unknownparams
399 399 raise error.BundleUnknownFeatureError(parttype=part.type,
400 400 params=unknownparams)
401 401 status = 'supported'
402 402 except error.BundleUnknownFeatureError as exc:
403 403 if part.mandatory: # mandatory parts
404 404 raise
405 405 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
406 406 return # skip to part processing
407 407 finally:
408 408 if op.ui.debugflag:
409 409 msg = ['bundle2-input-part: "%s"' % part.type]
410 410 if not part.mandatory:
411 411 msg.append(' (advisory)')
412 412 nbmp = len(part.mandatorykeys)
413 413 nbap = len(part.params) - nbmp
414 414 if nbmp or nbap:
415 415 msg.append(' (params:')
416 416 if nbmp:
417 417 msg.append(' %i mandatory' % nbmp)
418 418 if nbap:
419 419 msg.append(' %i advisory' % nbmp)
420 420 msg.append(')')
421 421 msg.append(' %s\n' % status)
422 422 op.ui.debug(''.join(msg))
423 423
424 424 # handler is called outside the above try block so that we don't
425 425 # risk catching KeyErrors from anything other than the
426 426 # parthandlermapping lookup (any KeyError raised by handler()
427 427 # itself represents a defect of a different variety).
428 428 output = None
429 429 if op.captureoutput and op.reply is not None:
430 430 op.ui.pushbuffer(error=True, subproc=True)
431 431 output = ''
432 432 try:
433 433 handler(op, part)
434 434 finally:
435 435 if output is not None:
436 436 output = op.ui.popbuffer()
437 437 if output:
438 438 outpart = op.reply.newpart('output', data=output,
439 439 mandatory=False)
440 440 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
441 441 # If exiting or interrupted, do not attempt to seek the stream in the
442 442 # finally block below. This makes abort faster.
443 443 except (SystemExit, KeyboardInterrupt):
444 444 hardabort = True
445 445 raise
446 446 finally:
447 447 # consume the part content to not corrupt the stream.
448 448 if not hardabort:
449 449 part.seek(0, 2)
450 450
451 451
452 452 def decodecaps(blob):
453 453 """decode a bundle2 caps bytes blob into a dictionary
454 454
455 455 The blob is a list of capabilities (one per line)
456 456 Capabilities may have values using a line of the form::
457 457
458 458 capability=value1,value2,value3
459 459
460 460 The values are always a list."""
461 461 caps = {}
462 462 for line in blob.splitlines():
463 463 if not line:
464 464 continue
465 465 if '=' not in line:
466 466 key, vals = line, ()
467 467 else:
468 468 key, vals = line.split('=', 1)
469 469 vals = vals.split(',')
470 470 key = urlreq.unquote(key)
471 471 vals = [urlreq.unquote(v) for v in vals]
472 472 caps[key] = vals
473 473 return caps
474 474
475 475 def encodecaps(caps):
476 476 """encode a bundle2 caps dictionary into a bytes blob"""
477 477 chunks = []
478 478 for ca in sorted(caps):
479 479 vals = caps[ca]
480 480 ca = urlreq.quote(ca)
481 481 vals = [urlreq.quote(v) for v in vals]
482 482 if vals:
483 483 ca = "%s=%s" % (ca, ','.join(vals))
484 484 chunks.append(ca)
485 485 return '\n'.join(chunks)
486 486
487 487 bundletypes = {
488 488 "": ("", None), # only when using unbundle on ssh and old http servers
489 489 # since the unification ssh accepts a header but there
490 490 # is no capability signaling it.
491 491 "HG20": (), # special-cased below
492 492 "HG10UN": ("HG10UN", None),
493 493 "HG10BZ": ("HG10", 'BZ'),
494 494 "HG10GZ": ("HG10GZ", 'GZ'),
495 495 }
496 496
497 497 # hgweb uses this list to communicate its preferred type
498 498 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
499 499
500 500 class bundle20(object):
501 501 """represent an outgoing bundle2 container
502 502
503 503 Use the `addparam` method to add stream level parameter. and `newpart` to
504 504 populate it. Then call `getchunks` to retrieve all the binary chunks of
505 505 data that compose the bundle2 container."""
506 506
507 507 _magicstring = 'HG20'
508 508
509 509 def __init__(self, ui, capabilities=()):
510 510 self.ui = ui
511 511 self._params = []
512 512 self._parts = []
513 513 self.capabilities = dict(capabilities)
514 514 self._compressor = util.compressors[None]()
515 515
516 516 def setcompression(self, alg):
517 517 """setup core part compression to <alg>"""
518 518 if alg is None:
519 519 return
520 520 assert not any(n.lower() == 'Compression' for n, v in self._params)
521 521 self.addparam('Compression', alg)
522 522 self._compressor = util.compressors[alg]()
523 523
524 524 @property
525 525 def nbparts(self):
526 526 """total number of parts added to the bundler"""
527 527 return len(self._parts)
528 528
529 529 # methods used to defines the bundle2 content
530 530 def addparam(self, name, value=None):
531 531 """add a stream level parameter"""
532 532 if not name:
533 533 raise ValueError('empty parameter name')
534 534 if name[0] not in string.letters:
535 535 raise ValueError('non letter first character: %r' % name)
536 536 self._params.append((name, value))
537 537
538 538 def addpart(self, part):
539 539 """add a new part to the bundle2 container
540 540
541 541 Parts contains the actual applicative payload."""
542 542 assert part.id is None
543 543 part.id = len(self._parts) # very cheap counter
544 544 self._parts.append(part)
545 545
546 546 def newpart(self, typeid, *args, **kwargs):
547 547 """create a new part and add it to the containers
548 548
549 549 As the part is directly added to the containers. For now, this means
550 550 that any failure to properly initialize the part after calling
551 551 ``newpart`` should result in a failure of the whole bundling process.
552 552
553 553 You can still fall back to manually create and add if you need better
554 554 control."""
555 555 part = bundlepart(typeid, *args, **kwargs)
556 556 self.addpart(part)
557 557 return part
558 558
559 559 # methods used to generate the bundle2 stream
560 560 def getchunks(self):
561 561 if self.ui.debugflag:
562 562 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
563 563 if self._params:
564 564 msg.append(' (%i params)' % len(self._params))
565 565 msg.append(' %i parts total\n' % len(self._parts))
566 566 self.ui.debug(''.join(msg))
567 567 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
568 568 yield self._magicstring
569 569 param = self._paramchunk()
570 570 outdebug(self.ui, 'bundle parameter: %s' % param)
571 571 yield _pack(_fstreamparamsize, len(param))
572 572 if param:
573 573 yield param
574 574 # starting compression
575 575 for chunk in self._getcorechunk():
576 yield self._compressor.compress(chunk)
576 data = self._compressor.compress(chunk)
577 if data:
578 yield data
577 579 yield self._compressor.flush()
578 580
579 581 def _paramchunk(self):
580 582 """return a encoded version of all stream parameters"""
581 583 blocks = []
582 584 for par, value in self._params:
583 585 par = urlreq.quote(par)
584 586 if value is not None:
585 587 value = urlreq.quote(value)
586 588 par = '%s=%s' % (par, value)
587 589 blocks.append(par)
588 590 return ' '.join(blocks)
589 591
590 592 def _getcorechunk(self):
591 593 """yield chunk for the core part of the bundle
592 594
593 595 (all but headers and parameters)"""
594 596 outdebug(self.ui, 'start of parts')
595 597 for part in self._parts:
596 598 outdebug(self.ui, 'bundle part: "%s"' % part.type)
597 599 for chunk in part.getchunks(ui=self.ui):
598 600 yield chunk
599 601 outdebug(self.ui, 'end of bundle')
600 602 yield _pack(_fpartheadersize, 0)
601 603
602 604
603 605 def salvageoutput(self):
604 606 """return a list with a copy of all output parts in the bundle
605 607
606 608 This is meant to be used during error handling to make sure we preserve
607 609 server output"""
608 610 salvaged = []
609 611 for part in self._parts:
610 612 if part.type.startswith('output'):
611 613 salvaged.append(part.copy())
612 614 return salvaged
613 615
614 616
615 617 class unpackermixin(object):
616 618 """A mixin to extract bytes and struct data from a stream"""
617 619
618 620 def __init__(self, fp):
619 621 self._fp = fp
620 622 self._seekable = (util.safehasattr(fp, 'seek') and
621 623 util.safehasattr(fp, 'tell'))
622 624
623 625 def _unpack(self, format):
624 626 """unpack this struct format from the stream"""
625 627 data = self._readexact(struct.calcsize(format))
626 628 return _unpack(format, data)
627 629
628 630 def _readexact(self, size):
629 631 """read exactly <size> bytes from the stream"""
630 632 return changegroup.readexactly(self._fp, size)
631 633
632 634 def seek(self, offset, whence=0):
633 635 """move the underlying file pointer"""
634 636 if self._seekable:
635 637 return self._fp.seek(offset, whence)
636 638 else:
637 639 raise NotImplementedError(_('File pointer is not seekable'))
638 640
639 641 def tell(self):
640 642 """return the file offset, or None if file is not seekable"""
641 643 if self._seekable:
642 644 try:
643 645 return self._fp.tell()
644 646 except IOError as e:
645 647 if e.errno == errno.ESPIPE:
646 648 self._seekable = False
647 649 else:
648 650 raise
649 651 return None
650 652
651 653 def close(self):
652 654 """close underlying file"""
653 655 if util.safehasattr(self._fp, 'close'):
654 656 return self._fp.close()
655 657
656 658 def getunbundler(ui, fp, magicstring=None):
657 659 """return a valid unbundler object for a given magicstring"""
658 660 if magicstring is None:
659 661 magicstring = changegroup.readexactly(fp, 4)
660 662 magic, version = magicstring[0:2], magicstring[2:4]
661 663 if magic != 'HG':
662 664 raise error.Abort(_('not a Mercurial bundle'))
663 665 unbundlerclass = formatmap.get(version)
664 666 if unbundlerclass is None:
665 667 raise error.Abort(_('unknown bundle version %s') % version)
666 668 unbundler = unbundlerclass(ui, fp)
667 669 indebug(ui, 'start processing of %s stream' % magicstring)
668 670 return unbundler
669 671
670 672 class unbundle20(unpackermixin):
671 673 """interpret a bundle2 stream
672 674
673 675 This class is fed with a binary stream and yields parts through its
674 676 `iterparts` methods."""
675 677
676 678 _magicstring = 'HG20'
677 679
678 680 def __init__(self, ui, fp):
679 681 """If header is specified, we do not read it out of the stream."""
680 682 self.ui = ui
681 683 self._decompressor = util.decompressors[None]
682 684 self._compressed = None
683 685 super(unbundle20, self).__init__(fp)
684 686
685 687 @util.propertycache
686 688 def params(self):
687 689 """dictionary of stream level parameters"""
688 690 indebug(self.ui, 'reading bundle2 stream parameters')
689 691 params = {}
690 692 paramssize = self._unpack(_fstreamparamsize)[0]
691 693 if paramssize < 0:
692 694 raise error.BundleValueError('negative bundle param size: %i'
693 695 % paramssize)
694 696 if paramssize:
695 697 params = self._readexact(paramssize)
696 698 params = self._processallparams(params)
697 699 return params
698 700
699 701 def _processallparams(self, paramsblock):
700 702 """"""
701 703 params = util.sortdict()
702 704 for p in paramsblock.split(' '):
703 705 p = p.split('=', 1)
704 706 p = [urlreq.unquote(i) for i in p]
705 707 if len(p) < 2:
706 708 p.append(None)
707 709 self._processparam(*p)
708 710 params[p[0]] = p[1]
709 711 return params
710 712
711 713
712 714 def _processparam(self, name, value):
713 715 """process a parameter, applying its effect if needed
714 716
715 717 Parameter starting with a lower case letter are advisory and will be
716 718 ignored when unknown. Those starting with an upper case letter are
717 719 mandatory and will this function will raise a KeyError when unknown.
718 720
719 721 Note: no option are currently supported. Any input will be either
720 722 ignored or failing.
721 723 """
722 724 if not name:
723 725 raise ValueError('empty parameter name')
724 726 if name[0] not in string.letters:
725 727 raise ValueError('non letter first character: %r' % name)
726 728 try:
727 729 handler = b2streamparamsmap[name.lower()]
728 730 except KeyError:
729 731 if name[0].islower():
730 732 indebug(self.ui, "ignoring unknown parameter %r" % name)
731 733 else:
732 734 raise error.BundleUnknownFeatureError(params=(name,))
733 735 else:
734 736 handler(self, name, value)
735 737
736 738 def _forwardchunks(self):
737 739 """utility to transfer a bundle2 as binary
738 740
739 741 This is made necessary by the fact the 'getbundle' command over 'ssh'
740 742 have no way to know then the reply end, relying on the bundle to be
741 743 interpreted to know its end. This is terrible and we are sorry, but we
742 744 needed to move forward to get general delta enabled.
743 745 """
744 746 yield self._magicstring
745 747 assert 'params' not in vars(self)
746 748 paramssize = self._unpack(_fstreamparamsize)[0]
747 749 if paramssize < 0:
748 750 raise error.BundleValueError('negative bundle param size: %i'
749 751 % paramssize)
750 752 yield _pack(_fstreamparamsize, paramssize)
751 753 if paramssize:
752 754 params = self._readexact(paramssize)
753 755 self._processallparams(params)
754 756 yield params
755 757 assert self._decompressor is util.decompressors[None]
756 758 # From there, payload might need to be decompressed
757 759 self._fp = self._decompressor(self._fp)
758 760 emptycount = 0
759 761 while emptycount < 2:
760 762 # so we can brainlessly loop
761 763 assert _fpartheadersize == _fpayloadsize
762 764 size = self._unpack(_fpartheadersize)[0]
763 765 yield _pack(_fpartheadersize, size)
764 766 if size:
765 767 emptycount = 0
766 768 else:
767 769 emptycount += 1
768 770 continue
769 771 if size == flaginterrupt:
770 772 continue
771 773 elif size < 0:
772 774 raise error.BundleValueError('negative chunk size: %i')
773 775 yield self._readexact(size)
774 776
775 777
776 778 def iterparts(self):
777 779 """yield all parts contained in the stream"""
778 780 # make sure param have been loaded
779 781 self.params
780 782 # From there, payload need to be decompressed
781 783 self._fp = self._decompressor(self._fp)
782 784 indebug(self.ui, 'start extraction of bundle2 parts')
783 785 headerblock = self._readpartheader()
784 786 while headerblock is not None:
785 787 part = unbundlepart(self.ui, headerblock, self._fp)
786 788 yield part
787 789 part.seek(0, 2)
788 790 headerblock = self._readpartheader()
789 791 indebug(self.ui, 'end of bundle2 stream')
790 792
791 793 def _readpartheader(self):
792 794 """reads a part header size and return the bytes blob
793 795
794 796 returns None if empty"""
795 797 headersize = self._unpack(_fpartheadersize)[0]
796 798 if headersize < 0:
797 799 raise error.BundleValueError('negative part header size: %i'
798 800 % headersize)
799 801 indebug(self.ui, 'part header size: %i' % headersize)
800 802 if headersize:
801 803 return self._readexact(headersize)
802 804 return None
803 805
804 806 def compressed(self):
805 807 self.params # load params
806 808 return self._compressed
807 809
808 810 formatmap = {'20': unbundle20}
809 811
810 812 b2streamparamsmap = {}
811 813
812 814 def b2streamparamhandler(name):
813 815 """register a handler for a stream level parameter"""
814 816 def decorator(func):
815 817 assert name not in formatmap
816 818 b2streamparamsmap[name] = func
817 819 return func
818 820 return decorator
819 821
820 822 @b2streamparamhandler('compression')
821 823 def processcompression(unbundler, param, value):
822 824 """read compression parameter and install payload decompression"""
823 825 if value not in util.decompressors:
824 826 raise error.BundleUnknownFeatureError(params=(param,),
825 827 values=(value,))
826 828 unbundler._decompressor = util.decompressors[value]
827 829 if value is not None:
828 830 unbundler._compressed = True
829 831
830 832 class bundlepart(object):
831 833 """A bundle2 part contains application level payload
832 834
833 835 The part `type` is used to route the part to the application level
834 836 handler.
835 837
836 838 The part payload is contained in ``part.data``. It could be raw bytes or a
837 839 generator of byte chunks.
838 840
839 841 You can add parameters to the part using the ``addparam`` method.
840 842 Parameters can be either mandatory (default) or advisory. Remote side
841 843 should be able to safely ignore the advisory ones.
842 844
843 845 Both data and parameters cannot be modified after the generation has begun.
844 846 """
845 847
846 848 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
847 849 data='', mandatory=True):
848 850 validateparttype(parttype)
849 851 self.id = None
850 852 self.type = parttype
851 853 self._data = data
852 854 self._mandatoryparams = list(mandatoryparams)
853 855 self._advisoryparams = list(advisoryparams)
854 856 # checking for duplicated entries
855 857 self._seenparams = set()
856 858 for pname, __ in self._mandatoryparams + self._advisoryparams:
857 859 if pname in self._seenparams:
858 860 raise RuntimeError('duplicated params: %s' % pname)
859 861 self._seenparams.add(pname)
860 862 # status of the part's generation:
861 863 # - None: not started,
862 864 # - False: currently generated,
863 865 # - True: generation done.
864 866 self._generated = None
865 867 self.mandatory = mandatory
866 868
867 869 def copy(self):
868 870 """return a copy of the part
869 871
870 872 The new part have the very same content but no partid assigned yet.
871 873 Parts with generated data cannot be copied."""
872 874 assert not util.safehasattr(self.data, 'next')
873 875 return self.__class__(self.type, self._mandatoryparams,
874 876 self._advisoryparams, self._data, self.mandatory)
875 877
876 878 # methods used to defines the part content
877 879 @property
878 880 def data(self):
879 881 return self._data
880 882
881 883 @data.setter
882 884 def data(self, data):
883 885 if self._generated is not None:
884 886 raise error.ReadOnlyPartError('part is being generated')
885 887 self._data = data
886 888
887 889 @property
888 890 def mandatoryparams(self):
889 891 # make it an immutable tuple to force people through ``addparam``
890 892 return tuple(self._mandatoryparams)
891 893
892 894 @property
893 895 def advisoryparams(self):
894 896 # make it an immutable tuple to force people through ``addparam``
895 897 return tuple(self._advisoryparams)
896 898
897 899 def addparam(self, name, value='', mandatory=True):
898 900 if self._generated is not None:
899 901 raise error.ReadOnlyPartError('part is being generated')
900 902 if name in self._seenparams:
901 903 raise ValueError('duplicated params: %s' % name)
902 904 self._seenparams.add(name)
903 905 params = self._advisoryparams
904 906 if mandatory:
905 907 params = self._mandatoryparams
906 908 params.append((name, value))
907 909
908 910 # methods used to generates the bundle2 stream
909 911 def getchunks(self, ui):
910 912 if self._generated is not None:
911 913 raise RuntimeError('part can only be consumed once')
912 914 self._generated = False
913 915
914 916 if ui.debugflag:
915 917 msg = ['bundle2-output-part: "%s"' % self.type]
916 918 if not self.mandatory:
917 919 msg.append(' (advisory)')
918 920 nbmp = len(self.mandatoryparams)
919 921 nbap = len(self.advisoryparams)
920 922 if nbmp or nbap:
921 923 msg.append(' (params:')
922 924 if nbmp:
923 925 msg.append(' %i mandatory' % nbmp)
924 926 if nbap:
925 927 msg.append(' %i advisory' % nbmp)
926 928 msg.append(')')
927 929 if not self.data:
928 930 msg.append(' empty payload')
929 931 elif util.safehasattr(self.data, 'next'):
930 932 msg.append(' streamed payload')
931 933 else:
932 934 msg.append(' %i bytes payload' % len(self.data))
933 935 msg.append('\n')
934 936 ui.debug(''.join(msg))
935 937
936 938 #### header
937 939 if self.mandatory:
938 940 parttype = self.type.upper()
939 941 else:
940 942 parttype = self.type.lower()
941 943 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
942 944 ## parttype
943 945 header = [_pack(_fparttypesize, len(parttype)),
944 946 parttype, _pack(_fpartid, self.id),
945 947 ]
946 948 ## parameters
947 949 # count
948 950 manpar = self.mandatoryparams
949 951 advpar = self.advisoryparams
950 952 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
951 953 # size
952 954 parsizes = []
953 955 for key, value in manpar:
954 956 parsizes.append(len(key))
955 957 parsizes.append(len(value))
956 958 for key, value in advpar:
957 959 parsizes.append(len(key))
958 960 parsizes.append(len(value))
959 961 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
960 962 header.append(paramsizes)
961 963 # key, value
962 964 for key, value in manpar:
963 965 header.append(key)
964 966 header.append(value)
965 967 for key, value in advpar:
966 968 header.append(key)
967 969 header.append(value)
968 970 ## finalize header
969 971 headerchunk = ''.join(header)
970 972 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
971 973 yield _pack(_fpartheadersize, len(headerchunk))
972 974 yield headerchunk
973 975 ## payload
974 976 try:
975 977 for chunk in self._payloadchunks():
976 978 outdebug(ui, 'payload chunk size: %i' % len(chunk))
977 979 yield _pack(_fpayloadsize, len(chunk))
978 980 yield chunk
979 981 except GeneratorExit:
980 982 # GeneratorExit means that nobody is listening for our
981 983 # results anyway, so just bail quickly rather than trying
982 984 # to produce an error part.
983 985 ui.debug('bundle2-generatorexit\n')
984 986 raise
985 987 except BaseException as exc:
986 988 # backup exception data for later
987 989 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
988 990 % exc)
989 991 exc_info = sys.exc_info()
990 992 msg = 'unexpected error: %s' % exc
991 993 interpart = bundlepart('error:abort', [('message', msg)],
992 994 mandatory=False)
993 995 interpart.id = 0
994 996 yield _pack(_fpayloadsize, -1)
995 997 for chunk in interpart.getchunks(ui=ui):
996 998 yield chunk
997 999 outdebug(ui, 'closing payload chunk')
998 1000 # abort current part payload
999 1001 yield _pack(_fpayloadsize, 0)
1000 1002 if pycompat.ispy3:
1001 1003 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1002 1004 else:
1003 1005 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1004 1006 # end of payload
1005 1007 outdebug(ui, 'closing payload chunk')
1006 1008 yield _pack(_fpayloadsize, 0)
1007 1009 self._generated = True
1008 1010
1009 1011 def _payloadchunks(self):
1010 1012 """yield chunks of a the part payload
1011 1013
1012 1014 Exists to handle the different methods to provide data to a part."""
1013 1015 # we only support fixed size data now.
1014 1016 # This will be improved in the future.
1015 1017 if util.safehasattr(self.data, 'next'):
1016 1018 buff = util.chunkbuffer(self.data)
1017 1019 chunk = buff.read(preferedchunksize)
1018 1020 while chunk:
1019 1021 yield chunk
1020 1022 chunk = buff.read(preferedchunksize)
1021 1023 elif len(self.data):
1022 1024 yield self.data
1023 1025
1024 1026
1025 1027 flaginterrupt = -1
1026 1028
1027 1029 class interrupthandler(unpackermixin):
1028 1030 """read one part and process it with restricted capability
1029 1031
1030 1032 This allows to transmit exception raised on the producer size during part
1031 1033 iteration while the consumer is reading a part.
1032 1034
1033 1035 Part processed in this manner only have access to a ui object,"""
1034 1036
1035 1037 def __init__(self, ui, fp):
1036 1038 super(interrupthandler, self).__init__(fp)
1037 1039 self.ui = ui
1038 1040
1039 1041 def _readpartheader(self):
1040 1042 """reads a part header size and return the bytes blob
1041 1043
1042 1044 returns None if empty"""
1043 1045 headersize = self._unpack(_fpartheadersize)[0]
1044 1046 if headersize < 0:
1045 1047 raise error.BundleValueError('negative part header size: %i'
1046 1048 % headersize)
1047 1049 indebug(self.ui, 'part header size: %i\n' % headersize)
1048 1050 if headersize:
1049 1051 return self._readexact(headersize)
1050 1052 return None
1051 1053
1052 1054 def __call__(self):
1053 1055
1054 1056 self.ui.debug('bundle2-input-stream-interrupt:'
1055 1057 ' opening out of band context\n')
1056 1058 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1057 1059 headerblock = self._readpartheader()
1058 1060 if headerblock is None:
1059 1061 indebug(self.ui, 'no part found during interruption.')
1060 1062 return
1061 1063 part = unbundlepart(self.ui, headerblock, self._fp)
1062 1064 op = interruptoperation(self.ui)
1063 1065 _processpart(op, part)
1064 1066 self.ui.debug('bundle2-input-stream-interrupt:'
1065 1067 ' closing out of band context\n')
1066 1068
1067 1069 class interruptoperation(object):
1068 1070 """A limited operation to be use by part handler during interruption
1069 1071
1070 1072 It only have access to an ui object.
1071 1073 """
1072 1074
1073 1075 def __init__(self, ui):
1074 1076 self.ui = ui
1075 1077 self.reply = None
1076 1078 self.captureoutput = False
1077 1079
1078 1080 @property
1079 1081 def repo(self):
1080 1082 raise RuntimeError('no repo access from stream interruption')
1081 1083
1082 1084 def gettransaction(self):
1083 1085 raise TransactionUnavailable('no repo access from stream interruption')
1084 1086
1085 1087 class unbundlepart(unpackermixin):
1086 1088 """a bundle part read from a bundle"""
1087 1089
1088 1090 def __init__(self, ui, header, fp):
1089 1091 super(unbundlepart, self).__init__(fp)
1090 1092 self.ui = ui
1091 1093 # unbundle state attr
1092 1094 self._headerdata = header
1093 1095 self._headeroffset = 0
1094 1096 self._initialized = False
1095 1097 self.consumed = False
1096 1098 # part data
1097 1099 self.id = None
1098 1100 self.type = None
1099 1101 self.mandatoryparams = None
1100 1102 self.advisoryparams = None
1101 1103 self.params = None
1102 1104 self.mandatorykeys = ()
1103 1105 self._payloadstream = None
1104 1106 self._readheader()
1105 1107 self._mandatory = None
1106 1108 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1107 1109 self._pos = 0
1108 1110
1109 1111 def _fromheader(self, size):
1110 1112 """return the next <size> byte from the header"""
1111 1113 offset = self._headeroffset
1112 1114 data = self._headerdata[offset:(offset + size)]
1113 1115 self._headeroffset = offset + size
1114 1116 return data
1115 1117
1116 1118 def _unpackheader(self, format):
1117 1119 """read given format from header
1118 1120
1119 1121 This automatically compute the size of the format to read."""
1120 1122 data = self._fromheader(struct.calcsize(format))
1121 1123 return _unpack(format, data)
1122 1124
1123 1125 def _initparams(self, mandatoryparams, advisoryparams):
1124 1126 """internal function to setup all logic related parameters"""
1125 1127 # make it read only to prevent people touching it by mistake.
1126 1128 self.mandatoryparams = tuple(mandatoryparams)
1127 1129 self.advisoryparams = tuple(advisoryparams)
1128 1130 # user friendly UI
1129 1131 self.params = util.sortdict(self.mandatoryparams)
1130 1132 self.params.update(self.advisoryparams)
1131 1133 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1132 1134
1133 1135 def _payloadchunks(self, chunknum=0):
1134 1136 '''seek to specified chunk and start yielding data'''
1135 1137 if len(self._chunkindex) == 0:
1136 1138 assert chunknum == 0, 'Must start with chunk 0'
1137 1139 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1138 1140 else:
1139 1141 assert chunknum < len(self._chunkindex), \
1140 1142 'Unknown chunk %d' % chunknum
1141 1143 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1142 1144
1143 1145 pos = self._chunkindex[chunknum][0]
1144 1146 payloadsize = self._unpack(_fpayloadsize)[0]
1145 1147 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1146 1148 while payloadsize:
1147 1149 if payloadsize == flaginterrupt:
1148 1150 # interruption detection, the handler will now read a
1149 1151 # single part and process it.
1150 1152 interrupthandler(self.ui, self._fp)()
1151 1153 elif payloadsize < 0:
1152 1154 msg = 'negative payload chunk size: %i' % payloadsize
1153 1155 raise error.BundleValueError(msg)
1154 1156 else:
1155 1157 result = self._readexact(payloadsize)
1156 1158 chunknum += 1
1157 1159 pos += payloadsize
1158 1160 if chunknum == len(self._chunkindex):
1159 1161 self._chunkindex.append((pos,
1160 1162 super(unbundlepart, self).tell()))
1161 1163 yield result
1162 1164 payloadsize = self._unpack(_fpayloadsize)[0]
1163 1165 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1164 1166
1165 1167 def _findchunk(self, pos):
1166 1168 '''for a given payload position, return a chunk number and offset'''
1167 1169 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1168 1170 if ppos == pos:
1169 1171 return chunk, 0
1170 1172 elif ppos > pos:
1171 1173 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1172 1174 raise ValueError('Unknown chunk')
1173 1175
1174 1176 def _readheader(self):
1175 1177 """read the header and setup the object"""
1176 1178 typesize = self._unpackheader(_fparttypesize)[0]
1177 1179 self.type = self._fromheader(typesize)
1178 1180 indebug(self.ui, 'part type: "%s"' % self.type)
1179 1181 self.id = self._unpackheader(_fpartid)[0]
1180 1182 indebug(self.ui, 'part id: "%s"' % self.id)
1181 1183 # extract mandatory bit from type
1182 1184 self.mandatory = (self.type != self.type.lower())
1183 1185 self.type = self.type.lower()
1184 1186 ## reading parameters
1185 1187 # param count
1186 1188 mancount, advcount = self._unpackheader(_fpartparamcount)
1187 1189 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1188 1190 # param size
1189 1191 fparamsizes = _makefpartparamsizes(mancount + advcount)
1190 1192 paramsizes = self._unpackheader(fparamsizes)
1191 1193 # make it a list of couple again
1192 1194 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1193 1195 # split mandatory from advisory
1194 1196 mansizes = paramsizes[:mancount]
1195 1197 advsizes = paramsizes[mancount:]
1196 1198 # retrieve param value
1197 1199 manparams = []
1198 1200 for key, value in mansizes:
1199 1201 manparams.append((self._fromheader(key), self._fromheader(value)))
1200 1202 advparams = []
1201 1203 for key, value in advsizes:
1202 1204 advparams.append((self._fromheader(key), self._fromheader(value)))
1203 1205 self._initparams(manparams, advparams)
1204 1206 ## part payload
1205 1207 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1206 1208 # we read the data, tell it
1207 1209 self._initialized = True
1208 1210
1209 1211 def read(self, size=None):
1210 1212 """read payload data"""
1211 1213 if not self._initialized:
1212 1214 self._readheader()
1213 1215 if size is None:
1214 1216 data = self._payloadstream.read()
1215 1217 else:
1216 1218 data = self._payloadstream.read(size)
1217 1219 self._pos += len(data)
1218 1220 if size is None or len(data) < size:
1219 1221 if not self.consumed and self._pos:
1220 1222 self.ui.debug('bundle2-input-part: total payload size %i\n'
1221 1223 % self._pos)
1222 1224 self.consumed = True
1223 1225 return data
1224 1226
1225 1227 def tell(self):
1226 1228 return self._pos
1227 1229
1228 1230 def seek(self, offset, whence=0):
1229 1231 if whence == 0:
1230 1232 newpos = offset
1231 1233 elif whence == 1:
1232 1234 newpos = self._pos + offset
1233 1235 elif whence == 2:
1234 1236 if not self.consumed:
1235 1237 self.read()
1236 1238 newpos = self._chunkindex[-1][0] - offset
1237 1239 else:
1238 1240 raise ValueError('Unknown whence value: %r' % (whence,))
1239 1241
1240 1242 if newpos > self._chunkindex[-1][0] and not self.consumed:
1241 1243 self.read()
1242 1244 if not 0 <= newpos <= self._chunkindex[-1][0]:
1243 1245 raise ValueError('Offset out of range')
1244 1246
1245 1247 if self._pos != newpos:
1246 1248 chunk, internaloffset = self._findchunk(newpos)
1247 1249 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1248 1250 adjust = self.read(internaloffset)
1249 1251 if len(adjust) != internaloffset:
1250 1252 raise error.Abort(_('Seek failed\n'))
1251 1253 self._pos = newpos
1252 1254
1253 1255 # These are only the static capabilities.
1254 1256 # Check the 'getrepocaps' function for the rest.
1255 1257 capabilities = {'HG20': (),
1256 1258 'error': ('abort', 'unsupportedcontent', 'pushraced',
1257 1259 'pushkey'),
1258 1260 'listkeys': (),
1259 1261 'pushkey': (),
1260 1262 'digests': tuple(sorted(util.DIGESTS.keys())),
1261 1263 'remote-changegroup': ('http', 'https'),
1262 1264 'hgtagsfnodes': (),
1263 1265 }
1264 1266
1265 1267 def getrepocaps(repo, allowpushback=False):
1266 1268 """return the bundle2 capabilities for a given repo
1267 1269
1268 1270 Exists to allow extensions (like evolution) to mutate the capabilities.
1269 1271 """
1270 1272 caps = capabilities.copy()
1271 1273 caps['changegroup'] = tuple(sorted(
1272 1274 changegroup.supportedincomingversions(repo)))
1273 1275 if obsolete.isenabled(repo, obsolete.exchangeopt):
1274 1276 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1275 1277 caps['obsmarkers'] = supportedformat
1276 1278 if allowpushback:
1277 1279 caps['pushback'] = ()
1278 1280 return caps
1279 1281
1280 1282 def bundle2caps(remote):
1281 1283 """return the bundle capabilities of a peer as dict"""
1282 1284 raw = remote.capable('bundle2')
1283 1285 if not raw and raw != '':
1284 1286 return {}
1285 1287 capsblob = urlreq.unquote(remote.capable('bundle2'))
1286 1288 return decodecaps(capsblob)
1287 1289
1288 1290 def obsmarkersversion(caps):
1289 1291 """extract the list of supported obsmarkers versions from a bundle2caps dict
1290 1292 """
1291 1293 obscaps = caps.get('obsmarkers', ())
1292 1294 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1293 1295
1294 1296 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1295 1297 """Write a bundle file and return its filename.
1296 1298
1297 1299 Existing files will not be overwritten.
1298 1300 If no filename is specified, a temporary file is created.
1299 1301 bz2 compression can be turned off.
1300 1302 The bundle file will be deleted in case of errors.
1301 1303 """
1302 1304
1303 1305 if bundletype == "HG20":
1304 1306 bundle = bundle20(ui)
1305 1307 bundle.setcompression(compression)
1306 1308 part = bundle.newpart('changegroup', data=cg.getchunks())
1307 1309 part.addparam('version', cg.version)
1308 1310 if 'clcount' in cg.extras:
1309 1311 part.addparam('nbchanges', str(cg.extras['clcount']),
1310 1312 mandatory=False)
1311 1313 chunkiter = bundle.getchunks()
1312 1314 else:
1313 1315 # compression argument is only for the bundle2 case
1314 1316 assert compression is None
1315 1317 if cg.version != '01':
1316 1318 raise error.Abort(_('old bundle types only supports v1 '
1317 1319 'changegroups'))
1318 1320 header, comp = bundletypes[bundletype]
1319 1321 if comp not in util.compressors:
1320 1322 raise error.Abort(_('unknown stream compression type: %s')
1321 1323 % comp)
1322 1324 z = util.compressors[comp]()
1323 1325 subchunkiter = cg.getchunks()
1324 1326 def chunkiter():
1325 1327 yield header
1326 1328 for chunk in subchunkiter:
1327 yield z.compress(chunk)
1329 data = z.compress(chunk)
1330 if data:
1331 yield data
1328 1332 yield z.flush()
1329 1333 chunkiter = chunkiter()
1330 1334
1331 1335 # parse the changegroup data, otherwise we will block
1332 1336 # in case of sshrepo because we don't know the end of the stream
1333 1337 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1334 1338
1335 1339 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1336 1340 def handlechangegroup(op, inpart):
1337 1341 """apply a changegroup part on the repo
1338 1342
1339 1343 This is a very early implementation that will massive rework before being
1340 1344 inflicted to any end-user.
1341 1345 """
1342 1346 # Make sure we trigger a transaction creation
1343 1347 #
1344 1348 # The addchangegroup function will get a transaction object by itself, but
1345 1349 # we need to make sure we trigger the creation of a transaction object used
1346 1350 # for the whole processing scope.
1347 1351 op.gettransaction()
1348 1352 unpackerversion = inpart.params.get('version', '01')
1349 1353 # We should raise an appropriate exception here
1350 1354 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1351 1355 # the source and url passed here are overwritten by the one contained in
1352 1356 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1353 1357 nbchangesets = None
1354 1358 if 'nbchanges' in inpart.params:
1355 1359 nbchangesets = int(inpart.params.get('nbchanges'))
1356 1360 if ('treemanifest' in inpart.params and
1357 1361 'treemanifest' not in op.repo.requirements):
1358 1362 if len(op.repo.changelog) != 0:
1359 1363 raise error.Abort(_(
1360 1364 "bundle contains tree manifests, but local repo is "
1361 1365 "non-empty and does not use tree manifests"))
1362 1366 op.repo.requirements.add('treemanifest')
1363 1367 op.repo._applyopenerreqs()
1364 1368 op.repo._writerequirements()
1365 1369 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1366 1370 op.records.add('changegroup', {'return': ret})
1367 1371 if op.reply is not None:
1368 1372 # This is definitely not the final form of this
1369 1373 # return. But one need to start somewhere.
1370 1374 part = op.reply.newpart('reply:changegroup', mandatory=False)
1371 1375 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1372 1376 part.addparam('return', '%i' % ret, mandatory=False)
1373 1377 assert not inpart.read()
1374 1378
1375 1379 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1376 1380 ['digest:%s' % k for k in util.DIGESTS.keys()])
1377 1381 @parthandler('remote-changegroup', _remotechangegroupparams)
1378 1382 def handleremotechangegroup(op, inpart):
1379 1383 """apply a bundle10 on the repo, given an url and validation information
1380 1384
1381 1385 All the information about the remote bundle to import are given as
1382 1386 parameters. The parameters include:
1383 1387 - url: the url to the bundle10.
1384 1388 - size: the bundle10 file size. It is used to validate what was
1385 1389 retrieved by the client matches the server knowledge about the bundle.
1386 1390 - digests: a space separated list of the digest types provided as
1387 1391 parameters.
1388 1392 - digest:<digest-type>: the hexadecimal representation of the digest with
1389 1393 that name. Like the size, it is used to validate what was retrieved by
1390 1394 the client matches what the server knows about the bundle.
1391 1395
1392 1396 When multiple digest types are given, all of them are checked.
1393 1397 """
1394 1398 try:
1395 1399 raw_url = inpart.params['url']
1396 1400 except KeyError:
1397 1401 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1398 1402 parsed_url = util.url(raw_url)
1399 1403 if parsed_url.scheme not in capabilities['remote-changegroup']:
1400 1404 raise error.Abort(_('remote-changegroup does not support %s urls') %
1401 1405 parsed_url.scheme)
1402 1406
1403 1407 try:
1404 1408 size = int(inpart.params['size'])
1405 1409 except ValueError:
1406 1410 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1407 1411 % 'size')
1408 1412 except KeyError:
1409 1413 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1410 1414
1411 1415 digests = {}
1412 1416 for typ in inpart.params.get('digests', '').split():
1413 1417 param = 'digest:%s' % typ
1414 1418 try:
1415 1419 value = inpart.params[param]
1416 1420 except KeyError:
1417 1421 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1418 1422 param)
1419 1423 digests[typ] = value
1420 1424
1421 1425 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1422 1426
1423 1427 # Make sure we trigger a transaction creation
1424 1428 #
1425 1429 # The addchangegroup function will get a transaction object by itself, but
1426 1430 # we need to make sure we trigger the creation of a transaction object used
1427 1431 # for the whole processing scope.
1428 1432 op.gettransaction()
1429 1433 from . import exchange
1430 1434 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1431 1435 if not isinstance(cg, changegroup.cg1unpacker):
1432 1436 raise error.Abort(_('%s: not a bundle version 1.0') %
1433 1437 util.hidepassword(raw_url))
1434 1438 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1435 1439 op.records.add('changegroup', {'return': ret})
1436 1440 if op.reply is not None:
1437 1441 # This is definitely not the final form of this
1438 1442 # return. But one need to start somewhere.
1439 1443 part = op.reply.newpart('reply:changegroup')
1440 1444 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1441 1445 part.addparam('return', '%i' % ret, mandatory=False)
1442 1446 try:
1443 1447 real_part.validate()
1444 1448 except error.Abort as e:
1445 1449 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1446 1450 (util.hidepassword(raw_url), str(e)))
1447 1451 assert not inpart.read()
1448 1452
1449 1453 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1450 1454 def handlereplychangegroup(op, inpart):
1451 1455 ret = int(inpart.params['return'])
1452 1456 replyto = int(inpart.params['in-reply-to'])
1453 1457 op.records.add('changegroup', {'return': ret}, replyto)
1454 1458
1455 1459 @parthandler('check:heads')
1456 1460 def handlecheckheads(op, inpart):
1457 1461 """check that head of the repo did not change
1458 1462
1459 1463 This is used to detect a push race when using unbundle.
1460 1464 This replaces the "heads" argument of unbundle."""
1461 1465 h = inpart.read(20)
1462 1466 heads = []
1463 1467 while len(h) == 20:
1464 1468 heads.append(h)
1465 1469 h = inpart.read(20)
1466 1470 assert not h
1467 1471 # Trigger a transaction so that we are guaranteed to have the lock now.
1468 1472 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1469 1473 op.gettransaction()
1470 1474 if sorted(heads) != sorted(op.repo.heads()):
1471 1475 raise error.PushRaced('repository changed while pushing - '
1472 1476 'please try again')
1473 1477
1474 1478 @parthandler('output')
1475 1479 def handleoutput(op, inpart):
1476 1480 """forward output captured on the server to the client"""
1477 1481 for line in inpart.read().splitlines():
1478 1482 op.ui.status(_('remote: %s\n') % line)
1479 1483
1480 1484 @parthandler('replycaps')
1481 1485 def handlereplycaps(op, inpart):
1482 1486 """Notify that a reply bundle should be created
1483 1487
1484 1488 The payload contains the capabilities information for the reply"""
1485 1489 caps = decodecaps(inpart.read())
1486 1490 if op.reply is None:
1487 1491 op.reply = bundle20(op.ui, caps)
1488 1492
1489 1493 class AbortFromPart(error.Abort):
1490 1494 """Sub-class of Abort that denotes an error from a bundle2 part."""
1491 1495
1492 1496 @parthandler('error:abort', ('message', 'hint'))
1493 1497 def handleerrorabort(op, inpart):
1494 1498 """Used to transmit abort error over the wire"""
1495 1499 raise AbortFromPart(inpart.params['message'],
1496 1500 hint=inpart.params.get('hint'))
1497 1501
1498 1502 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1499 1503 'in-reply-to'))
1500 1504 def handleerrorpushkey(op, inpart):
1501 1505 """Used to transmit failure of a mandatory pushkey over the wire"""
1502 1506 kwargs = {}
1503 1507 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1504 1508 value = inpart.params.get(name)
1505 1509 if value is not None:
1506 1510 kwargs[name] = value
1507 1511 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1508 1512
1509 1513 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1510 1514 def handleerrorunsupportedcontent(op, inpart):
1511 1515 """Used to transmit unknown content error over the wire"""
1512 1516 kwargs = {}
1513 1517 parttype = inpart.params.get('parttype')
1514 1518 if parttype is not None:
1515 1519 kwargs['parttype'] = parttype
1516 1520 params = inpart.params.get('params')
1517 1521 if params is not None:
1518 1522 kwargs['params'] = params.split('\0')
1519 1523
1520 1524 raise error.BundleUnknownFeatureError(**kwargs)
1521 1525
1522 1526 @parthandler('error:pushraced', ('message',))
1523 1527 def handleerrorpushraced(op, inpart):
1524 1528 """Used to transmit push race error over the wire"""
1525 1529 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1526 1530
1527 1531 @parthandler('listkeys', ('namespace',))
1528 1532 def handlelistkeys(op, inpart):
1529 1533 """retrieve pushkey namespace content stored in a bundle2"""
1530 1534 namespace = inpart.params['namespace']
1531 1535 r = pushkey.decodekeys(inpart.read())
1532 1536 op.records.add('listkeys', (namespace, r))
1533 1537
1534 1538 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1535 1539 def handlepushkey(op, inpart):
1536 1540 """process a pushkey request"""
1537 1541 dec = pushkey.decode
1538 1542 namespace = dec(inpart.params['namespace'])
1539 1543 key = dec(inpart.params['key'])
1540 1544 old = dec(inpart.params['old'])
1541 1545 new = dec(inpart.params['new'])
1542 1546 # Grab the transaction to ensure that we have the lock before performing the
1543 1547 # pushkey.
1544 1548 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1545 1549 op.gettransaction()
1546 1550 ret = op.repo.pushkey(namespace, key, old, new)
1547 1551 record = {'namespace': namespace,
1548 1552 'key': key,
1549 1553 'old': old,
1550 1554 'new': new}
1551 1555 op.records.add('pushkey', record)
1552 1556 if op.reply is not None:
1553 1557 rpart = op.reply.newpart('reply:pushkey')
1554 1558 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1555 1559 rpart.addparam('return', '%i' % ret, mandatory=False)
1556 1560 if inpart.mandatory and not ret:
1557 1561 kwargs = {}
1558 1562 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1559 1563 if key in inpart.params:
1560 1564 kwargs[key] = inpart.params[key]
1561 1565 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1562 1566
1563 1567 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1564 1568 def handlepushkeyreply(op, inpart):
1565 1569 """retrieve the result of a pushkey request"""
1566 1570 ret = int(inpart.params['return'])
1567 1571 partid = int(inpart.params['in-reply-to'])
1568 1572 op.records.add('pushkey', {'return': ret}, partid)
1569 1573
1570 1574 @parthandler('obsmarkers')
1571 1575 def handleobsmarker(op, inpart):
1572 1576 """add a stream of obsmarkers to the repo"""
1573 1577 tr = op.gettransaction()
1574 1578 markerdata = inpart.read()
1575 1579 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1576 1580 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1577 1581 % len(markerdata))
1578 1582 # The mergemarkers call will crash if marker creation is not enabled.
1579 1583 # we want to avoid this if the part is advisory.
1580 1584 if not inpart.mandatory and op.repo.obsstore.readonly:
1581 1585 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1582 1586 return
1583 1587 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1584 1588 if new:
1585 1589 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1586 1590 op.records.add('obsmarkers', {'new': new})
1587 1591 if op.reply is not None:
1588 1592 rpart = op.reply.newpart('reply:obsmarkers')
1589 1593 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1590 1594 rpart.addparam('new', '%i' % new, mandatory=False)
1591 1595
1592 1596
1593 1597 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1594 1598 def handleobsmarkerreply(op, inpart):
1595 1599 """retrieve the result of a pushkey request"""
1596 1600 ret = int(inpart.params['new'])
1597 1601 partid = int(inpart.params['in-reply-to'])
1598 1602 op.records.add('obsmarkers', {'new': ret}, partid)
1599 1603
1600 1604 @parthandler('hgtagsfnodes')
1601 1605 def handlehgtagsfnodes(op, inpart):
1602 1606 """Applies .hgtags fnodes cache entries to the local repo.
1603 1607
1604 1608 Payload is pairs of 20 byte changeset nodes and filenodes.
1605 1609 """
1606 1610 # Grab the transaction so we ensure that we have the lock at this point.
1607 1611 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1608 1612 op.gettransaction()
1609 1613 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1610 1614
1611 1615 count = 0
1612 1616 while True:
1613 1617 node = inpart.read(20)
1614 1618 fnode = inpart.read(20)
1615 1619 if len(node) < 20 or len(fnode) < 20:
1616 1620 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1617 1621 break
1618 1622 cache.setfnode(node, fnode)
1619 1623 count += 1
1620 1624
1621 1625 cache.write()
1622 1626 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now