##// END OF EJS Templates
bundle2: fix assertion that 'compression' hasn't been set...
Siddharth Agarwal -
r30915:aa25989b stable
parent child Browse files
Show More
@@ -1,1622 +1,1622 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 pushkey,
162 162 pycompat,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 urlerr = util.urlerr
169 169 urlreq = util.urlreq
170 170
171 171 _pack = struct.pack
172 172 _unpack = struct.unpack
173 173
174 174 _fstreamparamsize = '>i'
175 175 _fpartheadersize = '>i'
176 176 _fparttypesize = '>B'
177 177 _fpartid = '>I'
178 178 _fpayloadsize = '>i'
179 179 _fpartparamcount = '>BB'
180 180
181 181 preferedchunksize = 4096
182 182
183 183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184 184
185 185 def outdebug(ui, message):
186 186 """debug regarding output stream (bundling)"""
187 187 if ui.configbool('devel', 'bundle2.debug', False):
188 188 ui.debug('bundle2-output: %s\n' % message)
189 189
190 190 def indebug(ui, message):
191 191 """debug on input stream (unbundling)"""
192 192 if ui.configbool('devel', 'bundle2.debug', False):
193 193 ui.debug('bundle2-input: %s\n' % message)
194 194
195 195 def validateparttype(parttype):
196 196 """raise ValueError if a parttype contains invalid character"""
197 197 if _parttypeforbidden.search(parttype):
198 198 raise ValueError(parttype)
199 199
200 200 def _makefpartparamsizes(nbparams):
201 201 """return a struct format to read part parameter sizes
202 202
203 203 The number parameters is variable so we need to build that format
204 204 dynamically.
205 205 """
206 206 return '>'+('BB'*nbparams)
207 207
208 208 parthandlermapping = {}
209 209
210 210 def parthandler(parttype, params=()):
211 211 """decorator that register a function as a bundle2 part handler
212 212
213 213 eg::
214 214
215 215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 216 def myparttypehandler(...):
217 217 '''process a part of type "my part".'''
218 218 ...
219 219 """
220 220 validateparttype(parttype)
221 221 def _decorator(func):
222 222 lparttype = parttype.lower() # enforce lower case matching.
223 223 assert lparttype not in parthandlermapping
224 224 parthandlermapping[lparttype] = func
225 225 func.params = frozenset(params)
226 226 return func
227 227 return _decorator
228 228
229 229 class unbundlerecords(object):
230 230 """keep record of what happens during and unbundle
231 231
232 232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 233 category of record and obj is an arbitrary object.
234 234
235 235 `records['cat']` will return all entries of this category 'cat'.
236 236
237 237 Iterating on the object itself will yield `('category', obj)` tuples
238 238 for all entries.
239 239
240 240 All iterations happens in chronological order.
241 241 """
242 242
243 243 def __init__(self):
244 244 self._categories = {}
245 245 self._sequences = []
246 246 self._replies = {}
247 247
248 248 def add(self, category, entry, inreplyto=None):
249 249 """add a new record of a given category.
250 250
251 251 The entry can then be retrieved in the list returned by
252 252 self['category']."""
253 253 self._categories.setdefault(category, []).append(entry)
254 254 self._sequences.append((category, entry))
255 255 if inreplyto is not None:
256 256 self.getreplies(inreplyto).add(category, entry)
257 257
258 258 def getreplies(self, partid):
259 259 """get the records that are replies to a specific part"""
260 260 return self._replies.setdefault(partid, unbundlerecords())
261 261
262 262 def __getitem__(self, cat):
263 263 return tuple(self._categories.get(cat, ()))
264 264
265 265 def __iter__(self):
266 266 return iter(self._sequences)
267 267
268 268 def __len__(self):
269 269 return len(self._sequences)
270 270
271 271 def __nonzero__(self):
272 272 return bool(self._sequences)
273 273
274 274 class bundleoperation(object):
275 275 """an object that represents a single bundling process
276 276
277 277 Its purpose is to carry unbundle-related objects and states.
278 278
279 279 A new object should be created at the beginning of each bundle processing.
280 280 The object is to be returned by the processing function.
281 281
282 282 The object has very little content now it will ultimately contain:
283 283 * an access to the repo the bundle is applied to,
284 284 * a ui object,
285 285 * a way to retrieve a transaction to add changes to the repo,
286 286 * a way to record the result of processing each part,
287 287 * a way to construct a bundle response when applicable.
288 288 """
289 289
290 290 def __init__(self, repo, transactiongetter, captureoutput=True):
291 291 self.repo = repo
292 292 self.ui = repo.ui
293 293 self.records = unbundlerecords()
294 294 self.gettransaction = transactiongetter
295 295 self.reply = None
296 296 self.captureoutput = captureoutput
297 297
298 298 class TransactionUnavailable(RuntimeError):
299 299 pass
300 300
301 301 def _notransaction():
302 302 """default method to get a transaction while processing a bundle
303 303
304 304 Raise an exception to highlight the fact that no transaction was expected
305 305 to be created"""
306 306 raise TransactionUnavailable()
307 307
308 308 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
309 309 # transform me into unbundler.apply() as soon as the freeze is lifted
310 310 tr.hookargs['bundle2'] = '1'
311 311 if source is not None and 'source' not in tr.hookargs:
312 312 tr.hookargs['source'] = source
313 313 if url is not None and 'url' not in tr.hookargs:
314 314 tr.hookargs['url'] = url
315 315 return processbundle(repo, unbundler, lambda: tr, op=op)
316 316
317 317 def processbundle(repo, unbundler, transactiongetter=None, op=None):
318 318 """This function process a bundle, apply effect to/from a repo
319 319
320 320 It iterates over each part then searches for and uses the proper handling
321 321 code to process the part. Parts are processed in order.
322 322
323 323 This is very early version of this function that will be strongly reworked
324 324 before final usage.
325 325
326 326 Unknown Mandatory part will abort the process.
327 327
328 328 It is temporarily possible to provide a prebuilt bundleoperation to the
329 329 function. This is used to ensure output is properly propagated in case of
330 330 an error during the unbundling. This output capturing part will likely be
331 331 reworked and this ability will probably go away in the process.
332 332 """
333 333 if op is None:
334 334 if transactiongetter is None:
335 335 transactiongetter = _notransaction
336 336 op = bundleoperation(repo, transactiongetter)
337 337 # todo:
338 338 # - replace this is a init function soon.
339 339 # - exception catching
340 340 unbundler.params
341 341 if repo.ui.debugflag:
342 342 msg = ['bundle2-input-bundle:']
343 343 if unbundler.params:
344 344 msg.append(' %i params')
345 345 if op.gettransaction is None:
346 346 msg.append(' no-transaction')
347 347 else:
348 348 msg.append(' with-transaction')
349 349 msg.append('\n')
350 350 repo.ui.debug(''.join(msg))
351 351 iterparts = enumerate(unbundler.iterparts())
352 352 part = None
353 353 nbpart = 0
354 354 try:
355 355 for nbpart, part in iterparts:
356 356 _processpart(op, part)
357 357 except Exception as exc:
358 358 for nbpart, part in iterparts:
359 359 # consume the bundle content
360 360 part.seek(0, 2)
361 361 # Small hack to let caller code distinguish exceptions from bundle2
362 362 # processing from processing the old format. This is mostly
363 363 # needed to handle different return codes to unbundle according to the
364 364 # type of bundle. We should probably clean up or drop this return code
365 365 # craziness in a future version.
366 366 exc.duringunbundle2 = True
367 367 salvaged = []
368 368 replycaps = None
369 369 if op.reply is not None:
370 370 salvaged = op.reply.salvageoutput()
371 371 replycaps = op.reply.capabilities
372 372 exc._replycaps = replycaps
373 373 exc._bundle2salvagedoutput = salvaged
374 374 raise
375 375 finally:
376 376 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
377 377
378 378 return op
379 379
380 380 def _processpart(op, part):
381 381 """process a single part from a bundle
382 382
383 383 The part is guaranteed to have been fully consumed when the function exits
384 384 (even if an exception is raised)."""
385 385 status = 'unknown' # used by debug output
386 386 hardabort = False
387 387 try:
388 388 try:
389 389 handler = parthandlermapping.get(part.type)
390 390 if handler is None:
391 391 status = 'unsupported-type'
392 392 raise error.BundleUnknownFeatureError(parttype=part.type)
393 393 indebug(op.ui, 'found a handler for part %r' % part.type)
394 394 unknownparams = part.mandatorykeys - handler.params
395 395 if unknownparams:
396 396 unknownparams = list(unknownparams)
397 397 unknownparams.sort()
398 398 status = 'unsupported-params (%s)' % unknownparams
399 399 raise error.BundleUnknownFeatureError(parttype=part.type,
400 400 params=unknownparams)
401 401 status = 'supported'
402 402 except error.BundleUnknownFeatureError as exc:
403 403 if part.mandatory: # mandatory parts
404 404 raise
405 405 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
406 406 return # skip to part processing
407 407 finally:
408 408 if op.ui.debugflag:
409 409 msg = ['bundle2-input-part: "%s"' % part.type]
410 410 if not part.mandatory:
411 411 msg.append(' (advisory)')
412 412 nbmp = len(part.mandatorykeys)
413 413 nbap = len(part.params) - nbmp
414 414 if nbmp or nbap:
415 415 msg.append(' (params:')
416 416 if nbmp:
417 417 msg.append(' %i mandatory' % nbmp)
418 418 if nbap:
419 419 msg.append(' %i advisory' % nbmp)
420 420 msg.append(')')
421 421 msg.append(' %s\n' % status)
422 422 op.ui.debug(''.join(msg))
423 423
424 424 # handler is called outside the above try block so that we don't
425 425 # risk catching KeyErrors from anything other than the
426 426 # parthandlermapping lookup (any KeyError raised by handler()
427 427 # itself represents a defect of a different variety).
428 428 output = None
429 429 if op.captureoutput and op.reply is not None:
430 430 op.ui.pushbuffer(error=True, subproc=True)
431 431 output = ''
432 432 try:
433 433 handler(op, part)
434 434 finally:
435 435 if output is not None:
436 436 output = op.ui.popbuffer()
437 437 if output:
438 438 outpart = op.reply.newpart('output', data=output,
439 439 mandatory=False)
440 440 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
441 441 # If exiting or interrupted, do not attempt to seek the stream in the
442 442 # finally block below. This makes abort faster.
443 443 except (SystemExit, KeyboardInterrupt):
444 444 hardabort = True
445 445 raise
446 446 finally:
447 447 # consume the part content to not corrupt the stream.
448 448 if not hardabort:
449 449 part.seek(0, 2)
450 450
451 451
452 452 def decodecaps(blob):
453 453 """decode a bundle2 caps bytes blob into a dictionary
454 454
455 455 The blob is a list of capabilities (one per line)
456 456 Capabilities may have values using a line of the form::
457 457
458 458 capability=value1,value2,value3
459 459
460 460 The values are always a list."""
461 461 caps = {}
462 462 for line in blob.splitlines():
463 463 if not line:
464 464 continue
465 465 if '=' not in line:
466 466 key, vals = line, ()
467 467 else:
468 468 key, vals = line.split('=', 1)
469 469 vals = vals.split(',')
470 470 key = urlreq.unquote(key)
471 471 vals = [urlreq.unquote(v) for v in vals]
472 472 caps[key] = vals
473 473 return caps
474 474
475 475 def encodecaps(caps):
476 476 """encode a bundle2 caps dictionary into a bytes blob"""
477 477 chunks = []
478 478 for ca in sorted(caps):
479 479 vals = caps[ca]
480 480 ca = urlreq.quote(ca)
481 481 vals = [urlreq.quote(v) for v in vals]
482 482 if vals:
483 483 ca = "%s=%s" % (ca, ','.join(vals))
484 484 chunks.append(ca)
485 485 return '\n'.join(chunks)
486 486
487 487 bundletypes = {
488 488 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
489 489 # since the unification ssh accepts a header but there
490 490 # is no capability signaling it.
491 491 "HG20": (), # special-cased below
492 492 "HG10UN": ("HG10UN", 'UN'),
493 493 "HG10BZ": ("HG10", 'BZ'),
494 494 "HG10GZ": ("HG10GZ", 'GZ'),
495 495 }
496 496
497 497 # hgweb uses this list to communicate its preferred type
498 498 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
499 499
500 500 class bundle20(object):
501 501 """represent an outgoing bundle2 container
502 502
503 503 Use the `addparam` method to add stream level parameter. and `newpart` to
504 504 populate it. Then call `getchunks` to retrieve all the binary chunks of
505 505 data that compose the bundle2 container."""
506 506
507 507 _magicstring = 'HG20'
508 508
509 509 def __init__(self, ui, capabilities=()):
510 510 self.ui = ui
511 511 self._params = []
512 512 self._parts = []
513 513 self.capabilities = dict(capabilities)
514 514 self._compengine = util.compengines.forbundletype('UN')
515 515 self._compopts = None
516 516
517 517 def setcompression(self, alg, compopts=None):
518 518 """setup core part compression to <alg>"""
519 519 if alg in (None, 'UN'):
520 520 return
521 assert not any(n.lower() == 'Compression' for n, v in self._params)
521 assert not any(n.lower() == 'compression' for n, v in self._params)
522 522 self.addparam('Compression', alg)
523 523 self._compengine = util.compengines.forbundletype(alg)
524 524 self._compopts = compopts
525 525
526 526 @property
527 527 def nbparts(self):
528 528 """total number of parts added to the bundler"""
529 529 return len(self._parts)
530 530
531 531 # methods used to defines the bundle2 content
532 532 def addparam(self, name, value=None):
533 533 """add a stream level parameter"""
534 534 if not name:
535 535 raise ValueError('empty parameter name')
536 536 if name[0] not in string.letters:
537 537 raise ValueError('non letter first character: %r' % name)
538 538 self._params.append((name, value))
539 539
540 540 def addpart(self, part):
541 541 """add a new part to the bundle2 container
542 542
543 543 Parts contains the actual applicative payload."""
544 544 assert part.id is None
545 545 part.id = len(self._parts) # very cheap counter
546 546 self._parts.append(part)
547 547
548 548 def newpart(self, typeid, *args, **kwargs):
549 549 """create a new part and add it to the containers
550 550
551 551 As the part is directly added to the containers. For now, this means
552 552 that any failure to properly initialize the part after calling
553 553 ``newpart`` should result in a failure of the whole bundling process.
554 554
555 555 You can still fall back to manually create and add if you need better
556 556 control."""
557 557 part = bundlepart(typeid, *args, **kwargs)
558 558 self.addpart(part)
559 559 return part
560 560
561 561 # methods used to generate the bundle2 stream
562 562 def getchunks(self):
563 563 if self.ui.debugflag:
564 564 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
565 565 if self._params:
566 566 msg.append(' (%i params)' % len(self._params))
567 567 msg.append(' %i parts total\n' % len(self._parts))
568 568 self.ui.debug(''.join(msg))
569 569 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
570 570 yield self._magicstring
571 571 param = self._paramchunk()
572 572 outdebug(self.ui, 'bundle parameter: %s' % param)
573 573 yield _pack(_fstreamparamsize, len(param))
574 574 if param:
575 575 yield param
576 576 for chunk in self._compengine.compressstream(self._getcorechunk(),
577 577 self._compopts):
578 578 yield chunk
579 579
580 580 def _paramchunk(self):
581 581 """return a encoded version of all stream parameters"""
582 582 blocks = []
583 583 for par, value in self._params:
584 584 par = urlreq.quote(par)
585 585 if value is not None:
586 586 value = urlreq.quote(value)
587 587 par = '%s=%s' % (par, value)
588 588 blocks.append(par)
589 589 return ' '.join(blocks)
590 590
591 591 def _getcorechunk(self):
592 592 """yield chunk for the core part of the bundle
593 593
594 594 (all but headers and parameters)"""
595 595 outdebug(self.ui, 'start of parts')
596 596 for part in self._parts:
597 597 outdebug(self.ui, 'bundle part: "%s"' % part.type)
598 598 for chunk in part.getchunks(ui=self.ui):
599 599 yield chunk
600 600 outdebug(self.ui, 'end of bundle')
601 601 yield _pack(_fpartheadersize, 0)
602 602
603 603
604 604 def salvageoutput(self):
605 605 """return a list with a copy of all output parts in the bundle
606 606
607 607 This is meant to be used during error handling to make sure we preserve
608 608 server output"""
609 609 salvaged = []
610 610 for part in self._parts:
611 611 if part.type.startswith('output'):
612 612 salvaged.append(part.copy())
613 613 return salvaged
614 614
615 615
616 616 class unpackermixin(object):
617 617 """A mixin to extract bytes and struct data from a stream"""
618 618
619 619 def __init__(self, fp):
620 620 self._fp = fp
621 621 self._seekable = (util.safehasattr(fp, 'seek') and
622 622 util.safehasattr(fp, 'tell'))
623 623
624 624 def _unpack(self, format):
625 625 """unpack this struct format from the stream"""
626 626 data = self._readexact(struct.calcsize(format))
627 627 return _unpack(format, data)
628 628
629 629 def _readexact(self, size):
630 630 """read exactly <size> bytes from the stream"""
631 631 return changegroup.readexactly(self._fp, size)
632 632
633 633 def seek(self, offset, whence=0):
634 634 """move the underlying file pointer"""
635 635 if self._seekable:
636 636 return self._fp.seek(offset, whence)
637 637 else:
638 638 raise NotImplementedError(_('File pointer is not seekable'))
639 639
640 640 def tell(self):
641 641 """return the file offset, or None if file is not seekable"""
642 642 if self._seekable:
643 643 try:
644 644 return self._fp.tell()
645 645 except IOError as e:
646 646 if e.errno == errno.ESPIPE:
647 647 self._seekable = False
648 648 else:
649 649 raise
650 650 return None
651 651
652 652 def close(self):
653 653 """close underlying file"""
654 654 if util.safehasattr(self._fp, 'close'):
655 655 return self._fp.close()
656 656
657 657 def getunbundler(ui, fp, magicstring=None):
658 658 """return a valid unbundler object for a given magicstring"""
659 659 if magicstring is None:
660 660 magicstring = changegroup.readexactly(fp, 4)
661 661 magic, version = magicstring[0:2], magicstring[2:4]
662 662 if magic != 'HG':
663 663 raise error.Abort(_('not a Mercurial bundle'))
664 664 unbundlerclass = formatmap.get(version)
665 665 if unbundlerclass is None:
666 666 raise error.Abort(_('unknown bundle version %s') % version)
667 667 unbundler = unbundlerclass(ui, fp)
668 668 indebug(ui, 'start processing of %s stream' % magicstring)
669 669 return unbundler
670 670
671 671 class unbundle20(unpackermixin):
672 672 """interpret a bundle2 stream
673 673
674 674 This class is fed with a binary stream and yields parts through its
675 675 `iterparts` methods."""
676 676
677 677 _magicstring = 'HG20'
678 678
679 679 def __init__(self, ui, fp):
680 680 """If header is specified, we do not read it out of the stream."""
681 681 self.ui = ui
682 682 self._compengine = util.compengines.forbundletype('UN')
683 683 self._compressed = None
684 684 super(unbundle20, self).__init__(fp)
685 685
686 686 @util.propertycache
687 687 def params(self):
688 688 """dictionary of stream level parameters"""
689 689 indebug(self.ui, 'reading bundle2 stream parameters')
690 690 params = {}
691 691 paramssize = self._unpack(_fstreamparamsize)[0]
692 692 if paramssize < 0:
693 693 raise error.BundleValueError('negative bundle param size: %i'
694 694 % paramssize)
695 695 if paramssize:
696 696 params = self._readexact(paramssize)
697 697 params = self._processallparams(params)
698 698 return params
699 699
700 700 def _processallparams(self, paramsblock):
701 701 """"""
702 702 params = util.sortdict()
703 703 for p in paramsblock.split(' '):
704 704 p = p.split('=', 1)
705 705 p = [urlreq.unquote(i) for i in p]
706 706 if len(p) < 2:
707 707 p.append(None)
708 708 self._processparam(*p)
709 709 params[p[0]] = p[1]
710 710 return params
711 711
712 712
713 713 def _processparam(self, name, value):
714 714 """process a parameter, applying its effect if needed
715 715
716 716 Parameter starting with a lower case letter are advisory and will be
717 717 ignored when unknown. Those starting with an upper case letter are
718 718 mandatory and will this function will raise a KeyError when unknown.
719 719
720 720 Note: no option are currently supported. Any input will be either
721 721 ignored or failing.
722 722 """
723 723 if not name:
724 724 raise ValueError('empty parameter name')
725 725 if name[0] not in string.letters:
726 726 raise ValueError('non letter first character: %r' % name)
727 727 try:
728 728 handler = b2streamparamsmap[name.lower()]
729 729 except KeyError:
730 730 if name[0].islower():
731 731 indebug(self.ui, "ignoring unknown parameter %r" % name)
732 732 else:
733 733 raise error.BundleUnknownFeatureError(params=(name,))
734 734 else:
735 735 handler(self, name, value)
736 736
737 737 def _forwardchunks(self):
738 738 """utility to transfer a bundle2 as binary
739 739
740 740 This is made necessary by the fact the 'getbundle' command over 'ssh'
741 741 have no way to know then the reply end, relying on the bundle to be
742 742 interpreted to know its end. This is terrible and we are sorry, but we
743 743 needed to move forward to get general delta enabled.
744 744 """
745 745 yield self._magicstring
746 746 assert 'params' not in vars(self)
747 747 paramssize = self._unpack(_fstreamparamsize)[0]
748 748 if paramssize < 0:
749 749 raise error.BundleValueError('negative bundle param size: %i'
750 750 % paramssize)
751 751 yield _pack(_fstreamparamsize, paramssize)
752 752 if paramssize:
753 753 params = self._readexact(paramssize)
754 754 self._processallparams(params)
755 755 yield params
756 756 assert self._compengine.bundletype == 'UN'
757 757 # From there, payload might need to be decompressed
758 758 self._fp = self._compengine.decompressorreader(self._fp)
759 759 emptycount = 0
760 760 while emptycount < 2:
761 761 # so we can brainlessly loop
762 762 assert _fpartheadersize == _fpayloadsize
763 763 size = self._unpack(_fpartheadersize)[0]
764 764 yield _pack(_fpartheadersize, size)
765 765 if size:
766 766 emptycount = 0
767 767 else:
768 768 emptycount += 1
769 769 continue
770 770 if size == flaginterrupt:
771 771 continue
772 772 elif size < 0:
773 773 raise error.BundleValueError('negative chunk size: %i')
774 774 yield self._readexact(size)
775 775
776 776
777 777 def iterparts(self):
778 778 """yield all parts contained in the stream"""
779 779 # make sure param have been loaded
780 780 self.params
781 781 # From there, payload need to be decompressed
782 782 self._fp = self._compengine.decompressorreader(self._fp)
783 783 indebug(self.ui, 'start extraction of bundle2 parts')
784 784 headerblock = self._readpartheader()
785 785 while headerblock is not None:
786 786 part = unbundlepart(self.ui, headerblock, self._fp)
787 787 yield part
788 788 part.seek(0, 2)
789 789 headerblock = self._readpartheader()
790 790 indebug(self.ui, 'end of bundle2 stream')
791 791
792 792 def _readpartheader(self):
793 793 """reads a part header size and return the bytes blob
794 794
795 795 returns None if empty"""
796 796 headersize = self._unpack(_fpartheadersize)[0]
797 797 if headersize < 0:
798 798 raise error.BundleValueError('negative part header size: %i'
799 799 % headersize)
800 800 indebug(self.ui, 'part header size: %i' % headersize)
801 801 if headersize:
802 802 return self._readexact(headersize)
803 803 return None
804 804
805 805 def compressed(self):
806 806 self.params # load params
807 807 return self._compressed
808 808
809 809 formatmap = {'20': unbundle20}
810 810
811 811 b2streamparamsmap = {}
812 812
813 813 def b2streamparamhandler(name):
814 814 """register a handler for a stream level parameter"""
815 815 def decorator(func):
816 816 assert name not in formatmap
817 817 b2streamparamsmap[name] = func
818 818 return func
819 819 return decorator
820 820
821 821 @b2streamparamhandler('compression')
822 822 def processcompression(unbundler, param, value):
823 823 """read compression parameter and install payload decompression"""
824 824 if value not in util.compengines.supportedbundletypes:
825 825 raise error.BundleUnknownFeatureError(params=(param,),
826 826 values=(value,))
827 827 unbundler._compengine = util.compengines.forbundletype(value)
828 828 if value is not None:
829 829 unbundler._compressed = True
830 830
831 831 class bundlepart(object):
832 832 """A bundle2 part contains application level payload
833 833
834 834 The part `type` is used to route the part to the application level
835 835 handler.
836 836
837 837 The part payload is contained in ``part.data``. It could be raw bytes or a
838 838 generator of byte chunks.
839 839
840 840 You can add parameters to the part using the ``addparam`` method.
841 841 Parameters can be either mandatory (default) or advisory. Remote side
842 842 should be able to safely ignore the advisory ones.
843 843
844 844 Both data and parameters cannot be modified after the generation has begun.
845 845 """
846 846
847 847 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
848 848 data='', mandatory=True):
849 849 validateparttype(parttype)
850 850 self.id = None
851 851 self.type = parttype
852 852 self._data = data
853 853 self._mandatoryparams = list(mandatoryparams)
854 854 self._advisoryparams = list(advisoryparams)
855 855 # checking for duplicated entries
856 856 self._seenparams = set()
857 857 for pname, __ in self._mandatoryparams + self._advisoryparams:
858 858 if pname in self._seenparams:
859 859 raise RuntimeError('duplicated params: %s' % pname)
860 860 self._seenparams.add(pname)
861 861 # status of the part's generation:
862 862 # - None: not started,
863 863 # - False: currently generated,
864 864 # - True: generation done.
865 865 self._generated = None
866 866 self.mandatory = mandatory
867 867
868 868 def copy(self):
869 869 """return a copy of the part
870 870
871 871 The new part have the very same content but no partid assigned yet.
872 872 Parts with generated data cannot be copied."""
873 873 assert not util.safehasattr(self.data, 'next')
874 874 return self.__class__(self.type, self._mandatoryparams,
875 875 self._advisoryparams, self._data, self.mandatory)
876 876
877 877 # methods used to defines the part content
878 878 @property
879 879 def data(self):
880 880 return self._data
881 881
882 882 @data.setter
883 883 def data(self, data):
884 884 if self._generated is not None:
885 885 raise error.ReadOnlyPartError('part is being generated')
886 886 self._data = data
887 887
888 888 @property
889 889 def mandatoryparams(self):
890 890 # make it an immutable tuple to force people through ``addparam``
891 891 return tuple(self._mandatoryparams)
892 892
893 893 @property
894 894 def advisoryparams(self):
895 895 # make it an immutable tuple to force people through ``addparam``
896 896 return tuple(self._advisoryparams)
897 897
898 898 def addparam(self, name, value='', mandatory=True):
899 899 if self._generated is not None:
900 900 raise error.ReadOnlyPartError('part is being generated')
901 901 if name in self._seenparams:
902 902 raise ValueError('duplicated params: %s' % name)
903 903 self._seenparams.add(name)
904 904 params = self._advisoryparams
905 905 if mandatory:
906 906 params = self._mandatoryparams
907 907 params.append((name, value))
908 908
909 909 # methods used to generates the bundle2 stream
910 910 def getchunks(self, ui):
911 911 if self._generated is not None:
912 912 raise RuntimeError('part can only be consumed once')
913 913 self._generated = False
914 914
915 915 if ui.debugflag:
916 916 msg = ['bundle2-output-part: "%s"' % self.type]
917 917 if not self.mandatory:
918 918 msg.append(' (advisory)')
919 919 nbmp = len(self.mandatoryparams)
920 920 nbap = len(self.advisoryparams)
921 921 if nbmp or nbap:
922 922 msg.append(' (params:')
923 923 if nbmp:
924 924 msg.append(' %i mandatory' % nbmp)
925 925 if nbap:
926 926 msg.append(' %i advisory' % nbmp)
927 927 msg.append(')')
928 928 if not self.data:
929 929 msg.append(' empty payload')
930 930 elif util.safehasattr(self.data, 'next'):
931 931 msg.append(' streamed payload')
932 932 else:
933 933 msg.append(' %i bytes payload' % len(self.data))
934 934 msg.append('\n')
935 935 ui.debug(''.join(msg))
936 936
937 937 #### header
938 938 if self.mandatory:
939 939 parttype = self.type.upper()
940 940 else:
941 941 parttype = self.type.lower()
942 942 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
943 943 ## parttype
944 944 header = [_pack(_fparttypesize, len(parttype)),
945 945 parttype, _pack(_fpartid, self.id),
946 946 ]
947 947 ## parameters
948 948 # count
949 949 manpar = self.mandatoryparams
950 950 advpar = self.advisoryparams
951 951 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
952 952 # size
953 953 parsizes = []
954 954 for key, value in manpar:
955 955 parsizes.append(len(key))
956 956 parsizes.append(len(value))
957 957 for key, value in advpar:
958 958 parsizes.append(len(key))
959 959 parsizes.append(len(value))
960 960 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
961 961 header.append(paramsizes)
962 962 # key, value
963 963 for key, value in manpar:
964 964 header.append(key)
965 965 header.append(value)
966 966 for key, value in advpar:
967 967 header.append(key)
968 968 header.append(value)
969 969 ## finalize header
970 970 headerchunk = ''.join(header)
971 971 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
972 972 yield _pack(_fpartheadersize, len(headerchunk))
973 973 yield headerchunk
974 974 ## payload
975 975 try:
976 976 for chunk in self._payloadchunks():
977 977 outdebug(ui, 'payload chunk size: %i' % len(chunk))
978 978 yield _pack(_fpayloadsize, len(chunk))
979 979 yield chunk
980 980 except GeneratorExit:
981 981 # GeneratorExit means that nobody is listening for our
982 982 # results anyway, so just bail quickly rather than trying
983 983 # to produce an error part.
984 984 ui.debug('bundle2-generatorexit\n')
985 985 raise
986 986 except BaseException as exc:
987 987 # backup exception data for later
988 988 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
989 989 % exc)
990 990 exc_info = sys.exc_info()
991 991 msg = 'unexpected error: %s' % exc
992 992 interpart = bundlepart('error:abort', [('message', msg)],
993 993 mandatory=False)
994 994 interpart.id = 0
995 995 yield _pack(_fpayloadsize, -1)
996 996 for chunk in interpart.getchunks(ui=ui):
997 997 yield chunk
998 998 outdebug(ui, 'closing payload chunk')
999 999 # abort current part payload
1000 1000 yield _pack(_fpayloadsize, 0)
1001 1001 if pycompat.ispy3:
1002 1002 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1003 1003 else:
1004 1004 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1005 1005 # end of payload
1006 1006 outdebug(ui, 'closing payload chunk')
1007 1007 yield _pack(_fpayloadsize, 0)
1008 1008 self._generated = True
1009 1009
1010 1010 def _payloadchunks(self):
1011 1011 """yield chunks of a the part payload
1012 1012
1013 1013 Exists to handle the different methods to provide data to a part."""
1014 1014 # we only support fixed size data now.
1015 1015 # This will be improved in the future.
1016 1016 if util.safehasattr(self.data, 'next'):
1017 1017 buff = util.chunkbuffer(self.data)
1018 1018 chunk = buff.read(preferedchunksize)
1019 1019 while chunk:
1020 1020 yield chunk
1021 1021 chunk = buff.read(preferedchunksize)
1022 1022 elif len(self.data):
1023 1023 yield self.data
1024 1024
1025 1025
1026 1026 flaginterrupt = -1
1027 1027
1028 1028 class interrupthandler(unpackermixin):
1029 1029 """read one part and process it with restricted capability
1030 1030
1031 1031 This allows to transmit exception raised on the producer size during part
1032 1032 iteration while the consumer is reading a part.
1033 1033
1034 1034 Part processed in this manner only have access to a ui object,"""
1035 1035
1036 1036 def __init__(self, ui, fp):
1037 1037 super(interrupthandler, self).__init__(fp)
1038 1038 self.ui = ui
1039 1039
1040 1040 def _readpartheader(self):
1041 1041 """reads a part header size and return the bytes blob
1042 1042
1043 1043 returns None if empty"""
1044 1044 headersize = self._unpack(_fpartheadersize)[0]
1045 1045 if headersize < 0:
1046 1046 raise error.BundleValueError('negative part header size: %i'
1047 1047 % headersize)
1048 1048 indebug(self.ui, 'part header size: %i\n' % headersize)
1049 1049 if headersize:
1050 1050 return self._readexact(headersize)
1051 1051 return None
1052 1052
1053 1053 def __call__(self):
1054 1054
1055 1055 self.ui.debug('bundle2-input-stream-interrupt:'
1056 1056 ' opening out of band context\n')
1057 1057 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1058 1058 headerblock = self._readpartheader()
1059 1059 if headerblock is None:
1060 1060 indebug(self.ui, 'no part found during interruption.')
1061 1061 return
1062 1062 part = unbundlepart(self.ui, headerblock, self._fp)
1063 1063 op = interruptoperation(self.ui)
1064 1064 _processpart(op, part)
1065 1065 self.ui.debug('bundle2-input-stream-interrupt:'
1066 1066 ' closing out of band context\n')
1067 1067
1068 1068 class interruptoperation(object):
1069 1069 """A limited operation to be use by part handler during interruption
1070 1070
1071 1071 It only have access to an ui object.
1072 1072 """
1073 1073
1074 1074 def __init__(self, ui):
1075 1075 self.ui = ui
1076 1076 self.reply = None
1077 1077 self.captureoutput = False
1078 1078
1079 1079 @property
1080 1080 def repo(self):
1081 1081 raise RuntimeError('no repo access from stream interruption')
1082 1082
1083 1083 def gettransaction(self):
1084 1084 raise TransactionUnavailable('no repo access from stream interruption')
1085 1085
1086 1086 class unbundlepart(unpackermixin):
1087 1087 """a bundle part read from a bundle"""
1088 1088
1089 1089 def __init__(self, ui, header, fp):
1090 1090 super(unbundlepart, self).__init__(fp)
1091 1091 self.ui = ui
1092 1092 # unbundle state attr
1093 1093 self._headerdata = header
1094 1094 self._headeroffset = 0
1095 1095 self._initialized = False
1096 1096 self.consumed = False
1097 1097 # part data
1098 1098 self.id = None
1099 1099 self.type = None
1100 1100 self.mandatoryparams = None
1101 1101 self.advisoryparams = None
1102 1102 self.params = None
1103 1103 self.mandatorykeys = ()
1104 1104 self._payloadstream = None
1105 1105 self._readheader()
1106 1106 self._mandatory = None
1107 1107 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1108 1108 self._pos = 0
1109 1109
1110 1110 def _fromheader(self, size):
1111 1111 """return the next <size> byte from the header"""
1112 1112 offset = self._headeroffset
1113 1113 data = self._headerdata[offset:(offset + size)]
1114 1114 self._headeroffset = offset + size
1115 1115 return data
1116 1116
1117 1117 def _unpackheader(self, format):
1118 1118 """read given format from header
1119 1119
1120 1120 This automatically compute the size of the format to read."""
1121 1121 data = self._fromheader(struct.calcsize(format))
1122 1122 return _unpack(format, data)
1123 1123
1124 1124 def _initparams(self, mandatoryparams, advisoryparams):
1125 1125 """internal function to setup all logic related parameters"""
1126 1126 # make it read only to prevent people touching it by mistake.
1127 1127 self.mandatoryparams = tuple(mandatoryparams)
1128 1128 self.advisoryparams = tuple(advisoryparams)
1129 1129 # user friendly UI
1130 1130 self.params = util.sortdict(self.mandatoryparams)
1131 1131 self.params.update(self.advisoryparams)
1132 1132 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1133 1133
1134 1134 def _payloadchunks(self, chunknum=0):
1135 1135 '''seek to specified chunk and start yielding data'''
1136 1136 if len(self._chunkindex) == 0:
1137 1137 assert chunknum == 0, 'Must start with chunk 0'
1138 1138 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1139 1139 else:
1140 1140 assert chunknum < len(self._chunkindex), \
1141 1141 'Unknown chunk %d' % chunknum
1142 1142 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1143 1143
1144 1144 pos = self._chunkindex[chunknum][0]
1145 1145 payloadsize = self._unpack(_fpayloadsize)[0]
1146 1146 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1147 1147 while payloadsize:
1148 1148 if payloadsize == flaginterrupt:
1149 1149 # interruption detection, the handler will now read a
1150 1150 # single part and process it.
1151 1151 interrupthandler(self.ui, self._fp)()
1152 1152 elif payloadsize < 0:
1153 1153 msg = 'negative payload chunk size: %i' % payloadsize
1154 1154 raise error.BundleValueError(msg)
1155 1155 else:
1156 1156 result = self._readexact(payloadsize)
1157 1157 chunknum += 1
1158 1158 pos += payloadsize
1159 1159 if chunknum == len(self._chunkindex):
1160 1160 self._chunkindex.append((pos,
1161 1161 super(unbundlepart, self).tell()))
1162 1162 yield result
1163 1163 payloadsize = self._unpack(_fpayloadsize)[0]
1164 1164 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1165 1165
1166 1166 def _findchunk(self, pos):
1167 1167 '''for a given payload position, return a chunk number and offset'''
1168 1168 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1169 1169 if ppos == pos:
1170 1170 return chunk, 0
1171 1171 elif ppos > pos:
1172 1172 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1173 1173 raise ValueError('Unknown chunk')
1174 1174
1175 1175 def _readheader(self):
1176 1176 """read the header and setup the object"""
1177 1177 typesize = self._unpackheader(_fparttypesize)[0]
1178 1178 self.type = self._fromheader(typesize)
1179 1179 indebug(self.ui, 'part type: "%s"' % self.type)
1180 1180 self.id = self._unpackheader(_fpartid)[0]
1181 1181 indebug(self.ui, 'part id: "%s"' % self.id)
1182 1182 # extract mandatory bit from type
1183 1183 self.mandatory = (self.type != self.type.lower())
1184 1184 self.type = self.type.lower()
1185 1185 ## reading parameters
1186 1186 # param count
1187 1187 mancount, advcount = self._unpackheader(_fpartparamcount)
1188 1188 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1189 1189 # param size
1190 1190 fparamsizes = _makefpartparamsizes(mancount + advcount)
1191 1191 paramsizes = self._unpackheader(fparamsizes)
1192 1192 # make it a list of couple again
1193 1193 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1194 1194 # split mandatory from advisory
1195 1195 mansizes = paramsizes[:mancount]
1196 1196 advsizes = paramsizes[mancount:]
1197 1197 # retrieve param value
1198 1198 manparams = []
1199 1199 for key, value in mansizes:
1200 1200 manparams.append((self._fromheader(key), self._fromheader(value)))
1201 1201 advparams = []
1202 1202 for key, value in advsizes:
1203 1203 advparams.append((self._fromheader(key), self._fromheader(value)))
1204 1204 self._initparams(manparams, advparams)
1205 1205 ## part payload
1206 1206 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1207 1207 # we read the data, tell it
1208 1208 self._initialized = True
1209 1209
1210 1210 def read(self, size=None):
1211 1211 """read payload data"""
1212 1212 if not self._initialized:
1213 1213 self._readheader()
1214 1214 if size is None:
1215 1215 data = self._payloadstream.read()
1216 1216 else:
1217 1217 data = self._payloadstream.read(size)
1218 1218 self._pos += len(data)
1219 1219 if size is None or len(data) < size:
1220 1220 if not self.consumed and self._pos:
1221 1221 self.ui.debug('bundle2-input-part: total payload size %i\n'
1222 1222 % self._pos)
1223 1223 self.consumed = True
1224 1224 return data
1225 1225
1226 1226 def tell(self):
1227 1227 return self._pos
1228 1228
1229 1229 def seek(self, offset, whence=0):
1230 1230 if whence == 0:
1231 1231 newpos = offset
1232 1232 elif whence == 1:
1233 1233 newpos = self._pos + offset
1234 1234 elif whence == 2:
1235 1235 if not self.consumed:
1236 1236 self.read()
1237 1237 newpos = self._chunkindex[-1][0] - offset
1238 1238 else:
1239 1239 raise ValueError('Unknown whence value: %r' % (whence,))
1240 1240
1241 1241 if newpos > self._chunkindex[-1][0] and not self.consumed:
1242 1242 self.read()
1243 1243 if not 0 <= newpos <= self._chunkindex[-1][0]:
1244 1244 raise ValueError('Offset out of range')
1245 1245
1246 1246 if self._pos != newpos:
1247 1247 chunk, internaloffset = self._findchunk(newpos)
1248 1248 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1249 1249 adjust = self.read(internaloffset)
1250 1250 if len(adjust) != internaloffset:
1251 1251 raise error.Abort(_('Seek failed\n'))
1252 1252 self._pos = newpos
1253 1253
1254 1254 # These are only the static capabilities.
1255 1255 # Check the 'getrepocaps' function for the rest.
1256 1256 capabilities = {'HG20': (),
1257 1257 'error': ('abort', 'unsupportedcontent', 'pushraced',
1258 1258 'pushkey'),
1259 1259 'listkeys': (),
1260 1260 'pushkey': (),
1261 1261 'digests': tuple(sorted(util.DIGESTS.keys())),
1262 1262 'remote-changegroup': ('http', 'https'),
1263 1263 'hgtagsfnodes': (),
1264 1264 }
1265 1265
1266 1266 def getrepocaps(repo, allowpushback=False):
1267 1267 """return the bundle2 capabilities for a given repo
1268 1268
1269 1269 Exists to allow extensions (like evolution) to mutate the capabilities.
1270 1270 """
1271 1271 caps = capabilities.copy()
1272 1272 caps['changegroup'] = tuple(sorted(
1273 1273 changegroup.supportedincomingversions(repo)))
1274 1274 if obsolete.isenabled(repo, obsolete.exchangeopt):
1275 1275 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1276 1276 caps['obsmarkers'] = supportedformat
1277 1277 if allowpushback:
1278 1278 caps['pushback'] = ()
1279 1279 return caps
1280 1280
1281 1281 def bundle2caps(remote):
1282 1282 """return the bundle capabilities of a peer as dict"""
1283 1283 raw = remote.capable('bundle2')
1284 1284 if not raw and raw != '':
1285 1285 return {}
1286 1286 capsblob = urlreq.unquote(remote.capable('bundle2'))
1287 1287 return decodecaps(capsblob)
1288 1288
1289 1289 def obsmarkersversion(caps):
1290 1290 """extract the list of supported obsmarkers versions from a bundle2caps dict
1291 1291 """
1292 1292 obscaps = caps.get('obsmarkers', ())
1293 1293 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1294 1294
1295 1295 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1296 1296 compopts=None):
1297 1297 """Write a bundle file and return its filename.
1298 1298
1299 1299 Existing files will not be overwritten.
1300 1300 If no filename is specified, a temporary file is created.
1301 1301 bz2 compression can be turned off.
1302 1302 The bundle file will be deleted in case of errors.
1303 1303 """
1304 1304
1305 1305 if bundletype == "HG20":
1306 1306 bundle = bundle20(ui)
1307 1307 bundle.setcompression(compression, compopts)
1308 1308 part = bundle.newpart('changegroup', data=cg.getchunks())
1309 1309 part.addparam('version', cg.version)
1310 1310 if 'clcount' in cg.extras:
1311 1311 part.addparam('nbchanges', str(cg.extras['clcount']),
1312 1312 mandatory=False)
1313 1313 chunkiter = bundle.getchunks()
1314 1314 else:
1315 1315 # compression argument is only for the bundle2 case
1316 1316 assert compression is None
1317 1317 if cg.version != '01':
1318 1318 raise error.Abort(_('old bundle types only supports v1 '
1319 1319 'changegroups'))
1320 1320 header, comp = bundletypes[bundletype]
1321 1321 if comp not in util.compengines.supportedbundletypes:
1322 1322 raise error.Abort(_('unknown stream compression type: %s')
1323 1323 % comp)
1324 1324 compengine = util.compengines.forbundletype(comp)
1325 1325 def chunkiter():
1326 1326 yield header
1327 1327 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1328 1328 yield chunk
1329 1329 chunkiter = chunkiter()
1330 1330
1331 1331 # parse the changegroup data, otherwise we will block
1332 1332 # in case of sshrepo because we don't know the end of the stream
1333 1333 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1334 1334
1335 1335 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1336 1336 def handlechangegroup(op, inpart):
1337 1337 """apply a changegroup part on the repo
1338 1338
1339 1339 This is a very early implementation that will massive rework before being
1340 1340 inflicted to any end-user.
1341 1341 """
1342 1342 # Make sure we trigger a transaction creation
1343 1343 #
1344 1344 # The addchangegroup function will get a transaction object by itself, but
1345 1345 # we need to make sure we trigger the creation of a transaction object used
1346 1346 # for the whole processing scope.
1347 1347 op.gettransaction()
1348 1348 unpackerversion = inpart.params.get('version', '01')
1349 1349 # We should raise an appropriate exception here
1350 1350 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1351 1351 # the source and url passed here are overwritten by the one contained in
1352 1352 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1353 1353 nbchangesets = None
1354 1354 if 'nbchanges' in inpart.params:
1355 1355 nbchangesets = int(inpart.params.get('nbchanges'))
1356 1356 if ('treemanifest' in inpart.params and
1357 1357 'treemanifest' not in op.repo.requirements):
1358 1358 if len(op.repo.changelog) != 0:
1359 1359 raise error.Abort(_(
1360 1360 "bundle contains tree manifests, but local repo is "
1361 1361 "non-empty and does not use tree manifests"))
1362 1362 op.repo.requirements.add('treemanifest')
1363 1363 op.repo._applyopenerreqs()
1364 1364 op.repo._writerequirements()
1365 1365 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1366 1366 op.records.add('changegroup', {'return': ret})
1367 1367 if op.reply is not None:
1368 1368 # This is definitely not the final form of this
1369 1369 # return. But one need to start somewhere.
1370 1370 part = op.reply.newpart('reply:changegroup', mandatory=False)
1371 1371 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1372 1372 part.addparam('return', '%i' % ret, mandatory=False)
1373 1373 assert not inpart.read()
1374 1374
1375 1375 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1376 1376 ['digest:%s' % k for k in util.DIGESTS.keys()])
1377 1377 @parthandler('remote-changegroup', _remotechangegroupparams)
1378 1378 def handleremotechangegroup(op, inpart):
1379 1379 """apply a bundle10 on the repo, given an url and validation information
1380 1380
1381 1381 All the information about the remote bundle to import are given as
1382 1382 parameters. The parameters include:
1383 1383 - url: the url to the bundle10.
1384 1384 - size: the bundle10 file size. It is used to validate what was
1385 1385 retrieved by the client matches the server knowledge about the bundle.
1386 1386 - digests: a space separated list of the digest types provided as
1387 1387 parameters.
1388 1388 - digest:<digest-type>: the hexadecimal representation of the digest with
1389 1389 that name. Like the size, it is used to validate what was retrieved by
1390 1390 the client matches what the server knows about the bundle.
1391 1391
1392 1392 When multiple digest types are given, all of them are checked.
1393 1393 """
1394 1394 try:
1395 1395 raw_url = inpart.params['url']
1396 1396 except KeyError:
1397 1397 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1398 1398 parsed_url = util.url(raw_url)
1399 1399 if parsed_url.scheme not in capabilities['remote-changegroup']:
1400 1400 raise error.Abort(_('remote-changegroup does not support %s urls') %
1401 1401 parsed_url.scheme)
1402 1402
1403 1403 try:
1404 1404 size = int(inpart.params['size'])
1405 1405 except ValueError:
1406 1406 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1407 1407 % 'size')
1408 1408 except KeyError:
1409 1409 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1410 1410
1411 1411 digests = {}
1412 1412 for typ in inpart.params.get('digests', '').split():
1413 1413 param = 'digest:%s' % typ
1414 1414 try:
1415 1415 value = inpart.params[param]
1416 1416 except KeyError:
1417 1417 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1418 1418 param)
1419 1419 digests[typ] = value
1420 1420
1421 1421 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1422 1422
1423 1423 # Make sure we trigger a transaction creation
1424 1424 #
1425 1425 # The addchangegroup function will get a transaction object by itself, but
1426 1426 # we need to make sure we trigger the creation of a transaction object used
1427 1427 # for the whole processing scope.
1428 1428 op.gettransaction()
1429 1429 from . import exchange
1430 1430 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1431 1431 if not isinstance(cg, changegroup.cg1unpacker):
1432 1432 raise error.Abort(_('%s: not a bundle version 1.0') %
1433 1433 util.hidepassword(raw_url))
1434 1434 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1435 1435 op.records.add('changegroup', {'return': ret})
1436 1436 if op.reply is not None:
1437 1437 # This is definitely not the final form of this
1438 1438 # return. But one need to start somewhere.
1439 1439 part = op.reply.newpart('reply:changegroup')
1440 1440 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1441 1441 part.addparam('return', '%i' % ret, mandatory=False)
1442 1442 try:
1443 1443 real_part.validate()
1444 1444 except error.Abort as e:
1445 1445 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1446 1446 (util.hidepassword(raw_url), str(e)))
1447 1447 assert not inpart.read()
1448 1448
1449 1449 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1450 1450 def handlereplychangegroup(op, inpart):
1451 1451 ret = int(inpart.params['return'])
1452 1452 replyto = int(inpart.params['in-reply-to'])
1453 1453 op.records.add('changegroup', {'return': ret}, replyto)
1454 1454
1455 1455 @parthandler('check:heads')
1456 1456 def handlecheckheads(op, inpart):
1457 1457 """check that head of the repo did not change
1458 1458
1459 1459 This is used to detect a push race when using unbundle.
1460 1460 This replaces the "heads" argument of unbundle."""
1461 1461 h = inpart.read(20)
1462 1462 heads = []
1463 1463 while len(h) == 20:
1464 1464 heads.append(h)
1465 1465 h = inpart.read(20)
1466 1466 assert not h
1467 1467 # Trigger a transaction so that we are guaranteed to have the lock now.
1468 1468 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1469 1469 op.gettransaction()
1470 1470 if sorted(heads) != sorted(op.repo.heads()):
1471 1471 raise error.PushRaced('repository changed while pushing - '
1472 1472 'please try again')
1473 1473
1474 1474 @parthandler('output')
1475 1475 def handleoutput(op, inpart):
1476 1476 """forward output captured on the server to the client"""
1477 1477 for line in inpart.read().splitlines():
1478 1478 op.ui.status(_('remote: %s\n') % line)
1479 1479
1480 1480 @parthandler('replycaps')
1481 1481 def handlereplycaps(op, inpart):
1482 1482 """Notify that a reply bundle should be created
1483 1483
1484 1484 The payload contains the capabilities information for the reply"""
1485 1485 caps = decodecaps(inpart.read())
1486 1486 if op.reply is None:
1487 1487 op.reply = bundle20(op.ui, caps)
1488 1488
1489 1489 class AbortFromPart(error.Abort):
1490 1490 """Sub-class of Abort that denotes an error from a bundle2 part."""
1491 1491
1492 1492 @parthandler('error:abort', ('message', 'hint'))
1493 1493 def handleerrorabort(op, inpart):
1494 1494 """Used to transmit abort error over the wire"""
1495 1495 raise AbortFromPart(inpart.params['message'],
1496 1496 hint=inpart.params.get('hint'))
1497 1497
1498 1498 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1499 1499 'in-reply-to'))
1500 1500 def handleerrorpushkey(op, inpart):
1501 1501 """Used to transmit failure of a mandatory pushkey over the wire"""
1502 1502 kwargs = {}
1503 1503 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1504 1504 value = inpart.params.get(name)
1505 1505 if value is not None:
1506 1506 kwargs[name] = value
1507 1507 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1508 1508
1509 1509 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1510 1510 def handleerrorunsupportedcontent(op, inpart):
1511 1511 """Used to transmit unknown content error over the wire"""
1512 1512 kwargs = {}
1513 1513 parttype = inpart.params.get('parttype')
1514 1514 if parttype is not None:
1515 1515 kwargs['parttype'] = parttype
1516 1516 params = inpart.params.get('params')
1517 1517 if params is not None:
1518 1518 kwargs['params'] = params.split('\0')
1519 1519
1520 1520 raise error.BundleUnknownFeatureError(**kwargs)
1521 1521
1522 1522 @parthandler('error:pushraced', ('message',))
1523 1523 def handleerrorpushraced(op, inpart):
1524 1524 """Used to transmit push race error over the wire"""
1525 1525 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1526 1526
1527 1527 @parthandler('listkeys', ('namespace',))
1528 1528 def handlelistkeys(op, inpart):
1529 1529 """retrieve pushkey namespace content stored in a bundle2"""
1530 1530 namespace = inpart.params['namespace']
1531 1531 r = pushkey.decodekeys(inpart.read())
1532 1532 op.records.add('listkeys', (namespace, r))
1533 1533
1534 1534 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1535 1535 def handlepushkey(op, inpart):
1536 1536 """process a pushkey request"""
1537 1537 dec = pushkey.decode
1538 1538 namespace = dec(inpart.params['namespace'])
1539 1539 key = dec(inpart.params['key'])
1540 1540 old = dec(inpart.params['old'])
1541 1541 new = dec(inpart.params['new'])
1542 1542 # Grab the transaction to ensure that we have the lock before performing the
1543 1543 # pushkey.
1544 1544 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1545 1545 op.gettransaction()
1546 1546 ret = op.repo.pushkey(namespace, key, old, new)
1547 1547 record = {'namespace': namespace,
1548 1548 'key': key,
1549 1549 'old': old,
1550 1550 'new': new}
1551 1551 op.records.add('pushkey', record)
1552 1552 if op.reply is not None:
1553 1553 rpart = op.reply.newpart('reply:pushkey')
1554 1554 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1555 1555 rpart.addparam('return', '%i' % ret, mandatory=False)
1556 1556 if inpart.mandatory and not ret:
1557 1557 kwargs = {}
1558 1558 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1559 1559 if key in inpart.params:
1560 1560 kwargs[key] = inpart.params[key]
1561 1561 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1562 1562
1563 1563 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1564 1564 def handlepushkeyreply(op, inpart):
1565 1565 """retrieve the result of a pushkey request"""
1566 1566 ret = int(inpart.params['return'])
1567 1567 partid = int(inpart.params['in-reply-to'])
1568 1568 op.records.add('pushkey', {'return': ret}, partid)
1569 1569
1570 1570 @parthandler('obsmarkers')
1571 1571 def handleobsmarker(op, inpart):
1572 1572 """add a stream of obsmarkers to the repo"""
1573 1573 tr = op.gettransaction()
1574 1574 markerdata = inpart.read()
1575 1575 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1576 1576 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1577 1577 % len(markerdata))
1578 1578 # The mergemarkers call will crash if marker creation is not enabled.
1579 1579 # we want to avoid this if the part is advisory.
1580 1580 if not inpart.mandatory and op.repo.obsstore.readonly:
1581 1581 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1582 1582 return
1583 1583 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1584 1584 if new:
1585 1585 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1586 1586 op.records.add('obsmarkers', {'new': new})
1587 1587 if op.reply is not None:
1588 1588 rpart = op.reply.newpart('reply:obsmarkers')
1589 1589 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1590 1590 rpart.addparam('new', '%i' % new, mandatory=False)
1591 1591
1592 1592
1593 1593 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1594 1594 def handleobsmarkerreply(op, inpart):
1595 1595 """retrieve the result of a pushkey request"""
1596 1596 ret = int(inpart.params['new'])
1597 1597 partid = int(inpart.params['in-reply-to'])
1598 1598 op.records.add('obsmarkers', {'new': ret}, partid)
1599 1599
1600 1600 @parthandler('hgtagsfnodes')
1601 1601 def handlehgtagsfnodes(op, inpart):
1602 1602 """Applies .hgtags fnodes cache entries to the local repo.
1603 1603
1604 1604 Payload is pairs of 20 byte changeset nodes and filenodes.
1605 1605 """
1606 1606 # Grab the transaction so we ensure that we have the lock at this point.
1607 1607 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1608 1608 op.gettransaction()
1609 1609 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1610 1610
1611 1611 count = 0
1612 1612 while True:
1613 1613 node = inpart.read(20)
1614 1614 fnode = inpart.read(20)
1615 1615 if len(node) < 20 or len(fnode) < 20:
1616 1616 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1617 1617 break
1618 1618 cache.setfnode(node, fnode)
1619 1619 count += 1
1620 1620
1621 1621 cache.write()
1622 1622 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now