##// END OF EJS Templates
bundle2: fail faster when interrupted...
Gregory Szorc -
r29847:9a9629b9 stable
parent child Browse files
Show More
@@ -1,1611 +1,1618
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 pushkey,
162 162 tags,
163 163 url,
164 164 util,
165 165 )
166 166
167 167 urlerr = util.urlerr
168 168 urlreq = util.urlreq
169 169
170 170 _pack = struct.pack
171 171 _unpack = struct.unpack
172 172
173 173 _fstreamparamsize = '>i'
174 174 _fpartheadersize = '>i'
175 175 _fparttypesize = '>B'
176 176 _fpartid = '>I'
177 177 _fpayloadsize = '>i'
178 178 _fpartparamcount = '>BB'
179 179
180 180 preferedchunksize = 4096
181 181
182 182 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
183 183
184 184 def outdebug(ui, message):
185 185 """debug regarding output stream (bundling)"""
186 186 if ui.configbool('devel', 'bundle2.debug', False):
187 187 ui.debug('bundle2-output: %s\n' % message)
188 188
189 189 def indebug(ui, message):
190 190 """debug on input stream (unbundling)"""
191 191 if ui.configbool('devel', 'bundle2.debug', False):
192 192 ui.debug('bundle2-input: %s\n' % message)
193 193
194 194 def validateparttype(parttype):
195 195 """raise ValueError if a parttype contains invalid character"""
196 196 if _parttypeforbidden.search(parttype):
197 197 raise ValueError(parttype)
198 198
199 199 def _makefpartparamsizes(nbparams):
200 200 """return a struct format to read part parameter sizes
201 201
202 202 The number parameters is variable so we need to build that format
203 203 dynamically.
204 204 """
205 205 return '>'+('BB'*nbparams)
206 206
207 207 parthandlermapping = {}
208 208
209 209 def parthandler(parttype, params=()):
210 210 """decorator that register a function as a bundle2 part handler
211 211
212 212 eg::
213 213
214 214 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
215 215 def myparttypehandler(...):
216 216 '''process a part of type "my part".'''
217 217 ...
218 218 """
219 219 validateparttype(parttype)
220 220 def _decorator(func):
221 221 lparttype = parttype.lower() # enforce lower case matching.
222 222 assert lparttype not in parthandlermapping
223 223 parthandlermapping[lparttype] = func
224 224 func.params = frozenset(params)
225 225 return func
226 226 return _decorator
227 227
228 228 class unbundlerecords(object):
229 229 """keep record of what happens during and unbundle
230 230
231 231 New records are added using `records.add('cat', obj)`. Where 'cat' is a
232 232 category of record and obj is an arbitrary object.
233 233
234 234 `records['cat']` will return all entries of this category 'cat'.
235 235
236 236 Iterating on the object itself will yield `('category', obj)` tuples
237 237 for all entries.
238 238
239 239 All iterations happens in chronological order.
240 240 """
241 241
242 242 def __init__(self):
243 243 self._categories = {}
244 244 self._sequences = []
245 245 self._replies = {}
246 246
247 247 def add(self, category, entry, inreplyto=None):
248 248 """add a new record of a given category.
249 249
250 250 The entry can then be retrieved in the list returned by
251 251 self['category']."""
252 252 self._categories.setdefault(category, []).append(entry)
253 253 self._sequences.append((category, entry))
254 254 if inreplyto is not None:
255 255 self.getreplies(inreplyto).add(category, entry)
256 256
257 257 def getreplies(self, partid):
258 258 """get the records that are replies to a specific part"""
259 259 return self._replies.setdefault(partid, unbundlerecords())
260 260
261 261 def __getitem__(self, cat):
262 262 return tuple(self._categories.get(cat, ()))
263 263
264 264 def __iter__(self):
265 265 return iter(self._sequences)
266 266
267 267 def __len__(self):
268 268 return len(self._sequences)
269 269
270 270 def __nonzero__(self):
271 271 return bool(self._sequences)
272 272
273 273 class bundleoperation(object):
274 274 """an object that represents a single bundling process
275 275
276 276 Its purpose is to carry unbundle-related objects and states.
277 277
278 278 A new object should be created at the beginning of each bundle processing.
279 279 The object is to be returned by the processing function.
280 280
281 281 The object has very little content now it will ultimately contain:
282 282 * an access to the repo the bundle is applied to,
283 283 * a ui object,
284 284 * a way to retrieve a transaction to add changes to the repo,
285 285 * a way to record the result of processing each part,
286 286 * a way to construct a bundle response when applicable.
287 287 """
288 288
289 289 def __init__(self, repo, transactiongetter, captureoutput=True):
290 290 self.repo = repo
291 291 self.ui = repo.ui
292 292 self.records = unbundlerecords()
293 293 self.gettransaction = transactiongetter
294 294 self.reply = None
295 295 self.captureoutput = captureoutput
296 296
297 297 class TransactionUnavailable(RuntimeError):
298 298 pass
299 299
300 300 def _notransaction():
301 301 """default method to get a transaction while processing a bundle
302 302
303 303 Raise an exception to highlight the fact that no transaction was expected
304 304 to be created"""
305 305 raise TransactionUnavailable()
306 306
307 307 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
308 308 # transform me into unbundler.apply() as soon as the freeze is lifted
309 309 tr.hookargs['bundle2'] = '1'
310 310 if source is not None and 'source' not in tr.hookargs:
311 311 tr.hookargs['source'] = source
312 312 if url is not None and 'url' not in tr.hookargs:
313 313 tr.hookargs['url'] = url
314 314 return processbundle(repo, unbundler, lambda: tr, op=op)
315 315
316 316 def processbundle(repo, unbundler, transactiongetter=None, op=None):
317 317 """This function process a bundle, apply effect to/from a repo
318 318
319 319 It iterates over each part then searches for and uses the proper handling
320 320 code to process the part. Parts are processed in order.
321 321
322 322 This is very early version of this function that will be strongly reworked
323 323 before final usage.
324 324
325 325 Unknown Mandatory part will abort the process.
326 326
327 327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 328 function. This is used to ensure output is properly propagated in case of
329 329 an error during the unbundling. This output capturing part will likely be
330 330 reworked and this ability will probably go away in the process.
331 331 """
332 332 if op is None:
333 333 if transactiongetter is None:
334 334 transactiongetter = _notransaction
335 335 op = bundleoperation(repo, transactiongetter)
336 336 # todo:
337 337 # - replace this is a init function soon.
338 338 # - exception catching
339 339 unbundler.params
340 340 if repo.ui.debugflag:
341 341 msg = ['bundle2-input-bundle:']
342 342 if unbundler.params:
343 343 msg.append(' %i params')
344 344 if op.gettransaction is None:
345 345 msg.append(' no-transaction')
346 346 else:
347 347 msg.append(' with-transaction')
348 348 msg.append('\n')
349 349 repo.ui.debug(''.join(msg))
350 350 iterparts = enumerate(unbundler.iterparts())
351 351 part = None
352 352 nbpart = 0
353 353 try:
354 354 for nbpart, part in iterparts:
355 355 _processpart(op, part)
356 except BaseException as exc:
356 except Exception as exc:
357 357 for nbpart, part in iterparts:
358 358 # consume the bundle content
359 359 part.seek(0, 2)
360 360 # Small hack to let caller code distinguish exceptions from bundle2
361 361 # processing from processing the old format. This is mostly
362 362 # needed to handle different return codes to unbundle according to the
363 363 # type of bundle. We should probably clean up or drop this return code
364 364 # craziness in a future version.
365 365 exc.duringunbundle2 = True
366 366 salvaged = []
367 367 replycaps = None
368 368 if op.reply is not None:
369 369 salvaged = op.reply.salvageoutput()
370 370 replycaps = op.reply.capabilities
371 371 exc._replycaps = replycaps
372 372 exc._bundle2salvagedoutput = salvaged
373 373 raise
374 374 finally:
375 375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376 376
377 377 return op
378 378
379 379 def _processpart(op, part):
380 380 """process a single part from a bundle
381 381
382 382 The part is guaranteed to have been fully consumed when the function exits
383 383 (even if an exception is raised)."""
384 384 status = 'unknown' # used by debug output
385 hardabort = False
385 386 try:
386 387 try:
387 388 handler = parthandlermapping.get(part.type)
388 389 if handler is None:
389 390 status = 'unsupported-type'
390 391 raise error.BundleUnknownFeatureError(parttype=part.type)
391 392 indebug(op.ui, 'found a handler for part %r' % part.type)
392 393 unknownparams = part.mandatorykeys - handler.params
393 394 if unknownparams:
394 395 unknownparams = list(unknownparams)
395 396 unknownparams.sort()
396 397 status = 'unsupported-params (%s)' % unknownparams
397 398 raise error.BundleUnknownFeatureError(parttype=part.type,
398 399 params=unknownparams)
399 400 status = 'supported'
400 401 except error.BundleUnknownFeatureError as exc:
401 402 if part.mandatory: # mandatory parts
402 403 raise
403 404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
404 405 return # skip to part processing
405 406 finally:
406 407 if op.ui.debugflag:
407 408 msg = ['bundle2-input-part: "%s"' % part.type]
408 409 if not part.mandatory:
409 410 msg.append(' (advisory)')
410 411 nbmp = len(part.mandatorykeys)
411 412 nbap = len(part.params) - nbmp
412 413 if nbmp or nbap:
413 414 msg.append(' (params:')
414 415 if nbmp:
415 416 msg.append(' %i mandatory' % nbmp)
416 417 if nbap:
417 418 msg.append(' %i advisory' % nbmp)
418 419 msg.append(')')
419 420 msg.append(' %s\n' % status)
420 421 op.ui.debug(''.join(msg))
421 422
422 423 # handler is called outside the above try block so that we don't
423 424 # risk catching KeyErrors from anything other than the
424 425 # parthandlermapping lookup (any KeyError raised by handler()
425 426 # itself represents a defect of a different variety).
426 427 output = None
427 428 if op.captureoutput and op.reply is not None:
428 429 op.ui.pushbuffer(error=True, subproc=True)
429 430 output = ''
430 431 try:
431 432 handler(op, part)
432 433 finally:
433 434 if output is not None:
434 435 output = op.ui.popbuffer()
435 436 if output:
436 437 outpart = op.reply.newpart('output', data=output,
437 438 mandatory=False)
438 439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
440 # If exiting or interrupted, do not attempt to seek the stream in the
441 # finally block below. This makes abort faster.
442 except (SystemExit, KeyboardInterrupt):
443 hardabort = True
444 raise
439 445 finally:
440 446 # consume the part content to not corrupt the stream.
441 part.seek(0, 2)
447 if not hardabort:
448 part.seek(0, 2)
442 449
443 450
444 451 def decodecaps(blob):
445 452 """decode a bundle2 caps bytes blob into a dictionary
446 453
447 454 The blob is a list of capabilities (one per line)
448 455 Capabilities may have values using a line of the form::
449 456
450 457 capability=value1,value2,value3
451 458
452 459 The values are always a list."""
453 460 caps = {}
454 461 for line in blob.splitlines():
455 462 if not line:
456 463 continue
457 464 if '=' not in line:
458 465 key, vals = line, ()
459 466 else:
460 467 key, vals = line.split('=', 1)
461 468 vals = vals.split(',')
462 469 key = urlreq.unquote(key)
463 470 vals = [urlreq.unquote(v) for v in vals]
464 471 caps[key] = vals
465 472 return caps
466 473
467 474 def encodecaps(caps):
468 475 """encode a bundle2 caps dictionary into a bytes blob"""
469 476 chunks = []
470 477 for ca in sorted(caps):
471 478 vals = caps[ca]
472 479 ca = urlreq.quote(ca)
473 480 vals = [urlreq.quote(v) for v in vals]
474 481 if vals:
475 482 ca = "%s=%s" % (ca, ','.join(vals))
476 483 chunks.append(ca)
477 484 return '\n'.join(chunks)
478 485
479 486 bundletypes = {
480 487 "": ("", None), # only when using unbundle on ssh and old http servers
481 488 # since the unification ssh accepts a header but there
482 489 # is no capability signaling it.
483 490 "HG20": (), # special-cased below
484 491 "HG10UN": ("HG10UN", None),
485 492 "HG10BZ": ("HG10", 'BZ'),
486 493 "HG10GZ": ("HG10GZ", 'GZ'),
487 494 }
488 495
489 496 # hgweb uses this list to communicate its preferred type
490 497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
491 498
492 499 class bundle20(object):
493 500 """represent an outgoing bundle2 container
494 501
495 502 Use the `addparam` method to add stream level parameter. and `newpart` to
496 503 populate it. Then call `getchunks` to retrieve all the binary chunks of
497 504 data that compose the bundle2 container."""
498 505
499 506 _magicstring = 'HG20'
500 507
501 508 def __init__(self, ui, capabilities=()):
502 509 self.ui = ui
503 510 self._params = []
504 511 self._parts = []
505 512 self.capabilities = dict(capabilities)
506 513 self._compressor = util.compressors[None]()
507 514
508 515 def setcompression(self, alg):
509 516 """setup core part compression to <alg>"""
510 517 if alg is None:
511 518 return
512 519 assert not any(n.lower() == 'Compression' for n, v in self._params)
513 520 self.addparam('Compression', alg)
514 521 self._compressor = util.compressors[alg]()
515 522
516 523 @property
517 524 def nbparts(self):
518 525 """total number of parts added to the bundler"""
519 526 return len(self._parts)
520 527
521 528 # methods used to defines the bundle2 content
522 529 def addparam(self, name, value=None):
523 530 """add a stream level parameter"""
524 531 if not name:
525 532 raise ValueError('empty parameter name')
526 533 if name[0] not in string.letters:
527 534 raise ValueError('non letter first character: %r' % name)
528 535 self._params.append((name, value))
529 536
530 537 def addpart(self, part):
531 538 """add a new part to the bundle2 container
532 539
533 540 Parts contains the actual applicative payload."""
534 541 assert part.id is None
535 542 part.id = len(self._parts) # very cheap counter
536 543 self._parts.append(part)
537 544
538 545 def newpart(self, typeid, *args, **kwargs):
539 546 """create a new part and add it to the containers
540 547
541 548 As the part is directly added to the containers. For now, this means
542 549 that any failure to properly initialize the part after calling
543 550 ``newpart`` should result in a failure of the whole bundling process.
544 551
545 552 You can still fall back to manually create and add if you need better
546 553 control."""
547 554 part = bundlepart(typeid, *args, **kwargs)
548 555 self.addpart(part)
549 556 return part
550 557
551 558 # methods used to generate the bundle2 stream
552 559 def getchunks(self):
553 560 if self.ui.debugflag:
554 561 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
555 562 if self._params:
556 563 msg.append(' (%i params)' % len(self._params))
557 564 msg.append(' %i parts total\n' % len(self._parts))
558 565 self.ui.debug(''.join(msg))
559 566 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
560 567 yield self._magicstring
561 568 param = self._paramchunk()
562 569 outdebug(self.ui, 'bundle parameter: %s' % param)
563 570 yield _pack(_fstreamparamsize, len(param))
564 571 if param:
565 572 yield param
566 573 # starting compression
567 574 for chunk in self._getcorechunk():
568 575 yield self._compressor.compress(chunk)
569 576 yield self._compressor.flush()
570 577
571 578 def _paramchunk(self):
572 579 """return a encoded version of all stream parameters"""
573 580 blocks = []
574 581 for par, value in self._params:
575 582 par = urlreq.quote(par)
576 583 if value is not None:
577 584 value = urlreq.quote(value)
578 585 par = '%s=%s' % (par, value)
579 586 blocks.append(par)
580 587 return ' '.join(blocks)
581 588
582 589 def _getcorechunk(self):
583 590 """yield chunk for the core part of the bundle
584 591
585 592 (all but headers and parameters)"""
586 593 outdebug(self.ui, 'start of parts')
587 594 for part in self._parts:
588 595 outdebug(self.ui, 'bundle part: "%s"' % part.type)
589 596 for chunk in part.getchunks(ui=self.ui):
590 597 yield chunk
591 598 outdebug(self.ui, 'end of bundle')
592 599 yield _pack(_fpartheadersize, 0)
593 600
594 601
595 602 def salvageoutput(self):
596 603 """return a list with a copy of all output parts in the bundle
597 604
598 605 This is meant to be used during error handling to make sure we preserve
599 606 server output"""
600 607 salvaged = []
601 608 for part in self._parts:
602 609 if part.type.startswith('output'):
603 610 salvaged.append(part.copy())
604 611 return salvaged
605 612
606 613
607 614 class unpackermixin(object):
608 615 """A mixin to extract bytes and struct data from a stream"""
609 616
610 617 def __init__(self, fp):
611 618 self._fp = fp
612 619 self._seekable = (util.safehasattr(fp, 'seek') and
613 620 util.safehasattr(fp, 'tell'))
614 621
615 622 def _unpack(self, format):
616 623 """unpack this struct format from the stream"""
617 624 data = self._readexact(struct.calcsize(format))
618 625 return _unpack(format, data)
619 626
620 627 def _readexact(self, size):
621 628 """read exactly <size> bytes from the stream"""
622 629 return changegroup.readexactly(self._fp, size)
623 630
624 631 def seek(self, offset, whence=0):
625 632 """move the underlying file pointer"""
626 633 if self._seekable:
627 634 return self._fp.seek(offset, whence)
628 635 else:
629 636 raise NotImplementedError(_('File pointer is not seekable'))
630 637
631 638 def tell(self):
632 639 """return the file offset, or None if file is not seekable"""
633 640 if self._seekable:
634 641 try:
635 642 return self._fp.tell()
636 643 except IOError as e:
637 644 if e.errno == errno.ESPIPE:
638 645 self._seekable = False
639 646 else:
640 647 raise
641 648 return None
642 649
643 650 def close(self):
644 651 """close underlying file"""
645 652 if util.safehasattr(self._fp, 'close'):
646 653 return self._fp.close()
647 654
648 655 def getunbundler(ui, fp, magicstring=None):
649 656 """return a valid unbundler object for a given magicstring"""
650 657 if magicstring is None:
651 658 magicstring = changegroup.readexactly(fp, 4)
652 659 magic, version = magicstring[0:2], magicstring[2:4]
653 660 if magic != 'HG':
654 661 raise error.Abort(_('not a Mercurial bundle'))
655 662 unbundlerclass = formatmap.get(version)
656 663 if unbundlerclass is None:
657 664 raise error.Abort(_('unknown bundle version %s') % version)
658 665 unbundler = unbundlerclass(ui, fp)
659 666 indebug(ui, 'start processing of %s stream' % magicstring)
660 667 return unbundler
661 668
662 669 class unbundle20(unpackermixin):
663 670 """interpret a bundle2 stream
664 671
665 672 This class is fed with a binary stream and yields parts through its
666 673 `iterparts` methods."""
667 674
668 675 _magicstring = 'HG20'
669 676
670 677 def __init__(self, ui, fp):
671 678 """If header is specified, we do not read it out of the stream."""
672 679 self.ui = ui
673 680 self._decompressor = util.decompressors[None]
674 681 self._compressed = None
675 682 super(unbundle20, self).__init__(fp)
676 683
677 684 @util.propertycache
678 685 def params(self):
679 686 """dictionary of stream level parameters"""
680 687 indebug(self.ui, 'reading bundle2 stream parameters')
681 688 params = {}
682 689 paramssize = self._unpack(_fstreamparamsize)[0]
683 690 if paramssize < 0:
684 691 raise error.BundleValueError('negative bundle param size: %i'
685 692 % paramssize)
686 693 if paramssize:
687 694 params = self._readexact(paramssize)
688 695 params = self._processallparams(params)
689 696 return params
690 697
691 698 def _processallparams(self, paramsblock):
692 699 """"""
693 700 params = util.sortdict()
694 701 for p in paramsblock.split(' '):
695 702 p = p.split('=', 1)
696 703 p = [urlreq.unquote(i) for i in p]
697 704 if len(p) < 2:
698 705 p.append(None)
699 706 self._processparam(*p)
700 707 params[p[0]] = p[1]
701 708 return params
702 709
703 710
704 711 def _processparam(self, name, value):
705 712 """process a parameter, applying its effect if needed
706 713
707 714 Parameter starting with a lower case letter are advisory and will be
708 715 ignored when unknown. Those starting with an upper case letter are
709 716 mandatory and will this function will raise a KeyError when unknown.
710 717
711 718 Note: no option are currently supported. Any input will be either
712 719 ignored or failing.
713 720 """
714 721 if not name:
715 722 raise ValueError('empty parameter name')
716 723 if name[0] not in string.letters:
717 724 raise ValueError('non letter first character: %r' % name)
718 725 try:
719 726 handler = b2streamparamsmap[name.lower()]
720 727 except KeyError:
721 728 if name[0].islower():
722 729 indebug(self.ui, "ignoring unknown parameter %r" % name)
723 730 else:
724 731 raise error.BundleUnknownFeatureError(params=(name,))
725 732 else:
726 733 handler(self, name, value)
727 734
728 735 def _forwardchunks(self):
729 736 """utility to transfer a bundle2 as binary
730 737
731 738 This is made necessary by the fact the 'getbundle' command over 'ssh'
732 739 have no way to know then the reply end, relying on the bundle to be
733 740 interpreted to know its end. This is terrible and we are sorry, but we
734 741 needed to move forward to get general delta enabled.
735 742 """
736 743 yield self._magicstring
737 744 assert 'params' not in vars(self)
738 745 paramssize = self._unpack(_fstreamparamsize)[0]
739 746 if paramssize < 0:
740 747 raise error.BundleValueError('negative bundle param size: %i'
741 748 % paramssize)
742 749 yield _pack(_fstreamparamsize, paramssize)
743 750 if paramssize:
744 751 params = self._readexact(paramssize)
745 752 self._processallparams(params)
746 753 yield params
747 754 assert self._decompressor is util.decompressors[None]
748 755 # From there, payload might need to be decompressed
749 756 self._fp = self._decompressor(self._fp)
750 757 emptycount = 0
751 758 while emptycount < 2:
752 759 # so we can brainlessly loop
753 760 assert _fpartheadersize == _fpayloadsize
754 761 size = self._unpack(_fpartheadersize)[0]
755 762 yield _pack(_fpartheadersize, size)
756 763 if size:
757 764 emptycount = 0
758 765 else:
759 766 emptycount += 1
760 767 continue
761 768 if size == flaginterrupt:
762 769 continue
763 770 elif size < 0:
764 771 raise error.BundleValueError('negative chunk size: %i')
765 772 yield self._readexact(size)
766 773
767 774
768 775 def iterparts(self):
769 776 """yield all parts contained in the stream"""
770 777 # make sure param have been loaded
771 778 self.params
772 779 # From there, payload need to be decompressed
773 780 self._fp = self._decompressor(self._fp)
774 781 indebug(self.ui, 'start extraction of bundle2 parts')
775 782 headerblock = self._readpartheader()
776 783 while headerblock is not None:
777 784 part = unbundlepart(self.ui, headerblock, self._fp)
778 785 yield part
779 786 part.seek(0, 2)
780 787 headerblock = self._readpartheader()
781 788 indebug(self.ui, 'end of bundle2 stream')
782 789
783 790 def _readpartheader(self):
784 791 """reads a part header size and return the bytes blob
785 792
786 793 returns None if empty"""
787 794 headersize = self._unpack(_fpartheadersize)[0]
788 795 if headersize < 0:
789 796 raise error.BundleValueError('negative part header size: %i'
790 797 % headersize)
791 798 indebug(self.ui, 'part header size: %i' % headersize)
792 799 if headersize:
793 800 return self._readexact(headersize)
794 801 return None
795 802
796 803 def compressed(self):
797 804 self.params # load params
798 805 return self._compressed
799 806
800 807 formatmap = {'20': unbundle20}
801 808
802 809 b2streamparamsmap = {}
803 810
804 811 def b2streamparamhandler(name):
805 812 """register a handler for a stream level parameter"""
806 813 def decorator(func):
807 814 assert name not in formatmap
808 815 b2streamparamsmap[name] = func
809 816 return func
810 817 return decorator
811 818
812 819 @b2streamparamhandler('compression')
813 820 def processcompression(unbundler, param, value):
814 821 """read compression parameter and install payload decompression"""
815 822 if value not in util.decompressors:
816 823 raise error.BundleUnknownFeatureError(params=(param,),
817 824 values=(value,))
818 825 unbundler._decompressor = util.decompressors[value]
819 826 if value is not None:
820 827 unbundler._compressed = True
821 828
822 829 class bundlepart(object):
823 830 """A bundle2 part contains application level payload
824 831
825 832 The part `type` is used to route the part to the application level
826 833 handler.
827 834
828 835 The part payload is contained in ``part.data``. It could be raw bytes or a
829 836 generator of byte chunks.
830 837
831 838 You can add parameters to the part using the ``addparam`` method.
832 839 Parameters can be either mandatory (default) or advisory. Remote side
833 840 should be able to safely ignore the advisory ones.
834 841
835 842 Both data and parameters cannot be modified after the generation has begun.
836 843 """
837 844
838 845 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
839 846 data='', mandatory=True):
840 847 validateparttype(parttype)
841 848 self.id = None
842 849 self.type = parttype
843 850 self._data = data
844 851 self._mandatoryparams = list(mandatoryparams)
845 852 self._advisoryparams = list(advisoryparams)
846 853 # checking for duplicated entries
847 854 self._seenparams = set()
848 855 for pname, __ in self._mandatoryparams + self._advisoryparams:
849 856 if pname in self._seenparams:
850 857 raise RuntimeError('duplicated params: %s' % pname)
851 858 self._seenparams.add(pname)
852 859 # status of the part's generation:
853 860 # - None: not started,
854 861 # - False: currently generated,
855 862 # - True: generation done.
856 863 self._generated = None
857 864 self.mandatory = mandatory
858 865
859 866 def copy(self):
860 867 """return a copy of the part
861 868
862 869 The new part have the very same content but no partid assigned yet.
863 870 Parts with generated data cannot be copied."""
864 871 assert not util.safehasattr(self.data, 'next')
865 872 return self.__class__(self.type, self._mandatoryparams,
866 873 self._advisoryparams, self._data, self.mandatory)
867 874
868 875 # methods used to defines the part content
869 876 @property
870 877 def data(self):
871 878 return self._data
872 879
873 880 @data.setter
874 881 def data(self, data):
875 882 if self._generated is not None:
876 883 raise error.ReadOnlyPartError('part is being generated')
877 884 self._data = data
878 885
879 886 @property
880 887 def mandatoryparams(self):
881 888 # make it an immutable tuple to force people through ``addparam``
882 889 return tuple(self._mandatoryparams)
883 890
884 891 @property
885 892 def advisoryparams(self):
886 893 # make it an immutable tuple to force people through ``addparam``
887 894 return tuple(self._advisoryparams)
888 895
889 896 def addparam(self, name, value='', mandatory=True):
890 897 if self._generated is not None:
891 898 raise error.ReadOnlyPartError('part is being generated')
892 899 if name in self._seenparams:
893 900 raise ValueError('duplicated params: %s' % name)
894 901 self._seenparams.add(name)
895 902 params = self._advisoryparams
896 903 if mandatory:
897 904 params = self._mandatoryparams
898 905 params.append((name, value))
899 906
900 907 # methods used to generates the bundle2 stream
901 908 def getchunks(self, ui):
902 909 if self._generated is not None:
903 910 raise RuntimeError('part can only be consumed once')
904 911 self._generated = False
905 912
906 913 if ui.debugflag:
907 914 msg = ['bundle2-output-part: "%s"' % self.type]
908 915 if not self.mandatory:
909 916 msg.append(' (advisory)')
910 917 nbmp = len(self.mandatoryparams)
911 918 nbap = len(self.advisoryparams)
912 919 if nbmp or nbap:
913 920 msg.append(' (params:')
914 921 if nbmp:
915 922 msg.append(' %i mandatory' % nbmp)
916 923 if nbap:
917 924 msg.append(' %i advisory' % nbmp)
918 925 msg.append(')')
919 926 if not self.data:
920 927 msg.append(' empty payload')
921 928 elif util.safehasattr(self.data, 'next'):
922 929 msg.append(' streamed payload')
923 930 else:
924 931 msg.append(' %i bytes payload' % len(self.data))
925 932 msg.append('\n')
926 933 ui.debug(''.join(msg))
927 934
928 935 #### header
929 936 if self.mandatory:
930 937 parttype = self.type.upper()
931 938 else:
932 939 parttype = self.type.lower()
933 940 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
934 941 ## parttype
935 942 header = [_pack(_fparttypesize, len(parttype)),
936 943 parttype, _pack(_fpartid, self.id),
937 944 ]
938 945 ## parameters
939 946 # count
940 947 manpar = self.mandatoryparams
941 948 advpar = self.advisoryparams
942 949 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
943 950 # size
944 951 parsizes = []
945 952 for key, value in manpar:
946 953 parsizes.append(len(key))
947 954 parsizes.append(len(value))
948 955 for key, value in advpar:
949 956 parsizes.append(len(key))
950 957 parsizes.append(len(value))
951 958 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
952 959 header.append(paramsizes)
953 960 # key, value
954 961 for key, value in manpar:
955 962 header.append(key)
956 963 header.append(value)
957 964 for key, value in advpar:
958 965 header.append(key)
959 966 header.append(value)
960 967 ## finalize header
961 968 headerchunk = ''.join(header)
962 969 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
963 970 yield _pack(_fpartheadersize, len(headerchunk))
964 971 yield headerchunk
965 972 ## payload
966 973 try:
967 974 for chunk in self._payloadchunks():
968 975 outdebug(ui, 'payload chunk size: %i' % len(chunk))
969 976 yield _pack(_fpayloadsize, len(chunk))
970 977 yield chunk
971 978 except GeneratorExit:
972 979 # GeneratorExit means that nobody is listening for our
973 980 # results anyway, so just bail quickly rather than trying
974 981 # to produce an error part.
975 982 ui.debug('bundle2-generatorexit\n')
976 983 raise
977 984 except BaseException as exc:
978 985 # backup exception data for later
979 986 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
980 987 % exc)
981 988 exc_info = sys.exc_info()
982 989 msg = 'unexpected error: %s' % exc
983 990 interpart = bundlepart('error:abort', [('message', msg)],
984 991 mandatory=False)
985 992 interpart.id = 0
986 993 yield _pack(_fpayloadsize, -1)
987 994 for chunk in interpart.getchunks(ui=ui):
988 995 yield chunk
989 996 outdebug(ui, 'closing payload chunk')
990 997 # abort current part payload
991 998 yield _pack(_fpayloadsize, 0)
992 999 raise exc_info[0], exc_info[1], exc_info[2]
993 1000 # end of payload
994 1001 outdebug(ui, 'closing payload chunk')
995 1002 yield _pack(_fpayloadsize, 0)
996 1003 self._generated = True
997 1004
998 1005 def _payloadchunks(self):
999 1006 """yield chunks of a the part payload
1000 1007
1001 1008 Exists to handle the different methods to provide data to a part."""
1002 1009 # we only support fixed size data now.
1003 1010 # This will be improved in the future.
1004 1011 if util.safehasattr(self.data, 'next'):
1005 1012 buff = util.chunkbuffer(self.data)
1006 1013 chunk = buff.read(preferedchunksize)
1007 1014 while chunk:
1008 1015 yield chunk
1009 1016 chunk = buff.read(preferedchunksize)
1010 1017 elif len(self.data):
1011 1018 yield self.data
1012 1019
1013 1020
1014 1021 flaginterrupt = -1
1015 1022
1016 1023 class interrupthandler(unpackermixin):
1017 1024 """read one part and process it with restricted capability
1018 1025
1019 1026 This allows to transmit exception raised on the producer size during part
1020 1027 iteration while the consumer is reading a part.
1021 1028
1022 1029 Part processed in this manner only have access to a ui object,"""
1023 1030
1024 1031 def __init__(self, ui, fp):
1025 1032 super(interrupthandler, self).__init__(fp)
1026 1033 self.ui = ui
1027 1034
1028 1035 def _readpartheader(self):
1029 1036 """reads a part header size and return the bytes blob
1030 1037
1031 1038 returns None if empty"""
1032 1039 headersize = self._unpack(_fpartheadersize)[0]
1033 1040 if headersize < 0:
1034 1041 raise error.BundleValueError('negative part header size: %i'
1035 1042 % headersize)
1036 1043 indebug(self.ui, 'part header size: %i\n' % headersize)
1037 1044 if headersize:
1038 1045 return self._readexact(headersize)
1039 1046 return None
1040 1047
1041 1048 def __call__(self):
1042 1049
1043 1050 self.ui.debug('bundle2-input-stream-interrupt:'
1044 1051 ' opening out of band context\n')
1045 1052 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1046 1053 headerblock = self._readpartheader()
1047 1054 if headerblock is None:
1048 1055 indebug(self.ui, 'no part found during interruption.')
1049 1056 return
1050 1057 part = unbundlepart(self.ui, headerblock, self._fp)
1051 1058 op = interruptoperation(self.ui)
1052 1059 _processpart(op, part)
1053 1060 self.ui.debug('bundle2-input-stream-interrupt:'
1054 1061 ' closing out of band context\n')
1055 1062
1056 1063 class interruptoperation(object):
1057 1064 """A limited operation to be use by part handler during interruption
1058 1065
1059 1066 It only have access to an ui object.
1060 1067 """
1061 1068
1062 1069 def __init__(self, ui):
1063 1070 self.ui = ui
1064 1071 self.reply = None
1065 1072 self.captureoutput = False
1066 1073
1067 1074 @property
1068 1075 def repo(self):
1069 1076 raise RuntimeError('no repo access from stream interruption')
1070 1077
1071 1078 def gettransaction(self):
1072 1079 raise TransactionUnavailable('no repo access from stream interruption')
1073 1080
1074 1081 class unbundlepart(unpackermixin):
1075 1082 """a bundle part read from a bundle"""
1076 1083
1077 1084 def __init__(self, ui, header, fp):
1078 1085 super(unbundlepart, self).__init__(fp)
1079 1086 self.ui = ui
1080 1087 # unbundle state attr
1081 1088 self._headerdata = header
1082 1089 self._headeroffset = 0
1083 1090 self._initialized = False
1084 1091 self.consumed = False
1085 1092 # part data
1086 1093 self.id = None
1087 1094 self.type = None
1088 1095 self.mandatoryparams = None
1089 1096 self.advisoryparams = None
1090 1097 self.params = None
1091 1098 self.mandatorykeys = ()
1092 1099 self._payloadstream = None
1093 1100 self._readheader()
1094 1101 self._mandatory = None
1095 1102 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1096 1103 self._pos = 0
1097 1104
1098 1105 def _fromheader(self, size):
1099 1106 """return the next <size> byte from the header"""
1100 1107 offset = self._headeroffset
1101 1108 data = self._headerdata[offset:(offset + size)]
1102 1109 self._headeroffset = offset + size
1103 1110 return data
1104 1111
1105 1112 def _unpackheader(self, format):
1106 1113 """read given format from header
1107 1114
1108 1115 This automatically compute the size of the format to read."""
1109 1116 data = self._fromheader(struct.calcsize(format))
1110 1117 return _unpack(format, data)
1111 1118
1112 1119 def _initparams(self, mandatoryparams, advisoryparams):
1113 1120 """internal function to setup all logic related parameters"""
1114 1121 # make it read only to prevent people touching it by mistake.
1115 1122 self.mandatoryparams = tuple(mandatoryparams)
1116 1123 self.advisoryparams = tuple(advisoryparams)
1117 1124 # user friendly UI
1118 1125 self.params = util.sortdict(self.mandatoryparams)
1119 1126 self.params.update(self.advisoryparams)
1120 1127 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1121 1128
1122 1129 def _payloadchunks(self, chunknum=0):
1123 1130 '''seek to specified chunk and start yielding data'''
1124 1131 if len(self._chunkindex) == 0:
1125 1132 assert chunknum == 0, 'Must start with chunk 0'
1126 1133 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1127 1134 else:
1128 1135 assert chunknum < len(self._chunkindex), \
1129 1136 'Unknown chunk %d' % chunknum
1130 1137 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1131 1138
1132 1139 pos = self._chunkindex[chunknum][0]
1133 1140 payloadsize = self._unpack(_fpayloadsize)[0]
1134 1141 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1135 1142 while payloadsize:
1136 1143 if payloadsize == flaginterrupt:
1137 1144 # interruption detection, the handler will now read a
1138 1145 # single part and process it.
1139 1146 interrupthandler(self.ui, self._fp)()
1140 1147 elif payloadsize < 0:
1141 1148 msg = 'negative payload chunk size: %i' % payloadsize
1142 1149 raise error.BundleValueError(msg)
1143 1150 else:
1144 1151 result = self._readexact(payloadsize)
1145 1152 chunknum += 1
1146 1153 pos += payloadsize
1147 1154 if chunknum == len(self._chunkindex):
1148 1155 self._chunkindex.append((pos,
1149 1156 super(unbundlepart, self).tell()))
1150 1157 yield result
1151 1158 payloadsize = self._unpack(_fpayloadsize)[0]
1152 1159 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1153 1160
1154 1161 def _findchunk(self, pos):
1155 1162 '''for a given payload position, return a chunk number and offset'''
1156 1163 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1157 1164 if ppos == pos:
1158 1165 return chunk, 0
1159 1166 elif ppos > pos:
1160 1167 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1161 1168 raise ValueError('Unknown chunk')
1162 1169
1163 1170 def _readheader(self):
1164 1171 """read the header and setup the object"""
1165 1172 typesize = self._unpackheader(_fparttypesize)[0]
1166 1173 self.type = self._fromheader(typesize)
1167 1174 indebug(self.ui, 'part type: "%s"' % self.type)
1168 1175 self.id = self._unpackheader(_fpartid)[0]
1169 1176 indebug(self.ui, 'part id: "%s"' % self.id)
1170 1177 # extract mandatory bit from type
1171 1178 self.mandatory = (self.type != self.type.lower())
1172 1179 self.type = self.type.lower()
1173 1180 ## reading parameters
1174 1181 # param count
1175 1182 mancount, advcount = self._unpackheader(_fpartparamcount)
1176 1183 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1177 1184 # param size
1178 1185 fparamsizes = _makefpartparamsizes(mancount + advcount)
1179 1186 paramsizes = self._unpackheader(fparamsizes)
1180 1187 # make it a list of couple again
1181 1188 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1182 1189 # split mandatory from advisory
1183 1190 mansizes = paramsizes[:mancount]
1184 1191 advsizes = paramsizes[mancount:]
1185 1192 # retrieve param value
1186 1193 manparams = []
1187 1194 for key, value in mansizes:
1188 1195 manparams.append((self._fromheader(key), self._fromheader(value)))
1189 1196 advparams = []
1190 1197 for key, value in advsizes:
1191 1198 advparams.append((self._fromheader(key), self._fromheader(value)))
1192 1199 self._initparams(manparams, advparams)
1193 1200 ## part payload
1194 1201 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1195 1202 # we read the data, tell it
1196 1203 self._initialized = True
1197 1204
1198 1205 def read(self, size=None):
1199 1206 """read payload data"""
1200 1207 if not self._initialized:
1201 1208 self._readheader()
1202 1209 if size is None:
1203 1210 data = self._payloadstream.read()
1204 1211 else:
1205 1212 data = self._payloadstream.read(size)
1206 1213 self._pos += len(data)
1207 1214 if size is None or len(data) < size:
1208 1215 if not self.consumed and self._pos:
1209 1216 self.ui.debug('bundle2-input-part: total payload size %i\n'
1210 1217 % self._pos)
1211 1218 self.consumed = True
1212 1219 return data
1213 1220
1214 1221 def tell(self):
1215 1222 return self._pos
1216 1223
1217 1224 def seek(self, offset, whence=0):
1218 1225 if whence == 0:
1219 1226 newpos = offset
1220 1227 elif whence == 1:
1221 1228 newpos = self._pos + offset
1222 1229 elif whence == 2:
1223 1230 if not self.consumed:
1224 1231 self.read()
1225 1232 newpos = self._chunkindex[-1][0] - offset
1226 1233 else:
1227 1234 raise ValueError('Unknown whence value: %r' % (whence,))
1228 1235
1229 1236 if newpos > self._chunkindex[-1][0] and not self.consumed:
1230 1237 self.read()
1231 1238 if not 0 <= newpos <= self._chunkindex[-1][0]:
1232 1239 raise ValueError('Offset out of range')
1233 1240
1234 1241 if self._pos != newpos:
1235 1242 chunk, internaloffset = self._findchunk(newpos)
1236 1243 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1237 1244 adjust = self.read(internaloffset)
1238 1245 if len(adjust) != internaloffset:
1239 1246 raise error.Abort(_('Seek failed\n'))
1240 1247 self._pos = newpos
1241 1248
1242 1249 # These are only the static capabilities.
1243 1250 # Check the 'getrepocaps' function for the rest.
1244 1251 capabilities = {'HG20': (),
1245 1252 'error': ('abort', 'unsupportedcontent', 'pushraced',
1246 1253 'pushkey'),
1247 1254 'listkeys': (),
1248 1255 'pushkey': (),
1249 1256 'digests': tuple(sorted(util.DIGESTS.keys())),
1250 1257 'remote-changegroup': ('http', 'https'),
1251 1258 'hgtagsfnodes': (),
1252 1259 }
1253 1260
1254 1261 def getrepocaps(repo, allowpushback=False):
1255 1262 """return the bundle2 capabilities for a given repo
1256 1263
1257 1264 Exists to allow extensions (like evolution) to mutate the capabilities.
1258 1265 """
1259 1266 caps = capabilities.copy()
1260 1267 caps['changegroup'] = tuple(sorted(
1261 1268 changegroup.supportedincomingversions(repo)))
1262 1269 if obsolete.isenabled(repo, obsolete.exchangeopt):
1263 1270 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1264 1271 caps['obsmarkers'] = supportedformat
1265 1272 if allowpushback:
1266 1273 caps['pushback'] = ()
1267 1274 return caps
1268 1275
1269 1276 def bundle2caps(remote):
1270 1277 """return the bundle capabilities of a peer as dict"""
1271 1278 raw = remote.capable('bundle2')
1272 1279 if not raw and raw != '':
1273 1280 return {}
1274 1281 capsblob = urlreq.unquote(remote.capable('bundle2'))
1275 1282 return decodecaps(capsblob)
1276 1283
1277 1284 def obsmarkersversion(caps):
1278 1285 """extract the list of supported obsmarkers versions from a bundle2caps dict
1279 1286 """
1280 1287 obscaps = caps.get('obsmarkers', ())
1281 1288 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1282 1289
1283 1290 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1284 1291 """Write a bundle file and return its filename.
1285 1292
1286 1293 Existing files will not be overwritten.
1287 1294 If no filename is specified, a temporary file is created.
1288 1295 bz2 compression can be turned off.
1289 1296 The bundle file will be deleted in case of errors.
1290 1297 """
1291 1298
1292 1299 if bundletype == "HG20":
1293 1300 bundle = bundle20(ui)
1294 1301 bundle.setcompression(compression)
1295 1302 part = bundle.newpart('changegroup', data=cg.getchunks())
1296 1303 part.addparam('version', cg.version)
1297 1304 if 'clcount' in cg.extras:
1298 1305 part.addparam('nbchanges', str(cg.extras['clcount']),
1299 1306 mandatory=False)
1300 1307 chunkiter = bundle.getchunks()
1301 1308 else:
1302 1309 # compression argument is only for the bundle2 case
1303 1310 assert compression is None
1304 1311 if cg.version != '01':
1305 1312 raise error.Abort(_('old bundle types only supports v1 '
1306 1313 'changegroups'))
1307 1314 header, comp = bundletypes[bundletype]
1308 1315 if comp not in util.compressors:
1309 1316 raise error.Abort(_('unknown stream compression type: %s')
1310 1317 % comp)
1311 1318 z = util.compressors[comp]()
1312 1319 subchunkiter = cg.getchunks()
1313 1320 def chunkiter():
1314 1321 yield header
1315 1322 for chunk in subchunkiter:
1316 1323 yield z.compress(chunk)
1317 1324 yield z.flush()
1318 1325 chunkiter = chunkiter()
1319 1326
1320 1327 # parse the changegroup data, otherwise we will block
1321 1328 # in case of sshrepo because we don't know the end of the stream
1322 1329 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1323 1330
1324 1331 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1325 1332 def handlechangegroup(op, inpart):
1326 1333 """apply a changegroup part on the repo
1327 1334
1328 1335 This is a very early implementation that will massive rework before being
1329 1336 inflicted to any end-user.
1330 1337 """
1331 1338 # Make sure we trigger a transaction creation
1332 1339 #
1333 1340 # The addchangegroup function will get a transaction object by itself, but
1334 1341 # we need to make sure we trigger the creation of a transaction object used
1335 1342 # for the whole processing scope.
1336 1343 op.gettransaction()
1337 1344 unpackerversion = inpart.params.get('version', '01')
1338 1345 # We should raise an appropriate exception here
1339 1346 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1340 1347 # the source and url passed here are overwritten by the one contained in
1341 1348 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1342 1349 nbchangesets = None
1343 1350 if 'nbchanges' in inpart.params:
1344 1351 nbchangesets = int(inpart.params.get('nbchanges'))
1345 1352 if ('treemanifest' in inpart.params and
1346 1353 'treemanifest' not in op.repo.requirements):
1347 1354 if len(op.repo.changelog) != 0:
1348 1355 raise error.Abort(_(
1349 1356 "bundle contains tree manifests, but local repo is "
1350 1357 "non-empty and does not use tree manifests"))
1351 1358 op.repo.requirements.add('treemanifest')
1352 1359 op.repo._applyopenerreqs()
1353 1360 op.repo._writerequirements()
1354 1361 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1355 1362 op.records.add('changegroup', {'return': ret})
1356 1363 if op.reply is not None:
1357 1364 # This is definitely not the final form of this
1358 1365 # return. But one need to start somewhere.
1359 1366 part = op.reply.newpart('reply:changegroup', mandatory=False)
1360 1367 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1361 1368 part.addparam('return', '%i' % ret, mandatory=False)
1362 1369 assert not inpart.read()
1363 1370
1364 1371 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1365 1372 ['digest:%s' % k for k in util.DIGESTS.keys()])
1366 1373 @parthandler('remote-changegroup', _remotechangegroupparams)
1367 1374 def handleremotechangegroup(op, inpart):
1368 1375 """apply a bundle10 on the repo, given an url and validation information
1369 1376
1370 1377 All the information about the remote bundle to import are given as
1371 1378 parameters. The parameters include:
1372 1379 - url: the url to the bundle10.
1373 1380 - size: the bundle10 file size. It is used to validate what was
1374 1381 retrieved by the client matches the server knowledge about the bundle.
1375 1382 - digests: a space separated list of the digest types provided as
1376 1383 parameters.
1377 1384 - digest:<digest-type>: the hexadecimal representation of the digest with
1378 1385 that name. Like the size, it is used to validate what was retrieved by
1379 1386 the client matches what the server knows about the bundle.
1380 1387
1381 1388 When multiple digest types are given, all of them are checked.
1382 1389 """
1383 1390 try:
1384 1391 raw_url = inpart.params['url']
1385 1392 except KeyError:
1386 1393 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1387 1394 parsed_url = util.url(raw_url)
1388 1395 if parsed_url.scheme not in capabilities['remote-changegroup']:
1389 1396 raise error.Abort(_('remote-changegroup does not support %s urls') %
1390 1397 parsed_url.scheme)
1391 1398
1392 1399 try:
1393 1400 size = int(inpart.params['size'])
1394 1401 except ValueError:
1395 1402 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1396 1403 % 'size')
1397 1404 except KeyError:
1398 1405 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1399 1406
1400 1407 digests = {}
1401 1408 for typ in inpart.params.get('digests', '').split():
1402 1409 param = 'digest:%s' % typ
1403 1410 try:
1404 1411 value = inpart.params[param]
1405 1412 except KeyError:
1406 1413 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1407 1414 param)
1408 1415 digests[typ] = value
1409 1416
1410 1417 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1411 1418
1412 1419 # Make sure we trigger a transaction creation
1413 1420 #
1414 1421 # The addchangegroup function will get a transaction object by itself, but
1415 1422 # we need to make sure we trigger the creation of a transaction object used
1416 1423 # for the whole processing scope.
1417 1424 op.gettransaction()
1418 1425 from . import exchange
1419 1426 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1420 1427 if not isinstance(cg, changegroup.cg1unpacker):
1421 1428 raise error.Abort(_('%s: not a bundle version 1.0') %
1422 1429 util.hidepassword(raw_url))
1423 1430 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1424 1431 op.records.add('changegroup', {'return': ret})
1425 1432 if op.reply is not None:
1426 1433 # This is definitely not the final form of this
1427 1434 # return. But one need to start somewhere.
1428 1435 part = op.reply.newpart('reply:changegroup')
1429 1436 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1430 1437 part.addparam('return', '%i' % ret, mandatory=False)
1431 1438 try:
1432 1439 real_part.validate()
1433 1440 except error.Abort as e:
1434 1441 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1435 1442 (util.hidepassword(raw_url), str(e)))
1436 1443 assert not inpart.read()
1437 1444
1438 1445 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1439 1446 def handlereplychangegroup(op, inpart):
1440 1447 ret = int(inpart.params['return'])
1441 1448 replyto = int(inpart.params['in-reply-to'])
1442 1449 op.records.add('changegroup', {'return': ret}, replyto)
1443 1450
1444 1451 @parthandler('check:heads')
1445 1452 def handlecheckheads(op, inpart):
1446 1453 """check that head of the repo did not change
1447 1454
1448 1455 This is used to detect a push race when using unbundle.
1449 1456 This replaces the "heads" argument of unbundle."""
1450 1457 h = inpart.read(20)
1451 1458 heads = []
1452 1459 while len(h) == 20:
1453 1460 heads.append(h)
1454 1461 h = inpart.read(20)
1455 1462 assert not h
1456 1463 # Trigger a transaction so that we are guaranteed to have the lock now.
1457 1464 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1458 1465 op.gettransaction()
1459 1466 if sorted(heads) != sorted(op.repo.heads()):
1460 1467 raise error.PushRaced('repository changed while pushing - '
1461 1468 'please try again')
1462 1469
1463 1470 @parthandler('output')
1464 1471 def handleoutput(op, inpart):
1465 1472 """forward output captured on the server to the client"""
1466 1473 for line in inpart.read().splitlines():
1467 1474 op.ui.status(('remote: %s\n' % line))
1468 1475
1469 1476 @parthandler('replycaps')
1470 1477 def handlereplycaps(op, inpart):
1471 1478 """Notify that a reply bundle should be created
1472 1479
1473 1480 The payload contains the capabilities information for the reply"""
1474 1481 caps = decodecaps(inpart.read())
1475 1482 if op.reply is None:
1476 1483 op.reply = bundle20(op.ui, caps)
1477 1484
1478 1485 class AbortFromPart(error.Abort):
1479 1486 """Sub-class of Abort that denotes an error from a bundle2 part."""
1480 1487
1481 1488 @parthandler('error:abort', ('message', 'hint'))
1482 1489 def handleerrorabort(op, inpart):
1483 1490 """Used to transmit abort error over the wire"""
1484 1491 raise AbortFromPart(inpart.params['message'],
1485 1492 hint=inpart.params.get('hint'))
1486 1493
1487 1494 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1488 1495 'in-reply-to'))
1489 1496 def handleerrorpushkey(op, inpart):
1490 1497 """Used to transmit failure of a mandatory pushkey over the wire"""
1491 1498 kwargs = {}
1492 1499 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1493 1500 value = inpart.params.get(name)
1494 1501 if value is not None:
1495 1502 kwargs[name] = value
1496 1503 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1497 1504
1498 1505 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1499 1506 def handleerrorunsupportedcontent(op, inpart):
1500 1507 """Used to transmit unknown content error over the wire"""
1501 1508 kwargs = {}
1502 1509 parttype = inpart.params.get('parttype')
1503 1510 if parttype is not None:
1504 1511 kwargs['parttype'] = parttype
1505 1512 params = inpart.params.get('params')
1506 1513 if params is not None:
1507 1514 kwargs['params'] = params.split('\0')
1508 1515
1509 1516 raise error.BundleUnknownFeatureError(**kwargs)
1510 1517
1511 1518 @parthandler('error:pushraced', ('message',))
1512 1519 def handleerrorpushraced(op, inpart):
1513 1520 """Used to transmit push race error over the wire"""
1514 1521 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1515 1522
1516 1523 @parthandler('listkeys', ('namespace',))
1517 1524 def handlelistkeys(op, inpart):
1518 1525 """retrieve pushkey namespace content stored in a bundle2"""
1519 1526 namespace = inpart.params['namespace']
1520 1527 r = pushkey.decodekeys(inpart.read())
1521 1528 op.records.add('listkeys', (namespace, r))
1522 1529
1523 1530 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1524 1531 def handlepushkey(op, inpart):
1525 1532 """process a pushkey request"""
1526 1533 dec = pushkey.decode
1527 1534 namespace = dec(inpart.params['namespace'])
1528 1535 key = dec(inpart.params['key'])
1529 1536 old = dec(inpart.params['old'])
1530 1537 new = dec(inpart.params['new'])
1531 1538 # Grab the transaction to ensure that we have the lock before performing the
1532 1539 # pushkey.
1533 1540 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1534 1541 op.gettransaction()
1535 1542 ret = op.repo.pushkey(namespace, key, old, new)
1536 1543 record = {'namespace': namespace,
1537 1544 'key': key,
1538 1545 'old': old,
1539 1546 'new': new}
1540 1547 op.records.add('pushkey', record)
1541 1548 if op.reply is not None:
1542 1549 rpart = op.reply.newpart('reply:pushkey')
1543 1550 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1544 1551 rpart.addparam('return', '%i' % ret, mandatory=False)
1545 1552 if inpart.mandatory and not ret:
1546 1553 kwargs = {}
1547 1554 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1548 1555 if key in inpart.params:
1549 1556 kwargs[key] = inpart.params[key]
1550 1557 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1551 1558
1552 1559 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1553 1560 def handlepushkeyreply(op, inpart):
1554 1561 """retrieve the result of a pushkey request"""
1555 1562 ret = int(inpart.params['return'])
1556 1563 partid = int(inpart.params['in-reply-to'])
1557 1564 op.records.add('pushkey', {'return': ret}, partid)
1558 1565
1559 1566 @parthandler('obsmarkers')
1560 1567 def handleobsmarker(op, inpart):
1561 1568 """add a stream of obsmarkers to the repo"""
1562 1569 tr = op.gettransaction()
1563 1570 markerdata = inpart.read()
1564 1571 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1565 1572 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1566 1573 % len(markerdata))
1567 1574 # The mergemarkers call will crash if marker creation is not enabled.
1568 1575 # we want to avoid this if the part is advisory.
1569 1576 if not inpart.mandatory and op.repo.obsstore.readonly:
1570 1577 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1571 1578 return
1572 1579 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1573 1580 if new:
1574 1581 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1575 1582 op.records.add('obsmarkers', {'new': new})
1576 1583 if op.reply is not None:
1577 1584 rpart = op.reply.newpart('reply:obsmarkers')
1578 1585 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1579 1586 rpart.addparam('new', '%i' % new, mandatory=False)
1580 1587
1581 1588
1582 1589 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1583 1590 def handleobsmarkerreply(op, inpart):
1584 1591 """retrieve the result of a pushkey request"""
1585 1592 ret = int(inpart.params['new'])
1586 1593 partid = int(inpart.params['in-reply-to'])
1587 1594 op.records.add('obsmarkers', {'new': ret}, partid)
1588 1595
1589 1596 @parthandler('hgtagsfnodes')
1590 1597 def handlehgtagsfnodes(op, inpart):
1591 1598 """Applies .hgtags fnodes cache entries to the local repo.
1592 1599
1593 1600 Payload is pairs of 20 byte changeset nodes and filenodes.
1594 1601 """
1595 1602 # Grab the transaction so we ensure that we have the lock at this point.
1596 1603 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1597 1604 op.gettransaction()
1598 1605 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1599 1606
1600 1607 count = 0
1601 1608 while True:
1602 1609 node = inpart.read(20)
1603 1610 fnode = inpart.read(20)
1604 1611 if len(node) < 20 or len(fnode) < 20:
1605 1612 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1606 1613 break
1607 1614 cache.setfnode(node, fnode)
1608 1615 count += 1
1609 1616
1610 1617 cache.write()
1611 1618 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now