##// END OF EJS Templates
bundle2: use ProgrammingError
Jun Wu -
r31647:4dbef666 default
parent child Browse files
Show More
@@ -1,1626 +1,1626 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 pushkey,
162 162 pycompat,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 urlerr = util.urlerr
169 169 urlreq = util.urlreq
170 170
171 171 _pack = struct.pack
172 172 _unpack = struct.unpack
173 173
174 174 _fstreamparamsize = '>i'
175 175 _fpartheadersize = '>i'
176 176 _fparttypesize = '>B'
177 177 _fpartid = '>I'
178 178 _fpayloadsize = '>i'
179 179 _fpartparamcount = '>BB'
180 180
181 181 preferedchunksize = 4096
182 182
183 183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184 184
185 185 def outdebug(ui, message):
186 186 """debug regarding output stream (bundling)"""
187 187 if ui.configbool('devel', 'bundle2.debug', False):
188 188 ui.debug('bundle2-output: %s\n' % message)
189 189
190 190 def indebug(ui, message):
191 191 """debug on input stream (unbundling)"""
192 192 if ui.configbool('devel', 'bundle2.debug', False):
193 193 ui.debug('bundle2-input: %s\n' % message)
194 194
195 195 def validateparttype(parttype):
196 196 """raise ValueError if a parttype contains invalid character"""
197 197 if _parttypeforbidden.search(parttype):
198 198 raise ValueError(parttype)
199 199
200 200 def _makefpartparamsizes(nbparams):
201 201 """return a struct format to read part parameter sizes
202 202
203 203 The number parameters is variable so we need to build that format
204 204 dynamically.
205 205 """
206 206 return '>'+('BB'*nbparams)
207 207
208 208 parthandlermapping = {}
209 209
210 210 def parthandler(parttype, params=()):
211 211 """decorator that register a function as a bundle2 part handler
212 212
213 213 eg::
214 214
215 215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 216 def myparttypehandler(...):
217 217 '''process a part of type "my part".'''
218 218 ...
219 219 """
220 220 validateparttype(parttype)
221 221 def _decorator(func):
222 222 lparttype = parttype.lower() # enforce lower case matching.
223 223 assert lparttype not in parthandlermapping
224 224 parthandlermapping[lparttype] = func
225 225 func.params = frozenset(params)
226 226 return func
227 227 return _decorator
228 228
229 229 class unbundlerecords(object):
230 230 """keep record of what happens during and unbundle
231 231
232 232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 233 category of record and obj is an arbitrary object.
234 234
235 235 `records['cat']` will return all entries of this category 'cat'.
236 236
237 237 Iterating on the object itself will yield `('category', obj)` tuples
238 238 for all entries.
239 239
240 240 All iterations happens in chronological order.
241 241 """
242 242
243 243 def __init__(self):
244 244 self._categories = {}
245 245 self._sequences = []
246 246 self._replies = {}
247 247
248 248 def add(self, category, entry, inreplyto=None):
249 249 """add a new record of a given category.
250 250
251 251 The entry can then be retrieved in the list returned by
252 252 self['category']."""
253 253 self._categories.setdefault(category, []).append(entry)
254 254 self._sequences.append((category, entry))
255 255 if inreplyto is not None:
256 256 self.getreplies(inreplyto).add(category, entry)
257 257
258 258 def getreplies(self, partid):
259 259 """get the records that are replies to a specific part"""
260 260 return self._replies.setdefault(partid, unbundlerecords())
261 261
262 262 def __getitem__(self, cat):
263 263 return tuple(self._categories.get(cat, ()))
264 264
265 265 def __iter__(self):
266 266 return iter(self._sequences)
267 267
268 268 def __len__(self):
269 269 return len(self._sequences)
270 270
271 271 def __nonzero__(self):
272 272 return bool(self._sequences)
273 273
274 274 __bool__ = __nonzero__
275 275
276 276 class bundleoperation(object):
277 277 """an object that represents a single bundling process
278 278
279 279 Its purpose is to carry unbundle-related objects and states.
280 280
281 281 A new object should be created at the beginning of each bundle processing.
282 282 The object is to be returned by the processing function.
283 283
284 284 The object has very little content now it will ultimately contain:
285 285 * an access to the repo the bundle is applied to,
286 286 * a ui object,
287 287 * a way to retrieve a transaction to add changes to the repo,
288 288 * a way to record the result of processing each part,
289 289 * a way to construct a bundle response when applicable.
290 290 """
291 291
292 292 def __init__(self, repo, transactiongetter, captureoutput=True):
293 293 self.repo = repo
294 294 self.ui = repo.ui
295 295 self.records = unbundlerecords()
296 296 self.gettransaction = transactiongetter
297 297 self.reply = None
298 298 self.captureoutput = captureoutput
299 299
300 300 class TransactionUnavailable(RuntimeError):
301 301 pass
302 302
303 303 def _notransaction():
304 304 """default method to get a transaction while processing a bundle
305 305
306 306 Raise an exception to highlight the fact that no transaction was expected
307 307 to be created"""
308 308 raise TransactionUnavailable()
309 309
310 310 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
311 311 # transform me into unbundler.apply() as soon as the freeze is lifted
312 312 tr.hookargs['bundle2'] = '1'
313 313 if source is not None and 'source' not in tr.hookargs:
314 314 tr.hookargs['source'] = source
315 315 if url is not None and 'url' not in tr.hookargs:
316 316 tr.hookargs['url'] = url
317 317 return processbundle(repo, unbundler, lambda: tr, op=op)
318 318
319 319 def processbundle(repo, unbundler, transactiongetter=None, op=None):
320 320 """This function process a bundle, apply effect to/from a repo
321 321
322 322 It iterates over each part then searches for and uses the proper handling
323 323 code to process the part. Parts are processed in order.
324 324
325 325 Unknown Mandatory part will abort the process.
326 326
327 327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 328 function. This is used to ensure output is properly propagated in case of
329 329 an error during the unbundling. This output capturing part will likely be
330 330 reworked and this ability will probably go away in the process.
331 331 """
332 332 if op is None:
333 333 if transactiongetter is None:
334 334 transactiongetter = _notransaction
335 335 op = bundleoperation(repo, transactiongetter)
336 336 # todo:
337 337 # - replace this is a init function soon.
338 338 # - exception catching
339 339 unbundler.params
340 340 if repo.ui.debugflag:
341 341 msg = ['bundle2-input-bundle:']
342 342 if unbundler.params:
343 343 msg.append(' %i params')
344 344 if op.gettransaction is None:
345 345 msg.append(' no-transaction')
346 346 else:
347 347 msg.append(' with-transaction')
348 348 msg.append('\n')
349 349 repo.ui.debug(''.join(msg))
350 350 iterparts = enumerate(unbundler.iterparts())
351 351 part = None
352 352 nbpart = 0
353 353 try:
354 354 for nbpart, part in iterparts:
355 355 _processpart(op, part)
356 356 except Exception as exc:
357 357 for nbpart, part in iterparts:
358 358 # consume the bundle content
359 359 part.seek(0, 2)
360 360 # Small hack to let caller code distinguish exceptions from bundle2
361 361 # processing from processing the old format. This is mostly
362 362 # needed to handle different return codes to unbundle according to the
363 363 # type of bundle. We should probably clean up or drop this return code
364 364 # craziness in a future version.
365 365 exc.duringunbundle2 = True
366 366 salvaged = []
367 367 replycaps = None
368 368 if op.reply is not None:
369 369 salvaged = op.reply.salvageoutput()
370 370 replycaps = op.reply.capabilities
371 371 exc._replycaps = replycaps
372 372 exc._bundle2salvagedoutput = salvaged
373 373 raise
374 374 finally:
375 375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376 376
377 377 return op
378 378
379 379 def _processpart(op, part):
380 380 """process a single part from a bundle
381 381
382 382 The part is guaranteed to have been fully consumed when the function exits
383 383 (even if an exception is raised)."""
384 384 status = 'unknown' # used by debug output
385 385 hardabort = False
386 386 try:
387 387 try:
388 388 handler = parthandlermapping.get(part.type)
389 389 if handler is None:
390 390 status = 'unsupported-type'
391 391 raise error.BundleUnknownFeatureError(parttype=part.type)
392 392 indebug(op.ui, 'found a handler for part %r' % part.type)
393 393 unknownparams = part.mandatorykeys - handler.params
394 394 if unknownparams:
395 395 unknownparams = list(unknownparams)
396 396 unknownparams.sort()
397 397 status = 'unsupported-params (%s)' % unknownparams
398 398 raise error.BundleUnknownFeatureError(parttype=part.type,
399 399 params=unknownparams)
400 400 status = 'supported'
401 401 except error.BundleUnknownFeatureError as exc:
402 402 if part.mandatory: # mandatory parts
403 403 raise
404 404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
405 405 return # skip to part processing
406 406 finally:
407 407 if op.ui.debugflag:
408 408 msg = ['bundle2-input-part: "%s"' % part.type]
409 409 if not part.mandatory:
410 410 msg.append(' (advisory)')
411 411 nbmp = len(part.mandatorykeys)
412 412 nbap = len(part.params) - nbmp
413 413 if nbmp or nbap:
414 414 msg.append(' (params:')
415 415 if nbmp:
416 416 msg.append(' %i mandatory' % nbmp)
417 417 if nbap:
418 418 msg.append(' %i advisory' % nbmp)
419 419 msg.append(')')
420 420 msg.append(' %s\n' % status)
421 421 op.ui.debug(''.join(msg))
422 422
423 423 # handler is called outside the above try block so that we don't
424 424 # risk catching KeyErrors from anything other than the
425 425 # parthandlermapping lookup (any KeyError raised by handler()
426 426 # itself represents a defect of a different variety).
427 427 output = None
428 428 if op.captureoutput and op.reply is not None:
429 429 op.ui.pushbuffer(error=True, subproc=True)
430 430 output = ''
431 431 try:
432 432 handler(op, part)
433 433 finally:
434 434 if output is not None:
435 435 output = op.ui.popbuffer()
436 436 if output:
437 437 outpart = op.reply.newpart('output', data=output,
438 438 mandatory=False)
439 439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
440 440 # If exiting or interrupted, do not attempt to seek the stream in the
441 441 # finally block below. This makes abort faster.
442 442 except (SystemExit, KeyboardInterrupt):
443 443 hardabort = True
444 444 raise
445 445 finally:
446 446 # consume the part content to not corrupt the stream.
447 447 if not hardabort:
448 448 part.seek(0, 2)
449 449
450 450
451 451 def decodecaps(blob):
452 452 """decode a bundle2 caps bytes blob into a dictionary
453 453
454 454 The blob is a list of capabilities (one per line)
455 455 Capabilities may have values using a line of the form::
456 456
457 457 capability=value1,value2,value3
458 458
459 459 The values are always a list."""
460 460 caps = {}
461 461 for line in blob.splitlines():
462 462 if not line:
463 463 continue
464 464 if '=' not in line:
465 465 key, vals = line, ()
466 466 else:
467 467 key, vals = line.split('=', 1)
468 468 vals = vals.split(',')
469 469 key = urlreq.unquote(key)
470 470 vals = [urlreq.unquote(v) for v in vals]
471 471 caps[key] = vals
472 472 return caps
473 473
474 474 def encodecaps(caps):
475 475 """encode a bundle2 caps dictionary into a bytes blob"""
476 476 chunks = []
477 477 for ca in sorted(caps):
478 478 vals = caps[ca]
479 479 ca = urlreq.quote(ca)
480 480 vals = [urlreq.quote(v) for v in vals]
481 481 if vals:
482 482 ca = "%s=%s" % (ca, ','.join(vals))
483 483 chunks.append(ca)
484 484 return '\n'.join(chunks)
485 485
486 486 bundletypes = {
487 487 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
488 488 # since the unification ssh accepts a header but there
489 489 # is no capability signaling it.
490 490 "HG20": (), # special-cased below
491 491 "HG10UN": ("HG10UN", 'UN'),
492 492 "HG10BZ": ("HG10", 'BZ'),
493 493 "HG10GZ": ("HG10GZ", 'GZ'),
494 494 }
495 495
496 496 # hgweb uses this list to communicate its preferred type
497 497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
498 498
499 499 class bundle20(object):
500 500 """represent an outgoing bundle2 container
501 501
502 502 Use the `addparam` method to add stream level parameter. and `newpart` to
503 503 populate it. Then call `getchunks` to retrieve all the binary chunks of
504 504 data that compose the bundle2 container."""
505 505
506 506 _magicstring = 'HG20'
507 507
508 508 def __init__(self, ui, capabilities=()):
509 509 self.ui = ui
510 510 self._params = []
511 511 self._parts = []
512 512 self.capabilities = dict(capabilities)
513 513 self._compengine = util.compengines.forbundletype('UN')
514 514 self._compopts = None
515 515
516 516 def setcompression(self, alg, compopts=None):
517 517 """setup core part compression to <alg>"""
518 518 if alg in (None, 'UN'):
519 519 return
520 520 assert not any(n.lower() == 'compression' for n, v in self._params)
521 521 self.addparam('Compression', alg)
522 522 self._compengine = util.compengines.forbundletype(alg)
523 523 self._compopts = compopts
524 524
525 525 @property
526 526 def nbparts(self):
527 527 """total number of parts added to the bundler"""
528 528 return len(self._parts)
529 529
530 530 # methods used to defines the bundle2 content
531 531 def addparam(self, name, value=None):
532 532 """add a stream level parameter"""
533 533 if not name:
534 534 raise ValueError('empty parameter name')
535 535 if name[0] not in string.letters:
536 536 raise ValueError('non letter first character: %r' % name)
537 537 self._params.append((name, value))
538 538
539 539 def addpart(self, part):
540 540 """add a new part to the bundle2 container
541 541
542 542 Parts contains the actual applicative payload."""
543 543 assert part.id is None
544 544 part.id = len(self._parts) # very cheap counter
545 545 self._parts.append(part)
546 546
547 547 def newpart(self, typeid, *args, **kwargs):
548 548 """create a new part and add it to the containers
549 549
550 550 As the part is directly added to the containers. For now, this means
551 551 that any failure to properly initialize the part after calling
552 552 ``newpart`` should result in a failure of the whole bundling process.
553 553
554 554 You can still fall back to manually create and add if you need better
555 555 control."""
556 556 part = bundlepart(typeid, *args, **kwargs)
557 557 self.addpart(part)
558 558 return part
559 559
560 560 # methods used to generate the bundle2 stream
561 561 def getchunks(self):
562 562 if self.ui.debugflag:
563 563 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
564 564 if self._params:
565 565 msg.append(' (%i params)' % len(self._params))
566 566 msg.append(' %i parts total\n' % len(self._parts))
567 567 self.ui.debug(''.join(msg))
568 568 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
569 569 yield self._magicstring
570 570 param = self._paramchunk()
571 571 outdebug(self.ui, 'bundle parameter: %s' % param)
572 572 yield _pack(_fstreamparamsize, len(param))
573 573 if param:
574 574 yield param
575 575 for chunk in self._compengine.compressstream(self._getcorechunk(),
576 576 self._compopts):
577 577 yield chunk
578 578
579 579 def _paramchunk(self):
580 580 """return a encoded version of all stream parameters"""
581 581 blocks = []
582 582 for par, value in self._params:
583 583 par = urlreq.quote(par)
584 584 if value is not None:
585 585 value = urlreq.quote(value)
586 586 par = '%s=%s' % (par, value)
587 587 blocks.append(par)
588 588 return ' '.join(blocks)
589 589
590 590 def _getcorechunk(self):
591 591 """yield chunk for the core part of the bundle
592 592
593 593 (all but headers and parameters)"""
594 594 outdebug(self.ui, 'start of parts')
595 595 for part in self._parts:
596 596 outdebug(self.ui, 'bundle part: "%s"' % part.type)
597 597 for chunk in part.getchunks(ui=self.ui):
598 598 yield chunk
599 599 outdebug(self.ui, 'end of bundle')
600 600 yield _pack(_fpartheadersize, 0)
601 601
602 602
603 603 def salvageoutput(self):
604 604 """return a list with a copy of all output parts in the bundle
605 605
606 606 This is meant to be used during error handling to make sure we preserve
607 607 server output"""
608 608 salvaged = []
609 609 for part in self._parts:
610 610 if part.type.startswith('output'):
611 611 salvaged.append(part.copy())
612 612 return salvaged
613 613
614 614
615 615 class unpackermixin(object):
616 616 """A mixin to extract bytes and struct data from a stream"""
617 617
618 618 def __init__(self, fp):
619 619 self._fp = fp
620 620 self._seekable = (util.safehasattr(fp, 'seek') and
621 621 util.safehasattr(fp, 'tell'))
622 622
623 623 def _unpack(self, format):
624 624 """unpack this struct format from the stream"""
625 625 data = self._readexact(struct.calcsize(format))
626 626 return _unpack(format, data)
627 627
628 628 def _readexact(self, size):
629 629 """read exactly <size> bytes from the stream"""
630 630 return changegroup.readexactly(self._fp, size)
631 631
632 632 def seek(self, offset, whence=0):
633 633 """move the underlying file pointer"""
634 634 if self._seekable:
635 635 return self._fp.seek(offset, whence)
636 636 else:
637 637 raise NotImplementedError(_('File pointer is not seekable'))
638 638
639 639 def tell(self):
640 640 """return the file offset, or None if file is not seekable"""
641 641 if self._seekable:
642 642 try:
643 643 return self._fp.tell()
644 644 except IOError as e:
645 645 if e.errno == errno.ESPIPE:
646 646 self._seekable = False
647 647 else:
648 648 raise
649 649 return None
650 650
651 651 def close(self):
652 652 """close underlying file"""
653 653 if util.safehasattr(self._fp, 'close'):
654 654 return self._fp.close()
655 655
656 656 def getunbundler(ui, fp, magicstring=None):
657 657 """return a valid unbundler object for a given magicstring"""
658 658 if magicstring is None:
659 659 magicstring = changegroup.readexactly(fp, 4)
660 660 magic, version = magicstring[0:2], magicstring[2:4]
661 661 if magic != 'HG':
662 662 raise error.Abort(_('not a Mercurial bundle'))
663 663 unbundlerclass = formatmap.get(version)
664 664 if unbundlerclass is None:
665 665 raise error.Abort(_('unknown bundle version %s') % version)
666 666 unbundler = unbundlerclass(ui, fp)
667 667 indebug(ui, 'start processing of %s stream' % magicstring)
668 668 return unbundler
669 669
670 670 class unbundle20(unpackermixin):
671 671 """interpret a bundle2 stream
672 672
673 673 This class is fed with a binary stream and yields parts through its
674 674 `iterparts` methods."""
675 675
676 676 _magicstring = 'HG20'
677 677
678 678 def __init__(self, ui, fp):
679 679 """If header is specified, we do not read it out of the stream."""
680 680 self.ui = ui
681 681 self._compengine = util.compengines.forbundletype('UN')
682 682 self._compressed = None
683 683 super(unbundle20, self).__init__(fp)
684 684
685 685 @util.propertycache
686 686 def params(self):
687 687 """dictionary of stream level parameters"""
688 688 indebug(self.ui, 'reading bundle2 stream parameters')
689 689 params = {}
690 690 paramssize = self._unpack(_fstreamparamsize)[0]
691 691 if paramssize < 0:
692 692 raise error.BundleValueError('negative bundle param size: %i'
693 693 % paramssize)
694 694 if paramssize:
695 695 params = self._readexact(paramssize)
696 696 params = self._processallparams(params)
697 697 return params
698 698
699 699 def _processallparams(self, paramsblock):
700 700 """"""
701 701 params = util.sortdict()
702 702 for p in paramsblock.split(' '):
703 703 p = p.split('=', 1)
704 704 p = [urlreq.unquote(i) for i in p]
705 705 if len(p) < 2:
706 706 p.append(None)
707 707 self._processparam(*p)
708 708 params[p[0]] = p[1]
709 709 return params
710 710
711 711
712 712 def _processparam(self, name, value):
713 713 """process a parameter, applying its effect if needed
714 714
715 715 Parameter starting with a lower case letter are advisory and will be
716 716 ignored when unknown. Those starting with an upper case letter are
717 717 mandatory and will this function will raise a KeyError when unknown.
718 718
719 719 Note: no option are currently supported. Any input will be either
720 720 ignored or failing.
721 721 """
722 722 if not name:
723 723 raise ValueError('empty parameter name')
724 724 if name[0] not in string.letters:
725 725 raise ValueError('non letter first character: %r' % name)
726 726 try:
727 727 handler = b2streamparamsmap[name.lower()]
728 728 except KeyError:
729 729 if name[0].islower():
730 730 indebug(self.ui, "ignoring unknown parameter %r" % name)
731 731 else:
732 732 raise error.BundleUnknownFeatureError(params=(name,))
733 733 else:
734 734 handler(self, name, value)
735 735
736 736 def _forwardchunks(self):
737 737 """utility to transfer a bundle2 as binary
738 738
739 739 This is made necessary by the fact the 'getbundle' command over 'ssh'
740 740 have no way to know then the reply end, relying on the bundle to be
741 741 interpreted to know its end. This is terrible and we are sorry, but we
742 742 needed to move forward to get general delta enabled.
743 743 """
744 744 yield self._magicstring
745 745 assert 'params' not in vars(self)
746 746 paramssize = self._unpack(_fstreamparamsize)[0]
747 747 if paramssize < 0:
748 748 raise error.BundleValueError('negative bundle param size: %i'
749 749 % paramssize)
750 750 yield _pack(_fstreamparamsize, paramssize)
751 751 if paramssize:
752 752 params = self._readexact(paramssize)
753 753 self._processallparams(params)
754 754 yield params
755 755 assert self._compengine.bundletype == 'UN'
756 756 # From there, payload might need to be decompressed
757 757 self._fp = self._compengine.decompressorreader(self._fp)
758 758 emptycount = 0
759 759 while emptycount < 2:
760 760 # so we can brainlessly loop
761 761 assert _fpartheadersize == _fpayloadsize
762 762 size = self._unpack(_fpartheadersize)[0]
763 763 yield _pack(_fpartheadersize, size)
764 764 if size:
765 765 emptycount = 0
766 766 else:
767 767 emptycount += 1
768 768 continue
769 769 if size == flaginterrupt:
770 770 continue
771 771 elif size < 0:
772 772 raise error.BundleValueError('negative chunk size: %i')
773 773 yield self._readexact(size)
774 774
775 775
776 776 def iterparts(self):
777 777 """yield all parts contained in the stream"""
778 778 # make sure param have been loaded
779 779 self.params
780 780 # From there, payload need to be decompressed
781 781 self._fp = self._compengine.decompressorreader(self._fp)
782 782 indebug(self.ui, 'start extraction of bundle2 parts')
783 783 headerblock = self._readpartheader()
784 784 while headerblock is not None:
785 785 part = unbundlepart(self.ui, headerblock, self._fp)
786 786 yield part
787 787 part.seek(0, 2)
788 788 headerblock = self._readpartheader()
789 789 indebug(self.ui, 'end of bundle2 stream')
790 790
791 791 def _readpartheader(self):
792 792 """reads a part header size and return the bytes blob
793 793
794 794 returns None if empty"""
795 795 headersize = self._unpack(_fpartheadersize)[0]
796 796 if headersize < 0:
797 797 raise error.BundleValueError('negative part header size: %i'
798 798 % headersize)
799 799 indebug(self.ui, 'part header size: %i' % headersize)
800 800 if headersize:
801 801 return self._readexact(headersize)
802 802 return None
803 803
804 804 def compressed(self):
805 805 self.params # load params
806 806 return self._compressed
807 807
808 808 formatmap = {'20': unbundle20}
809 809
810 810 b2streamparamsmap = {}
811 811
812 812 def b2streamparamhandler(name):
813 813 """register a handler for a stream level parameter"""
814 814 def decorator(func):
815 815 assert name not in formatmap
816 816 b2streamparamsmap[name] = func
817 817 return func
818 818 return decorator
819 819
820 820 @b2streamparamhandler('compression')
821 821 def processcompression(unbundler, param, value):
822 822 """read compression parameter and install payload decompression"""
823 823 if value not in util.compengines.supportedbundletypes:
824 824 raise error.BundleUnknownFeatureError(params=(param,),
825 825 values=(value,))
826 826 unbundler._compengine = util.compengines.forbundletype(value)
827 827 if value is not None:
828 828 unbundler._compressed = True
829 829
830 830 class bundlepart(object):
831 831 """A bundle2 part contains application level payload
832 832
833 833 The part `type` is used to route the part to the application level
834 834 handler.
835 835
836 836 The part payload is contained in ``part.data``. It could be raw bytes or a
837 837 generator of byte chunks.
838 838
839 839 You can add parameters to the part using the ``addparam`` method.
840 840 Parameters can be either mandatory (default) or advisory. Remote side
841 841 should be able to safely ignore the advisory ones.
842 842
843 843 Both data and parameters cannot be modified after the generation has begun.
844 844 """
845 845
846 846 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
847 847 data='', mandatory=True):
848 848 validateparttype(parttype)
849 849 self.id = None
850 850 self.type = parttype
851 851 self._data = data
852 852 self._mandatoryparams = list(mandatoryparams)
853 853 self._advisoryparams = list(advisoryparams)
854 854 # checking for duplicated entries
855 855 self._seenparams = set()
856 856 for pname, __ in self._mandatoryparams + self._advisoryparams:
857 857 if pname in self._seenparams:
858 raise RuntimeError('duplicated params: %s' % pname)
858 raise error.ProgrammingError('duplicated params: %s' % pname)
859 859 self._seenparams.add(pname)
860 860 # status of the part's generation:
861 861 # - None: not started,
862 862 # - False: currently generated,
863 863 # - True: generation done.
864 864 self._generated = None
865 865 self.mandatory = mandatory
866 866
867 867 def __repr__(self):
868 868 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
869 869 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
870 870 % (cls, id(self), self.id, self.type, self.mandatory))
871 871
872 872 def copy(self):
873 873 """return a copy of the part
874 874
875 875 The new part have the very same content but no partid assigned yet.
876 876 Parts with generated data cannot be copied."""
877 877 assert not util.safehasattr(self.data, 'next')
878 878 return self.__class__(self.type, self._mandatoryparams,
879 879 self._advisoryparams, self._data, self.mandatory)
880 880
881 881 # methods used to defines the part content
882 882 @property
883 883 def data(self):
884 884 return self._data
885 885
886 886 @data.setter
887 887 def data(self, data):
888 888 if self._generated is not None:
889 889 raise error.ReadOnlyPartError('part is being generated')
890 890 self._data = data
891 891
892 892 @property
893 893 def mandatoryparams(self):
894 894 # make it an immutable tuple to force people through ``addparam``
895 895 return tuple(self._mandatoryparams)
896 896
897 897 @property
898 898 def advisoryparams(self):
899 899 # make it an immutable tuple to force people through ``addparam``
900 900 return tuple(self._advisoryparams)
901 901
902 902 def addparam(self, name, value='', mandatory=True):
903 903 if self._generated is not None:
904 904 raise error.ReadOnlyPartError('part is being generated')
905 905 if name in self._seenparams:
906 906 raise ValueError('duplicated params: %s' % name)
907 907 self._seenparams.add(name)
908 908 params = self._advisoryparams
909 909 if mandatory:
910 910 params = self._mandatoryparams
911 911 params.append((name, value))
912 912
913 913 # methods used to generates the bundle2 stream
914 914 def getchunks(self, ui):
915 915 if self._generated is not None:
916 raise RuntimeError('part can only be consumed once')
916 raise error.ProgrammingError('part can only be consumed once')
917 917 self._generated = False
918 918
919 919 if ui.debugflag:
920 920 msg = ['bundle2-output-part: "%s"' % self.type]
921 921 if not self.mandatory:
922 922 msg.append(' (advisory)')
923 923 nbmp = len(self.mandatoryparams)
924 924 nbap = len(self.advisoryparams)
925 925 if nbmp or nbap:
926 926 msg.append(' (params:')
927 927 if nbmp:
928 928 msg.append(' %i mandatory' % nbmp)
929 929 if nbap:
930 930 msg.append(' %i advisory' % nbmp)
931 931 msg.append(')')
932 932 if not self.data:
933 933 msg.append(' empty payload')
934 934 elif util.safehasattr(self.data, 'next'):
935 935 msg.append(' streamed payload')
936 936 else:
937 937 msg.append(' %i bytes payload' % len(self.data))
938 938 msg.append('\n')
939 939 ui.debug(''.join(msg))
940 940
941 941 #### header
942 942 if self.mandatory:
943 943 parttype = self.type.upper()
944 944 else:
945 945 parttype = self.type.lower()
946 946 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
947 947 ## parttype
948 948 header = [_pack(_fparttypesize, len(parttype)),
949 949 parttype, _pack(_fpartid, self.id),
950 950 ]
951 951 ## parameters
952 952 # count
953 953 manpar = self.mandatoryparams
954 954 advpar = self.advisoryparams
955 955 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
956 956 # size
957 957 parsizes = []
958 958 for key, value in manpar:
959 959 parsizes.append(len(key))
960 960 parsizes.append(len(value))
961 961 for key, value in advpar:
962 962 parsizes.append(len(key))
963 963 parsizes.append(len(value))
964 964 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
965 965 header.append(paramsizes)
966 966 # key, value
967 967 for key, value in manpar:
968 968 header.append(key)
969 969 header.append(value)
970 970 for key, value in advpar:
971 971 header.append(key)
972 972 header.append(value)
973 973 ## finalize header
974 974 headerchunk = ''.join(header)
975 975 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
976 976 yield _pack(_fpartheadersize, len(headerchunk))
977 977 yield headerchunk
978 978 ## payload
979 979 try:
980 980 for chunk in self._payloadchunks():
981 981 outdebug(ui, 'payload chunk size: %i' % len(chunk))
982 982 yield _pack(_fpayloadsize, len(chunk))
983 983 yield chunk
984 984 except GeneratorExit:
985 985 # GeneratorExit means that nobody is listening for our
986 986 # results anyway, so just bail quickly rather than trying
987 987 # to produce an error part.
988 988 ui.debug('bundle2-generatorexit\n')
989 989 raise
990 990 except BaseException as exc:
991 991 # backup exception data for later
992 992 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
993 993 % exc)
994 994 exc_info = sys.exc_info()
995 995 msg = 'unexpected error: %s' % exc
996 996 interpart = bundlepart('error:abort', [('message', msg)],
997 997 mandatory=False)
998 998 interpart.id = 0
999 999 yield _pack(_fpayloadsize, -1)
1000 1000 for chunk in interpart.getchunks(ui=ui):
1001 1001 yield chunk
1002 1002 outdebug(ui, 'closing payload chunk')
1003 1003 # abort current part payload
1004 1004 yield _pack(_fpayloadsize, 0)
1005 1005 if pycompat.ispy3:
1006 1006 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1007 1007 else:
1008 1008 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1009 1009 # end of payload
1010 1010 outdebug(ui, 'closing payload chunk')
1011 1011 yield _pack(_fpayloadsize, 0)
1012 1012 self._generated = True
1013 1013
1014 1014 def _payloadchunks(self):
1015 1015 """yield chunks of a the part payload
1016 1016
1017 1017 Exists to handle the different methods to provide data to a part."""
1018 1018 # we only support fixed size data now.
1019 1019 # This will be improved in the future.
1020 1020 if util.safehasattr(self.data, 'next'):
1021 1021 buff = util.chunkbuffer(self.data)
1022 1022 chunk = buff.read(preferedchunksize)
1023 1023 while chunk:
1024 1024 yield chunk
1025 1025 chunk = buff.read(preferedchunksize)
1026 1026 elif len(self.data):
1027 1027 yield self.data
1028 1028
1029 1029
1030 1030 flaginterrupt = -1
1031 1031
1032 1032 class interrupthandler(unpackermixin):
1033 1033 """read one part and process it with restricted capability
1034 1034
1035 1035 This allows to transmit exception raised on the producer size during part
1036 1036 iteration while the consumer is reading a part.
1037 1037
1038 1038 Part processed in this manner only have access to a ui object,"""
1039 1039
1040 1040 def __init__(self, ui, fp):
1041 1041 super(interrupthandler, self).__init__(fp)
1042 1042 self.ui = ui
1043 1043
1044 1044 def _readpartheader(self):
1045 1045 """reads a part header size and return the bytes blob
1046 1046
1047 1047 returns None if empty"""
1048 1048 headersize = self._unpack(_fpartheadersize)[0]
1049 1049 if headersize < 0:
1050 1050 raise error.BundleValueError('negative part header size: %i'
1051 1051 % headersize)
1052 1052 indebug(self.ui, 'part header size: %i\n' % headersize)
1053 1053 if headersize:
1054 1054 return self._readexact(headersize)
1055 1055 return None
1056 1056
1057 1057 def __call__(self):
1058 1058
1059 1059 self.ui.debug('bundle2-input-stream-interrupt:'
1060 1060 ' opening out of band context\n')
1061 1061 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1062 1062 headerblock = self._readpartheader()
1063 1063 if headerblock is None:
1064 1064 indebug(self.ui, 'no part found during interruption.')
1065 1065 return
1066 1066 part = unbundlepart(self.ui, headerblock, self._fp)
1067 1067 op = interruptoperation(self.ui)
1068 1068 _processpart(op, part)
1069 1069 self.ui.debug('bundle2-input-stream-interrupt:'
1070 1070 ' closing out of band context\n')
1071 1071
1072 1072 class interruptoperation(object):
1073 1073 """A limited operation to be use by part handler during interruption
1074 1074
1075 1075 It only have access to an ui object.
1076 1076 """
1077 1077
1078 1078 def __init__(self, ui):
1079 1079 self.ui = ui
1080 1080 self.reply = None
1081 1081 self.captureoutput = False
1082 1082
1083 1083 @property
1084 1084 def repo(self):
1085 raise RuntimeError('no repo access from stream interruption')
1085 raise error.ProgrammingError('no repo access from stream interruption')
1086 1086
1087 1087 def gettransaction(self):
1088 1088 raise TransactionUnavailable('no repo access from stream interruption')
1089 1089
1090 1090 class unbundlepart(unpackermixin):
1091 1091 """a bundle part read from a bundle"""
1092 1092
1093 1093 def __init__(self, ui, header, fp):
1094 1094 super(unbundlepart, self).__init__(fp)
1095 1095 self.ui = ui
1096 1096 # unbundle state attr
1097 1097 self._headerdata = header
1098 1098 self._headeroffset = 0
1099 1099 self._initialized = False
1100 1100 self.consumed = False
1101 1101 # part data
1102 1102 self.id = None
1103 1103 self.type = None
1104 1104 self.mandatoryparams = None
1105 1105 self.advisoryparams = None
1106 1106 self.params = None
1107 1107 self.mandatorykeys = ()
1108 1108 self._payloadstream = None
1109 1109 self._readheader()
1110 1110 self._mandatory = None
1111 1111 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1112 1112 self._pos = 0
1113 1113
1114 1114 def _fromheader(self, size):
1115 1115 """return the next <size> byte from the header"""
1116 1116 offset = self._headeroffset
1117 1117 data = self._headerdata[offset:(offset + size)]
1118 1118 self._headeroffset = offset + size
1119 1119 return data
1120 1120
1121 1121 def _unpackheader(self, format):
1122 1122 """read given format from header
1123 1123
1124 1124 This automatically compute the size of the format to read."""
1125 1125 data = self._fromheader(struct.calcsize(format))
1126 1126 return _unpack(format, data)
1127 1127
1128 1128 def _initparams(self, mandatoryparams, advisoryparams):
1129 1129 """internal function to setup all logic related parameters"""
1130 1130 # make it read only to prevent people touching it by mistake.
1131 1131 self.mandatoryparams = tuple(mandatoryparams)
1132 1132 self.advisoryparams = tuple(advisoryparams)
1133 1133 # user friendly UI
1134 1134 self.params = util.sortdict(self.mandatoryparams)
1135 1135 self.params.update(self.advisoryparams)
1136 1136 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1137 1137
1138 1138 def _payloadchunks(self, chunknum=0):
1139 1139 '''seek to specified chunk and start yielding data'''
1140 1140 if len(self._chunkindex) == 0:
1141 1141 assert chunknum == 0, 'Must start with chunk 0'
1142 1142 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1143 1143 else:
1144 1144 assert chunknum < len(self._chunkindex), \
1145 1145 'Unknown chunk %d' % chunknum
1146 1146 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1147 1147
1148 1148 pos = self._chunkindex[chunknum][0]
1149 1149 payloadsize = self._unpack(_fpayloadsize)[0]
1150 1150 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1151 1151 while payloadsize:
1152 1152 if payloadsize == flaginterrupt:
1153 1153 # interruption detection, the handler will now read a
1154 1154 # single part and process it.
1155 1155 interrupthandler(self.ui, self._fp)()
1156 1156 elif payloadsize < 0:
1157 1157 msg = 'negative payload chunk size: %i' % payloadsize
1158 1158 raise error.BundleValueError(msg)
1159 1159 else:
1160 1160 result = self._readexact(payloadsize)
1161 1161 chunknum += 1
1162 1162 pos += payloadsize
1163 1163 if chunknum == len(self._chunkindex):
1164 1164 self._chunkindex.append((pos,
1165 1165 super(unbundlepart, self).tell()))
1166 1166 yield result
1167 1167 payloadsize = self._unpack(_fpayloadsize)[0]
1168 1168 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1169 1169
1170 1170 def _findchunk(self, pos):
1171 1171 '''for a given payload position, return a chunk number and offset'''
1172 1172 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1173 1173 if ppos == pos:
1174 1174 return chunk, 0
1175 1175 elif ppos > pos:
1176 1176 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1177 1177 raise ValueError('Unknown chunk')
1178 1178
1179 1179 def _readheader(self):
1180 1180 """read the header and setup the object"""
1181 1181 typesize = self._unpackheader(_fparttypesize)[0]
1182 1182 self.type = self._fromheader(typesize)
1183 1183 indebug(self.ui, 'part type: "%s"' % self.type)
1184 1184 self.id = self._unpackheader(_fpartid)[0]
1185 1185 indebug(self.ui, 'part id: "%s"' % self.id)
1186 1186 # extract mandatory bit from type
1187 1187 self.mandatory = (self.type != self.type.lower())
1188 1188 self.type = self.type.lower()
1189 1189 ## reading parameters
1190 1190 # param count
1191 1191 mancount, advcount = self._unpackheader(_fpartparamcount)
1192 1192 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1193 1193 # param size
1194 1194 fparamsizes = _makefpartparamsizes(mancount + advcount)
1195 1195 paramsizes = self._unpackheader(fparamsizes)
1196 1196 # make it a list of couple again
1197 1197 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1198 1198 # split mandatory from advisory
1199 1199 mansizes = paramsizes[:mancount]
1200 1200 advsizes = paramsizes[mancount:]
1201 1201 # retrieve param value
1202 1202 manparams = []
1203 1203 for key, value in mansizes:
1204 1204 manparams.append((self._fromheader(key), self._fromheader(value)))
1205 1205 advparams = []
1206 1206 for key, value in advsizes:
1207 1207 advparams.append((self._fromheader(key), self._fromheader(value)))
1208 1208 self._initparams(manparams, advparams)
1209 1209 ## part payload
1210 1210 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1211 1211 # we read the data, tell it
1212 1212 self._initialized = True
1213 1213
1214 1214 def read(self, size=None):
1215 1215 """read payload data"""
1216 1216 if not self._initialized:
1217 1217 self._readheader()
1218 1218 if size is None:
1219 1219 data = self._payloadstream.read()
1220 1220 else:
1221 1221 data = self._payloadstream.read(size)
1222 1222 self._pos += len(data)
1223 1223 if size is None or len(data) < size:
1224 1224 if not self.consumed and self._pos:
1225 1225 self.ui.debug('bundle2-input-part: total payload size %i\n'
1226 1226 % self._pos)
1227 1227 self.consumed = True
1228 1228 return data
1229 1229
1230 1230 def tell(self):
1231 1231 return self._pos
1232 1232
1233 1233 def seek(self, offset, whence=0):
1234 1234 if whence == 0:
1235 1235 newpos = offset
1236 1236 elif whence == 1:
1237 1237 newpos = self._pos + offset
1238 1238 elif whence == 2:
1239 1239 if not self.consumed:
1240 1240 self.read()
1241 1241 newpos = self._chunkindex[-1][0] - offset
1242 1242 else:
1243 1243 raise ValueError('Unknown whence value: %r' % (whence,))
1244 1244
1245 1245 if newpos > self._chunkindex[-1][0] and not self.consumed:
1246 1246 self.read()
1247 1247 if not 0 <= newpos <= self._chunkindex[-1][0]:
1248 1248 raise ValueError('Offset out of range')
1249 1249
1250 1250 if self._pos != newpos:
1251 1251 chunk, internaloffset = self._findchunk(newpos)
1252 1252 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1253 1253 adjust = self.read(internaloffset)
1254 1254 if len(adjust) != internaloffset:
1255 1255 raise error.Abort(_('Seek failed\n'))
1256 1256 self._pos = newpos
1257 1257
1258 1258 # These are only the static capabilities.
1259 1259 # Check the 'getrepocaps' function for the rest.
1260 1260 capabilities = {'HG20': (),
1261 1261 'error': ('abort', 'unsupportedcontent', 'pushraced',
1262 1262 'pushkey'),
1263 1263 'listkeys': (),
1264 1264 'pushkey': (),
1265 1265 'digests': tuple(sorted(util.DIGESTS.keys())),
1266 1266 'remote-changegroup': ('http', 'https'),
1267 1267 'hgtagsfnodes': (),
1268 1268 }
1269 1269
1270 1270 def getrepocaps(repo, allowpushback=False):
1271 1271 """return the bundle2 capabilities for a given repo
1272 1272
1273 1273 Exists to allow extensions (like evolution) to mutate the capabilities.
1274 1274 """
1275 1275 caps = capabilities.copy()
1276 1276 caps['changegroup'] = tuple(sorted(
1277 1277 changegroup.supportedincomingversions(repo)))
1278 1278 if obsolete.isenabled(repo, obsolete.exchangeopt):
1279 1279 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1280 1280 caps['obsmarkers'] = supportedformat
1281 1281 if allowpushback:
1282 1282 caps['pushback'] = ()
1283 1283 return caps
1284 1284
1285 1285 def bundle2caps(remote):
1286 1286 """return the bundle capabilities of a peer as dict"""
1287 1287 raw = remote.capable('bundle2')
1288 1288 if not raw and raw != '':
1289 1289 return {}
1290 1290 capsblob = urlreq.unquote(remote.capable('bundle2'))
1291 1291 return decodecaps(capsblob)
1292 1292
1293 1293 def obsmarkersversion(caps):
1294 1294 """extract the list of supported obsmarkers versions from a bundle2caps dict
1295 1295 """
1296 1296 obscaps = caps.get('obsmarkers', ())
1297 1297 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1298 1298
1299 1299 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1300 1300 compopts=None):
1301 1301 """Write a bundle file and return its filename.
1302 1302
1303 1303 Existing files will not be overwritten.
1304 1304 If no filename is specified, a temporary file is created.
1305 1305 bz2 compression can be turned off.
1306 1306 The bundle file will be deleted in case of errors.
1307 1307 """
1308 1308
1309 1309 if bundletype == "HG20":
1310 1310 bundle = bundle20(ui)
1311 1311 bundle.setcompression(compression, compopts)
1312 1312 part = bundle.newpart('changegroup', data=cg.getchunks())
1313 1313 part.addparam('version', cg.version)
1314 1314 if 'clcount' in cg.extras:
1315 1315 part.addparam('nbchanges', str(cg.extras['clcount']),
1316 1316 mandatory=False)
1317 1317 chunkiter = bundle.getchunks()
1318 1318 else:
1319 1319 # compression argument is only for the bundle2 case
1320 1320 assert compression is None
1321 1321 if cg.version != '01':
1322 1322 raise error.Abort(_('old bundle types only supports v1 '
1323 1323 'changegroups'))
1324 1324 header, comp = bundletypes[bundletype]
1325 1325 if comp not in util.compengines.supportedbundletypes:
1326 1326 raise error.Abort(_('unknown stream compression type: %s')
1327 1327 % comp)
1328 1328 compengine = util.compengines.forbundletype(comp)
1329 1329 def chunkiter():
1330 1330 yield header
1331 1331 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1332 1332 yield chunk
1333 1333 chunkiter = chunkiter()
1334 1334
1335 1335 # parse the changegroup data, otherwise we will block
1336 1336 # in case of sshrepo because we don't know the end of the stream
1337 1337 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1338 1338
1339 1339 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1340 1340 def handlechangegroup(op, inpart):
1341 1341 """apply a changegroup part on the repo
1342 1342
1343 1343 This is a very early implementation that will massive rework before being
1344 1344 inflicted to any end-user.
1345 1345 """
1346 1346 # Make sure we trigger a transaction creation
1347 1347 #
1348 1348 # The addchangegroup function will get a transaction object by itself, but
1349 1349 # we need to make sure we trigger the creation of a transaction object used
1350 1350 # for the whole processing scope.
1351 1351 op.gettransaction()
1352 1352 unpackerversion = inpart.params.get('version', '01')
1353 1353 # We should raise an appropriate exception here
1354 1354 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1355 1355 # the source and url passed here are overwritten by the one contained in
1356 1356 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1357 1357 nbchangesets = None
1358 1358 if 'nbchanges' in inpart.params:
1359 1359 nbchangesets = int(inpart.params.get('nbchanges'))
1360 1360 if ('treemanifest' in inpart.params and
1361 1361 'treemanifest' not in op.repo.requirements):
1362 1362 if len(op.repo.changelog) != 0:
1363 1363 raise error.Abort(_(
1364 1364 "bundle contains tree manifests, but local repo is "
1365 1365 "non-empty and does not use tree manifests"))
1366 1366 op.repo.requirements.add('treemanifest')
1367 1367 op.repo._applyopenerreqs()
1368 1368 op.repo._writerequirements()
1369 1369 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1370 1370 op.records.add('changegroup', {'return': ret})
1371 1371 if op.reply is not None:
1372 1372 # This is definitely not the final form of this
1373 1373 # return. But one need to start somewhere.
1374 1374 part = op.reply.newpart('reply:changegroup', mandatory=False)
1375 1375 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1376 1376 part.addparam('return', '%i' % ret, mandatory=False)
1377 1377 assert not inpart.read()
1378 1378
1379 1379 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1380 1380 ['digest:%s' % k for k in util.DIGESTS.keys()])
1381 1381 @parthandler('remote-changegroup', _remotechangegroupparams)
1382 1382 def handleremotechangegroup(op, inpart):
1383 1383 """apply a bundle10 on the repo, given an url and validation information
1384 1384
1385 1385 All the information about the remote bundle to import are given as
1386 1386 parameters. The parameters include:
1387 1387 - url: the url to the bundle10.
1388 1388 - size: the bundle10 file size. It is used to validate what was
1389 1389 retrieved by the client matches the server knowledge about the bundle.
1390 1390 - digests: a space separated list of the digest types provided as
1391 1391 parameters.
1392 1392 - digest:<digest-type>: the hexadecimal representation of the digest with
1393 1393 that name. Like the size, it is used to validate what was retrieved by
1394 1394 the client matches what the server knows about the bundle.
1395 1395
1396 1396 When multiple digest types are given, all of them are checked.
1397 1397 """
1398 1398 try:
1399 1399 raw_url = inpart.params['url']
1400 1400 except KeyError:
1401 1401 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1402 1402 parsed_url = util.url(raw_url)
1403 1403 if parsed_url.scheme not in capabilities['remote-changegroup']:
1404 1404 raise error.Abort(_('remote-changegroup does not support %s urls') %
1405 1405 parsed_url.scheme)
1406 1406
1407 1407 try:
1408 1408 size = int(inpart.params['size'])
1409 1409 except ValueError:
1410 1410 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1411 1411 % 'size')
1412 1412 except KeyError:
1413 1413 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1414 1414
1415 1415 digests = {}
1416 1416 for typ in inpart.params.get('digests', '').split():
1417 1417 param = 'digest:%s' % typ
1418 1418 try:
1419 1419 value = inpart.params[param]
1420 1420 except KeyError:
1421 1421 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1422 1422 param)
1423 1423 digests[typ] = value
1424 1424
1425 1425 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1426 1426
1427 1427 # Make sure we trigger a transaction creation
1428 1428 #
1429 1429 # The addchangegroup function will get a transaction object by itself, but
1430 1430 # we need to make sure we trigger the creation of a transaction object used
1431 1431 # for the whole processing scope.
1432 1432 op.gettransaction()
1433 1433 from . import exchange
1434 1434 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1435 1435 if not isinstance(cg, changegroup.cg1unpacker):
1436 1436 raise error.Abort(_('%s: not a bundle version 1.0') %
1437 1437 util.hidepassword(raw_url))
1438 1438 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1439 1439 op.records.add('changegroup', {'return': ret})
1440 1440 if op.reply is not None:
1441 1441 # This is definitely not the final form of this
1442 1442 # return. But one need to start somewhere.
1443 1443 part = op.reply.newpart('reply:changegroup')
1444 1444 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1445 1445 part.addparam('return', '%i' % ret, mandatory=False)
1446 1446 try:
1447 1447 real_part.validate()
1448 1448 except error.Abort as e:
1449 1449 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1450 1450 (util.hidepassword(raw_url), str(e)))
1451 1451 assert not inpart.read()
1452 1452
1453 1453 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1454 1454 def handlereplychangegroup(op, inpart):
1455 1455 ret = int(inpart.params['return'])
1456 1456 replyto = int(inpart.params['in-reply-to'])
1457 1457 op.records.add('changegroup', {'return': ret}, replyto)
1458 1458
1459 1459 @parthandler('check:heads')
1460 1460 def handlecheckheads(op, inpart):
1461 1461 """check that head of the repo did not change
1462 1462
1463 1463 This is used to detect a push race when using unbundle.
1464 1464 This replaces the "heads" argument of unbundle."""
1465 1465 h = inpart.read(20)
1466 1466 heads = []
1467 1467 while len(h) == 20:
1468 1468 heads.append(h)
1469 1469 h = inpart.read(20)
1470 1470 assert not h
1471 1471 # Trigger a transaction so that we are guaranteed to have the lock now.
1472 1472 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1473 1473 op.gettransaction()
1474 1474 if sorted(heads) != sorted(op.repo.heads()):
1475 1475 raise error.PushRaced('repository changed while pushing - '
1476 1476 'please try again')
1477 1477
1478 1478 @parthandler('output')
1479 1479 def handleoutput(op, inpart):
1480 1480 """forward output captured on the server to the client"""
1481 1481 for line in inpart.read().splitlines():
1482 1482 op.ui.status(_('remote: %s\n') % line)
1483 1483
1484 1484 @parthandler('replycaps')
1485 1485 def handlereplycaps(op, inpart):
1486 1486 """Notify that a reply bundle should be created
1487 1487
1488 1488 The payload contains the capabilities information for the reply"""
1489 1489 caps = decodecaps(inpart.read())
1490 1490 if op.reply is None:
1491 1491 op.reply = bundle20(op.ui, caps)
1492 1492
1493 1493 class AbortFromPart(error.Abort):
1494 1494 """Sub-class of Abort that denotes an error from a bundle2 part."""
1495 1495
1496 1496 @parthandler('error:abort', ('message', 'hint'))
1497 1497 def handleerrorabort(op, inpart):
1498 1498 """Used to transmit abort error over the wire"""
1499 1499 raise AbortFromPart(inpart.params['message'],
1500 1500 hint=inpart.params.get('hint'))
1501 1501
1502 1502 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1503 1503 'in-reply-to'))
1504 1504 def handleerrorpushkey(op, inpart):
1505 1505 """Used to transmit failure of a mandatory pushkey over the wire"""
1506 1506 kwargs = {}
1507 1507 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1508 1508 value = inpart.params.get(name)
1509 1509 if value is not None:
1510 1510 kwargs[name] = value
1511 1511 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1512 1512
1513 1513 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1514 1514 def handleerrorunsupportedcontent(op, inpart):
1515 1515 """Used to transmit unknown content error over the wire"""
1516 1516 kwargs = {}
1517 1517 parttype = inpart.params.get('parttype')
1518 1518 if parttype is not None:
1519 1519 kwargs['parttype'] = parttype
1520 1520 params = inpart.params.get('params')
1521 1521 if params is not None:
1522 1522 kwargs['params'] = params.split('\0')
1523 1523
1524 1524 raise error.BundleUnknownFeatureError(**kwargs)
1525 1525
1526 1526 @parthandler('error:pushraced', ('message',))
1527 1527 def handleerrorpushraced(op, inpart):
1528 1528 """Used to transmit push race error over the wire"""
1529 1529 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1530 1530
1531 1531 @parthandler('listkeys', ('namespace',))
1532 1532 def handlelistkeys(op, inpart):
1533 1533 """retrieve pushkey namespace content stored in a bundle2"""
1534 1534 namespace = inpart.params['namespace']
1535 1535 r = pushkey.decodekeys(inpart.read())
1536 1536 op.records.add('listkeys', (namespace, r))
1537 1537
1538 1538 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1539 1539 def handlepushkey(op, inpart):
1540 1540 """process a pushkey request"""
1541 1541 dec = pushkey.decode
1542 1542 namespace = dec(inpart.params['namespace'])
1543 1543 key = dec(inpart.params['key'])
1544 1544 old = dec(inpart.params['old'])
1545 1545 new = dec(inpart.params['new'])
1546 1546 # Grab the transaction to ensure that we have the lock before performing the
1547 1547 # pushkey.
1548 1548 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1549 1549 op.gettransaction()
1550 1550 ret = op.repo.pushkey(namespace, key, old, new)
1551 1551 record = {'namespace': namespace,
1552 1552 'key': key,
1553 1553 'old': old,
1554 1554 'new': new}
1555 1555 op.records.add('pushkey', record)
1556 1556 if op.reply is not None:
1557 1557 rpart = op.reply.newpart('reply:pushkey')
1558 1558 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1559 1559 rpart.addparam('return', '%i' % ret, mandatory=False)
1560 1560 if inpart.mandatory and not ret:
1561 1561 kwargs = {}
1562 1562 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1563 1563 if key in inpart.params:
1564 1564 kwargs[key] = inpart.params[key]
1565 1565 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1566 1566
1567 1567 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1568 1568 def handlepushkeyreply(op, inpart):
1569 1569 """retrieve the result of a pushkey request"""
1570 1570 ret = int(inpart.params['return'])
1571 1571 partid = int(inpart.params['in-reply-to'])
1572 1572 op.records.add('pushkey', {'return': ret}, partid)
1573 1573
1574 1574 @parthandler('obsmarkers')
1575 1575 def handleobsmarker(op, inpart):
1576 1576 """add a stream of obsmarkers to the repo"""
1577 1577 tr = op.gettransaction()
1578 1578 markerdata = inpart.read()
1579 1579 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1580 1580 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1581 1581 % len(markerdata))
1582 1582 # The mergemarkers call will crash if marker creation is not enabled.
1583 1583 # we want to avoid this if the part is advisory.
1584 1584 if not inpart.mandatory and op.repo.obsstore.readonly:
1585 1585 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1586 1586 return
1587 1587 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1588 1588 if new:
1589 1589 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1590 1590 op.records.add('obsmarkers', {'new': new})
1591 1591 if op.reply is not None:
1592 1592 rpart = op.reply.newpart('reply:obsmarkers')
1593 1593 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1594 1594 rpart.addparam('new', '%i' % new, mandatory=False)
1595 1595
1596 1596
1597 1597 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1598 1598 def handleobsmarkerreply(op, inpart):
1599 1599 """retrieve the result of a pushkey request"""
1600 1600 ret = int(inpart.params['new'])
1601 1601 partid = int(inpart.params['in-reply-to'])
1602 1602 op.records.add('obsmarkers', {'new': ret}, partid)
1603 1603
1604 1604 @parthandler('hgtagsfnodes')
1605 1605 def handlehgtagsfnodes(op, inpart):
1606 1606 """Applies .hgtags fnodes cache entries to the local repo.
1607 1607
1608 1608 Payload is pairs of 20 byte changeset nodes and filenodes.
1609 1609 """
1610 1610 # Grab the transaction so we ensure that we have the lock at this point.
1611 1611 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1612 1612 op.gettransaction()
1613 1613 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1614 1614
1615 1615 count = 0
1616 1616 while True:
1617 1617 node = inpart.read(20)
1618 1618 fnode = inpart.read(20)
1619 1619 if len(node) < 20 or len(fnode) < 20:
1620 1620 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1621 1621 break
1622 1622 cache.setfnode(node, fnode)
1623 1623 count += 1
1624 1624
1625 1625 cache.write()
1626 1626 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now