##// END OF EJS Templates
bundle2: add documention to 'part.addparams'...
Pierre-Yves David -
r31861:6d055cd6 default
parent child Browse files
Show More
@@ -1,1626 +1,1633
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 pushkey,
162 162 pycompat,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 urlerr = util.urlerr
169 169 urlreq = util.urlreq
170 170
171 171 _pack = struct.pack
172 172 _unpack = struct.unpack
173 173
174 174 _fstreamparamsize = '>i'
175 175 _fpartheadersize = '>i'
176 176 _fparttypesize = '>B'
177 177 _fpartid = '>I'
178 178 _fpayloadsize = '>i'
179 179 _fpartparamcount = '>BB'
180 180
181 181 preferedchunksize = 4096
182 182
183 183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184 184
185 185 def outdebug(ui, message):
186 186 """debug regarding output stream (bundling)"""
187 187 if ui.configbool('devel', 'bundle2.debug', False):
188 188 ui.debug('bundle2-output: %s\n' % message)
189 189
190 190 def indebug(ui, message):
191 191 """debug on input stream (unbundling)"""
192 192 if ui.configbool('devel', 'bundle2.debug', False):
193 193 ui.debug('bundle2-input: %s\n' % message)
194 194
195 195 def validateparttype(parttype):
196 196 """raise ValueError if a parttype contains invalid character"""
197 197 if _parttypeforbidden.search(parttype):
198 198 raise ValueError(parttype)
199 199
200 200 def _makefpartparamsizes(nbparams):
201 201 """return a struct format to read part parameter sizes
202 202
203 203 The number parameters is variable so we need to build that format
204 204 dynamically.
205 205 """
206 206 return '>'+('BB'*nbparams)
207 207
208 208 parthandlermapping = {}
209 209
210 210 def parthandler(parttype, params=()):
211 211 """decorator that register a function as a bundle2 part handler
212 212
213 213 eg::
214 214
215 215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 216 def myparttypehandler(...):
217 217 '''process a part of type "my part".'''
218 218 ...
219 219 """
220 220 validateparttype(parttype)
221 221 def _decorator(func):
222 222 lparttype = parttype.lower() # enforce lower case matching.
223 223 assert lparttype not in parthandlermapping
224 224 parthandlermapping[lparttype] = func
225 225 func.params = frozenset(params)
226 226 return func
227 227 return _decorator
228 228
229 229 class unbundlerecords(object):
230 230 """keep record of what happens during and unbundle
231 231
232 232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 233 category of record and obj is an arbitrary object.
234 234
235 235 `records['cat']` will return all entries of this category 'cat'.
236 236
237 237 Iterating on the object itself will yield `('category', obj)` tuples
238 238 for all entries.
239 239
240 240 All iterations happens in chronological order.
241 241 """
242 242
243 243 def __init__(self):
244 244 self._categories = {}
245 245 self._sequences = []
246 246 self._replies = {}
247 247
248 248 def add(self, category, entry, inreplyto=None):
249 249 """add a new record of a given category.
250 250
251 251 The entry can then be retrieved in the list returned by
252 252 self['category']."""
253 253 self._categories.setdefault(category, []).append(entry)
254 254 self._sequences.append((category, entry))
255 255 if inreplyto is not None:
256 256 self.getreplies(inreplyto).add(category, entry)
257 257
258 258 def getreplies(self, partid):
259 259 """get the records that are replies to a specific part"""
260 260 return self._replies.setdefault(partid, unbundlerecords())
261 261
262 262 def __getitem__(self, cat):
263 263 return tuple(self._categories.get(cat, ()))
264 264
265 265 def __iter__(self):
266 266 return iter(self._sequences)
267 267
268 268 def __len__(self):
269 269 return len(self._sequences)
270 270
271 271 def __nonzero__(self):
272 272 return bool(self._sequences)
273 273
274 274 __bool__ = __nonzero__
275 275
276 276 class bundleoperation(object):
277 277 """an object that represents a single bundling process
278 278
279 279 Its purpose is to carry unbundle-related objects and states.
280 280
281 281 A new object should be created at the beginning of each bundle processing.
282 282 The object is to be returned by the processing function.
283 283
284 284 The object has very little content now it will ultimately contain:
285 285 * an access to the repo the bundle is applied to,
286 286 * a ui object,
287 287 * a way to retrieve a transaction to add changes to the repo,
288 288 * a way to record the result of processing each part,
289 289 * a way to construct a bundle response when applicable.
290 290 """
291 291
292 292 def __init__(self, repo, transactiongetter, captureoutput=True):
293 293 self.repo = repo
294 294 self.ui = repo.ui
295 295 self.records = unbundlerecords()
296 296 self.gettransaction = transactiongetter
297 297 self.reply = None
298 298 self.captureoutput = captureoutput
299 299
300 300 class TransactionUnavailable(RuntimeError):
301 301 pass
302 302
303 303 def _notransaction():
304 304 """default method to get a transaction while processing a bundle
305 305
306 306 Raise an exception to highlight the fact that no transaction was expected
307 307 to be created"""
308 308 raise TransactionUnavailable()
309 309
310 310 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
311 311 # transform me into unbundler.apply() as soon as the freeze is lifted
312 312 tr.hookargs['bundle2'] = '1'
313 313 if source is not None and 'source' not in tr.hookargs:
314 314 tr.hookargs['source'] = source
315 315 if url is not None and 'url' not in tr.hookargs:
316 316 tr.hookargs['url'] = url
317 317 return processbundle(repo, unbundler, lambda: tr, op=op)
318 318
319 319 def processbundle(repo, unbundler, transactiongetter=None, op=None):
320 320 """This function process a bundle, apply effect to/from a repo
321 321
322 322 It iterates over each part then searches for and uses the proper handling
323 323 code to process the part. Parts are processed in order.
324 324
325 325 Unknown Mandatory part will abort the process.
326 326
327 327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 328 function. This is used to ensure output is properly propagated in case of
329 329 an error during the unbundling. This output capturing part will likely be
330 330 reworked and this ability will probably go away in the process.
331 331 """
332 332 if op is None:
333 333 if transactiongetter is None:
334 334 transactiongetter = _notransaction
335 335 op = bundleoperation(repo, transactiongetter)
336 336 # todo:
337 337 # - replace this is a init function soon.
338 338 # - exception catching
339 339 unbundler.params
340 340 if repo.ui.debugflag:
341 341 msg = ['bundle2-input-bundle:']
342 342 if unbundler.params:
343 343 msg.append(' %i params')
344 344 if op.gettransaction is None:
345 345 msg.append(' no-transaction')
346 346 else:
347 347 msg.append(' with-transaction')
348 348 msg.append('\n')
349 349 repo.ui.debug(''.join(msg))
350 350 iterparts = enumerate(unbundler.iterparts())
351 351 part = None
352 352 nbpart = 0
353 353 try:
354 354 for nbpart, part in iterparts:
355 355 _processpart(op, part)
356 356 except Exception as exc:
357 357 for nbpart, part in iterparts:
358 358 # consume the bundle content
359 359 part.seek(0, 2)
360 360 # Small hack to let caller code distinguish exceptions from bundle2
361 361 # processing from processing the old format. This is mostly
362 362 # needed to handle different return codes to unbundle according to the
363 363 # type of bundle. We should probably clean up or drop this return code
364 364 # craziness in a future version.
365 365 exc.duringunbundle2 = True
366 366 salvaged = []
367 367 replycaps = None
368 368 if op.reply is not None:
369 369 salvaged = op.reply.salvageoutput()
370 370 replycaps = op.reply.capabilities
371 371 exc._replycaps = replycaps
372 372 exc._bundle2salvagedoutput = salvaged
373 373 raise
374 374 finally:
375 375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376 376
377 377 return op
378 378
379 379 def _processpart(op, part):
380 380 """process a single part from a bundle
381 381
382 382 The part is guaranteed to have been fully consumed when the function exits
383 383 (even if an exception is raised)."""
384 384 status = 'unknown' # used by debug output
385 385 hardabort = False
386 386 try:
387 387 try:
388 388 handler = parthandlermapping.get(part.type)
389 389 if handler is None:
390 390 status = 'unsupported-type'
391 391 raise error.BundleUnknownFeatureError(parttype=part.type)
392 392 indebug(op.ui, 'found a handler for part %r' % part.type)
393 393 unknownparams = part.mandatorykeys - handler.params
394 394 if unknownparams:
395 395 unknownparams = list(unknownparams)
396 396 unknownparams.sort()
397 397 status = 'unsupported-params (%s)' % unknownparams
398 398 raise error.BundleUnknownFeatureError(parttype=part.type,
399 399 params=unknownparams)
400 400 status = 'supported'
401 401 except error.BundleUnknownFeatureError as exc:
402 402 if part.mandatory: # mandatory parts
403 403 raise
404 404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
405 405 return # skip to part processing
406 406 finally:
407 407 if op.ui.debugflag:
408 408 msg = ['bundle2-input-part: "%s"' % part.type]
409 409 if not part.mandatory:
410 410 msg.append(' (advisory)')
411 411 nbmp = len(part.mandatorykeys)
412 412 nbap = len(part.params) - nbmp
413 413 if nbmp or nbap:
414 414 msg.append(' (params:')
415 415 if nbmp:
416 416 msg.append(' %i mandatory' % nbmp)
417 417 if nbap:
418 418 msg.append(' %i advisory' % nbmp)
419 419 msg.append(')')
420 420 msg.append(' %s\n' % status)
421 421 op.ui.debug(''.join(msg))
422 422
423 423 # handler is called outside the above try block so that we don't
424 424 # risk catching KeyErrors from anything other than the
425 425 # parthandlermapping lookup (any KeyError raised by handler()
426 426 # itself represents a defect of a different variety).
427 427 output = None
428 428 if op.captureoutput and op.reply is not None:
429 429 op.ui.pushbuffer(error=True, subproc=True)
430 430 output = ''
431 431 try:
432 432 handler(op, part)
433 433 finally:
434 434 if output is not None:
435 435 output = op.ui.popbuffer()
436 436 if output:
437 437 outpart = op.reply.newpart('output', data=output,
438 438 mandatory=False)
439 439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
440 440 # If exiting or interrupted, do not attempt to seek the stream in the
441 441 # finally block below. This makes abort faster.
442 442 except (SystemExit, KeyboardInterrupt):
443 443 hardabort = True
444 444 raise
445 445 finally:
446 446 # consume the part content to not corrupt the stream.
447 447 if not hardabort:
448 448 part.seek(0, 2)
449 449
450 450
451 451 def decodecaps(blob):
452 452 """decode a bundle2 caps bytes blob into a dictionary
453 453
454 454 The blob is a list of capabilities (one per line)
455 455 Capabilities may have values using a line of the form::
456 456
457 457 capability=value1,value2,value3
458 458
459 459 The values are always a list."""
460 460 caps = {}
461 461 for line in blob.splitlines():
462 462 if not line:
463 463 continue
464 464 if '=' not in line:
465 465 key, vals = line, ()
466 466 else:
467 467 key, vals = line.split('=', 1)
468 468 vals = vals.split(',')
469 469 key = urlreq.unquote(key)
470 470 vals = [urlreq.unquote(v) for v in vals]
471 471 caps[key] = vals
472 472 return caps
473 473
474 474 def encodecaps(caps):
475 475 """encode a bundle2 caps dictionary into a bytes blob"""
476 476 chunks = []
477 477 for ca in sorted(caps):
478 478 vals = caps[ca]
479 479 ca = urlreq.quote(ca)
480 480 vals = [urlreq.quote(v) for v in vals]
481 481 if vals:
482 482 ca = "%s=%s" % (ca, ','.join(vals))
483 483 chunks.append(ca)
484 484 return '\n'.join(chunks)
485 485
486 486 bundletypes = {
487 487 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
488 488 # since the unification ssh accepts a header but there
489 489 # is no capability signaling it.
490 490 "HG20": (), # special-cased below
491 491 "HG10UN": ("HG10UN", 'UN'),
492 492 "HG10BZ": ("HG10", 'BZ'),
493 493 "HG10GZ": ("HG10GZ", 'GZ'),
494 494 }
495 495
496 496 # hgweb uses this list to communicate its preferred type
497 497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
498 498
499 499 class bundle20(object):
500 500 """represent an outgoing bundle2 container
501 501
502 502 Use the `addparam` method to add stream level parameter. and `newpart` to
503 503 populate it. Then call `getchunks` to retrieve all the binary chunks of
504 504 data that compose the bundle2 container."""
505 505
506 506 _magicstring = 'HG20'
507 507
508 508 def __init__(self, ui, capabilities=()):
509 509 self.ui = ui
510 510 self._params = []
511 511 self._parts = []
512 512 self.capabilities = dict(capabilities)
513 513 self._compengine = util.compengines.forbundletype('UN')
514 514 self._compopts = None
515 515
516 516 def setcompression(self, alg, compopts=None):
517 517 """setup core part compression to <alg>"""
518 518 if alg in (None, 'UN'):
519 519 return
520 520 assert not any(n.lower() == 'compression' for n, v in self._params)
521 521 self.addparam('Compression', alg)
522 522 self._compengine = util.compengines.forbundletype(alg)
523 523 self._compopts = compopts
524 524
525 525 @property
526 526 def nbparts(self):
527 527 """total number of parts added to the bundler"""
528 528 return len(self._parts)
529 529
530 530 # methods used to defines the bundle2 content
531 531 def addparam(self, name, value=None):
532 532 """add a stream level parameter"""
533 533 if not name:
534 534 raise ValueError('empty parameter name')
535 535 if name[0] not in string.letters:
536 536 raise ValueError('non letter first character: %r' % name)
537 537 self._params.append((name, value))
538 538
539 539 def addpart(self, part):
540 540 """add a new part to the bundle2 container
541 541
542 542 Parts contains the actual applicative payload."""
543 543 assert part.id is None
544 544 part.id = len(self._parts) # very cheap counter
545 545 self._parts.append(part)
546 546
547 547 def newpart(self, typeid, *args, **kwargs):
548 548 """create a new part and add it to the containers
549 549
550 550 As the part is directly added to the containers. For now, this means
551 551 that any failure to properly initialize the part after calling
552 552 ``newpart`` should result in a failure of the whole bundling process.
553 553
554 554 You can still fall back to manually create and add if you need better
555 555 control."""
556 556 part = bundlepart(typeid, *args, **kwargs)
557 557 self.addpart(part)
558 558 return part
559 559
560 560 # methods used to generate the bundle2 stream
561 561 def getchunks(self):
562 562 if self.ui.debugflag:
563 563 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
564 564 if self._params:
565 565 msg.append(' (%i params)' % len(self._params))
566 566 msg.append(' %i parts total\n' % len(self._parts))
567 567 self.ui.debug(''.join(msg))
568 568 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
569 569 yield self._magicstring
570 570 param = self._paramchunk()
571 571 outdebug(self.ui, 'bundle parameter: %s' % param)
572 572 yield _pack(_fstreamparamsize, len(param))
573 573 if param:
574 574 yield param
575 575 for chunk in self._compengine.compressstream(self._getcorechunk(),
576 576 self._compopts):
577 577 yield chunk
578 578
579 579 def _paramchunk(self):
580 580 """return a encoded version of all stream parameters"""
581 581 blocks = []
582 582 for par, value in self._params:
583 583 par = urlreq.quote(par)
584 584 if value is not None:
585 585 value = urlreq.quote(value)
586 586 par = '%s=%s' % (par, value)
587 587 blocks.append(par)
588 588 return ' '.join(blocks)
589 589
590 590 def _getcorechunk(self):
591 591 """yield chunk for the core part of the bundle
592 592
593 593 (all but headers and parameters)"""
594 594 outdebug(self.ui, 'start of parts')
595 595 for part in self._parts:
596 596 outdebug(self.ui, 'bundle part: "%s"' % part.type)
597 597 for chunk in part.getchunks(ui=self.ui):
598 598 yield chunk
599 599 outdebug(self.ui, 'end of bundle')
600 600 yield _pack(_fpartheadersize, 0)
601 601
602 602
603 603 def salvageoutput(self):
604 604 """return a list with a copy of all output parts in the bundle
605 605
606 606 This is meant to be used during error handling to make sure we preserve
607 607 server output"""
608 608 salvaged = []
609 609 for part in self._parts:
610 610 if part.type.startswith('output'):
611 611 salvaged.append(part.copy())
612 612 return salvaged
613 613
614 614
615 615 class unpackermixin(object):
616 616 """A mixin to extract bytes and struct data from a stream"""
617 617
618 618 def __init__(self, fp):
619 619 self._fp = fp
620 620 self._seekable = (util.safehasattr(fp, 'seek') and
621 621 util.safehasattr(fp, 'tell'))
622 622
623 623 def _unpack(self, format):
624 624 """unpack this struct format from the stream"""
625 625 data = self._readexact(struct.calcsize(format))
626 626 return _unpack(format, data)
627 627
628 628 def _readexact(self, size):
629 629 """read exactly <size> bytes from the stream"""
630 630 return changegroup.readexactly(self._fp, size)
631 631
632 632 def seek(self, offset, whence=0):
633 633 """move the underlying file pointer"""
634 634 if self._seekable:
635 635 return self._fp.seek(offset, whence)
636 636 else:
637 637 raise NotImplementedError(_('File pointer is not seekable'))
638 638
639 639 def tell(self):
640 640 """return the file offset, or None if file is not seekable"""
641 641 if self._seekable:
642 642 try:
643 643 return self._fp.tell()
644 644 except IOError as e:
645 645 if e.errno == errno.ESPIPE:
646 646 self._seekable = False
647 647 else:
648 648 raise
649 649 return None
650 650
651 651 def close(self):
652 652 """close underlying file"""
653 653 if util.safehasattr(self._fp, 'close'):
654 654 return self._fp.close()
655 655
656 656 def getunbundler(ui, fp, magicstring=None):
657 657 """return a valid unbundler object for a given magicstring"""
658 658 if magicstring is None:
659 659 magicstring = changegroup.readexactly(fp, 4)
660 660 magic, version = magicstring[0:2], magicstring[2:4]
661 661 if magic != 'HG':
662 662 raise error.Abort(_('not a Mercurial bundle'))
663 663 unbundlerclass = formatmap.get(version)
664 664 if unbundlerclass is None:
665 665 raise error.Abort(_('unknown bundle version %s') % version)
666 666 unbundler = unbundlerclass(ui, fp)
667 667 indebug(ui, 'start processing of %s stream' % magicstring)
668 668 return unbundler
669 669
670 670 class unbundle20(unpackermixin):
671 671 """interpret a bundle2 stream
672 672
673 673 This class is fed with a binary stream and yields parts through its
674 674 `iterparts` methods."""
675 675
676 676 _magicstring = 'HG20'
677 677
678 678 def __init__(self, ui, fp):
679 679 """If header is specified, we do not read it out of the stream."""
680 680 self.ui = ui
681 681 self._compengine = util.compengines.forbundletype('UN')
682 682 self._compressed = None
683 683 super(unbundle20, self).__init__(fp)
684 684
685 685 @util.propertycache
686 686 def params(self):
687 687 """dictionary of stream level parameters"""
688 688 indebug(self.ui, 'reading bundle2 stream parameters')
689 689 params = {}
690 690 paramssize = self._unpack(_fstreamparamsize)[0]
691 691 if paramssize < 0:
692 692 raise error.BundleValueError('negative bundle param size: %i'
693 693 % paramssize)
694 694 if paramssize:
695 695 params = self._readexact(paramssize)
696 696 params = self._processallparams(params)
697 697 return params
698 698
699 699 def _processallparams(self, paramsblock):
700 700 """"""
701 701 params = util.sortdict()
702 702 for p in paramsblock.split(' '):
703 703 p = p.split('=', 1)
704 704 p = [urlreq.unquote(i) for i in p]
705 705 if len(p) < 2:
706 706 p.append(None)
707 707 self._processparam(*p)
708 708 params[p[0]] = p[1]
709 709 return params
710 710
711 711
712 712 def _processparam(self, name, value):
713 713 """process a parameter, applying its effect if needed
714 714
715 715 Parameter starting with a lower case letter are advisory and will be
716 716 ignored when unknown. Those starting with an upper case letter are
717 717 mandatory and will this function will raise a KeyError when unknown.
718 718
719 719 Note: no option are currently supported. Any input will be either
720 720 ignored or failing.
721 721 """
722 722 if not name:
723 723 raise ValueError('empty parameter name')
724 724 if name[0] not in string.letters:
725 725 raise ValueError('non letter first character: %r' % name)
726 726 try:
727 727 handler = b2streamparamsmap[name.lower()]
728 728 except KeyError:
729 729 if name[0].islower():
730 730 indebug(self.ui, "ignoring unknown parameter %r" % name)
731 731 else:
732 732 raise error.BundleUnknownFeatureError(params=(name,))
733 733 else:
734 734 handler(self, name, value)
735 735
736 736 def _forwardchunks(self):
737 737 """utility to transfer a bundle2 as binary
738 738
739 739 This is made necessary by the fact the 'getbundle' command over 'ssh'
740 740 have no way to know then the reply end, relying on the bundle to be
741 741 interpreted to know its end. This is terrible and we are sorry, but we
742 742 needed to move forward to get general delta enabled.
743 743 """
744 744 yield self._magicstring
745 745 assert 'params' not in vars(self)
746 746 paramssize = self._unpack(_fstreamparamsize)[0]
747 747 if paramssize < 0:
748 748 raise error.BundleValueError('negative bundle param size: %i'
749 749 % paramssize)
750 750 yield _pack(_fstreamparamsize, paramssize)
751 751 if paramssize:
752 752 params = self._readexact(paramssize)
753 753 self._processallparams(params)
754 754 yield params
755 755 assert self._compengine.bundletype == 'UN'
756 756 # From there, payload might need to be decompressed
757 757 self._fp = self._compengine.decompressorreader(self._fp)
758 758 emptycount = 0
759 759 while emptycount < 2:
760 760 # so we can brainlessly loop
761 761 assert _fpartheadersize == _fpayloadsize
762 762 size = self._unpack(_fpartheadersize)[0]
763 763 yield _pack(_fpartheadersize, size)
764 764 if size:
765 765 emptycount = 0
766 766 else:
767 767 emptycount += 1
768 768 continue
769 769 if size == flaginterrupt:
770 770 continue
771 771 elif size < 0:
772 772 raise error.BundleValueError('negative chunk size: %i')
773 773 yield self._readexact(size)
774 774
775 775
776 776 def iterparts(self):
777 777 """yield all parts contained in the stream"""
778 778 # make sure param have been loaded
779 779 self.params
780 780 # From there, payload need to be decompressed
781 781 self._fp = self._compengine.decompressorreader(self._fp)
782 782 indebug(self.ui, 'start extraction of bundle2 parts')
783 783 headerblock = self._readpartheader()
784 784 while headerblock is not None:
785 785 part = unbundlepart(self.ui, headerblock, self._fp)
786 786 yield part
787 787 part.seek(0, 2)
788 788 headerblock = self._readpartheader()
789 789 indebug(self.ui, 'end of bundle2 stream')
790 790
791 791 def _readpartheader(self):
792 792 """reads a part header size and return the bytes blob
793 793
794 794 returns None if empty"""
795 795 headersize = self._unpack(_fpartheadersize)[0]
796 796 if headersize < 0:
797 797 raise error.BundleValueError('negative part header size: %i'
798 798 % headersize)
799 799 indebug(self.ui, 'part header size: %i' % headersize)
800 800 if headersize:
801 801 return self._readexact(headersize)
802 802 return None
803 803
804 804 def compressed(self):
805 805 self.params # load params
806 806 return self._compressed
807 807
808 808 formatmap = {'20': unbundle20}
809 809
810 810 b2streamparamsmap = {}
811 811
812 812 def b2streamparamhandler(name):
813 813 """register a handler for a stream level parameter"""
814 814 def decorator(func):
815 815 assert name not in formatmap
816 816 b2streamparamsmap[name] = func
817 817 return func
818 818 return decorator
819 819
820 820 @b2streamparamhandler('compression')
821 821 def processcompression(unbundler, param, value):
822 822 """read compression parameter and install payload decompression"""
823 823 if value not in util.compengines.supportedbundletypes:
824 824 raise error.BundleUnknownFeatureError(params=(param,),
825 825 values=(value,))
826 826 unbundler._compengine = util.compengines.forbundletype(value)
827 827 if value is not None:
828 828 unbundler._compressed = True
829 829
830 830 class bundlepart(object):
831 831 """A bundle2 part contains application level payload
832 832
833 833 The part `type` is used to route the part to the application level
834 834 handler.
835 835
836 836 The part payload is contained in ``part.data``. It could be raw bytes or a
837 837 generator of byte chunks.
838 838
839 839 You can add parameters to the part using the ``addparam`` method.
840 840 Parameters can be either mandatory (default) or advisory. Remote side
841 841 should be able to safely ignore the advisory ones.
842 842
843 843 Both data and parameters cannot be modified after the generation has begun.
844 844 """
845 845
846 846 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
847 847 data='', mandatory=True):
848 848 validateparttype(parttype)
849 849 self.id = None
850 850 self.type = parttype
851 851 self._data = data
852 852 self._mandatoryparams = list(mandatoryparams)
853 853 self._advisoryparams = list(advisoryparams)
854 854 # checking for duplicated entries
855 855 self._seenparams = set()
856 856 for pname, __ in self._mandatoryparams + self._advisoryparams:
857 857 if pname in self._seenparams:
858 858 raise error.ProgrammingError('duplicated params: %s' % pname)
859 859 self._seenparams.add(pname)
860 860 # status of the part's generation:
861 861 # - None: not started,
862 862 # - False: currently generated,
863 863 # - True: generation done.
864 864 self._generated = None
865 865 self.mandatory = mandatory
866 866
867 867 def __repr__(self):
868 868 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
869 869 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
870 870 % (cls, id(self), self.id, self.type, self.mandatory))
871 871
872 872 def copy(self):
873 873 """return a copy of the part
874 874
875 875 The new part have the very same content but no partid assigned yet.
876 876 Parts with generated data cannot be copied."""
877 877 assert not util.safehasattr(self.data, 'next')
878 878 return self.__class__(self.type, self._mandatoryparams,
879 879 self._advisoryparams, self._data, self.mandatory)
880 880
881 881 # methods used to defines the part content
882 882 @property
883 883 def data(self):
884 884 return self._data
885 885
886 886 @data.setter
887 887 def data(self, data):
888 888 if self._generated is not None:
889 889 raise error.ReadOnlyPartError('part is being generated')
890 890 self._data = data
891 891
892 892 @property
893 893 def mandatoryparams(self):
894 894 # make it an immutable tuple to force people through ``addparam``
895 895 return tuple(self._mandatoryparams)
896 896
897 897 @property
898 898 def advisoryparams(self):
899 899 # make it an immutable tuple to force people through ``addparam``
900 900 return tuple(self._advisoryparams)
901 901
902 902 def addparam(self, name, value='', mandatory=True):
903 """add a parameter to the part
904
905 If 'mandatory' is set to True, the remote handler must claim support
906 for this parameter or the unbundling will be aborted.
907
908 The 'name' and 'value' cannot exceed 255 bytes each.
909 """
903 910 if self._generated is not None:
904 911 raise error.ReadOnlyPartError('part is being generated')
905 912 if name in self._seenparams:
906 913 raise ValueError('duplicated params: %s' % name)
907 914 self._seenparams.add(name)
908 915 params = self._advisoryparams
909 916 if mandatory:
910 917 params = self._mandatoryparams
911 918 params.append((name, value))
912 919
913 920 # methods used to generates the bundle2 stream
914 921 def getchunks(self, ui):
915 922 if self._generated is not None:
916 923 raise error.ProgrammingError('part can only be consumed once')
917 924 self._generated = False
918 925
919 926 if ui.debugflag:
920 927 msg = ['bundle2-output-part: "%s"' % self.type]
921 928 if not self.mandatory:
922 929 msg.append(' (advisory)')
923 930 nbmp = len(self.mandatoryparams)
924 931 nbap = len(self.advisoryparams)
925 932 if nbmp or nbap:
926 933 msg.append(' (params:')
927 934 if nbmp:
928 935 msg.append(' %i mandatory' % nbmp)
929 936 if nbap:
930 937 msg.append(' %i advisory' % nbmp)
931 938 msg.append(')')
932 939 if not self.data:
933 940 msg.append(' empty payload')
934 941 elif util.safehasattr(self.data, 'next'):
935 942 msg.append(' streamed payload')
936 943 else:
937 944 msg.append(' %i bytes payload' % len(self.data))
938 945 msg.append('\n')
939 946 ui.debug(''.join(msg))
940 947
941 948 #### header
942 949 if self.mandatory:
943 950 parttype = self.type.upper()
944 951 else:
945 952 parttype = self.type.lower()
946 953 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
947 954 ## parttype
948 955 header = [_pack(_fparttypesize, len(parttype)),
949 956 parttype, _pack(_fpartid, self.id),
950 957 ]
951 958 ## parameters
952 959 # count
953 960 manpar = self.mandatoryparams
954 961 advpar = self.advisoryparams
955 962 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
956 963 # size
957 964 parsizes = []
958 965 for key, value in manpar:
959 966 parsizes.append(len(key))
960 967 parsizes.append(len(value))
961 968 for key, value in advpar:
962 969 parsizes.append(len(key))
963 970 parsizes.append(len(value))
964 971 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
965 972 header.append(paramsizes)
966 973 # key, value
967 974 for key, value in manpar:
968 975 header.append(key)
969 976 header.append(value)
970 977 for key, value in advpar:
971 978 header.append(key)
972 979 header.append(value)
973 980 ## finalize header
974 981 headerchunk = ''.join(header)
975 982 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
976 983 yield _pack(_fpartheadersize, len(headerchunk))
977 984 yield headerchunk
978 985 ## payload
979 986 try:
980 987 for chunk in self._payloadchunks():
981 988 outdebug(ui, 'payload chunk size: %i' % len(chunk))
982 989 yield _pack(_fpayloadsize, len(chunk))
983 990 yield chunk
984 991 except GeneratorExit:
985 992 # GeneratorExit means that nobody is listening for our
986 993 # results anyway, so just bail quickly rather than trying
987 994 # to produce an error part.
988 995 ui.debug('bundle2-generatorexit\n')
989 996 raise
990 997 except BaseException as exc:
991 998 # backup exception data for later
992 999 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
993 1000 % exc)
994 1001 exc_info = sys.exc_info()
995 1002 msg = 'unexpected error: %s' % exc
996 1003 interpart = bundlepart('error:abort', [('message', msg)],
997 1004 mandatory=False)
998 1005 interpart.id = 0
999 1006 yield _pack(_fpayloadsize, -1)
1000 1007 for chunk in interpart.getchunks(ui=ui):
1001 1008 yield chunk
1002 1009 outdebug(ui, 'closing payload chunk')
1003 1010 # abort current part payload
1004 1011 yield _pack(_fpayloadsize, 0)
1005 1012 if pycompat.ispy3:
1006 1013 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1007 1014 else:
1008 1015 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1009 1016 # end of payload
1010 1017 outdebug(ui, 'closing payload chunk')
1011 1018 yield _pack(_fpayloadsize, 0)
1012 1019 self._generated = True
1013 1020
1014 1021 def _payloadchunks(self):
1015 1022 """yield chunks of a the part payload
1016 1023
1017 1024 Exists to handle the different methods to provide data to a part."""
1018 1025 # we only support fixed size data now.
1019 1026 # This will be improved in the future.
1020 1027 if util.safehasattr(self.data, 'next'):
1021 1028 buff = util.chunkbuffer(self.data)
1022 1029 chunk = buff.read(preferedchunksize)
1023 1030 while chunk:
1024 1031 yield chunk
1025 1032 chunk = buff.read(preferedchunksize)
1026 1033 elif len(self.data):
1027 1034 yield self.data
1028 1035
1029 1036
1030 1037 flaginterrupt = -1
1031 1038
1032 1039 class interrupthandler(unpackermixin):
1033 1040 """read one part and process it with restricted capability
1034 1041
1035 1042 This allows to transmit exception raised on the producer size during part
1036 1043 iteration while the consumer is reading a part.
1037 1044
1038 1045 Part processed in this manner only have access to a ui object,"""
1039 1046
1040 1047 def __init__(self, ui, fp):
1041 1048 super(interrupthandler, self).__init__(fp)
1042 1049 self.ui = ui
1043 1050
1044 1051 def _readpartheader(self):
1045 1052 """reads a part header size and return the bytes blob
1046 1053
1047 1054 returns None if empty"""
1048 1055 headersize = self._unpack(_fpartheadersize)[0]
1049 1056 if headersize < 0:
1050 1057 raise error.BundleValueError('negative part header size: %i'
1051 1058 % headersize)
1052 1059 indebug(self.ui, 'part header size: %i\n' % headersize)
1053 1060 if headersize:
1054 1061 return self._readexact(headersize)
1055 1062 return None
1056 1063
1057 1064 def __call__(self):
1058 1065
1059 1066 self.ui.debug('bundle2-input-stream-interrupt:'
1060 1067 ' opening out of band context\n')
1061 1068 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1062 1069 headerblock = self._readpartheader()
1063 1070 if headerblock is None:
1064 1071 indebug(self.ui, 'no part found during interruption.')
1065 1072 return
1066 1073 part = unbundlepart(self.ui, headerblock, self._fp)
1067 1074 op = interruptoperation(self.ui)
1068 1075 _processpart(op, part)
1069 1076 self.ui.debug('bundle2-input-stream-interrupt:'
1070 1077 ' closing out of band context\n')
1071 1078
1072 1079 class interruptoperation(object):
1073 1080 """A limited operation to be use by part handler during interruption
1074 1081
1075 1082 It only have access to an ui object.
1076 1083 """
1077 1084
1078 1085 def __init__(self, ui):
1079 1086 self.ui = ui
1080 1087 self.reply = None
1081 1088 self.captureoutput = False
1082 1089
1083 1090 @property
1084 1091 def repo(self):
1085 1092 raise error.ProgrammingError('no repo access from stream interruption')
1086 1093
1087 1094 def gettransaction(self):
1088 1095 raise TransactionUnavailable('no repo access from stream interruption')
1089 1096
1090 1097 class unbundlepart(unpackermixin):
1091 1098 """a bundle part read from a bundle"""
1092 1099
1093 1100 def __init__(self, ui, header, fp):
1094 1101 super(unbundlepart, self).__init__(fp)
1095 1102 self.ui = ui
1096 1103 # unbundle state attr
1097 1104 self._headerdata = header
1098 1105 self._headeroffset = 0
1099 1106 self._initialized = False
1100 1107 self.consumed = False
1101 1108 # part data
1102 1109 self.id = None
1103 1110 self.type = None
1104 1111 self.mandatoryparams = None
1105 1112 self.advisoryparams = None
1106 1113 self.params = None
1107 1114 self.mandatorykeys = ()
1108 1115 self._payloadstream = None
1109 1116 self._readheader()
1110 1117 self._mandatory = None
1111 1118 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1112 1119 self._pos = 0
1113 1120
1114 1121 def _fromheader(self, size):
1115 1122 """return the next <size> byte from the header"""
1116 1123 offset = self._headeroffset
1117 1124 data = self._headerdata[offset:(offset + size)]
1118 1125 self._headeroffset = offset + size
1119 1126 return data
1120 1127
1121 1128 def _unpackheader(self, format):
1122 1129 """read given format from header
1123 1130
1124 1131 This automatically compute the size of the format to read."""
1125 1132 data = self._fromheader(struct.calcsize(format))
1126 1133 return _unpack(format, data)
1127 1134
1128 1135 def _initparams(self, mandatoryparams, advisoryparams):
1129 1136 """internal function to setup all logic related parameters"""
1130 1137 # make it read only to prevent people touching it by mistake.
1131 1138 self.mandatoryparams = tuple(mandatoryparams)
1132 1139 self.advisoryparams = tuple(advisoryparams)
1133 1140 # user friendly UI
1134 1141 self.params = util.sortdict(self.mandatoryparams)
1135 1142 self.params.update(self.advisoryparams)
1136 1143 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1137 1144
1138 1145 def _payloadchunks(self, chunknum=0):
1139 1146 '''seek to specified chunk and start yielding data'''
1140 1147 if len(self._chunkindex) == 0:
1141 1148 assert chunknum == 0, 'Must start with chunk 0'
1142 1149 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1143 1150 else:
1144 1151 assert chunknum < len(self._chunkindex), \
1145 1152 'Unknown chunk %d' % chunknum
1146 1153 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1147 1154
1148 1155 pos = self._chunkindex[chunknum][0]
1149 1156 payloadsize = self._unpack(_fpayloadsize)[0]
1150 1157 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1151 1158 while payloadsize:
1152 1159 if payloadsize == flaginterrupt:
1153 1160 # interruption detection, the handler will now read a
1154 1161 # single part and process it.
1155 1162 interrupthandler(self.ui, self._fp)()
1156 1163 elif payloadsize < 0:
1157 1164 msg = 'negative payload chunk size: %i' % payloadsize
1158 1165 raise error.BundleValueError(msg)
1159 1166 else:
1160 1167 result = self._readexact(payloadsize)
1161 1168 chunknum += 1
1162 1169 pos += payloadsize
1163 1170 if chunknum == len(self._chunkindex):
1164 1171 self._chunkindex.append((pos,
1165 1172 super(unbundlepart, self).tell()))
1166 1173 yield result
1167 1174 payloadsize = self._unpack(_fpayloadsize)[0]
1168 1175 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1169 1176
1170 1177 def _findchunk(self, pos):
1171 1178 '''for a given payload position, return a chunk number and offset'''
1172 1179 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1173 1180 if ppos == pos:
1174 1181 return chunk, 0
1175 1182 elif ppos > pos:
1176 1183 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1177 1184 raise ValueError('Unknown chunk')
1178 1185
1179 1186 def _readheader(self):
1180 1187 """read the header and setup the object"""
1181 1188 typesize = self._unpackheader(_fparttypesize)[0]
1182 1189 self.type = self._fromheader(typesize)
1183 1190 indebug(self.ui, 'part type: "%s"' % self.type)
1184 1191 self.id = self._unpackheader(_fpartid)[0]
1185 1192 indebug(self.ui, 'part id: "%s"' % self.id)
1186 1193 # extract mandatory bit from type
1187 1194 self.mandatory = (self.type != self.type.lower())
1188 1195 self.type = self.type.lower()
1189 1196 ## reading parameters
1190 1197 # param count
1191 1198 mancount, advcount = self._unpackheader(_fpartparamcount)
1192 1199 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1193 1200 # param size
1194 1201 fparamsizes = _makefpartparamsizes(mancount + advcount)
1195 1202 paramsizes = self._unpackheader(fparamsizes)
1196 1203 # make it a list of couple again
1197 1204 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1198 1205 # split mandatory from advisory
1199 1206 mansizes = paramsizes[:mancount]
1200 1207 advsizes = paramsizes[mancount:]
1201 1208 # retrieve param value
1202 1209 manparams = []
1203 1210 for key, value in mansizes:
1204 1211 manparams.append((self._fromheader(key), self._fromheader(value)))
1205 1212 advparams = []
1206 1213 for key, value in advsizes:
1207 1214 advparams.append((self._fromheader(key), self._fromheader(value)))
1208 1215 self._initparams(manparams, advparams)
1209 1216 ## part payload
1210 1217 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1211 1218 # we read the data, tell it
1212 1219 self._initialized = True
1213 1220
1214 1221 def read(self, size=None):
1215 1222 """read payload data"""
1216 1223 if not self._initialized:
1217 1224 self._readheader()
1218 1225 if size is None:
1219 1226 data = self._payloadstream.read()
1220 1227 else:
1221 1228 data = self._payloadstream.read(size)
1222 1229 self._pos += len(data)
1223 1230 if size is None or len(data) < size:
1224 1231 if not self.consumed and self._pos:
1225 1232 self.ui.debug('bundle2-input-part: total payload size %i\n'
1226 1233 % self._pos)
1227 1234 self.consumed = True
1228 1235 return data
1229 1236
1230 1237 def tell(self):
1231 1238 return self._pos
1232 1239
1233 1240 def seek(self, offset, whence=0):
1234 1241 if whence == 0:
1235 1242 newpos = offset
1236 1243 elif whence == 1:
1237 1244 newpos = self._pos + offset
1238 1245 elif whence == 2:
1239 1246 if not self.consumed:
1240 1247 self.read()
1241 1248 newpos = self._chunkindex[-1][0] - offset
1242 1249 else:
1243 1250 raise ValueError('Unknown whence value: %r' % (whence,))
1244 1251
1245 1252 if newpos > self._chunkindex[-1][0] and not self.consumed:
1246 1253 self.read()
1247 1254 if not 0 <= newpos <= self._chunkindex[-1][0]:
1248 1255 raise ValueError('Offset out of range')
1249 1256
1250 1257 if self._pos != newpos:
1251 1258 chunk, internaloffset = self._findchunk(newpos)
1252 1259 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1253 1260 adjust = self.read(internaloffset)
1254 1261 if len(adjust) != internaloffset:
1255 1262 raise error.Abort(_('Seek failed\n'))
1256 1263 self._pos = newpos
1257 1264
1258 1265 # These are only the static capabilities.
1259 1266 # Check the 'getrepocaps' function for the rest.
1260 1267 capabilities = {'HG20': (),
1261 1268 'error': ('abort', 'unsupportedcontent', 'pushraced',
1262 1269 'pushkey'),
1263 1270 'listkeys': (),
1264 1271 'pushkey': (),
1265 1272 'digests': tuple(sorted(util.DIGESTS.keys())),
1266 1273 'remote-changegroup': ('http', 'https'),
1267 1274 'hgtagsfnodes': (),
1268 1275 }
1269 1276
1270 1277 def getrepocaps(repo, allowpushback=False):
1271 1278 """return the bundle2 capabilities for a given repo
1272 1279
1273 1280 Exists to allow extensions (like evolution) to mutate the capabilities.
1274 1281 """
1275 1282 caps = capabilities.copy()
1276 1283 caps['changegroup'] = tuple(sorted(
1277 1284 changegroup.supportedincomingversions(repo)))
1278 1285 if obsolete.isenabled(repo, obsolete.exchangeopt):
1279 1286 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1280 1287 caps['obsmarkers'] = supportedformat
1281 1288 if allowpushback:
1282 1289 caps['pushback'] = ()
1283 1290 return caps
1284 1291
1285 1292 def bundle2caps(remote):
1286 1293 """return the bundle capabilities of a peer as dict"""
1287 1294 raw = remote.capable('bundle2')
1288 1295 if not raw and raw != '':
1289 1296 return {}
1290 1297 capsblob = urlreq.unquote(remote.capable('bundle2'))
1291 1298 return decodecaps(capsblob)
1292 1299
1293 1300 def obsmarkersversion(caps):
1294 1301 """extract the list of supported obsmarkers versions from a bundle2caps dict
1295 1302 """
1296 1303 obscaps = caps.get('obsmarkers', ())
1297 1304 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1298 1305
1299 1306 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1300 1307 compopts=None):
1301 1308 """Write a bundle file and return its filename.
1302 1309
1303 1310 Existing files will not be overwritten.
1304 1311 If no filename is specified, a temporary file is created.
1305 1312 bz2 compression can be turned off.
1306 1313 The bundle file will be deleted in case of errors.
1307 1314 """
1308 1315
1309 1316 if bundletype == "HG20":
1310 1317 bundle = bundle20(ui)
1311 1318 bundle.setcompression(compression, compopts)
1312 1319 part = bundle.newpart('changegroup', data=cg.getchunks())
1313 1320 part.addparam('version', cg.version)
1314 1321 if 'clcount' in cg.extras:
1315 1322 part.addparam('nbchanges', str(cg.extras['clcount']),
1316 1323 mandatory=False)
1317 1324 chunkiter = bundle.getchunks()
1318 1325 else:
1319 1326 # compression argument is only for the bundle2 case
1320 1327 assert compression is None
1321 1328 if cg.version != '01':
1322 1329 raise error.Abort(_('old bundle types only supports v1 '
1323 1330 'changegroups'))
1324 1331 header, comp = bundletypes[bundletype]
1325 1332 if comp not in util.compengines.supportedbundletypes:
1326 1333 raise error.Abort(_('unknown stream compression type: %s')
1327 1334 % comp)
1328 1335 compengine = util.compengines.forbundletype(comp)
1329 1336 def chunkiter():
1330 1337 yield header
1331 1338 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1332 1339 yield chunk
1333 1340 chunkiter = chunkiter()
1334 1341
1335 1342 # parse the changegroup data, otherwise we will block
1336 1343 # in case of sshrepo because we don't know the end of the stream
1337 1344 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1338 1345
1339 1346 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1340 1347 def handlechangegroup(op, inpart):
1341 1348 """apply a changegroup part on the repo
1342 1349
1343 1350 This is a very early implementation that will massive rework before being
1344 1351 inflicted to any end-user.
1345 1352 """
1346 1353 # Make sure we trigger a transaction creation
1347 1354 #
1348 1355 # The addchangegroup function will get a transaction object by itself, but
1349 1356 # we need to make sure we trigger the creation of a transaction object used
1350 1357 # for the whole processing scope.
1351 1358 op.gettransaction()
1352 1359 unpackerversion = inpart.params.get('version', '01')
1353 1360 # We should raise an appropriate exception here
1354 1361 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1355 1362 # the source and url passed here are overwritten by the one contained in
1356 1363 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1357 1364 nbchangesets = None
1358 1365 if 'nbchanges' in inpart.params:
1359 1366 nbchangesets = int(inpart.params.get('nbchanges'))
1360 1367 if ('treemanifest' in inpart.params and
1361 1368 'treemanifest' not in op.repo.requirements):
1362 1369 if len(op.repo.changelog) != 0:
1363 1370 raise error.Abort(_(
1364 1371 "bundle contains tree manifests, but local repo is "
1365 1372 "non-empty and does not use tree manifests"))
1366 1373 op.repo.requirements.add('treemanifest')
1367 1374 op.repo._applyopenerreqs()
1368 1375 op.repo._writerequirements()
1369 1376 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1370 1377 op.records.add('changegroup', {'return': ret})
1371 1378 if op.reply is not None:
1372 1379 # This is definitely not the final form of this
1373 1380 # return. But one need to start somewhere.
1374 1381 part = op.reply.newpart('reply:changegroup', mandatory=False)
1375 1382 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1376 1383 part.addparam('return', '%i' % ret, mandatory=False)
1377 1384 assert not inpart.read()
1378 1385
1379 1386 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1380 1387 ['digest:%s' % k for k in util.DIGESTS.keys()])
1381 1388 @parthandler('remote-changegroup', _remotechangegroupparams)
1382 1389 def handleremotechangegroup(op, inpart):
1383 1390 """apply a bundle10 on the repo, given an url and validation information
1384 1391
1385 1392 All the information about the remote bundle to import are given as
1386 1393 parameters. The parameters include:
1387 1394 - url: the url to the bundle10.
1388 1395 - size: the bundle10 file size. It is used to validate what was
1389 1396 retrieved by the client matches the server knowledge about the bundle.
1390 1397 - digests: a space separated list of the digest types provided as
1391 1398 parameters.
1392 1399 - digest:<digest-type>: the hexadecimal representation of the digest with
1393 1400 that name. Like the size, it is used to validate what was retrieved by
1394 1401 the client matches what the server knows about the bundle.
1395 1402
1396 1403 When multiple digest types are given, all of them are checked.
1397 1404 """
1398 1405 try:
1399 1406 raw_url = inpart.params['url']
1400 1407 except KeyError:
1401 1408 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1402 1409 parsed_url = util.url(raw_url)
1403 1410 if parsed_url.scheme not in capabilities['remote-changegroup']:
1404 1411 raise error.Abort(_('remote-changegroup does not support %s urls') %
1405 1412 parsed_url.scheme)
1406 1413
1407 1414 try:
1408 1415 size = int(inpart.params['size'])
1409 1416 except ValueError:
1410 1417 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1411 1418 % 'size')
1412 1419 except KeyError:
1413 1420 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1414 1421
1415 1422 digests = {}
1416 1423 for typ in inpart.params.get('digests', '').split():
1417 1424 param = 'digest:%s' % typ
1418 1425 try:
1419 1426 value = inpart.params[param]
1420 1427 except KeyError:
1421 1428 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1422 1429 param)
1423 1430 digests[typ] = value
1424 1431
1425 1432 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1426 1433
1427 1434 # Make sure we trigger a transaction creation
1428 1435 #
1429 1436 # The addchangegroup function will get a transaction object by itself, but
1430 1437 # we need to make sure we trigger the creation of a transaction object used
1431 1438 # for the whole processing scope.
1432 1439 op.gettransaction()
1433 1440 from . import exchange
1434 1441 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1435 1442 if not isinstance(cg, changegroup.cg1unpacker):
1436 1443 raise error.Abort(_('%s: not a bundle version 1.0') %
1437 1444 util.hidepassword(raw_url))
1438 1445 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1439 1446 op.records.add('changegroup', {'return': ret})
1440 1447 if op.reply is not None:
1441 1448 # This is definitely not the final form of this
1442 1449 # return. But one need to start somewhere.
1443 1450 part = op.reply.newpart('reply:changegroup')
1444 1451 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1445 1452 part.addparam('return', '%i' % ret, mandatory=False)
1446 1453 try:
1447 1454 real_part.validate()
1448 1455 except error.Abort as e:
1449 1456 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1450 1457 (util.hidepassword(raw_url), str(e)))
1451 1458 assert not inpart.read()
1452 1459
1453 1460 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1454 1461 def handlereplychangegroup(op, inpart):
1455 1462 ret = int(inpart.params['return'])
1456 1463 replyto = int(inpart.params['in-reply-to'])
1457 1464 op.records.add('changegroup', {'return': ret}, replyto)
1458 1465
1459 1466 @parthandler('check:heads')
1460 1467 def handlecheckheads(op, inpart):
1461 1468 """check that head of the repo did not change
1462 1469
1463 1470 This is used to detect a push race when using unbundle.
1464 1471 This replaces the "heads" argument of unbundle."""
1465 1472 h = inpart.read(20)
1466 1473 heads = []
1467 1474 while len(h) == 20:
1468 1475 heads.append(h)
1469 1476 h = inpart.read(20)
1470 1477 assert not h
1471 1478 # Trigger a transaction so that we are guaranteed to have the lock now.
1472 1479 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1473 1480 op.gettransaction()
1474 1481 if sorted(heads) != sorted(op.repo.heads()):
1475 1482 raise error.PushRaced('repository changed while pushing - '
1476 1483 'please try again')
1477 1484
1478 1485 @parthandler('output')
1479 1486 def handleoutput(op, inpart):
1480 1487 """forward output captured on the server to the client"""
1481 1488 for line in inpart.read().splitlines():
1482 1489 op.ui.status(_('remote: %s\n') % line)
1483 1490
1484 1491 @parthandler('replycaps')
1485 1492 def handlereplycaps(op, inpart):
1486 1493 """Notify that a reply bundle should be created
1487 1494
1488 1495 The payload contains the capabilities information for the reply"""
1489 1496 caps = decodecaps(inpart.read())
1490 1497 if op.reply is None:
1491 1498 op.reply = bundle20(op.ui, caps)
1492 1499
1493 1500 class AbortFromPart(error.Abort):
1494 1501 """Sub-class of Abort that denotes an error from a bundle2 part."""
1495 1502
1496 1503 @parthandler('error:abort', ('message', 'hint'))
1497 1504 def handleerrorabort(op, inpart):
1498 1505 """Used to transmit abort error over the wire"""
1499 1506 raise AbortFromPart(inpart.params['message'],
1500 1507 hint=inpart.params.get('hint'))
1501 1508
1502 1509 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1503 1510 'in-reply-to'))
1504 1511 def handleerrorpushkey(op, inpart):
1505 1512 """Used to transmit failure of a mandatory pushkey over the wire"""
1506 1513 kwargs = {}
1507 1514 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1508 1515 value = inpart.params.get(name)
1509 1516 if value is not None:
1510 1517 kwargs[name] = value
1511 1518 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1512 1519
1513 1520 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1514 1521 def handleerrorunsupportedcontent(op, inpart):
1515 1522 """Used to transmit unknown content error over the wire"""
1516 1523 kwargs = {}
1517 1524 parttype = inpart.params.get('parttype')
1518 1525 if parttype is not None:
1519 1526 kwargs['parttype'] = parttype
1520 1527 params = inpart.params.get('params')
1521 1528 if params is not None:
1522 1529 kwargs['params'] = params.split('\0')
1523 1530
1524 1531 raise error.BundleUnknownFeatureError(**kwargs)
1525 1532
1526 1533 @parthandler('error:pushraced', ('message',))
1527 1534 def handleerrorpushraced(op, inpart):
1528 1535 """Used to transmit push race error over the wire"""
1529 1536 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1530 1537
1531 1538 @parthandler('listkeys', ('namespace',))
1532 1539 def handlelistkeys(op, inpart):
1533 1540 """retrieve pushkey namespace content stored in a bundle2"""
1534 1541 namespace = inpart.params['namespace']
1535 1542 r = pushkey.decodekeys(inpart.read())
1536 1543 op.records.add('listkeys', (namespace, r))
1537 1544
1538 1545 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1539 1546 def handlepushkey(op, inpart):
1540 1547 """process a pushkey request"""
1541 1548 dec = pushkey.decode
1542 1549 namespace = dec(inpart.params['namespace'])
1543 1550 key = dec(inpart.params['key'])
1544 1551 old = dec(inpart.params['old'])
1545 1552 new = dec(inpart.params['new'])
1546 1553 # Grab the transaction to ensure that we have the lock before performing the
1547 1554 # pushkey.
1548 1555 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1549 1556 op.gettransaction()
1550 1557 ret = op.repo.pushkey(namespace, key, old, new)
1551 1558 record = {'namespace': namespace,
1552 1559 'key': key,
1553 1560 'old': old,
1554 1561 'new': new}
1555 1562 op.records.add('pushkey', record)
1556 1563 if op.reply is not None:
1557 1564 rpart = op.reply.newpart('reply:pushkey')
1558 1565 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1559 1566 rpart.addparam('return', '%i' % ret, mandatory=False)
1560 1567 if inpart.mandatory and not ret:
1561 1568 kwargs = {}
1562 1569 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1563 1570 if key in inpart.params:
1564 1571 kwargs[key] = inpart.params[key]
1565 1572 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1566 1573
1567 1574 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1568 1575 def handlepushkeyreply(op, inpart):
1569 1576 """retrieve the result of a pushkey request"""
1570 1577 ret = int(inpart.params['return'])
1571 1578 partid = int(inpart.params['in-reply-to'])
1572 1579 op.records.add('pushkey', {'return': ret}, partid)
1573 1580
1574 1581 @parthandler('obsmarkers')
1575 1582 def handleobsmarker(op, inpart):
1576 1583 """add a stream of obsmarkers to the repo"""
1577 1584 tr = op.gettransaction()
1578 1585 markerdata = inpart.read()
1579 1586 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1580 1587 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1581 1588 % len(markerdata))
1582 1589 # The mergemarkers call will crash if marker creation is not enabled.
1583 1590 # we want to avoid this if the part is advisory.
1584 1591 if not inpart.mandatory and op.repo.obsstore.readonly:
1585 1592 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1586 1593 return
1587 1594 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1588 1595 if new:
1589 1596 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1590 1597 op.records.add('obsmarkers', {'new': new})
1591 1598 if op.reply is not None:
1592 1599 rpart = op.reply.newpart('reply:obsmarkers')
1593 1600 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1594 1601 rpart.addparam('new', '%i' % new, mandatory=False)
1595 1602
1596 1603
1597 1604 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1598 1605 def handleobsmarkerreply(op, inpart):
1599 1606 """retrieve the result of a pushkey request"""
1600 1607 ret = int(inpart.params['new'])
1601 1608 partid = int(inpart.params['in-reply-to'])
1602 1609 op.records.add('obsmarkers', {'new': ret}, partid)
1603 1610
1604 1611 @parthandler('hgtagsfnodes')
1605 1612 def handlehgtagsfnodes(op, inpart):
1606 1613 """Applies .hgtags fnodes cache entries to the local repo.
1607 1614
1608 1615 Payload is pairs of 20 byte changeset nodes and filenodes.
1609 1616 """
1610 1617 # Grab the transaction so we ensure that we have the lock at this point.
1611 1618 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1612 1619 op.gettransaction()
1613 1620 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1614 1621
1615 1622 count = 0
1616 1623 while True:
1617 1624 node = inpart.read(20)
1618 1625 fnode = inpart.read(20)
1619 1626 if len(node) < 20 or len(fnode) < 20:
1620 1627 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1621 1628 break
1622 1629 cache.setfnode(node, fnode)
1623 1630 count += 1
1624 1631
1625 1632 cache.write()
1626 1633 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now