##// END OF EJS Templates
merge with stable
Augie Fackler -
r30965:f01df5d2 merge default
parent child Browse files
Show More
@@ -1,1624 +1,1624 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 pushkey,
162 162 pycompat,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 urlerr = util.urlerr
169 169 urlreq = util.urlreq
170 170
171 171 _pack = struct.pack
172 172 _unpack = struct.unpack
173 173
174 174 _fstreamparamsize = '>i'
175 175 _fpartheadersize = '>i'
176 176 _fparttypesize = '>B'
177 177 _fpartid = '>I'
178 178 _fpayloadsize = '>i'
179 179 _fpartparamcount = '>BB'
180 180
181 181 preferedchunksize = 4096
182 182
183 183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184 184
185 185 def outdebug(ui, message):
186 186 """debug regarding output stream (bundling)"""
187 187 if ui.configbool('devel', 'bundle2.debug', False):
188 188 ui.debug('bundle2-output: %s\n' % message)
189 189
190 190 def indebug(ui, message):
191 191 """debug on input stream (unbundling)"""
192 192 if ui.configbool('devel', 'bundle2.debug', False):
193 193 ui.debug('bundle2-input: %s\n' % message)
194 194
195 195 def validateparttype(parttype):
196 196 """raise ValueError if a parttype contains invalid character"""
197 197 if _parttypeforbidden.search(parttype):
198 198 raise ValueError(parttype)
199 199
200 200 def _makefpartparamsizes(nbparams):
201 201 """return a struct format to read part parameter sizes
202 202
203 203 The number parameters is variable so we need to build that format
204 204 dynamically.
205 205 """
206 206 return '>'+('BB'*nbparams)
207 207
208 208 parthandlermapping = {}
209 209
210 210 def parthandler(parttype, params=()):
211 211 """decorator that register a function as a bundle2 part handler
212 212
213 213 eg::
214 214
215 215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 216 def myparttypehandler(...):
217 217 '''process a part of type "my part".'''
218 218 ...
219 219 """
220 220 validateparttype(parttype)
221 221 def _decorator(func):
222 222 lparttype = parttype.lower() # enforce lower case matching.
223 223 assert lparttype not in parthandlermapping
224 224 parthandlermapping[lparttype] = func
225 225 func.params = frozenset(params)
226 226 return func
227 227 return _decorator
228 228
229 229 class unbundlerecords(object):
230 230 """keep record of what happens during and unbundle
231 231
232 232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 233 category of record and obj is an arbitrary object.
234 234
235 235 `records['cat']` will return all entries of this category 'cat'.
236 236
237 237 Iterating on the object itself will yield `('category', obj)` tuples
238 238 for all entries.
239 239
240 240 All iterations happens in chronological order.
241 241 """
242 242
243 243 def __init__(self):
244 244 self._categories = {}
245 245 self._sequences = []
246 246 self._replies = {}
247 247
248 248 def add(self, category, entry, inreplyto=None):
249 249 """add a new record of a given category.
250 250
251 251 The entry can then be retrieved in the list returned by
252 252 self['category']."""
253 253 self._categories.setdefault(category, []).append(entry)
254 254 self._sequences.append((category, entry))
255 255 if inreplyto is not None:
256 256 self.getreplies(inreplyto).add(category, entry)
257 257
258 258 def getreplies(self, partid):
259 259 """get the records that are replies to a specific part"""
260 260 return self._replies.setdefault(partid, unbundlerecords())
261 261
262 262 def __getitem__(self, cat):
263 263 return tuple(self._categories.get(cat, ()))
264 264
265 265 def __iter__(self):
266 266 return iter(self._sequences)
267 267
268 268 def __len__(self):
269 269 return len(self._sequences)
270 270
271 271 def __nonzero__(self):
272 272 return bool(self._sequences)
273 273
274 274 class bundleoperation(object):
275 275 """an object that represents a single bundling process
276 276
277 277 Its purpose is to carry unbundle-related objects and states.
278 278
279 279 A new object should be created at the beginning of each bundle processing.
280 280 The object is to be returned by the processing function.
281 281
282 282 The object has very little content now it will ultimately contain:
283 283 * an access to the repo the bundle is applied to,
284 284 * a ui object,
285 285 * a way to retrieve a transaction to add changes to the repo,
286 286 * a way to record the result of processing each part,
287 287 * a way to construct a bundle response when applicable.
288 288 """
289 289
290 290 def __init__(self, repo, transactiongetter, captureoutput=True):
291 291 self.repo = repo
292 292 self.ui = repo.ui
293 293 self.records = unbundlerecords()
294 294 self.gettransaction = transactiongetter
295 295 self.reply = None
296 296 self.captureoutput = captureoutput
297 297
298 298 class TransactionUnavailable(RuntimeError):
299 299 pass
300 300
301 301 def _notransaction():
302 302 """default method to get a transaction while processing a bundle
303 303
304 304 Raise an exception to highlight the fact that no transaction was expected
305 305 to be created"""
306 306 raise TransactionUnavailable()
307 307
308 308 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
309 309 # transform me into unbundler.apply() as soon as the freeze is lifted
310 310 tr.hookargs['bundle2'] = '1'
311 311 if source is not None and 'source' not in tr.hookargs:
312 312 tr.hookargs['source'] = source
313 313 if url is not None and 'url' not in tr.hookargs:
314 314 tr.hookargs['url'] = url
315 315 return processbundle(repo, unbundler, lambda: tr, op=op)
316 316
317 317 def processbundle(repo, unbundler, transactiongetter=None, op=None):
318 318 """This function process a bundle, apply effect to/from a repo
319 319
320 320 It iterates over each part then searches for and uses the proper handling
321 321 code to process the part. Parts are processed in order.
322 322
323 323 Unknown Mandatory part will abort the process.
324 324
325 325 It is temporarily possible to provide a prebuilt bundleoperation to the
326 326 function. This is used to ensure output is properly propagated in case of
327 327 an error during the unbundling. This output capturing part will likely be
328 328 reworked and this ability will probably go away in the process.
329 329 """
330 330 if op is None:
331 331 if transactiongetter is None:
332 332 transactiongetter = _notransaction
333 333 op = bundleoperation(repo, transactiongetter)
334 334 # todo:
335 335 # - replace this is a init function soon.
336 336 # - exception catching
337 337 unbundler.params
338 338 if repo.ui.debugflag:
339 339 msg = ['bundle2-input-bundle:']
340 340 if unbundler.params:
341 341 msg.append(' %i params')
342 342 if op.gettransaction is None:
343 343 msg.append(' no-transaction')
344 344 else:
345 345 msg.append(' with-transaction')
346 346 msg.append('\n')
347 347 repo.ui.debug(''.join(msg))
348 348 iterparts = enumerate(unbundler.iterparts())
349 349 part = None
350 350 nbpart = 0
351 351 try:
352 352 for nbpart, part in iterparts:
353 353 _processpart(op, part)
354 354 except Exception as exc:
355 355 for nbpart, part in iterparts:
356 356 # consume the bundle content
357 357 part.seek(0, 2)
358 358 # Small hack to let caller code distinguish exceptions from bundle2
359 359 # processing from processing the old format. This is mostly
360 360 # needed to handle different return codes to unbundle according to the
361 361 # type of bundle. We should probably clean up or drop this return code
362 362 # craziness in a future version.
363 363 exc.duringunbundle2 = True
364 364 salvaged = []
365 365 replycaps = None
366 366 if op.reply is not None:
367 367 salvaged = op.reply.salvageoutput()
368 368 replycaps = op.reply.capabilities
369 369 exc._replycaps = replycaps
370 370 exc._bundle2salvagedoutput = salvaged
371 371 raise
372 372 finally:
373 373 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
374 374
375 375 return op
376 376
377 377 def _processpart(op, part):
378 378 """process a single part from a bundle
379 379
380 380 The part is guaranteed to have been fully consumed when the function exits
381 381 (even if an exception is raised)."""
382 382 status = 'unknown' # used by debug output
383 383 hardabort = False
384 384 try:
385 385 try:
386 386 handler = parthandlermapping.get(part.type)
387 387 if handler is None:
388 388 status = 'unsupported-type'
389 389 raise error.BundleUnknownFeatureError(parttype=part.type)
390 390 indebug(op.ui, 'found a handler for part %r' % part.type)
391 391 unknownparams = part.mandatorykeys - handler.params
392 392 if unknownparams:
393 393 unknownparams = list(unknownparams)
394 394 unknownparams.sort()
395 395 status = 'unsupported-params (%s)' % unknownparams
396 396 raise error.BundleUnknownFeatureError(parttype=part.type,
397 397 params=unknownparams)
398 398 status = 'supported'
399 399 except error.BundleUnknownFeatureError as exc:
400 400 if part.mandatory: # mandatory parts
401 401 raise
402 402 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
403 403 return # skip to part processing
404 404 finally:
405 405 if op.ui.debugflag:
406 406 msg = ['bundle2-input-part: "%s"' % part.type]
407 407 if not part.mandatory:
408 408 msg.append(' (advisory)')
409 409 nbmp = len(part.mandatorykeys)
410 410 nbap = len(part.params) - nbmp
411 411 if nbmp or nbap:
412 412 msg.append(' (params:')
413 413 if nbmp:
414 414 msg.append(' %i mandatory' % nbmp)
415 415 if nbap:
416 416 msg.append(' %i advisory' % nbmp)
417 417 msg.append(')')
418 418 msg.append(' %s\n' % status)
419 419 op.ui.debug(''.join(msg))
420 420
421 421 # handler is called outside the above try block so that we don't
422 422 # risk catching KeyErrors from anything other than the
423 423 # parthandlermapping lookup (any KeyError raised by handler()
424 424 # itself represents a defect of a different variety).
425 425 output = None
426 426 if op.captureoutput and op.reply is not None:
427 427 op.ui.pushbuffer(error=True, subproc=True)
428 428 output = ''
429 429 try:
430 430 handler(op, part)
431 431 finally:
432 432 if output is not None:
433 433 output = op.ui.popbuffer()
434 434 if output:
435 435 outpart = op.reply.newpart('output', data=output,
436 436 mandatory=False)
437 437 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
438 438 # If exiting or interrupted, do not attempt to seek the stream in the
439 439 # finally block below. This makes abort faster.
440 440 except (SystemExit, KeyboardInterrupt):
441 441 hardabort = True
442 442 raise
443 443 finally:
444 444 # consume the part content to not corrupt the stream.
445 445 if not hardabort:
446 446 part.seek(0, 2)
447 447
448 448
449 449 def decodecaps(blob):
450 450 """decode a bundle2 caps bytes blob into a dictionary
451 451
452 452 The blob is a list of capabilities (one per line)
453 453 Capabilities may have values using a line of the form::
454 454
455 455 capability=value1,value2,value3
456 456
457 457 The values are always a list."""
458 458 caps = {}
459 459 for line in blob.splitlines():
460 460 if not line:
461 461 continue
462 462 if '=' not in line:
463 463 key, vals = line, ()
464 464 else:
465 465 key, vals = line.split('=', 1)
466 466 vals = vals.split(',')
467 467 key = urlreq.unquote(key)
468 468 vals = [urlreq.unquote(v) for v in vals]
469 469 caps[key] = vals
470 470 return caps
471 471
472 472 def encodecaps(caps):
473 473 """encode a bundle2 caps dictionary into a bytes blob"""
474 474 chunks = []
475 475 for ca in sorted(caps):
476 476 vals = caps[ca]
477 477 ca = urlreq.quote(ca)
478 478 vals = [urlreq.quote(v) for v in vals]
479 479 if vals:
480 480 ca = "%s=%s" % (ca, ','.join(vals))
481 481 chunks.append(ca)
482 482 return '\n'.join(chunks)
483 483
484 484 bundletypes = {
485 485 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
486 486 # since the unification ssh accepts a header but there
487 487 # is no capability signaling it.
488 488 "HG20": (), # special-cased below
489 489 "HG10UN": ("HG10UN", 'UN'),
490 490 "HG10BZ": ("HG10", 'BZ'),
491 491 "HG10GZ": ("HG10GZ", 'GZ'),
492 492 }
493 493
494 494 # hgweb uses this list to communicate its preferred type
495 495 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
496 496
497 497 class bundle20(object):
498 498 """represent an outgoing bundle2 container
499 499
500 500 Use the `addparam` method to add stream level parameter. and `newpart` to
501 501 populate it. Then call `getchunks` to retrieve all the binary chunks of
502 502 data that compose the bundle2 container."""
503 503
504 504 _magicstring = 'HG20'
505 505
506 506 def __init__(self, ui, capabilities=()):
507 507 self.ui = ui
508 508 self._params = []
509 509 self._parts = []
510 510 self.capabilities = dict(capabilities)
511 511 self._compengine = util.compengines.forbundletype('UN')
512 512 self._compopts = None
513 513
514 514 def setcompression(self, alg, compopts=None):
515 515 """setup core part compression to <alg>"""
516 516 if alg in (None, 'UN'):
517 517 return
518 assert not any(n.lower() == 'Compression' for n, v in self._params)
518 assert not any(n.lower() == 'compression' for n, v in self._params)
519 519 self.addparam('Compression', alg)
520 520 self._compengine = util.compengines.forbundletype(alg)
521 521 self._compopts = compopts
522 522
523 523 @property
524 524 def nbparts(self):
525 525 """total number of parts added to the bundler"""
526 526 return len(self._parts)
527 527
528 528 # methods used to defines the bundle2 content
529 529 def addparam(self, name, value=None):
530 530 """add a stream level parameter"""
531 531 if not name:
532 532 raise ValueError('empty parameter name')
533 533 if name[0] not in string.letters:
534 534 raise ValueError('non letter first character: %r' % name)
535 535 self._params.append((name, value))
536 536
537 537 def addpart(self, part):
538 538 """add a new part to the bundle2 container
539 539
540 540 Parts contains the actual applicative payload."""
541 541 assert part.id is None
542 542 part.id = len(self._parts) # very cheap counter
543 543 self._parts.append(part)
544 544
545 545 def newpart(self, typeid, *args, **kwargs):
546 546 """create a new part and add it to the containers
547 547
548 548 As the part is directly added to the containers. For now, this means
549 549 that any failure to properly initialize the part after calling
550 550 ``newpart`` should result in a failure of the whole bundling process.
551 551
552 552 You can still fall back to manually create and add if you need better
553 553 control."""
554 554 part = bundlepart(typeid, *args, **kwargs)
555 555 self.addpart(part)
556 556 return part
557 557
558 558 # methods used to generate the bundle2 stream
559 559 def getchunks(self):
560 560 if self.ui.debugflag:
561 561 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
562 562 if self._params:
563 563 msg.append(' (%i params)' % len(self._params))
564 564 msg.append(' %i parts total\n' % len(self._parts))
565 565 self.ui.debug(''.join(msg))
566 566 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
567 567 yield self._magicstring
568 568 param = self._paramchunk()
569 569 outdebug(self.ui, 'bundle parameter: %s' % param)
570 570 yield _pack(_fstreamparamsize, len(param))
571 571 if param:
572 572 yield param
573 573 for chunk in self._compengine.compressstream(self._getcorechunk(),
574 574 self._compopts):
575 575 yield chunk
576 576
577 577 def _paramchunk(self):
578 578 """return a encoded version of all stream parameters"""
579 579 blocks = []
580 580 for par, value in self._params:
581 581 par = urlreq.quote(par)
582 582 if value is not None:
583 583 value = urlreq.quote(value)
584 584 par = '%s=%s' % (par, value)
585 585 blocks.append(par)
586 586 return ' '.join(blocks)
587 587
588 588 def _getcorechunk(self):
589 589 """yield chunk for the core part of the bundle
590 590
591 591 (all but headers and parameters)"""
592 592 outdebug(self.ui, 'start of parts')
593 593 for part in self._parts:
594 594 outdebug(self.ui, 'bundle part: "%s"' % part.type)
595 595 for chunk in part.getchunks(ui=self.ui):
596 596 yield chunk
597 597 outdebug(self.ui, 'end of bundle')
598 598 yield _pack(_fpartheadersize, 0)
599 599
600 600
601 601 def salvageoutput(self):
602 602 """return a list with a copy of all output parts in the bundle
603 603
604 604 This is meant to be used during error handling to make sure we preserve
605 605 server output"""
606 606 salvaged = []
607 607 for part in self._parts:
608 608 if part.type.startswith('output'):
609 609 salvaged.append(part.copy())
610 610 return salvaged
611 611
612 612
613 613 class unpackermixin(object):
614 614 """A mixin to extract bytes and struct data from a stream"""
615 615
616 616 def __init__(self, fp):
617 617 self._fp = fp
618 618 self._seekable = (util.safehasattr(fp, 'seek') and
619 619 util.safehasattr(fp, 'tell'))
620 620
621 621 def _unpack(self, format):
622 622 """unpack this struct format from the stream"""
623 623 data = self._readexact(struct.calcsize(format))
624 624 return _unpack(format, data)
625 625
626 626 def _readexact(self, size):
627 627 """read exactly <size> bytes from the stream"""
628 628 return changegroup.readexactly(self._fp, size)
629 629
630 630 def seek(self, offset, whence=0):
631 631 """move the underlying file pointer"""
632 632 if self._seekable:
633 633 return self._fp.seek(offset, whence)
634 634 else:
635 635 raise NotImplementedError(_('File pointer is not seekable'))
636 636
637 637 def tell(self):
638 638 """return the file offset, or None if file is not seekable"""
639 639 if self._seekable:
640 640 try:
641 641 return self._fp.tell()
642 642 except IOError as e:
643 643 if e.errno == errno.ESPIPE:
644 644 self._seekable = False
645 645 else:
646 646 raise
647 647 return None
648 648
649 649 def close(self):
650 650 """close underlying file"""
651 651 if util.safehasattr(self._fp, 'close'):
652 652 return self._fp.close()
653 653
654 654 def getunbundler(ui, fp, magicstring=None):
655 655 """return a valid unbundler object for a given magicstring"""
656 656 if magicstring is None:
657 657 magicstring = changegroup.readexactly(fp, 4)
658 658 magic, version = magicstring[0:2], magicstring[2:4]
659 659 if magic != 'HG':
660 660 raise error.Abort(_('not a Mercurial bundle'))
661 661 unbundlerclass = formatmap.get(version)
662 662 if unbundlerclass is None:
663 663 raise error.Abort(_('unknown bundle version %s') % version)
664 664 unbundler = unbundlerclass(ui, fp)
665 665 indebug(ui, 'start processing of %s stream' % magicstring)
666 666 return unbundler
667 667
668 668 class unbundle20(unpackermixin):
669 669 """interpret a bundle2 stream
670 670
671 671 This class is fed with a binary stream and yields parts through its
672 672 `iterparts` methods."""
673 673
674 674 _magicstring = 'HG20'
675 675
676 676 def __init__(self, ui, fp):
677 677 """If header is specified, we do not read it out of the stream."""
678 678 self.ui = ui
679 679 self._compengine = util.compengines.forbundletype('UN')
680 680 self._compressed = None
681 681 super(unbundle20, self).__init__(fp)
682 682
683 683 @util.propertycache
684 684 def params(self):
685 685 """dictionary of stream level parameters"""
686 686 indebug(self.ui, 'reading bundle2 stream parameters')
687 687 params = {}
688 688 paramssize = self._unpack(_fstreamparamsize)[0]
689 689 if paramssize < 0:
690 690 raise error.BundleValueError('negative bundle param size: %i'
691 691 % paramssize)
692 692 if paramssize:
693 693 params = self._readexact(paramssize)
694 694 params = self._processallparams(params)
695 695 return params
696 696
697 697 def _processallparams(self, paramsblock):
698 698 """"""
699 699 params = util.sortdict()
700 700 for p in paramsblock.split(' '):
701 701 p = p.split('=', 1)
702 702 p = [urlreq.unquote(i) for i in p]
703 703 if len(p) < 2:
704 704 p.append(None)
705 705 self._processparam(*p)
706 706 params[p[0]] = p[1]
707 707 return params
708 708
709 709
710 710 def _processparam(self, name, value):
711 711 """process a parameter, applying its effect if needed
712 712
713 713 Parameter starting with a lower case letter are advisory and will be
714 714 ignored when unknown. Those starting with an upper case letter are
715 715 mandatory and will this function will raise a KeyError when unknown.
716 716
717 717 Note: no option are currently supported. Any input will be either
718 718 ignored or failing.
719 719 """
720 720 if not name:
721 721 raise ValueError('empty parameter name')
722 722 if name[0] not in string.letters:
723 723 raise ValueError('non letter first character: %r' % name)
724 724 try:
725 725 handler = b2streamparamsmap[name.lower()]
726 726 except KeyError:
727 727 if name[0].islower():
728 728 indebug(self.ui, "ignoring unknown parameter %r" % name)
729 729 else:
730 730 raise error.BundleUnknownFeatureError(params=(name,))
731 731 else:
732 732 handler(self, name, value)
733 733
734 734 def _forwardchunks(self):
735 735 """utility to transfer a bundle2 as binary
736 736
737 737 This is made necessary by the fact the 'getbundle' command over 'ssh'
738 738 have no way to know then the reply end, relying on the bundle to be
739 739 interpreted to know its end. This is terrible and we are sorry, but we
740 740 needed to move forward to get general delta enabled.
741 741 """
742 742 yield self._magicstring
743 743 assert 'params' not in vars(self)
744 744 paramssize = self._unpack(_fstreamparamsize)[0]
745 745 if paramssize < 0:
746 746 raise error.BundleValueError('negative bundle param size: %i'
747 747 % paramssize)
748 748 yield _pack(_fstreamparamsize, paramssize)
749 749 if paramssize:
750 750 params = self._readexact(paramssize)
751 751 self._processallparams(params)
752 752 yield params
753 753 assert self._compengine.bundletype == 'UN'
754 754 # From there, payload might need to be decompressed
755 755 self._fp = self._compengine.decompressorreader(self._fp)
756 756 emptycount = 0
757 757 while emptycount < 2:
758 758 # so we can brainlessly loop
759 759 assert _fpartheadersize == _fpayloadsize
760 760 size = self._unpack(_fpartheadersize)[0]
761 761 yield _pack(_fpartheadersize, size)
762 762 if size:
763 763 emptycount = 0
764 764 else:
765 765 emptycount += 1
766 766 continue
767 767 if size == flaginterrupt:
768 768 continue
769 769 elif size < 0:
770 770 raise error.BundleValueError('negative chunk size: %i')
771 771 yield self._readexact(size)
772 772
773 773
774 774 def iterparts(self):
775 775 """yield all parts contained in the stream"""
776 776 # make sure param have been loaded
777 777 self.params
778 778 # From there, payload need to be decompressed
779 779 self._fp = self._compengine.decompressorreader(self._fp)
780 780 indebug(self.ui, 'start extraction of bundle2 parts')
781 781 headerblock = self._readpartheader()
782 782 while headerblock is not None:
783 783 part = unbundlepart(self.ui, headerblock, self._fp)
784 784 yield part
785 785 part.seek(0, 2)
786 786 headerblock = self._readpartheader()
787 787 indebug(self.ui, 'end of bundle2 stream')
788 788
789 789 def _readpartheader(self):
790 790 """reads a part header size and return the bytes blob
791 791
792 792 returns None if empty"""
793 793 headersize = self._unpack(_fpartheadersize)[0]
794 794 if headersize < 0:
795 795 raise error.BundleValueError('negative part header size: %i'
796 796 % headersize)
797 797 indebug(self.ui, 'part header size: %i' % headersize)
798 798 if headersize:
799 799 return self._readexact(headersize)
800 800 return None
801 801
802 802 def compressed(self):
803 803 self.params # load params
804 804 return self._compressed
805 805
806 806 formatmap = {'20': unbundle20}
807 807
808 808 b2streamparamsmap = {}
809 809
810 810 def b2streamparamhandler(name):
811 811 """register a handler for a stream level parameter"""
812 812 def decorator(func):
813 813 assert name not in formatmap
814 814 b2streamparamsmap[name] = func
815 815 return func
816 816 return decorator
817 817
818 818 @b2streamparamhandler('compression')
819 819 def processcompression(unbundler, param, value):
820 820 """read compression parameter and install payload decompression"""
821 821 if value not in util.compengines.supportedbundletypes:
822 822 raise error.BundleUnknownFeatureError(params=(param,),
823 823 values=(value,))
824 824 unbundler._compengine = util.compengines.forbundletype(value)
825 825 if value is not None:
826 826 unbundler._compressed = True
827 827
828 828 class bundlepart(object):
829 829 """A bundle2 part contains application level payload
830 830
831 831 The part `type` is used to route the part to the application level
832 832 handler.
833 833
834 834 The part payload is contained in ``part.data``. It could be raw bytes or a
835 835 generator of byte chunks.
836 836
837 837 You can add parameters to the part using the ``addparam`` method.
838 838 Parameters can be either mandatory (default) or advisory. Remote side
839 839 should be able to safely ignore the advisory ones.
840 840
841 841 Both data and parameters cannot be modified after the generation has begun.
842 842 """
843 843
844 844 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
845 845 data='', mandatory=True):
846 846 validateparttype(parttype)
847 847 self.id = None
848 848 self.type = parttype
849 849 self._data = data
850 850 self._mandatoryparams = list(mandatoryparams)
851 851 self._advisoryparams = list(advisoryparams)
852 852 # checking for duplicated entries
853 853 self._seenparams = set()
854 854 for pname, __ in self._mandatoryparams + self._advisoryparams:
855 855 if pname in self._seenparams:
856 856 raise RuntimeError('duplicated params: %s' % pname)
857 857 self._seenparams.add(pname)
858 858 # status of the part's generation:
859 859 # - None: not started,
860 860 # - False: currently generated,
861 861 # - True: generation done.
862 862 self._generated = None
863 863 self.mandatory = mandatory
864 864
865 865 def __repr__(self):
866 866 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
867 867 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
868 868 % (cls, id(self), self.id, self.type, self.mandatory))
869 869
870 870 def copy(self):
871 871 """return a copy of the part
872 872
873 873 The new part have the very same content but no partid assigned yet.
874 874 Parts with generated data cannot be copied."""
875 875 assert not util.safehasattr(self.data, 'next')
876 876 return self.__class__(self.type, self._mandatoryparams,
877 877 self._advisoryparams, self._data, self.mandatory)
878 878
879 879 # methods used to defines the part content
880 880 @property
881 881 def data(self):
882 882 return self._data
883 883
884 884 @data.setter
885 885 def data(self, data):
886 886 if self._generated is not None:
887 887 raise error.ReadOnlyPartError('part is being generated')
888 888 self._data = data
889 889
890 890 @property
891 891 def mandatoryparams(self):
892 892 # make it an immutable tuple to force people through ``addparam``
893 893 return tuple(self._mandatoryparams)
894 894
895 895 @property
896 896 def advisoryparams(self):
897 897 # make it an immutable tuple to force people through ``addparam``
898 898 return tuple(self._advisoryparams)
899 899
900 900 def addparam(self, name, value='', mandatory=True):
901 901 if self._generated is not None:
902 902 raise error.ReadOnlyPartError('part is being generated')
903 903 if name in self._seenparams:
904 904 raise ValueError('duplicated params: %s' % name)
905 905 self._seenparams.add(name)
906 906 params = self._advisoryparams
907 907 if mandatory:
908 908 params = self._mandatoryparams
909 909 params.append((name, value))
910 910
911 911 # methods used to generates the bundle2 stream
912 912 def getchunks(self, ui):
913 913 if self._generated is not None:
914 914 raise RuntimeError('part can only be consumed once')
915 915 self._generated = False
916 916
917 917 if ui.debugflag:
918 918 msg = ['bundle2-output-part: "%s"' % self.type]
919 919 if not self.mandatory:
920 920 msg.append(' (advisory)')
921 921 nbmp = len(self.mandatoryparams)
922 922 nbap = len(self.advisoryparams)
923 923 if nbmp or nbap:
924 924 msg.append(' (params:')
925 925 if nbmp:
926 926 msg.append(' %i mandatory' % nbmp)
927 927 if nbap:
928 928 msg.append(' %i advisory' % nbmp)
929 929 msg.append(')')
930 930 if not self.data:
931 931 msg.append(' empty payload')
932 932 elif util.safehasattr(self.data, 'next'):
933 933 msg.append(' streamed payload')
934 934 else:
935 935 msg.append(' %i bytes payload' % len(self.data))
936 936 msg.append('\n')
937 937 ui.debug(''.join(msg))
938 938
939 939 #### header
940 940 if self.mandatory:
941 941 parttype = self.type.upper()
942 942 else:
943 943 parttype = self.type.lower()
944 944 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
945 945 ## parttype
946 946 header = [_pack(_fparttypesize, len(parttype)),
947 947 parttype, _pack(_fpartid, self.id),
948 948 ]
949 949 ## parameters
950 950 # count
951 951 manpar = self.mandatoryparams
952 952 advpar = self.advisoryparams
953 953 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
954 954 # size
955 955 parsizes = []
956 956 for key, value in manpar:
957 957 parsizes.append(len(key))
958 958 parsizes.append(len(value))
959 959 for key, value in advpar:
960 960 parsizes.append(len(key))
961 961 parsizes.append(len(value))
962 962 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
963 963 header.append(paramsizes)
964 964 # key, value
965 965 for key, value in manpar:
966 966 header.append(key)
967 967 header.append(value)
968 968 for key, value in advpar:
969 969 header.append(key)
970 970 header.append(value)
971 971 ## finalize header
972 972 headerchunk = ''.join(header)
973 973 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
974 974 yield _pack(_fpartheadersize, len(headerchunk))
975 975 yield headerchunk
976 976 ## payload
977 977 try:
978 978 for chunk in self._payloadchunks():
979 979 outdebug(ui, 'payload chunk size: %i' % len(chunk))
980 980 yield _pack(_fpayloadsize, len(chunk))
981 981 yield chunk
982 982 except GeneratorExit:
983 983 # GeneratorExit means that nobody is listening for our
984 984 # results anyway, so just bail quickly rather than trying
985 985 # to produce an error part.
986 986 ui.debug('bundle2-generatorexit\n')
987 987 raise
988 988 except BaseException as exc:
989 989 # backup exception data for later
990 990 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
991 991 % exc)
992 992 exc_info = sys.exc_info()
993 993 msg = 'unexpected error: %s' % exc
994 994 interpart = bundlepart('error:abort', [('message', msg)],
995 995 mandatory=False)
996 996 interpart.id = 0
997 997 yield _pack(_fpayloadsize, -1)
998 998 for chunk in interpart.getchunks(ui=ui):
999 999 yield chunk
1000 1000 outdebug(ui, 'closing payload chunk')
1001 1001 # abort current part payload
1002 1002 yield _pack(_fpayloadsize, 0)
1003 1003 if pycompat.ispy3:
1004 1004 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1005 1005 else:
1006 1006 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1007 1007 # end of payload
1008 1008 outdebug(ui, 'closing payload chunk')
1009 1009 yield _pack(_fpayloadsize, 0)
1010 1010 self._generated = True
1011 1011
1012 1012 def _payloadchunks(self):
1013 1013 """yield chunks of a the part payload
1014 1014
1015 1015 Exists to handle the different methods to provide data to a part."""
1016 1016 # we only support fixed size data now.
1017 1017 # This will be improved in the future.
1018 1018 if util.safehasattr(self.data, 'next'):
1019 1019 buff = util.chunkbuffer(self.data)
1020 1020 chunk = buff.read(preferedchunksize)
1021 1021 while chunk:
1022 1022 yield chunk
1023 1023 chunk = buff.read(preferedchunksize)
1024 1024 elif len(self.data):
1025 1025 yield self.data
1026 1026
1027 1027
1028 1028 flaginterrupt = -1
1029 1029
1030 1030 class interrupthandler(unpackermixin):
1031 1031 """read one part and process it with restricted capability
1032 1032
1033 1033 This allows to transmit exception raised on the producer size during part
1034 1034 iteration while the consumer is reading a part.
1035 1035
1036 1036 Part processed in this manner only have access to a ui object,"""
1037 1037
1038 1038 def __init__(self, ui, fp):
1039 1039 super(interrupthandler, self).__init__(fp)
1040 1040 self.ui = ui
1041 1041
1042 1042 def _readpartheader(self):
1043 1043 """reads a part header size and return the bytes blob
1044 1044
1045 1045 returns None if empty"""
1046 1046 headersize = self._unpack(_fpartheadersize)[0]
1047 1047 if headersize < 0:
1048 1048 raise error.BundleValueError('negative part header size: %i'
1049 1049 % headersize)
1050 1050 indebug(self.ui, 'part header size: %i\n' % headersize)
1051 1051 if headersize:
1052 1052 return self._readexact(headersize)
1053 1053 return None
1054 1054
1055 1055 def __call__(self):
1056 1056
1057 1057 self.ui.debug('bundle2-input-stream-interrupt:'
1058 1058 ' opening out of band context\n')
1059 1059 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1060 1060 headerblock = self._readpartheader()
1061 1061 if headerblock is None:
1062 1062 indebug(self.ui, 'no part found during interruption.')
1063 1063 return
1064 1064 part = unbundlepart(self.ui, headerblock, self._fp)
1065 1065 op = interruptoperation(self.ui)
1066 1066 _processpart(op, part)
1067 1067 self.ui.debug('bundle2-input-stream-interrupt:'
1068 1068 ' closing out of band context\n')
1069 1069
1070 1070 class interruptoperation(object):
1071 1071 """A limited operation to be use by part handler during interruption
1072 1072
1073 1073 It only have access to an ui object.
1074 1074 """
1075 1075
1076 1076 def __init__(self, ui):
1077 1077 self.ui = ui
1078 1078 self.reply = None
1079 1079 self.captureoutput = False
1080 1080
1081 1081 @property
1082 1082 def repo(self):
1083 1083 raise RuntimeError('no repo access from stream interruption')
1084 1084
1085 1085 def gettransaction(self):
1086 1086 raise TransactionUnavailable('no repo access from stream interruption')
1087 1087
1088 1088 class unbundlepart(unpackermixin):
1089 1089 """a bundle part read from a bundle"""
1090 1090
1091 1091 def __init__(self, ui, header, fp):
1092 1092 super(unbundlepart, self).__init__(fp)
1093 1093 self.ui = ui
1094 1094 # unbundle state attr
1095 1095 self._headerdata = header
1096 1096 self._headeroffset = 0
1097 1097 self._initialized = False
1098 1098 self.consumed = False
1099 1099 # part data
1100 1100 self.id = None
1101 1101 self.type = None
1102 1102 self.mandatoryparams = None
1103 1103 self.advisoryparams = None
1104 1104 self.params = None
1105 1105 self.mandatorykeys = ()
1106 1106 self._payloadstream = None
1107 1107 self._readheader()
1108 1108 self._mandatory = None
1109 1109 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1110 1110 self._pos = 0
1111 1111
1112 1112 def _fromheader(self, size):
1113 1113 """return the next <size> byte from the header"""
1114 1114 offset = self._headeroffset
1115 1115 data = self._headerdata[offset:(offset + size)]
1116 1116 self._headeroffset = offset + size
1117 1117 return data
1118 1118
1119 1119 def _unpackheader(self, format):
1120 1120 """read given format from header
1121 1121
1122 1122 This automatically compute the size of the format to read."""
1123 1123 data = self._fromheader(struct.calcsize(format))
1124 1124 return _unpack(format, data)
1125 1125
1126 1126 def _initparams(self, mandatoryparams, advisoryparams):
1127 1127 """internal function to setup all logic related parameters"""
1128 1128 # make it read only to prevent people touching it by mistake.
1129 1129 self.mandatoryparams = tuple(mandatoryparams)
1130 1130 self.advisoryparams = tuple(advisoryparams)
1131 1131 # user friendly UI
1132 1132 self.params = util.sortdict(self.mandatoryparams)
1133 1133 self.params.update(self.advisoryparams)
1134 1134 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1135 1135
1136 1136 def _payloadchunks(self, chunknum=0):
1137 1137 '''seek to specified chunk and start yielding data'''
1138 1138 if len(self._chunkindex) == 0:
1139 1139 assert chunknum == 0, 'Must start with chunk 0'
1140 1140 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1141 1141 else:
1142 1142 assert chunknum < len(self._chunkindex), \
1143 1143 'Unknown chunk %d' % chunknum
1144 1144 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1145 1145
1146 1146 pos = self._chunkindex[chunknum][0]
1147 1147 payloadsize = self._unpack(_fpayloadsize)[0]
1148 1148 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1149 1149 while payloadsize:
1150 1150 if payloadsize == flaginterrupt:
1151 1151 # interruption detection, the handler will now read a
1152 1152 # single part and process it.
1153 1153 interrupthandler(self.ui, self._fp)()
1154 1154 elif payloadsize < 0:
1155 1155 msg = 'negative payload chunk size: %i' % payloadsize
1156 1156 raise error.BundleValueError(msg)
1157 1157 else:
1158 1158 result = self._readexact(payloadsize)
1159 1159 chunknum += 1
1160 1160 pos += payloadsize
1161 1161 if chunknum == len(self._chunkindex):
1162 1162 self._chunkindex.append((pos,
1163 1163 super(unbundlepart, self).tell()))
1164 1164 yield result
1165 1165 payloadsize = self._unpack(_fpayloadsize)[0]
1166 1166 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1167 1167
1168 1168 def _findchunk(self, pos):
1169 1169 '''for a given payload position, return a chunk number and offset'''
1170 1170 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1171 1171 if ppos == pos:
1172 1172 return chunk, 0
1173 1173 elif ppos > pos:
1174 1174 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1175 1175 raise ValueError('Unknown chunk')
1176 1176
1177 1177 def _readheader(self):
1178 1178 """read the header and setup the object"""
1179 1179 typesize = self._unpackheader(_fparttypesize)[0]
1180 1180 self.type = self._fromheader(typesize)
1181 1181 indebug(self.ui, 'part type: "%s"' % self.type)
1182 1182 self.id = self._unpackheader(_fpartid)[0]
1183 1183 indebug(self.ui, 'part id: "%s"' % self.id)
1184 1184 # extract mandatory bit from type
1185 1185 self.mandatory = (self.type != self.type.lower())
1186 1186 self.type = self.type.lower()
1187 1187 ## reading parameters
1188 1188 # param count
1189 1189 mancount, advcount = self._unpackheader(_fpartparamcount)
1190 1190 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1191 1191 # param size
1192 1192 fparamsizes = _makefpartparamsizes(mancount + advcount)
1193 1193 paramsizes = self._unpackheader(fparamsizes)
1194 1194 # make it a list of couple again
1195 1195 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1196 1196 # split mandatory from advisory
1197 1197 mansizes = paramsizes[:mancount]
1198 1198 advsizes = paramsizes[mancount:]
1199 1199 # retrieve param value
1200 1200 manparams = []
1201 1201 for key, value in mansizes:
1202 1202 manparams.append((self._fromheader(key), self._fromheader(value)))
1203 1203 advparams = []
1204 1204 for key, value in advsizes:
1205 1205 advparams.append((self._fromheader(key), self._fromheader(value)))
1206 1206 self._initparams(manparams, advparams)
1207 1207 ## part payload
1208 1208 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1209 1209 # we read the data, tell it
1210 1210 self._initialized = True
1211 1211
1212 1212 def read(self, size=None):
1213 1213 """read payload data"""
1214 1214 if not self._initialized:
1215 1215 self._readheader()
1216 1216 if size is None:
1217 1217 data = self._payloadstream.read()
1218 1218 else:
1219 1219 data = self._payloadstream.read(size)
1220 1220 self._pos += len(data)
1221 1221 if size is None or len(data) < size:
1222 1222 if not self.consumed and self._pos:
1223 1223 self.ui.debug('bundle2-input-part: total payload size %i\n'
1224 1224 % self._pos)
1225 1225 self.consumed = True
1226 1226 return data
1227 1227
1228 1228 def tell(self):
1229 1229 return self._pos
1230 1230
1231 1231 def seek(self, offset, whence=0):
1232 1232 if whence == 0:
1233 1233 newpos = offset
1234 1234 elif whence == 1:
1235 1235 newpos = self._pos + offset
1236 1236 elif whence == 2:
1237 1237 if not self.consumed:
1238 1238 self.read()
1239 1239 newpos = self._chunkindex[-1][0] - offset
1240 1240 else:
1241 1241 raise ValueError('Unknown whence value: %r' % (whence,))
1242 1242
1243 1243 if newpos > self._chunkindex[-1][0] and not self.consumed:
1244 1244 self.read()
1245 1245 if not 0 <= newpos <= self._chunkindex[-1][0]:
1246 1246 raise ValueError('Offset out of range')
1247 1247
1248 1248 if self._pos != newpos:
1249 1249 chunk, internaloffset = self._findchunk(newpos)
1250 1250 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1251 1251 adjust = self.read(internaloffset)
1252 1252 if len(adjust) != internaloffset:
1253 1253 raise error.Abort(_('Seek failed\n'))
1254 1254 self._pos = newpos
1255 1255
1256 1256 # These are only the static capabilities.
1257 1257 # Check the 'getrepocaps' function for the rest.
1258 1258 capabilities = {'HG20': (),
1259 1259 'error': ('abort', 'unsupportedcontent', 'pushraced',
1260 1260 'pushkey'),
1261 1261 'listkeys': (),
1262 1262 'pushkey': (),
1263 1263 'digests': tuple(sorted(util.DIGESTS.keys())),
1264 1264 'remote-changegroup': ('http', 'https'),
1265 1265 'hgtagsfnodes': (),
1266 1266 }
1267 1267
1268 1268 def getrepocaps(repo, allowpushback=False):
1269 1269 """return the bundle2 capabilities for a given repo
1270 1270
1271 1271 Exists to allow extensions (like evolution) to mutate the capabilities.
1272 1272 """
1273 1273 caps = capabilities.copy()
1274 1274 caps['changegroup'] = tuple(sorted(
1275 1275 changegroup.supportedincomingversions(repo)))
1276 1276 if obsolete.isenabled(repo, obsolete.exchangeopt):
1277 1277 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1278 1278 caps['obsmarkers'] = supportedformat
1279 1279 if allowpushback:
1280 1280 caps['pushback'] = ()
1281 1281 return caps
1282 1282
1283 1283 def bundle2caps(remote):
1284 1284 """return the bundle capabilities of a peer as dict"""
1285 1285 raw = remote.capable('bundle2')
1286 1286 if not raw and raw != '':
1287 1287 return {}
1288 1288 capsblob = urlreq.unquote(remote.capable('bundle2'))
1289 1289 return decodecaps(capsblob)
1290 1290
1291 1291 def obsmarkersversion(caps):
1292 1292 """extract the list of supported obsmarkers versions from a bundle2caps dict
1293 1293 """
1294 1294 obscaps = caps.get('obsmarkers', ())
1295 1295 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1296 1296
1297 1297 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1298 1298 compopts=None):
1299 1299 """Write a bundle file and return its filename.
1300 1300
1301 1301 Existing files will not be overwritten.
1302 1302 If no filename is specified, a temporary file is created.
1303 1303 bz2 compression can be turned off.
1304 1304 The bundle file will be deleted in case of errors.
1305 1305 """
1306 1306
1307 1307 if bundletype == "HG20":
1308 1308 bundle = bundle20(ui)
1309 1309 bundle.setcompression(compression, compopts)
1310 1310 part = bundle.newpart('changegroup', data=cg.getchunks())
1311 1311 part.addparam('version', cg.version)
1312 1312 if 'clcount' in cg.extras:
1313 1313 part.addparam('nbchanges', str(cg.extras['clcount']),
1314 1314 mandatory=False)
1315 1315 chunkiter = bundle.getchunks()
1316 1316 else:
1317 1317 # compression argument is only for the bundle2 case
1318 1318 assert compression is None
1319 1319 if cg.version != '01':
1320 1320 raise error.Abort(_('old bundle types only supports v1 '
1321 1321 'changegroups'))
1322 1322 header, comp = bundletypes[bundletype]
1323 1323 if comp not in util.compengines.supportedbundletypes:
1324 1324 raise error.Abort(_('unknown stream compression type: %s')
1325 1325 % comp)
1326 1326 compengine = util.compengines.forbundletype(comp)
1327 1327 def chunkiter():
1328 1328 yield header
1329 1329 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1330 1330 yield chunk
1331 1331 chunkiter = chunkiter()
1332 1332
1333 1333 # parse the changegroup data, otherwise we will block
1334 1334 # in case of sshrepo because we don't know the end of the stream
1335 1335 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1336 1336
1337 1337 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1338 1338 def handlechangegroup(op, inpart):
1339 1339 """apply a changegroup part on the repo
1340 1340
1341 1341 This is a very early implementation that will massive rework before being
1342 1342 inflicted to any end-user.
1343 1343 """
1344 1344 # Make sure we trigger a transaction creation
1345 1345 #
1346 1346 # The addchangegroup function will get a transaction object by itself, but
1347 1347 # we need to make sure we trigger the creation of a transaction object used
1348 1348 # for the whole processing scope.
1349 1349 op.gettransaction()
1350 1350 unpackerversion = inpart.params.get('version', '01')
1351 1351 # We should raise an appropriate exception here
1352 1352 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1353 1353 # the source and url passed here are overwritten by the one contained in
1354 1354 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1355 1355 nbchangesets = None
1356 1356 if 'nbchanges' in inpart.params:
1357 1357 nbchangesets = int(inpart.params.get('nbchanges'))
1358 1358 if ('treemanifest' in inpart.params and
1359 1359 'treemanifest' not in op.repo.requirements):
1360 1360 if len(op.repo.changelog) != 0:
1361 1361 raise error.Abort(_(
1362 1362 "bundle contains tree manifests, but local repo is "
1363 1363 "non-empty and does not use tree manifests"))
1364 1364 op.repo.requirements.add('treemanifest')
1365 1365 op.repo._applyopenerreqs()
1366 1366 op.repo._writerequirements()
1367 1367 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1368 1368 op.records.add('changegroup', {'return': ret})
1369 1369 if op.reply is not None:
1370 1370 # This is definitely not the final form of this
1371 1371 # return. But one need to start somewhere.
1372 1372 part = op.reply.newpart('reply:changegroup', mandatory=False)
1373 1373 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1374 1374 part.addparam('return', '%i' % ret, mandatory=False)
1375 1375 assert not inpart.read()
1376 1376
1377 1377 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1378 1378 ['digest:%s' % k for k in util.DIGESTS.keys()])
1379 1379 @parthandler('remote-changegroup', _remotechangegroupparams)
1380 1380 def handleremotechangegroup(op, inpart):
1381 1381 """apply a bundle10 on the repo, given an url and validation information
1382 1382
1383 1383 All the information about the remote bundle to import are given as
1384 1384 parameters. The parameters include:
1385 1385 - url: the url to the bundle10.
1386 1386 - size: the bundle10 file size. It is used to validate what was
1387 1387 retrieved by the client matches the server knowledge about the bundle.
1388 1388 - digests: a space separated list of the digest types provided as
1389 1389 parameters.
1390 1390 - digest:<digest-type>: the hexadecimal representation of the digest with
1391 1391 that name. Like the size, it is used to validate what was retrieved by
1392 1392 the client matches what the server knows about the bundle.
1393 1393
1394 1394 When multiple digest types are given, all of them are checked.
1395 1395 """
1396 1396 try:
1397 1397 raw_url = inpart.params['url']
1398 1398 except KeyError:
1399 1399 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1400 1400 parsed_url = util.url(raw_url)
1401 1401 if parsed_url.scheme not in capabilities['remote-changegroup']:
1402 1402 raise error.Abort(_('remote-changegroup does not support %s urls') %
1403 1403 parsed_url.scheme)
1404 1404
1405 1405 try:
1406 1406 size = int(inpart.params['size'])
1407 1407 except ValueError:
1408 1408 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1409 1409 % 'size')
1410 1410 except KeyError:
1411 1411 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1412 1412
1413 1413 digests = {}
1414 1414 for typ in inpart.params.get('digests', '').split():
1415 1415 param = 'digest:%s' % typ
1416 1416 try:
1417 1417 value = inpart.params[param]
1418 1418 except KeyError:
1419 1419 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1420 1420 param)
1421 1421 digests[typ] = value
1422 1422
1423 1423 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1424 1424
1425 1425 # Make sure we trigger a transaction creation
1426 1426 #
1427 1427 # The addchangegroup function will get a transaction object by itself, but
1428 1428 # we need to make sure we trigger the creation of a transaction object used
1429 1429 # for the whole processing scope.
1430 1430 op.gettransaction()
1431 1431 from . import exchange
1432 1432 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1433 1433 if not isinstance(cg, changegroup.cg1unpacker):
1434 1434 raise error.Abort(_('%s: not a bundle version 1.0') %
1435 1435 util.hidepassword(raw_url))
1436 1436 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1437 1437 op.records.add('changegroup', {'return': ret})
1438 1438 if op.reply is not None:
1439 1439 # This is definitely not the final form of this
1440 1440 # return. But one need to start somewhere.
1441 1441 part = op.reply.newpart('reply:changegroup')
1442 1442 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1443 1443 part.addparam('return', '%i' % ret, mandatory=False)
1444 1444 try:
1445 1445 real_part.validate()
1446 1446 except error.Abort as e:
1447 1447 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1448 1448 (util.hidepassword(raw_url), str(e)))
1449 1449 assert not inpart.read()
1450 1450
1451 1451 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1452 1452 def handlereplychangegroup(op, inpart):
1453 1453 ret = int(inpart.params['return'])
1454 1454 replyto = int(inpart.params['in-reply-to'])
1455 1455 op.records.add('changegroup', {'return': ret}, replyto)
1456 1456
1457 1457 @parthandler('check:heads')
1458 1458 def handlecheckheads(op, inpart):
1459 1459 """check that head of the repo did not change
1460 1460
1461 1461 This is used to detect a push race when using unbundle.
1462 1462 This replaces the "heads" argument of unbundle."""
1463 1463 h = inpart.read(20)
1464 1464 heads = []
1465 1465 while len(h) == 20:
1466 1466 heads.append(h)
1467 1467 h = inpart.read(20)
1468 1468 assert not h
1469 1469 # Trigger a transaction so that we are guaranteed to have the lock now.
1470 1470 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1471 1471 op.gettransaction()
1472 1472 if sorted(heads) != sorted(op.repo.heads()):
1473 1473 raise error.PushRaced('repository changed while pushing - '
1474 1474 'please try again')
1475 1475
1476 1476 @parthandler('output')
1477 1477 def handleoutput(op, inpart):
1478 1478 """forward output captured on the server to the client"""
1479 1479 for line in inpart.read().splitlines():
1480 1480 op.ui.status(_('remote: %s\n') % line)
1481 1481
1482 1482 @parthandler('replycaps')
1483 1483 def handlereplycaps(op, inpart):
1484 1484 """Notify that a reply bundle should be created
1485 1485
1486 1486 The payload contains the capabilities information for the reply"""
1487 1487 caps = decodecaps(inpart.read())
1488 1488 if op.reply is None:
1489 1489 op.reply = bundle20(op.ui, caps)
1490 1490
1491 1491 class AbortFromPart(error.Abort):
1492 1492 """Sub-class of Abort that denotes an error from a bundle2 part."""
1493 1493
1494 1494 @parthandler('error:abort', ('message', 'hint'))
1495 1495 def handleerrorabort(op, inpart):
1496 1496 """Used to transmit abort error over the wire"""
1497 1497 raise AbortFromPart(inpart.params['message'],
1498 1498 hint=inpart.params.get('hint'))
1499 1499
1500 1500 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1501 1501 'in-reply-to'))
1502 1502 def handleerrorpushkey(op, inpart):
1503 1503 """Used to transmit failure of a mandatory pushkey over the wire"""
1504 1504 kwargs = {}
1505 1505 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1506 1506 value = inpart.params.get(name)
1507 1507 if value is not None:
1508 1508 kwargs[name] = value
1509 1509 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1510 1510
1511 1511 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1512 1512 def handleerrorunsupportedcontent(op, inpart):
1513 1513 """Used to transmit unknown content error over the wire"""
1514 1514 kwargs = {}
1515 1515 parttype = inpart.params.get('parttype')
1516 1516 if parttype is not None:
1517 1517 kwargs['parttype'] = parttype
1518 1518 params = inpart.params.get('params')
1519 1519 if params is not None:
1520 1520 kwargs['params'] = params.split('\0')
1521 1521
1522 1522 raise error.BundleUnknownFeatureError(**kwargs)
1523 1523
1524 1524 @parthandler('error:pushraced', ('message',))
1525 1525 def handleerrorpushraced(op, inpart):
1526 1526 """Used to transmit push race error over the wire"""
1527 1527 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1528 1528
1529 1529 @parthandler('listkeys', ('namespace',))
1530 1530 def handlelistkeys(op, inpart):
1531 1531 """retrieve pushkey namespace content stored in a bundle2"""
1532 1532 namespace = inpart.params['namespace']
1533 1533 r = pushkey.decodekeys(inpart.read())
1534 1534 op.records.add('listkeys', (namespace, r))
1535 1535
1536 1536 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1537 1537 def handlepushkey(op, inpart):
1538 1538 """process a pushkey request"""
1539 1539 dec = pushkey.decode
1540 1540 namespace = dec(inpart.params['namespace'])
1541 1541 key = dec(inpart.params['key'])
1542 1542 old = dec(inpart.params['old'])
1543 1543 new = dec(inpart.params['new'])
1544 1544 # Grab the transaction to ensure that we have the lock before performing the
1545 1545 # pushkey.
1546 1546 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1547 1547 op.gettransaction()
1548 1548 ret = op.repo.pushkey(namespace, key, old, new)
1549 1549 record = {'namespace': namespace,
1550 1550 'key': key,
1551 1551 'old': old,
1552 1552 'new': new}
1553 1553 op.records.add('pushkey', record)
1554 1554 if op.reply is not None:
1555 1555 rpart = op.reply.newpart('reply:pushkey')
1556 1556 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1557 1557 rpart.addparam('return', '%i' % ret, mandatory=False)
1558 1558 if inpart.mandatory and not ret:
1559 1559 kwargs = {}
1560 1560 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1561 1561 if key in inpart.params:
1562 1562 kwargs[key] = inpart.params[key]
1563 1563 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1564 1564
1565 1565 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1566 1566 def handlepushkeyreply(op, inpart):
1567 1567 """retrieve the result of a pushkey request"""
1568 1568 ret = int(inpart.params['return'])
1569 1569 partid = int(inpart.params['in-reply-to'])
1570 1570 op.records.add('pushkey', {'return': ret}, partid)
1571 1571
1572 1572 @parthandler('obsmarkers')
1573 1573 def handleobsmarker(op, inpart):
1574 1574 """add a stream of obsmarkers to the repo"""
1575 1575 tr = op.gettransaction()
1576 1576 markerdata = inpart.read()
1577 1577 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1578 1578 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1579 1579 % len(markerdata))
1580 1580 # The mergemarkers call will crash if marker creation is not enabled.
1581 1581 # we want to avoid this if the part is advisory.
1582 1582 if not inpart.mandatory and op.repo.obsstore.readonly:
1583 1583 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1584 1584 return
1585 1585 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1586 1586 if new:
1587 1587 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1588 1588 op.records.add('obsmarkers', {'new': new})
1589 1589 if op.reply is not None:
1590 1590 rpart = op.reply.newpart('reply:obsmarkers')
1591 1591 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1592 1592 rpart.addparam('new', '%i' % new, mandatory=False)
1593 1593
1594 1594
1595 1595 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1596 1596 def handleobsmarkerreply(op, inpart):
1597 1597 """retrieve the result of a pushkey request"""
1598 1598 ret = int(inpart.params['new'])
1599 1599 partid = int(inpart.params['in-reply-to'])
1600 1600 op.records.add('obsmarkers', {'new': ret}, partid)
1601 1601
1602 1602 @parthandler('hgtagsfnodes')
1603 1603 def handlehgtagsfnodes(op, inpart):
1604 1604 """Applies .hgtags fnodes cache entries to the local repo.
1605 1605
1606 1606 Payload is pairs of 20 byte changeset nodes and filenodes.
1607 1607 """
1608 1608 # Grab the transaction so we ensure that we have the lock at this point.
1609 1609 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1610 1610 op.gettransaction()
1611 1611 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1612 1612
1613 1613 count = 0
1614 1614 while True:
1615 1615 node = inpart.read(20)
1616 1616 fnode = inpart.read(20)
1617 1617 if len(node) < 20 or len(fnode) < 20:
1618 1618 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1619 1619 break
1620 1620 cache.setfnode(node, fnode)
1621 1621 count += 1
1622 1622
1623 1623 cache.write()
1624 1624 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now