##// END OF EJS Templates
bundle2: clarify the docstring of unpackermixin methods...
Pierre-Yves David -
r31862:9bd9e9cb default
parent child Browse files
Show More
@@ -1,1633 +1,1645
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 pushkey,
162 162 pycompat,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 urlerr = util.urlerr
169 169 urlreq = util.urlreq
170 170
171 171 _pack = struct.pack
172 172 _unpack = struct.unpack
173 173
174 174 _fstreamparamsize = '>i'
175 175 _fpartheadersize = '>i'
176 176 _fparttypesize = '>B'
177 177 _fpartid = '>I'
178 178 _fpayloadsize = '>i'
179 179 _fpartparamcount = '>BB'
180 180
181 181 preferedchunksize = 4096
182 182
183 183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184 184
185 185 def outdebug(ui, message):
186 186 """debug regarding output stream (bundling)"""
187 187 if ui.configbool('devel', 'bundle2.debug', False):
188 188 ui.debug('bundle2-output: %s\n' % message)
189 189
190 190 def indebug(ui, message):
191 191 """debug on input stream (unbundling)"""
192 192 if ui.configbool('devel', 'bundle2.debug', False):
193 193 ui.debug('bundle2-input: %s\n' % message)
194 194
195 195 def validateparttype(parttype):
196 196 """raise ValueError if a parttype contains invalid character"""
197 197 if _parttypeforbidden.search(parttype):
198 198 raise ValueError(parttype)
199 199
200 200 def _makefpartparamsizes(nbparams):
201 201 """return a struct format to read part parameter sizes
202 202
203 203 The number parameters is variable so we need to build that format
204 204 dynamically.
205 205 """
206 206 return '>'+('BB'*nbparams)
207 207
208 208 parthandlermapping = {}
209 209
210 210 def parthandler(parttype, params=()):
211 211 """decorator that register a function as a bundle2 part handler
212 212
213 213 eg::
214 214
215 215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 216 def myparttypehandler(...):
217 217 '''process a part of type "my part".'''
218 218 ...
219 219 """
220 220 validateparttype(parttype)
221 221 def _decorator(func):
222 222 lparttype = parttype.lower() # enforce lower case matching.
223 223 assert lparttype not in parthandlermapping
224 224 parthandlermapping[lparttype] = func
225 225 func.params = frozenset(params)
226 226 return func
227 227 return _decorator
228 228
229 229 class unbundlerecords(object):
230 230 """keep record of what happens during and unbundle
231 231
232 232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 233 category of record and obj is an arbitrary object.
234 234
235 235 `records['cat']` will return all entries of this category 'cat'.
236 236
237 237 Iterating on the object itself will yield `('category', obj)` tuples
238 238 for all entries.
239 239
240 240 All iterations happens in chronological order.
241 241 """
242 242
243 243 def __init__(self):
244 244 self._categories = {}
245 245 self._sequences = []
246 246 self._replies = {}
247 247
248 248 def add(self, category, entry, inreplyto=None):
249 249 """add a new record of a given category.
250 250
251 251 The entry can then be retrieved in the list returned by
252 252 self['category']."""
253 253 self._categories.setdefault(category, []).append(entry)
254 254 self._sequences.append((category, entry))
255 255 if inreplyto is not None:
256 256 self.getreplies(inreplyto).add(category, entry)
257 257
258 258 def getreplies(self, partid):
259 259 """get the records that are replies to a specific part"""
260 260 return self._replies.setdefault(partid, unbundlerecords())
261 261
262 262 def __getitem__(self, cat):
263 263 return tuple(self._categories.get(cat, ()))
264 264
265 265 def __iter__(self):
266 266 return iter(self._sequences)
267 267
268 268 def __len__(self):
269 269 return len(self._sequences)
270 270
271 271 def __nonzero__(self):
272 272 return bool(self._sequences)
273 273
274 274 __bool__ = __nonzero__
275 275
276 276 class bundleoperation(object):
277 277 """an object that represents a single bundling process
278 278
279 279 Its purpose is to carry unbundle-related objects and states.
280 280
281 281 A new object should be created at the beginning of each bundle processing.
282 282 The object is to be returned by the processing function.
283 283
284 284 The object has very little content now it will ultimately contain:
285 285 * an access to the repo the bundle is applied to,
286 286 * a ui object,
287 287 * a way to retrieve a transaction to add changes to the repo,
288 288 * a way to record the result of processing each part,
289 289 * a way to construct a bundle response when applicable.
290 290 """
291 291
292 292 def __init__(self, repo, transactiongetter, captureoutput=True):
293 293 self.repo = repo
294 294 self.ui = repo.ui
295 295 self.records = unbundlerecords()
296 296 self.gettransaction = transactiongetter
297 297 self.reply = None
298 298 self.captureoutput = captureoutput
299 299
300 300 class TransactionUnavailable(RuntimeError):
301 301 pass
302 302
303 303 def _notransaction():
304 304 """default method to get a transaction while processing a bundle
305 305
306 306 Raise an exception to highlight the fact that no transaction was expected
307 307 to be created"""
308 308 raise TransactionUnavailable()
309 309
310 310 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
311 311 # transform me into unbundler.apply() as soon as the freeze is lifted
312 312 tr.hookargs['bundle2'] = '1'
313 313 if source is not None and 'source' not in tr.hookargs:
314 314 tr.hookargs['source'] = source
315 315 if url is not None and 'url' not in tr.hookargs:
316 316 tr.hookargs['url'] = url
317 317 return processbundle(repo, unbundler, lambda: tr, op=op)
318 318
319 319 def processbundle(repo, unbundler, transactiongetter=None, op=None):
320 320 """This function process a bundle, apply effect to/from a repo
321 321
322 322 It iterates over each part then searches for and uses the proper handling
323 323 code to process the part. Parts are processed in order.
324 324
325 325 Unknown Mandatory part will abort the process.
326 326
327 327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 328 function. This is used to ensure output is properly propagated in case of
329 329 an error during the unbundling. This output capturing part will likely be
330 330 reworked and this ability will probably go away in the process.
331 331 """
332 332 if op is None:
333 333 if transactiongetter is None:
334 334 transactiongetter = _notransaction
335 335 op = bundleoperation(repo, transactiongetter)
336 336 # todo:
337 337 # - replace this is a init function soon.
338 338 # - exception catching
339 339 unbundler.params
340 340 if repo.ui.debugflag:
341 341 msg = ['bundle2-input-bundle:']
342 342 if unbundler.params:
343 343 msg.append(' %i params')
344 344 if op.gettransaction is None:
345 345 msg.append(' no-transaction')
346 346 else:
347 347 msg.append(' with-transaction')
348 348 msg.append('\n')
349 349 repo.ui.debug(''.join(msg))
350 350 iterparts = enumerate(unbundler.iterparts())
351 351 part = None
352 352 nbpart = 0
353 353 try:
354 354 for nbpart, part in iterparts:
355 355 _processpart(op, part)
356 356 except Exception as exc:
357 357 for nbpart, part in iterparts:
358 358 # consume the bundle content
359 359 part.seek(0, 2)
360 360 # Small hack to let caller code distinguish exceptions from bundle2
361 361 # processing from processing the old format. This is mostly
362 362 # needed to handle different return codes to unbundle according to the
363 363 # type of bundle. We should probably clean up or drop this return code
364 364 # craziness in a future version.
365 365 exc.duringunbundle2 = True
366 366 salvaged = []
367 367 replycaps = None
368 368 if op.reply is not None:
369 369 salvaged = op.reply.salvageoutput()
370 370 replycaps = op.reply.capabilities
371 371 exc._replycaps = replycaps
372 372 exc._bundle2salvagedoutput = salvaged
373 373 raise
374 374 finally:
375 375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376 376
377 377 return op
378 378
379 379 def _processpart(op, part):
380 380 """process a single part from a bundle
381 381
382 382 The part is guaranteed to have been fully consumed when the function exits
383 383 (even if an exception is raised)."""
384 384 status = 'unknown' # used by debug output
385 385 hardabort = False
386 386 try:
387 387 try:
388 388 handler = parthandlermapping.get(part.type)
389 389 if handler is None:
390 390 status = 'unsupported-type'
391 391 raise error.BundleUnknownFeatureError(parttype=part.type)
392 392 indebug(op.ui, 'found a handler for part %r' % part.type)
393 393 unknownparams = part.mandatorykeys - handler.params
394 394 if unknownparams:
395 395 unknownparams = list(unknownparams)
396 396 unknownparams.sort()
397 397 status = 'unsupported-params (%s)' % unknownparams
398 398 raise error.BundleUnknownFeatureError(parttype=part.type,
399 399 params=unknownparams)
400 400 status = 'supported'
401 401 except error.BundleUnknownFeatureError as exc:
402 402 if part.mandatory: # mandatory parts
403 403 raise
404 404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
405 405 return # skip to part processing
406 406 finally:
407 407 if op.ui.debugflag:
408 408 msg = ['bundle2-input-part: "%s"' % part.type]
409 409 if not part.mandatory:
410 410 msg.append(' (advisory)')
411 411 nbmp = len(part.mandatorykeys)
412 412 nbap = len(part.params) - nbmp
413 413 if nbmp or nbap:
414 414 msg.append(' (params:')
415 415 if nbmp:
416 416 msg.append(' %i mandatory' % nbmp)
417 417 if nbap:
418 418 msg.append(' %i advisory' % nbmp)
419 419 msg.append(')')
420 420 msg.append(' %s\n' % status)
421 421 op.ui.debug(''.join(msg))
422 422
423 423 # handler is called outside the above try block so that we don't
424 424 # risk catching KeyErrors from anything other than the
425 425 # parthandlermapping lookup (any KeyError raised by handler()
426 426 # itself represents a defect of a different variety).
427 427 output = None
428 428 if op.captureoutput and op.reply is not None:
429 429 op.ui.pushbuffer(error=True, subproc=True)
430 430 output = ''
431 431 try:
432 432 handler(op, part)
433 433 finally:
434 434 if output is not None:
435 435 output = op.ui.popbuffer()
436 436 if output:
437 437 outpart = op.reply.newpart('output', data=output,
438 438 mandatory=False)
439 439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
440 440 # If exiting or interrupted, do not attempt to seek the stream in the
441 441 # finally block below. This makes abort faster.
442 442 except (SystemExit, KeyboardInterrupt):
443 443 hardabort = True
444 444 raise
445 445 finally:
446 446 # consume the part content to not corrupt the stream.
447 447 if not hardabort:
448 448 part.seek(0, 2)
449 449
450 450
451 451 def decodecaps(blob):
452 452 """decode a bundle2 caps bytes blob into a dictionary
453 453
454 454 The blob is a list of capabilities (one per line)
455 455 Capabilities may have values using a line of the form::
456 456
457 457 capability=value1,value2,value3
458 458
459 459 The values are always a list."""
460 460 caps = {}
461 461 for line in blob.splitlines():
462 462 if not line:
463 463 continue
464 464 if '=' not in line:
465 465 key, vals = line, ()
466 466 else:
467 467 key, vals = line.split('=', 1)
468 468 vals = vals.split(',')
469 469 key = urlreq.unquote(key)
470 470 vals = [urlreq.unquote(v) for v in vals]
471 471 caps[key] = vals
472 472 return caps
473 473
474 474 def encodecaps(caps):
475 475 """encode a bundle2 caps dictionary into a bytes blob"""
476 476 chunks = []
477 477 for ca in sorted(caps):
478 478 vals = caps[ca]
479 479 ca = urlreq.quote(ca)
480 480 vals = [urlreq.quote(v) for v in vals]
481 481 if vals:
482 482 ca = "%s=%s" % (ca, ','.join(vals))
483 483 chunks.append(ca)
484 484 return '\n'.join(chunks)
485 485
486 486 bundletypes = {
487 487 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
488 488 # since the unification ssh accepts a header but there
489 489 # is no capability signaling it.
490 490 "HG20": (), # special-cased below
491 491 "HG10UN": ("HG10UN", 'UN'),
492 492 "HG10BZ": ("HG10", 'BZ'),
493 493 "HG10GZ": ("HG10GZ", 'GZ'),
494 494 }
495 495
496 496 # hgweb uses this list to communicate its preferred type
497 497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
498 498
499 499 class bundle20(object):
500 500 """represent an outgoing bundle2 container
501 501
502 502 Use the `addparam` method to add stream level parameter. and `newpart` to
503 503 populate it. Then call `getchunks` to retrieve all the binary chunks of
504 504 data that compose the bundle2 container."""
505 505
506 506 _magicstring = 'HG20'
507 507
508 508 def __init__(self, ui, capabilities=()):
509 509 self.ui = ui
510 510 self._params = []
511 511 self._parts = []
512 512 self.capabilities = dict(capabilities)
513 513 self._compengine = util.compengines.forbundletype('UN')
514 514 self._compopts = None
515 515
516 516 def setcompression(self, alg, compopts=None):
517 517 """setup core part compression to <alg>"""
518 518 if alg in (None, 'UN'):
519 519 return
520 520 assert not any(n.lower() == 'compression' for n, v in self._params)
521 521 self.addparam('Compression', alg)
522 522 self._compengine = util.compengines.forbundletype(alg)
523 523 self._compopts = compopts
524 524
525 525 @property
526 526 def nbparts(self):
527 527 """total number of parts added to the bundler"""
528 528 return len(self._parts)
529 529
530 530 # methods used to defines the bundle2 content
531 531 def addparam(self, name, value=None):
532 532 """add a stream level parameter"""
533 533 if not name:
534 534 raise ValueError('empty parameter name')
535 535 if name[0] not in string.letters:
536 536 raise ValueError('non letter first character: %r' % name)
537 537 self._params.append((name, value))
538 538
539 539 def addpart(self, part):
540 540 """add a new part to the bundle2 container
541 541
542 542 Parts contains the actual applicative payload."""
543 543 assert part.id is None
544 544 part.id = len(self._parts) # very cheap counter
545 545 self._parts.append(part)
546 546
547 547 def newpart(self, typeid, *args, **kwargs):
548 548 """create a new part and add it to the containers
549 549
550 550 As the part is directly added to the containers. For now, this means
551 551 that any failure to properly initialize the part after calling
552 552 ``newpart`` should result in a failure of the whole bundling process.
553 553
554 554 You can still fall back to manually create and add if you need better
555 555 control."""
556 556 part = bundlepart(typeid, *args, **kwargs)
557 557 self.addpart(part)
558 558 return part
559 559
560 560 # methods used to generate the bundle2 stream
561 561 def getchunks(self):
562 562 if self.ui.debugflag:
563 563 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
564 564 if self._params:
565 565 msg.append(' (%i params)' % len(self._params))
566 566 msg.append(' %i parts total\n' % len(self._parts))
567 567 self.ui.debug(''.join(msg))
568 568 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
569 569 yield self._magicstring
570 570 param = self._paramchunk()
571 571 outdebug(self.ui, 'bundle parameter: %s' % param)
572 572 yield _pack(_fstreamparamsize, len(param))
573 573 if param:
574 574 yield param
575 575 for chunk in self._compengine.compressstream(self._getcorechunk(),
576 576 self._compopts):
577 577 yield chunk
578 578
579 579 def _paramchunk(self):
580 580 """return a encoded version of all stream parameters"""
581 581 blocks = []
582 582 for par, value in self._params:
583 583 par = urlreq.quote(par)
584 584 if value is not None:
585 585 value = urlreq.quote(value)
586 586 par = '%s=%s' % (par, value)
587 587 blocks.append(par)
588 588 return ' '.join(blocks)
589 589
590 590 def _getcorechunk(self):
591 591 """yield chunk for the core part of the bundle
592 592
593 593 (all but headers and parameters)"""
594 594 outdebug(self.ui, 'start of parts')
595 595 for part in self._parts:
596 596 outdebug(self.ui, 'bundle part: "%s"' % part.type)
597 597 for chunk in part.getchunks(ui=self.ui):
598 598 yield chunk
599 599 outdebug(self.ui, 'end of bundle')
600 600 yield _pack(_fpartheadersize, 0)
601 601
602 602
603 603 def salvageoutput(self):
604 604 """return a list with a copy of all output parts in the bundle
605 605
606 606 This is meant to be used during error handling to make sure we preserve
607 607 server output"""
608 608 salvaged = []
609 609 for part in self._parts:
610 610 if part.type.startswith('output'):
611 611 salvaged.append(part.copy())
612 612 return salvaged
613 613
614 614
615 615 class unpackermixin(object):
616 616 """A mixin to extract bytes and struct data from a stream"""
617 617
618 618 def __init__(self, fp):
619 619 self._fp = fp
620 620 self._seekable = (util.safehasattr(fp, 'seek') and
621 621 util.safehasattr(fp, 'tell'))
622 622
623 623 def _unpack(self, format):
624 """unpack this struct format from the stream"""
624 """unpack this struct format from the stream
625
626 This method is meant for internal usage by the bundle2 protocol only.
627 They directly manipulate the low level stream including bundle2 level
628 instruction.
629
630 Do not use it to implement higher-level logic or methods."""
625 631 data = self._readexact(struct.calcsize(format))
626 632 return _unpack(format, data)
627 633
628 634 def _readexact(self, size):
629 """read exactly <size> bytes from the stream"""
635 """read exactly <size> bytes from the stream
636
637 This method is meant for internal usage by the bundle2 protocol only.
638 They directly manipulate the low level stream including bundle2 level
639 instruction.
640
641 Do not use it to implement higher-level logic or methods."""
630 642 return changegroup.readexactly(self._fp, size)
631 643
632 644 def seek(self, offset, whence=0):
633 645 """move the underlying file pointer"""
634 646 if self._seekable:
635 647 return self._fp.seek(offset, whence)
636 648 else:
637 649 raise NotImplementedError(_('File pointer is not seekable'))
638 650
639 651 def tell(self):
640 652 """return the file offset, or None if file is not seekable"""
641 653 if self._seekable:
642 654 try:
643 655 return self._fp.tell()
644 656 except IOError as e:
645 657 if e.errno == errno.ESPIPE:
646 658 self._seekable = False
647 659 else:
648 660 raise
649 661 return None
650 662
651 663 def close(self):
652 664 """close underlying file"""
653 665 if util.safehasattr(self._fp, 'close'):
654 666 return self._fp.close()
655 667
656 668 def getunbundler(ui, fp, magicstring=None):
657 669 """return a valid unbundler object for a given magicstring"""
658 670 if magicstring is None:
659 671 magicstring = changegroup.readexactly(fp, 4)
660 672 magic, version = magicstring[0:2], magicstring[2:4]
661 673 if magic != 'HG':
662 674 raise error.Abort(_('not a Mercurial bundle'))
663 675 unbundlerclass = formatmap.get(version)
664 676 if unbundlerclass is None:
665 677 raise error.Abort(_('unknown bundle version %s') % version)
666 678 unbundler = unbundlerclass(ui, fp)
667 679 indebug(ui, 'start processing of %s stream' % magicstring)
668 680 return unbundler
669 681
670 682 class unbundle20(unpackermixin):
671 683 """interpret a bundle2 stream
672 684
673 685 This class is fed with a binary stream and yields parts through its
674 686 `iterparts` methods."""
675 687
676 688 _magicstring = 'HG20'
677 689
678 690 def __init__(self, ui, fp):
679 691 """If header is specified, we do not read it out of the stream."""
680 692 self.ui = ui
681 693 self._compengine = util.compengines.forbundletype('UN')
682 694 self._compressed = None
683 695 super(unbundle20, self).__init__(fp)
684 696
685 697 @util.propertycache
686 698 def params(self):
687 699 """dictionary of stream level parameters"""
688 700 indebug(self.ui, 'reading bundle2 stream parameters')
689 701 params = {}
690 702 paramssize = self._unpack(_fstreamparamsize)[0]
691 703 if paramssize < 0:
692 704 raise error.BundleValueError('negative bundle param size: %i'
693 705 % paramssize)
694 706 if paramssize:
695 707 params = self._readexact(paramssize)
696 708 params = self._processallparams(params)
697 709 return params
698 710
699 711 def _processallparams(self, paramsblock):
700 712 """"""
701 713 params = util.sortdict()
702 714 for p in paramsblock.split(' '):
703 715 p = p.split('=', 1)
704 716 p = [urlreq.unquote(i) for i in p]
705 717 if len(p) < 2:
706 718 p.append(None)
707 719 self._processparam(*p)
708 720 params[p[0]] = p[1]
709 721 return params
710 722
711 723
712 724 def _processparam(self, name, value):
713 725 """process a parameter, applying its effect if needed
714 726
715 727 Parameter starting with a lower case letter are advisory and will be
716 728 ignored when unknown. Those starting with an upper case letter are
717 729 mandatory and will this function will raise a KeyError when unknown.
718 730
719 731 Note: no option are currently supported. Any input will be either
720 732 ignored or failing.
721 733 """
722 734 if not name:
723 735 raise ValueError('empty parameter name')
724 736 if name[0] not in string.letters:
725 737 raise ValueError('non letter first character: %r' % name)
726 738 try:
727 739 handler = b2streamparamsmap[name.lower()]
728 740 except KeyError:
729 741 if name[0].islower():
730 742 indebug(self.ui, "ignoring unknown parameter %r" % name)
731 743 else:
732 744 raise error.BundleUnknownFeatureError(params=(name,))
733 745 else:
734 746 handler(self, name, value)
735 747
736 748 def _forwardchunks(self):
737 749 """utility to transfer a bundle2 as binary
738 750
739 751 This is made necessary by the fact the 'getbundle' command over 'ssh'
740 752 have no way to know then the reply end, relying on the bundle to be
741 753 interpreted to know its end. This is terrible and we are sorry, but we
742 754 needed to move forward to get general delta enabled.
743 755 """
744 756 yield self._magicstring
745 757 assert 'params' not in vars(self)
746 758 paramssize = self._unpack(_fstreamparamsize)[0]
747 759 if paramssize < 0:
748 760 raise error.BundleValueError('negative bundle param size: %i'
749 761 % paramssize)
750 762 yield _pack(_fstreamparamsize, paramssize)
751 763 if paramssize:
752 764 params = self._readexact(paramssize)
753 765 self._processallparams(params)
754 766 yield params
755 767 assert self._compengine.bundletype == 'UN'
756 768 # From there, payload might need to be decompressed
757 769 self._fp = self._compengine.decompressorreader(self._fp)
758 770 emptycount = 0
759 771 while emptycount < 2:
760 772 # so we can brainlessly loop
761 773 assert _fpartheadersize == _fpayloadsize
762 774 size = self._unpack(_fpartheadersize)[0]
763 775 yield _pack(_fpartheadersize, size)
764 776 if size:
765 777 emptycount = 0
766 778 else:
767 779 emptycount += 1
768 780 continue
769 781 if size == flaginterrupt:
770 782 continue
771 783 elif size < 0:
772 784 raise error.BundleValueError('negative chunk size: %i')
773 785 yield self._readexact(size)
774 786
775 787
776 788 def iterparts(self):
777 789 """yield all parts contained in the stream"""
778 790 # make sure param have been loaded
779 791 self.params
780 792 # From there, payload need to be decompressed
781 793 self._fp = self._compengine.decompressorreader(self._fp)
782 794 indebug(self.ui, 'start extraction of bundle2 parts')
783 795 headerblock = self._readpartheader()
784 796 while headerblock is not None:
785 797 part = unbundlepart(self.ui, headerblock, self._fp)
786 798 yield part
787 799 part.seek(0, 2)
788 800 headerblock = self._readpartheader()
789 801 indebug(self.ui, 'end of bundle2 stream')
790 802
791 803 def _readpartheader(self):
792 804 """reads a part header size and return the bytes blob
793 805
794 806 returns None if empty"""
795 807 headersize = self._unpack(_fpartheadersize)[0]
796 808 if headersize < 0:
797 809 raise error.BundleValueError('negative part header size: %i'
798 810 % headersize)
799 811 indebug(self.ui, 'part header size: %i' % headersize)
800 812 if headersize:
801 813 return self._readexact(headersize)
802 814 return None
803 815
804 816 def compressed(self):
805 817 self.params # load params
806 818 return self._compressed
807 819
808 820 formatmap = {'20': unbundle20}
809 821
810 822 b2streamparamsmap = {}
811 823
812 824 def b2streamparamhandler(name):
813 825 """register a handler for a stream level parameter"""
814 826 def decorator(func):
815 827 assert name not in formatmap
816 828 b2streamparamsmap[name] = func
817 829 return func
818 830 return decorator
819 831
820 832 @b2streamparamhandler('compression')
821 833 def processcompression(unbundler, param, value):
822 834 """read compression parameter and install payload decompression"""
823 835 if value not in util.compengines.supportedbundletypes:
824 836 raise error.BundleUnknownFeatureError(params=(param,),
825 837 values=(value,))
826 838 unbundler._compengine = util.compengines.forbundletype(value)
827 839 if value is not None:
828 840 unbundler._compressed = True
829 841
830 842 class bundlepart(object):
831 843 """A bundle2 part contains application level payload
832 844
833 845 The part `type` is used to route the part to the application level
834 846 handler.
835 847
836 848 The part payload is contained in ``part.data``. It could be raw bytes or a
837 849 generator of byte chunks.
838 850
839 851 You can add parameters to the part using the ``addparam`` method.
840 852 Parameters can be either mandatory (default) or advisory. Remote side
841 853 should be able to safely ignore the advisory ones.
842 854
843 855 Both data and parameters cannot be modified after the generation has begun.
844 856 """
845 857
846 858 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
847 859 data='', mandatory=True):
848 860 validateparttype(parttype)
849 861 self.id = None
850 862 self.type = parttype
851 863 self._data = data
852 864 self._mandatoryparams = list(mandatoryparams)
853 865 self._advisoryparams = list(advisoryparams)
854 866 # checking for duplicated entries
855 867 self._seenparams = set()
856 868 for pname, __ in self._mandatoryparams + self._advisoryparams:
857 869 if pname in self._seenparams:
858 870 raise error.ProgrammingError('duplicated params: %s' % pname)
859 871 self._seenparams.add(pname)
860 872 # status of the part's generation:
861 873 # - None: not started,
862 874 # - False: currently generated,
863 875 # - True: generation done.
864 876 self._generated = None
865 877 self.mandatory = mandatory
866 878
867 879 def __repr__(self):
868 880 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
869 881 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
870 882 % (cls, id(self), self.id, self.type, self.mandatory))
871 883
872 884 def copy(self):
873 885 """return a copy of the part
874 886
875 887 The new part have the very same content but no partid assigned yet.
876 888 Parts with generated data cannot be copied."""
877 889 assert not util.safehasattr(self.data, 'next')
878 890 return self.__class__(self.type, self._mandatoryparams,
879 891 self._advisoryparams, self._data, self.mandatory)
880 892
881 893 # methods used to defines the part content
882 894 @property
883 895 def data(self):
884 896 return self._data
885 897
886 898 @data.setter
887 899 def data(self, data):
888 900 if self._generated is not None:
889 901 raise error.ReadOnlyPartError('part is being generated')
890 902 self._data = data
891 903
892 904 @property
893 905 def mandatoryparams(self):
894 906 # make it an immutable tuple to force people through ``addparam``
895 907 return tuple(self._mandatoryparams)
896 908
897 909 @property
898 910 def advisoryparams(self):
899 911 # make it an immutable tuple to force people through ``addparam``
900 912 return tuple(self._advisoryparams)
901 913
902 914 def addparam(self, name, value='', mandatory=True):
903 915 """add a parameter to the part
904 916
905 917 If 'mandatory' is set to True, the remote handler must claim support
906 918 for this parameter or the unbundling will be aborted.
907 919
908 920 The 'name' and 'value' cannot exceed 255 bytes each.
909 921 """
910 922 if self._generated is not None:
911 923 raise error.ReadOnlyPartError('part is being generated')
912 924 if name in self._seenparams:
913 925 raise ValueError('duplicated params: %s' % name)
914 926 self._seenparams.add(name)
915 927 params = self._advisoryparams
916 928 if mandatory:
917 929 params = self._mandatoryparams
918 930 params.append((name, value))
919 931
920 932 # methods used to generates the bundle2 stream
921 933 def getchunks(self, ui):
922 934 if self._generated is not None:
923 935 raise error.ProgrammingError('part can only be consumed once')
924 936 self._generated = False
925 937
926 938 if ui.debugflag:
927 939 msg = ['bundle2-output-part: "%s"' % self.type]
928 940 if not self.mandatory:
929 941 msg.append(' (advisory)')
930 942 nbmp = len(self.mandatoryparams)
931 943 nbap = len(self.advisoryparams)
932 944 if nbmp or nbap:
933 945 msg.append(' (params:')
934 946 if nbmp:
935 947 msg.append(' %i mandatory' % nbmp)
936 948 if nbap:
937 949 msg.append(' %i advisory' % nbmp)
938 950 msg.append(')')
939 951 if not self.data:
940 952 msg.append(' empty payload')
941 953 elif util.safehasattr(self.data, 'next'):
942 954 msg.append(' streamed payload')
943 955 else:
944 956 msg.append(' %i bytes payload' % len(self.data))
945 957 msg.append('\n')
946 958 ui.debug(''.join(msg))
947 959
948 960 #### header
949 961 if self.mandatory:
950 962 parttype = self.type.upper()
951 963 else:
952 964 parttype = self.type.lower()
953 965 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
954 966 ## parttype
955 967 header = [_pack(_fparttypesize, len(parttype)),
956 968 parttype, _pack(_fpartid, self.id),
957 969 ]
958 970 ## parameters
959 971 # count
960 972 manpar = self.mandatoryparams
961 973 advpar = self.advisoryparams
962 974 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
963 975 # size
964 976 parsizes = []
965 977 for key, value in manpar:
966 978 parsizes.append(len(key))
967 979 parsizes.append(len(value))
968 980 for key, value in advpar:
969 981 parsizes.append(len(key))
970 982 parsizes.append(len(value))
971 983 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
972 984 header.append(paramsizes)
973 985 # key, value
974 986 for key, value in manpar:
975 987 header.append(key)
976 988 header.append(value)
977 989 for key, value in advpar:
978 990 header.append(key)
979 991 header.append(value)
980 992 ## finalize header
981 993 headerchunk = ''.join(header)
982 994 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
983 995 yield _pack(_fpartheadersize, len(headerchunk))
984 996 yield headerchunk
985 997 ## payload
986 998 try:
987 999 for chunk in self._payloadchunks():
988 1000 outdebug(ui, 'payload chunk size: %i' % len(chunk))
989 1001 yield _pack(_fpayloadsize, len(chunk))
990 1002 yield chunk
991 1003 except GeneratorExit:
992 1004 # GeneratorExit means that nobody is listening for our
993 1005 # results anyway, so just bail quickly rather than trying
994 1006 # to produce an error part.
995 1007 ui.debug('bundle2-generatorexit\n')
996 1008 raise
997 1009 except BaseException as exc:
998 1010 # backup exception data for later
999 1011 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1000 1012 % exc)
1001 1013 exc_info = sys.exc_info()
1002 1014 msg = 'unexpected error: %s' % exc
1003 1015 interpart = bundlepart('error:abort', [('message', msg)],
1004 1016 mandatory=False)
1005 1017 interpart.id = 0
1006 1018 yield _pack(_fpayloadsize, -1)
1007 1019 for chunk in interpart.getchunks(ui=ui):
1008 1020 yield chunk
1009 1021 outdebug(ui, 'closing payload chunk')
1010 1022 # abort current part payload
1011 1023 yield _pack(_fpayloadsize, 0)
1012 1024 if pycompat.ispy3:
1013 1025 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1014 1026 else:
1015 1027 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1016 1028 # end of payload
1017 1029 outdebug(ui, 'closing payload chunk')
1018 1030 yield _pack(_fpayloadsize, 0)
1019 1031 self._generated = True
1020 1032
1021 1033 def _payloadchunks(self):
1022 1034 """yield chunks of a the part payload
1023 1035
1024 1036 Exists to handle the different methods to provide data to a part."""
1025 1037 # we only support fixed size data now.
1026 1038 # This will be improved in the future.
1027 1039 if util.safehasattr(self.data, 'next'):
1028 1040 buff = util.chunkbuffer(self.data)
1029 1041 chunk = buff.read(preferedchunksize)
1030 1042 while chunk:
1031 1043 yield chunk
1032 1044 chunk = buff.read(preferedchunksize)
1033 1045 elif len(self.data):
1034 1046 yield self.data
1035 1047
1036 1048
1037 1049 flaginterrupt = -1
1038 1050
1039 1051 class interrupthandler(unpackermixin):
1040 1052 """read one part and process it with restricted capability
1041 1053
1042 1054 This allows to transmit exception raised on the producer size during part
1043 1055 iteration while the consumer is reading a part.
1044 1056
1045 1057 Part processed in this manner only have access to a ui object,"""
1046 1058
1047 1059 def __init__(self, ui, fp):
1048 1060 super(interrupthandler, self).__init__(fp)
1049 1061 self.ui = ui
1050 1062
1051 1063 def _readpartheader(self):
1052 1064 """reads a part header size and return the bytes blob
1053 1065
1054 1066 returns None if empty"""
1055 1067 headersize = self._unpack(_fpartheadersize)[0]
1056 1068 if headersize < 0:
1057 1069 raise error.BundleValueError('negative part header size: %i'
1058 1070 % headersize)
1059 1071 indebug(self.ui, 'part header size: %i\n' % headersize)
1060 1072 if headersize:
1061 1073 return self._readexact(headersize)
1062 1074 return None
1063 1075
1064 1076 def __call__(self):
1065 1077
1066 1078 self.ui.debug('bundle2-input-stream-interrupt:'
1067 1079 ' opening out of band context\n')
1068 1080 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1069 1081 headerblock = self._readpartheader()
1070 1082 if headerblock is None:
1071 1083 indebug(self.ui, 'no part found during interruption.')
1072 1084 return
1073 1085 part = unbundlepart(self.ui, headerblock, self._fp)
1074 1086 op = interruptoperation(self.ui)
1075 1087 _processpart(op, part)
1076 1088 self.ui.debug('bundle2-input-stream-interrupt:'
1077 1089 ' closing out of band context\n')
1078 1090
1079 1091 class interruptoperation(object):
1080 1092 """A limited operation to be use by part handler during interruption
1081 1093
1082 1094 It only have access to an ui object.
1083 1095 """
1084 1096
1085 1097 def __init__(self, ui):
1086 1098 self.ui = ui
1087 1099 self.reply = None
1088 1100 self.captureoutput = False
1089 1101
1090 1102 @property
1091 1103 def repo(self):
1092 1104 raise error.ProgrammingError('no repo access from stream interruption')
1093 1105
1094 1106 def gettransaction(self):
1095 1107 raise TransactionUnavailable('no repo access from stream interruption')
1096 1108
1097 1109 class unbundlepart(unpackermixin):
1098 1110 """a bundle part read from a bundle"""
1099 1111
1100 1112 def __init__(self, ui, header, fp):
1101 1113 super(unbundlepart, self).__init__(fp)
1102 1114 self.ui = ui
1103 1115 # unbundle state attr
1104 1116 self._headerdata = header
1105 1117 self._headeroffset = 0
1106 1118 self._initialized = False
1107 1119 self.consumed = False
1108 1120 # part data
1109 1121 self.id = None
1110 1122 self.type = None
1111 1123 self.mandatoryparams = None
1112 1124 self.advisoryparams = None
1113 1125 self.params = None
1114 1126 self.mandatorykeys = ()
1115 1127 self._payloadstream = None
1116 1128 self._readheader()
1117 1129 self._mandatory = None
1118 1130 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1119 1131 self._pos = 0
1120 1132
1121 1133 def _fromheader(self, size):
1122 1134 """return the next <size> byte from the header"""
1123 1135 offset = self._headeroffset
1124 1136 data = self._headerdata[offset:(offset + size)]
1125 1137 self._headeroffset = offset + size
1126 1138 return data
1127 1139
1128 1140 def _unpackheader(self, format):
1129 1141 """read given format from header
1130 1142
1131 1143 This automatically compute the size of the format to read."""
1132 1144 data = self._fromheader(struct.calcsize(format))
1133 1145 return _unpack(format, data)
1134 1146
1135 1147 def _initparams(self, mandatoryparams, advisoryparams):
1136 1148 """internal function to setup all logic related parameters"""
1137 1149 # make it read only to prevent people touching it by mistake.
1138 1150 self.mandatoryparams = tuple(mandatoryparams)
1139 1151 self.advisoryparams = tuple(advisoryparams)
1140 1152 # user friendly UI
1141 1153 self.params = util.sortdict(self.mandatoryparams)
1142 1154 self.params.update(self.advisoryparams)
1143 1155 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1144 1156
1145 1157 def _payloadchunks(self, chunknum=0):
1146 1158 '''seek to specified chunk and start yielding data'''
1147 1159 if len(self._chunkindex) == 0:
1148 1160 assert chunknum == 0, 'Must start with chunk 0'
1149 1161 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1150 1162 else:
1151 1163 assert chunknum < len(self._chunkindex), \
1152 1164 'Unknown chunk %d' % chunknum
1153 1165 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1154 1166
1155 1167 pos = self._chunkindex[chunknum][0]
1156 1168 payloadsize = self._unpack(_fpayloadsize)[0]
1157 1169 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1158 1170 while payloadsize:
1159 1171 if payloadsize == flaginterrupt:
1160 1172 # interruption detection, the handler will now read a
1161 1173 # single part and process it.
1162 1174 interrupthandler(self.ui, self._fp)()
1163 1175 elif payloadsize < 0:
1164 1176 msg = 'negative payload chunk size: %i' % payloadsize
1165 1177 raise error.BundleValueError(msg)
1166 1178 else:
1167 1179 result = self._readexact(payloadsize)
1168 1180 chunknum += 1
1169 1181 pos += payloadsize
1170 1182 if chunknum == len(self._chunkindex):
1171 1183 self._chunkindex.append((pos,
1172 1184 super(unbundlepart, self).tell()))
1173 1185 yield result
1174 1186 payloadsize = self._unpack(_fpayloadsize)[0]
1175 1187 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1176 1188
1177 1189 def _findchunk(self, pos):
1178 1190 '''for a given payload position, return a chunk number and offset'''
1179 1191 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1180 1192 if ppos == pos:
1181 1193 return chunk, 0
1182 1194 elif ppos > pos:
1183 1195 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1184 1196 raise ValueError('Unknown chunk')
1185 1197
1186 1198 def _readheader(self):
1187 1199 """read the header and setup the object"""
1188 1200 typesize = self._unpackheader(_fparttypesize)[0]
1189 1201 self.type = self._fromheader(typesize)
1190 1202 indebug(self.ui, 'part type: "%s"' % self.type)
1191 1203 self.id = self._unpackheader(_fpartid)[0]
1192 1204 indebug(self.ui, 'part id: "%s"' % self.id)
1193 1205 # extract mandatory bit from type
1194 1206 self.mandatory = (self.type != self.type.lower())
1195 1207 self.type = self.type.lower()
1196 1208 ## reading parameters
1197 1209 # param count
1198 1210 mancount, advcount = self._unpackheader(_fpartparamcount)
1199 1211 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1200 1212 # param size
1201 1213 fparamsizes = _makefpartparamsizes(mancount + advcount)
1202 1214 paramsizes = self._unpackheader(fparamsizes)
1203 1215 # make it a list of couple again
1204 1216 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1205 1217 # split mandatory from advisory
1206 1218 mansizes = paramsizes[:mancount]
1207 1219 advsizes = paramsizes[mancount:]
1208 1220 # retrieve param value
1209 1221 manparams = []
1210 1222 for key, value in mansizes:
1211 1223 manparams.append((self._fromheader(key), self._fromheader(value)))
1212 1224 advparams = []
1213 1225 for key, value in advsizes:
1214 1226 advparams.append((self._fromheader(key), self._fromheader(value)))
1215 1227 self._initparams(manparams, advparams)
1216 1228 ## part payload
1217 1229 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1218 1230 # we read the data, tell it
1219 1231 self._initialized = True
1220 1232
1221 1233 def read(self, size=None):
1222 1234 """read payload data"""
1223 1235 if not self._initialized:
1224 1236 self._readheader()
1225 1237 if size is None:
1226 1238 data = self._payloadstream.read()
1227 1239 else:
1228 1240 data = self._payloadstream.read(size)
1229 1241 self._pos += len(data)
1230 1242 if size is None or len(data) < size:
1231 1243 if not self.consumed and self._pos:
1232 1244 self.ui.debug('bundle2-input-part: total payload size %i\n'
1233 1245 % self._pos)
1234 1246 self.consumed = True
1235 1247 return data
1236 1248
1237 1249 def tell(self):
1238 1250 return self._pos
1239 1251
1240 1252 def seek(self, offset, whence=0):
1241 1253 if whence == 0:
1242 1254 newpos = offset
1243 1255 elif whence == 1:
1244 1256 newpos = self._pos + offset
1245 1257 elif whence == 2:
1246 1258 if not self.consumed:
1247 1259 self.read()
1248 1260 newpos = self._chunkindex[-1][0] - offset
1249 1261 else:
1250 1262 raise ValueError('Unknown whence value: %r' % (whence,))
1251 1263
1252 1264 if newpos > self._chunkindex[-1][0] and not self.consumed:
1253 1265 self.read()
1254 1266 if not 0 <= newpos <= self._chunkindex[-1][0]:
1255 1267 raise ValueError('Offset out of range')
1256 1268
1257 1269 if self._pos != newpos:
1258 1270 chunk, internaloffset = self._findchunk(newpos)
1259 1271 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1260 1272 adjust = self.read(internaloffset)
1261 1273 if len(adjust) != internaloffset:
1262 1274 raise error.Abort(_('Seek failed\n'))
1263 1275 self._pos = newpos
1264 1276
1265 1277 # These are only the static capabilities.
1266 1278 # Check the 'getrepocaps' function for the rest.
1267 1279 capabilities = {'HG20': (),
1268 1280 'error': ('abort', 'unsupportedcontent', 'pushraced',
1269 1281 'pushkey'),
1270 1282 'listkeys': (),
1271 1283 'pushkey': (),
1272 1284 'digests': tuple(sorted(util.DIGESTS.keys())),
1273 1285 'remote-changegroup': ('http', 'https'),
1274 1286 'hgtagsfnodes': (),
1275 1287 }
1276 1288
1277 1289 def getrepocaps(repo, allowpushback=False):
1278 1290 """return the bundle2 capabilities for a given repo
1279 1291
1280 1292 Exists to allow extensions (like evolution) to mutate the capabilities.
1281 1293 """
1282 1294 caps = capabilities.copy()
1283 1295 caps['changegroup'] = tuple(sorted(
1284 1296 changegroup.supportedincomingversions(repo)))
1285 1297 if obsolete.isenabled(repo, obsolete.exchangeopt):
1286 1298 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1287 1299 caps['obsmarkers'] = supportedformat
1288 1300 if allowpushback:
1289 1301 caps['pushback'] = ()
1290 1302 return caps
1291 1303
1292 1304 def bundle2caps(remote):
1293 1305 """return the bundle capabilities of a peer as dict"""
1294 1306 raw = remote.capable('bundle2')
1295 1307 if not raw and raw != '':
1296 1308 return {}
1297 1309 capsblob = urlreq.unquote(remote.capable('bundle2'))
1298 1310 return decodecaps(capsblob)
1299 1311
1300 1312 def obsmarkersversion(caps):
1301 1313 """extract the list of supported obsmarkers versions from a bundle2caps dict
1302 1314 """
1303 1315 obscaps = caps.get('obsmarkers', ())
1304 1316 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1305 1317
1306 1318 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1307 1319 compopts=None):
1308 1320 """Write a bundle file and return its filename.
1309 1321
1310 1322 Existing files will not be overwritten.
1311 1323 If no filename is specified, a temporary file is created.
1312 1324 bz2 compression can be turned off.
1313 1325 The bundle file will be deleted in case of errors.
1314 1326 """
1315 1327
1316 1328 if bundletype == "HG20":
1317 1329 bundle = bundle20(ui)
1318 1330 bundle.setcompression(compression, compopts)
1319 1331 part = bundle.newpart('changegroup', data=cg.getchunks())
1320 1332 part.addparam('version', cg.version)
1321 1333 if 'clcount' in cg.extras:
1322 1334 part.addparam('nbchanges', str(cg.extras['clcount']),
1323 1335 mandatory=False)
1324 1336 chunkiter = bundle.getchunks()
1325 1337 else:
1326 1338 # compression argument is only for the bundle2 case
1327 1339 assert compression is None
1328 1340 if cg.version != '01':
1329 1341 raise error.Abort(_('old bundle types only supports v1 '
1330 1342 'changegroups'))
1331 1343 header, comp = bundletypes[bundletype]
1332 1344 if comp not in util.compengines.supportedbundletypes:
1333 1345 raise error.Abort(_('unknown stream compression type: %s')
1334 1346 % comp)
1335 1347 compengine = util.compengines.forbundletype(comp)
1336 1348 def chunkiter():
1337 1349 yield header
1338 1350 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1339 1351 yield chunk
1340 1352 chunkiter = chunkiter()
1341 1353
1342 1354 # parse the changegroup data, otherwise we will block
1343 1355 # in case of sshrepo because we don't know the end of the stream
1344 1356 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1345 1357
1346 1358 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1347 1359 def handlechangegroup(op, inpart):
1348 1360 """apply a changegroup part on the repo
1349 1361
1350 1362 This is a very early implementation that will massive rework before being
1351 1363 inflicted to any end-user.
1352 1364 """
1353 1365 # Make sure we trigger a transaction creation
1354 1366 #
1355 1367 # The addchangegroup function will get a transaction object by itself, but
1356 1368 # we need to make sure we trigger the creation of a transaction object used
1357 1369 # for the whole processing scope.
1358 1370 op.gettransaction()
1359 1371 unpackerversion = inpart.params.get('version', '01')
1360 1372 # We should raise an appropriate exception here
1361 1373 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1362 1374 # the source and url passed here are overwritten by the one contained in
1363 1375 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1364 1376 nbchangesets = None
1365 1377 if 'nbchanges' in inpart.params:
1366 1378 nbchangesets = int(inpart.params.get('nbchanges'))
1367 1379 if ('treemanifest' in inpart.params and
1368 1380 'treemanifest' not in op.repo.requirements):
1369 1381 if len(op.repo.changelog) != 0:
1370 1382 raise error.Abort(_(
1371 1383 "bundle contains tree manifests, but local repo is "
1372 1384 "non-empty and does not use tree manifests"))
1373 1385 op.repo.requirements.add('treemanifest')
1374 1386 op.repo._applyopenerreqs()
1375 1387 op.repo._writerequirements()
1376 1388 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1377 1389 op.records.add('changegroup', {'return': ret})
1378 1390 if op.reply is not None:
1379 1391 # This is definitely not the final form of this
1380 1392 # return. But one need to start somewhere.
1381 1393 part = op.reply.newpart('reply:changegroup', mandatory=False)
1382 1394 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1383 1395 part.addparam('return', '%i' % ret, mandatory=False)
1384 1396 assert not inpart.read()
1385 1397
1386 1398 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1387 1399 ['digest:%s' % k for k in util.DIGESTS.keys()])
1388 1400 @parthandler('remote-changegroup', _remotechangegroupparams)
1389 1401 def handleremotechangegroup(op, inpart):
1390 1402 """apply a bundle10 on the repo, given an url and validation information
1391 1403
1392 1404 All the information about the remote bundle to import are given as
1393 1405 parameters. The parameters include:
1394 1406 - url: the url to the bundle10.
1395 1407 - size: the bundle10 file size. It is used to validate what was
1396 1408 retrieved by the client matches the server knowledge about the bundle.
1397 1409 - digests: a space separated list of the digest types provided as
1398 1410 parameters.
1399 1411 - digest:<digest-type>: the hexadecimal representation of the digest with
1400 1412 that name. Like the size, it is used to validate what was retrieved by
1401 1413 the client matches what the server knows about the bundle.
1402 1414
1403 1415 When multiple digest types are given, all of them are checked.
1404 1416 """
1405 1417 try:
1406 1418 raw_url = inpart.params['url']
1407 1419 except KeyError:
1408 1420 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1409 1421 parsed_url = util.url(raw_url)
1410 1422 if parsed_url.scheme not in capabilities['remote-changegroup']:
1411 1423 raise error.Abort(_('remote-changegroup does not support %s urls') %
1412 1424 parsed_url.scheme)
1413 1425
1414 1426 try:
1415 1427 size = int(inpart.params['size'])
1416 1428 except ValueError:
1417 1429 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1418 1430 % 'size')
1419 1431 except KeyError:
1420 1432 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1421 1433
1422 1434 digests = {}
1423 1435 for typ in inpart.params.get('digests', '').split():
1424 1436 param = 'digest:%s' % typ
1425 1437 try:
1426 1438 value = inpart.params[param]
1427 1439 except KeyError:
1428 1440 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1429 1441 param)
1430 1442 digests[typ] = value
1431 1443
1432 1444 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1433 1445
1434 1446 # Make sure we trigger a transaction creation
1435 1447 #
1436 1448 # The addchangegroup function will get a transaction object by itself, but
1437 1449 # we need to make sure we trigger the creation of a transaction object used
1438 1450 # for the whole processing scope.
1439 1451 op.gettransaction()
1440 1452 from . import exchange
1441 1453 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1442 1454 if not isinstance(cg, changegroup.cg1unpacker):
1443 1455 raise error.Abort(_('%s: not a bundle version 1.0') %
1444 1456 util.hidepassword(raw_url))
1445 1457 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1446 1458 op.records.add('changegroup', {'return': ret})
1447 1459 if op.reply is not None:
1448 1460 # This is definitely not the final form of this
1449 1461 # return. But one need to start somewhere.
1450 1462 part = op.reply.newpart('reply:changegroup')
1451 1463 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1452 1464 part.addparam('return', '%i' % ret, mandatory=False)
1453 1465 try:
1454 1466 real_part.validate()
1455 1467 except error.Abort as e:
1456 1468 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1457 1469 (util.hidepassword(raw_url), str(e)))
1458 1470 assert not inpart.read()
1459 1471
1460 1472 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1461 1473 def handlereplychangegroup(op, inpart):
1462 1474 ret = int(inpart.params['return'])
1463 1475 replyto = int(inpart.params['in-reply-to'])
1464 1476 op.records.add('changegroup', {'return': ret}, replyto)
1465 1477
1466 1478 @parthandler('check:heads')
1467 1479 def handlecheckheads(op, inpart):
1468 1480 """check that head of the repo did not change
1469 1481
1470 1482 This is used to detect a push race when using unbundle.
1471 1483 This replaces the "heads" argument of unbundle."""
1472 1484 h = inpart.read(20)
1473 1485 heads = []
1474 1486 while len(h) == 20:
1475 1487 heads.append(h)
1476 1488 h = inpart.read(20)
1477 1489 assert not h
1478 1490 # Trigger a transaction so that we are guaranteed to have the lock now.
1479 1491 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1480 1492 op.gettransaction()
1481 1493 if sorted(heads) != sorted(op.repo.heads()):
1482 1494 raise error.PushRaced('repository changed while pushing - '
1483 1495 'please try again')
1484 1496
1485 1497 @parthandler('output')
1486 1498 def handleoutput(op, inpart):
1487 1499 """forward output captured on the server to the client"""
1488 1500 for line in inpart.read().splitlines():
1489 1501 op.ui.status(_('remote: %s\n') % line)
1490 1502
1491 1503 @parthandler('replycaps')
1492 1504 def handlereplycaps(op, inpart):
1493 1505 """Notify that a reply bundle should be created
1494 1506
1495 1507 The payload contains the capabilities information for the reply"""
1496 1508 caps = decodecaps(inpart.read())
1497 1509 if op.reply is None:
1498 1510 op.reply = bundle20(op.ui, caps)
1499 1511
1500 1512 class AbortFromPart(error.Abort):
1501 1513 """Sub-class of Abort that denotes an error from a bundle2 part."""
1502 1514
1503 1515 @parthandler('error:abort', ('message', 'hint'))
1504 1516 def handleerrorabort(op, inpart):
1505 1517 """Used to transmit abort error over the wire"""
1506 1518 raise AbortFromPart(inpart.params['message'],
1507 1519 hint=inpart.params.get('hint'))
1508 1520
1509 1521 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1510 1522 'in-reply-to'))
1511 1523 def handleerrorpushkey(op, inpart):
1512 1524 """Used to transmit failure of a mandatory pushkey over the wire"""
1513 1525 kwargs = {}
1514 1526 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1515 1527 value = inpart.params.get(name)
1516 1528 if value is not None:
1517 1529 kwargs[name] = value
1518 1530 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1519 1531
1520 1532 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1521 1533 def handleerrorunsupportedcontent(op, inpart):
1522 1534 """Used to transmit unknown content error over the wire"""
1523 1535 kwargs = {}
1524 1536 parttype = inpart.params.get('parttype')
1525 1537 if parttype is not None:
1526 1538 kwargs['parttype'] = parttype
1527 1539 params = inpart.params.get('params')
1528 1540 if params is not None:
1529 1541 kwargs['params'] = params.split('\0')
1530 1542
1531 1543 raise error.BundleUnknownFeatureError(**kwargs)
1532 1544
1533 1545 @parthandler('error:pushraced', ('message',))
1534 1546 def handleerrorpushraced(op, inpart):
1535 1547 """Used to transmit push race error over the wire"""
1536 1548 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1537 1549
1538 1550 @parthandler('listkeys', ('namespace',))
1539 1551 def handlelistkeys(op, inpart):
1540 1552 """retrieve pushkey namespace content stored in a bundle2"""
1541 1553 namespace = inpart.params['namespace']
1542 1554 r = pushkey.decodekeys(inpart.read())
1543 1555 op.records.add('listkeys', (namespace, r))
1544 1556
1545 1557 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1546 1558 def handlepushkey(op, inpart):
1547 1559 """process a pushkey request"""
1548 1560 dec = pushkey.decode
1549 1561 namespace = dec(inpart.params['namespace'])
1550 1562 key = dec(inpart.params['key'])
1551 1563 old = dec(inpart.params['old'])
1552 1564 new = dec(inpart.params['new'])
1553 1565 # Grab the transaction to ensure that we have the lock before performing the
1554 1566 # pushkey.
1555 1567 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1556 1568 op.gettransaction()
1557 1569 ret = op.repo.pushkey(namespace, key, old, new)
1558 1570 record = {'namespace': namespace,
1559 1571 'key': key,
1560 1572 'old': old,
1561 1573 'new': new}
1562 1574 op.records.add('pushkey', record)
1563 1575 if op.reply is not None:
1564 1576 rpart = op.reply.newpart('reply:pushkey')
1565 1577 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1566 1578 rpart.addparam('return', '%i' % ret, mandatory=False)
1567 1579 if inpart.mandatory and not ret:
1568 1580 kwargs = {}
1569 1581 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1570 1582 if key in inpart.params:
1571 1583 kwargs[key] = inpart.params[key]
1572 1584 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1573 1585
1574 1586 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1575 1587 def handlepushkeyreply(op, inpart):
1576 1588 """retrieve the result of a pushkey request"""
1577 1589 ret = int(inpart.params['return'])
1578 1590 partid = int(inpart.params['in-reply-to'])
1579 1591 op.records.add('pushkey', {'return': ret}, partid)
1580 1592
1581 1593 @parthandler('obsmarkers')
1582 1594 def handleobsmarker(op, inpart):
1583 1595 """add a stream of obsmarkers to the repo"""
1584 1596 tr = op.gettransaction()
1585 1597 markerdata = inpart.read()
1586 1598 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1587 1599 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1588 1600 % len(markerdata))
1589 1601 # The mergemarkers call will crash if marker creation is not enabled.
1590 1602 # we want to avoid this if the part is advisory.
1591 1603 if not inpart.mandatory and op.repo.obsstore.readonly:
1592 1604 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1593 1605 return
1594 1606 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1595 1607 if new:
1596 1608 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1597 1609 op.records.add('obsmarkers', {'new': new})
1598 1610 if op.reply is not None:
1599 1611 rpart = op.reply.newpart('reply:obsmarkers')
1600 1612 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1601 1613 rpart.addparam('new', '%i' % new, mandatory=False)
1602 1614
1603 1615
1604 1616 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1605 1617 def handleobsmarkerreply(op, inpart):
1606 1618 """retrieve the result of a pushkey request"""
1607 1619 ret = int(inpart.params['new'])
1608 1620 partid = int(inpart.params['in-reply-to'])
1609 1621 op.records.add('obsmarkers', {'new': ret}, partid)
1610 1622
1611 1623 @parthandler('hgtagsfnodes')
1612 1624 def handlehgtagsfnodes(op, inpart):
1613 1625 """Applies .hgtags fnodes cache entries to the local repo.
1614 1626
1615 1627 Payload is pairs of 20 byte changeset nodes and filenodes.
1616 1628 """
1617 1629 # Grab the transaction so we ensure that we have the lock at this point.
1618 1630 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1619 1631 op.gettransaction()
1620 1632 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1621 1633
1622 1634 count = 0
1623 1635 while True:
1624 1636 node = inpart.read(20)
1625 1637 fnode = inpart.read(20)
1626 1638 if len(node) < 20 or len(fnode) < 20:
1627 1639 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1628 1640 break
1629 1641 cache.setfnode(node, fnode)
1630 1642 count += 1
1631 1643
1632 1644 cache.write()
1633 1645 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now