##// END OF EJS Templates
bundle: remove obsolete (and duplicate) comment...
Martin von Zweigbergk -
r28672:ca489611 default
parent child Browse files
Show More
@@ -1,1610 +1,1606 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155 import urllib
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 changegroup,
160 160 error,
161 161 obsolete,
162 162 pushkey,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 _pack = struct.pack
169 169 _unpack = struct.unpack
170 170
171 171 _fstreamparamsize = '>i'
172 172 _fpartheadersize = '>i'
173 173 _fparttypesize = '>B'
174 174 _fpartid = '>I'
175 175 _fpayloadsize = '>i'
176 176 _fpartparamcount = '>BB'
177 177
178 178 preferedchunksize = 4096
179 179
180 180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181 181
182 182 def outdebug(ui, message):
183 183 """debug regarding output stream (bundling)"""
184 184 if ui.configbool('devel', 'bundle2.debug', False):
185 185 ui.debug('bundle2-output: %s\n' % message)
186 186
187 187 def indebug(ui, message):
188 188 """debug on input stream (unbundling)"""
189 189 if ui.configbool('devel', 'bundle2.debug', False):
190 190 ui.debug('bundle2-input: %s\n' % message)
191 191
192 192 def validateparttype(parttype):
193 193 """raise ValueError if a parttype contains invalid character"""
194 194 if _parttypeforbidden.search(parttype):
195 195 raise ValueError(parttype)
196 196
197 197 def _makefpartparamsizes(nbparams):
198 198 """return a struct format to read part parameter sizes
199 199
200 200 The number parameters is variable so we need to build that format
201 201 dynamically.
202 202 """
203 203 return '>'+('BB'*nbparams)
204 204
205 205 parthandlermapping = {}
206 206
207 207 def parthandler(parttype, params=()):
208 208 """decorator that register a function as a bundle2 part handler
209 209
210 210 eg::
211 211
212 212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 213 def myparttypehandler(...):
214 214 '''process a part of type "my part".'''
215 215 ...
216 216 """
217 217 validateparttype(parttype)
218 218 def _decorator(func):
219 219 lparttype = parttype.lower() # enforce lower case matching.
220 220 assert lparttype not in parthandlermapping
221 221 parthandlermapping[lparttype] = func
222 222 func.params = frozenset(params)
223 223 return func
224 224 return _decorator
225 225
226 226 class unbundlerecords(object):
227 227 """keep record of what happens during and unbundle
228 228
229 229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 230 category of record and obj is an arbitrary object.
231 231
232 232 `records['cat']` will return all entries of this category 'cat'.
233 233
234 234 Iterating on the object itself will yield `('category', obj)` tuples
235 235 for all entries.
236 236
237 237 All iterations happens in chronological order.
238 238 """
239 239
240 240 def __init__(self):
241 241 self._categories = {}
242 242 self._sequences = []
243 243 self._replies = {}
244 244
245 245 def add(self, category, entry, inreplyto=None):
246 246 """add a new record of a given category.
247 247
248 248 The entry can then be retrieved in the list returned by
249 249 self['category']."""
250 250 self._categories.setdefault(category, []).append(entry)
251 251 self._sequences.append((category, entry))
252 252 if inreplyto is not None:
253 253 self.getreplies(inreplyto).add(category, entry)
254 254
255 255 def getreplies(self, partid):
256 256 """get the records that are replies to a specific part"""
257 257 return self._replies.setdefault(partid, unbundlerecords())
258 258
259 259 def __getitem__(self, cat):
260 260 return tuple(self._categories.get(cat, ()))
261 261
262 262 def __iter__(self):
263 263 return iter(self._sequences)
264 264
265 265 def __len__(self):
266 266 return len(self._sequences)
267 267
268 268 def __nonzero__(self):
269 269 return bool(self._sequences)
270 270
271 271 class bundleoperation(object):
272 272 """an object that represents a single bundling process
273 273
274 274 Its purpose is to carry unbundle-related objects and states.
275 275
276 276 A new object should be created at the beginning of each bundle processing.
277 277 The object is to be returned by the processing function.
278 278
279 279 The object has very little content now it will ultimately contain:
280 280 * an access to the repo the bundle is applied to,
281 281 * a ui object,
282 282 * a way to retrieve a transaction to add changes to the repo,
283 283 * a way to record the result of processing each part,
284 284 * a way to construct a bundle response when applicable.
285 285 """
286 286
287 287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 288 self.repo = repo
289 289 self.ui = repo.ui
290 290 self.records = unbundlerecords()
291 291 self.gettransaction = transactiongetter
292 292 self.reply = None
293 293 self.captureoutput = captureoutput
294 294
295 295 class TransactionUnavailable(RuntimeError):
296 296 pass
297 297
298 298 def _notransaction():
299 299 """default method to get a transaction while processing a bundle
300 300
301 301 Raise an exception to highlight the fact that no transaction was expected
302 302 to be created"""
303 303 raise TransactionUnavailable()
304 304
305 305 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
306 306 # transform me into unbundler.apply() as soon as the freeze is lifted
307 307 tr.hookargs['bundle2'] = '1'
308 308 if source is not None and 'source' not in tr.hookargs:
309 309 tr.hookargs['source'] = source
310 310 if url is not None and 'url' not in tr.hookargs:
311 311 tr.hookargs['url'] = url
312 312 return processbundle(repo, unbundler, lambda: tr, op=op)
313 313
314 314 def processbundle(repo, unbundler, transactiongetter=None, op=None):
315 315 """This function process a bundle, apply effect to/from a repo
316 316
317 317 It iterates over each part then searches for and uses the proper handling
318 318 code to process the part. Parts are processed in order.
319 319
320 320 This is very early version of this function that will be strongly reworked
321 321 before final usage.
322 322
323 323 Unknown Mandatory part will abort the process.
324 324
325 325 It is temporarily possible to provide a prebuilt bundleoperation to the
326 326 function. This is used to ensure output is properly propagated in case of
327 327 an error during the unbundling. This output capturing part will likely be
328 328 reworked and this ability will probably go away in the process.
329 329 """
330 330 if op is None:
331 331 if transactiongetter is None:
332 332 transactiongetter = _notransaction
333 333 op = bundleoperation(repo, transactiongetter)
334 334 # todo:
335 335 # - replace this is a init function soon.
336 336 # - exception catching
337 337 unbundler.params
338 338 if repo.ui.debugflag:
339 339 msg = ['bundle2-input-bundle:']
340 340 if unbundler.params:
341 341 msg.append(' %i params')
342 342 if op.gettransaction is None:
343 343 msg.append(' no-transaction')
344 344 else:
345 345 msg.append(' with-transaction')
346 346 msg.append('\n')
347 347 repo.ui.debug(''.join(msg))
348 348 iterparts = enumerate(unbundler.iterparts())
349 349 part = None
350 350 nbpart = 0
351 351 try:
352 352 for nbpart, part in iterparts:
353 353 _processpart(op, part)
354 354 except BaseException as exc:
355 355 for nbpart, part in iterparts:
356 356 # consume the bundle content
357 357 part.seek(0, 2)
358 358 # Small hack to let caller code distinguish exceptions from bundle2
359 359 # processing from processing the old format. This is mostly
360 360 # needed to handle different return codes to unbundle according to the
361 361 # type of bundle. We should probably clean up or drop this return code
362 362 # craziness in a future version.
363 363 exc.duringunbundle2 = True
364 364 salvaged = []
365 365 replycaps = None
366 366 if op.reply is not None:
367 367 salvaged = op.reply.salvageoutput()
368 368 replycaps = op.reply.capabilities
369 369 exc._replycaps = replycaps
370 370 exc._bundle2salvagedoutput = salvaged
371 371 raise
372 372 finally:
373 373 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
374 374
375 375 return op
376 376
377 377 def _processpart(op, part):
378 378 """process a single part from a bundle
379 379
380 380 The part is guaranteed to have been fully consumed when the function exits
381 381 (even if an exception is raised)."""
382 382 status = 'unknown' # used by debug output
383 383 try:
384 384 try:
385 385 handler = parthandlermapping.get(part.type)
386 386 if handler is None:
387 387 status = 'unsupported-type'
388 388 raise error.BundleUnknownFeatureError(parttype=part.type)
389 389 indebug(op.ui, 'found a handler for part %r' % part.type)
390 390 unknownparams = part.mandatorykeys - handler.params
391 391 if unknownparams:
392 392 unknownparams = list(unknownparams)
393 393 unknownparams.sort()
394 394 status = 'unsupported-params (%s)' % unknownparams
395 395 raise error.BundleUnknownFeatureError(parttype=part.type,
396 396 params=unknownparams)
397 397 status = 'supported'
398 398 except error.BundleUnknownFeatureError as exc:
399 399 if part.mandatory: # mandatory parts
400 400 raise
401 401 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
402 402 return # skip to part processing
403 403 finally:
404 404 if op.ui.debugflag:
405 405 msg = ['bundle2-input-part: "%s"' % part.type]
406 406 if not part.mandatory:
407 407 msg.append(' (advisory)')
408 408 nbmp = len(part.mandatorykeys)
409 409 nbap = len(part.params) - nbmp
410 410 if nbmp or nbap:
411 411 msg.append(' (params:')
412 412 if nbmp:
413 413 msg.append(' %i mandatory' % nbmp)
414 414 if nbap:
415 415 msg.append(' %i advisory' % nbmp)
416 416 msg.append(')')
417 417 msg.append(' %s\n' % status)
418 418 op.ui.debug(''.join(msg))
419 419
420 420 # handler is called outside the above try block so that we don't
421 421 # risk catching KeyErrors from anything other than the
422 422 # parthandlermapping lookup (any KeyError raised by handler()
423 423 # itself represents a defect of a different variety).
424 424 output = None
425 425 if op.captureoutput and op.reply is not None:
426 426 op.ui.pushbuffer(error=True, subproc=True)
427 427 output = ''
428 428 try:
429 429 handler(op, part)
430 430 finally:
431 431 if output is not None:
432 432 output = op.ui.popbuffer()
433 433 if output:
434 434 outpart = op.reply.newpart('output', data=output,
435 435 mandatory=False)
436 436 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
437 437 finally:
438 438 # consume the part content to not corrupt the stream.
439 439 part.seek(0, 2)
440 440
441 441
442 442 def decodecaps(blob):
443 443 """decode a bundle2 caps bytes blob into a dictionary
444 444
445 445 The blob is a list of capabilities (one per line)
446 446 Capabilities may have values using a line of the form::
447 447
448 448 capability=value1,value2,value3
449 449
450 450 The values are always a list."""
451 451 caps = {}
452 452 for line in blob.splitlines():
453 453 if not line:
454 454 continue
455 455 if '=' not in line:
456 456 key, vals = line, ()
457 457 else:
458 458 key, vals = line.split('=', 1)
459 459 vals = vals.split(',')
460 460 key = urllib.unquote(key)
461 461 vals = [urllib.unquote(v) for v in vals]
462 462 caps[key] = vals
463 463 return caps
464 464
465 465 def encodecaps(caps):
466 466 """encode a bundle2 caps dictionary into a bytes blob"""
467 467 chunks = []
468 468 for ca in sorted(caps):
469 469 vals = caps[ca]
470 470 ca = urllib.quote(ca)
471 471 vals = [urllib.quote(v) for v in vals]
472 472 if vals:
473 473 ca = "%s=%s" % (ca, ','.join(vals))
474 474 chunks.append(ca)
475 475 return '\n'.join(chunks)
476 476
477 477 bundletypes = {
478 478 "": ("", None), # only when using unbundle on ssh and old http servers
479 479 # since the unification ssh accepts a header but there
480 480 # is no capability signaling it.
481 481 "HG20": (), # special-cased below
482 482 "HG10UN": ("HG10UN", None),
483 483 "HG10BZ": ("HG10", 'BZ'),
484 484 "HG10GZ": ("HG10GZ", 'GZ'),
485 485 }
486 486
487 487 # hgweb uses this list to communicate its preferred type
488 488 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
489 489
490 490 class bundle20(object):
491 491 """represent an outgoing bundle2 container
492 492
493 493 Use the `addparam` method to add stream level parameter. and `newpart` to
494 494 populate it. Then call `getchunks` to retrieve all the binary chunks of
495 495 data that compose the bundle2 container."""
496 496
497 497 _magicstring = 'HG20'
498 498
499 499 def __init__(self, ui, capabilities=()):
500 500 self.ui = ui
501 501 self._params = []
502 502 self._parts = []
503 503 self.capabilities = dict(capabilities)
504 504 self._compressor = util.compressors[None]()
505 505
506 506 def setcompression(self, alg):
507 507 """setup core part compression to <alg>"""
508 508 if alg is None:
509 509 return
510 510 assert not any(n.lower() == 'Compression' for n, v in self._params)
511 511 self.addparam('Compression', alg)
512 512 self._compressor = util.compressors[alg]()
513 513
514 514 @property
515 515 def nbparts(self):
516 516 """total number of parts added to the bundler"""
517 517 return len(self._parts)
518 518
519 519 # methods used to defines the bundle2 content
520 520 def addparam(self, name, value=None):
521 521 """add a stream level parameter"""
522 522 if not name:
523 523 raise ValueError('empty parameter name')
524 524 if name[0] not in string.letters:
525 525 raise ValueError('non letter first character: %r' % name)
526 526 self._params.append((name, value))
527 527
528 528 def addpart(self, part):
529 529 """add a new part to the bundle2 container
530 530
531 531 Parts contains the actual applicative payload."""
532 532 assert part.id is None
533 533 part.id = len(self._parts) # very cheap counter
534 534 self._parts.append(part)
535 535
536 536 def newpart(self, typeid, *args, **kwargs):
537 537 """create a new part and add it to the containers
538 538
539 539 As the part is directly added to the containers. For now, this means
540 540 that any failure to properly initialize the part after calling
541 541 ``newpart`` should result in a failure of the whole bundling process.
542 542
543 543 You can still fall back to manually create and add if you need better
544 544 control."""
545 545 part = bundlepart(typeid, *args, **kwargs)
546 546 self.addpart(part)
547 547 return part
548 548
549 549 # methods used to generate the bundle2 stream
550 550 def getchunks(self):
551 551 if self.ui.debugflag:
552 552 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
553 553 if self._params:
554 554 msg.append(' (%i params)' % len(self._params))
555 555 msg.append(' %i parts total\n' % len(self._parts))
556 556 self.ui.debug(''.join(msg))
557 557 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
558 558 yield self._magicstring
559 559 param = self._paramchunk()
560 560 outdebug(self.ui, 'bundle parameter: %s' % param)
561 561 yield _pack(_fstreamparamsize, len(param))
562 562 if param:
563 563 yield param
564 564 # starting compression
565 565 for chunk in self._getcorechunk():
566 566 yield self._compressor.compress(chunk)
567 567 yield self._compressor.flush()
568 568
569 569 def _paramchunk(self):
570 570 """return a encoded version of all stream parameters"""
571 571 blocks = []
572 572 for par, value in self._params:
573 573 par = urllib.quote(par)
574 574 if value is not None:
575 575 value = urllib.quote(value)
576 576 par = '%s=%s' % (par, value)
577 577 blocks.append(par)
578 578 return ' '.join(blocks)
579 579
580 580 def _getcorechunk(self):
581 581 """yield chunk for the core part of the bundle
582 582
583 583 (all but headers and parameters)"""
584 584 outdebug(self.ui, 'start of parts')
585 585 for part in self._parts:
586 586 outdebug(self.ui, 'bundle part: "%s"' % part.type)
587 587 for chunk in part.getchunks(ui=self.ui):
588 588 yield chunk
589 589 outdebug(self.ui, 'end of bundle')
590 590 yield _pack(_fpartheadersize, 0)
591 591
592 592
593 593 def salvageoutput(self):
594 594 """return a list with a copy of all output parts in the bundle
595 595
596 596 This is meant to be used during error handling to make sure we preserve
597 597 server output"""
598 598 salvaged = []
599 599 for part in self._parts:
600 600 if part.type.startswith('output'):
601 601 salvaged.append(part.copy())
602 602 return salvaged
603 603
604 604
605 605 class unpackermixin(object):
606 606 """A mixin to extract bytes and struct data from a stream"""
607 607
608 608 def __init__(self, fp):
609 609 self._fp = fp
610 610 self._seekable = (util.safehasattr(fp, 'seek') and
611 611 util.safehasattr(fp, 'tell'))
612 612
613 613 def _unpack(self, format):
614 614 """unpack this struct format from the stream"""
615 615 data = self._readexact(struct.calcsize(format))
616 616 return _unpack(format, data)
617 617
618 618 def _readexact(self, size):
619 619 """read exactly <size> bytes from the stream"""
620 620 return changegroup.readexactly(self._fp, size)
621 621
622 622 def seek(self, offset, whence=0):
623 623 """move the underlying file pointer"""
624 624 if self._seekable:
625 625 return self._fp.seek(offset, whence)
626 626 else:
627 627 raise NotImplementedError(_('File pointer is not seekable'))
628 628
629 629 def tell(self):
630 630 """return the file offset, or None if file is not seekable"""
631 631 if self._seekable:
632 632 try:
633 633 return self._fp.tell()
634 634 except IOError as e:
635 635 if e.errno == errno.ESPIPE:
636 636 self._seekable = False
637 637 else:
638 638 raise
639 639 return None
640 640
641 641 def close(self):
642 642 """close underlying file"""
643 643 if util.safehasattr(self._fp, 'close'):
644 644 return self._fp.close()
645 645
646 646 def getunbundler(ui, fp, magicstring=None):
647 647 """return a valid unbundler object for a given magicstring"""
648 648 if magicstring is None:
649 649 magicstring = changegroup.readexactly(fp, 4)
650 650 magic, version = magicstring[0:2], magicstring[2:4]
651 651 if magic != 'HG':
652 652 raise error.Abort(_('not a Mercurial bundle'))
653 653 unbundlerclass = formatmap.get(version)
654 654 if unbundlerclass is None:
655 655 raise error.Abort(_('unknown bundle version %s') % version)
656 656 unbundler = unbundlerclass(ui, fp)
657 657 indebug(ui, 'start processing of %s stream' % magicstring)
658 658 return unbundler
659 659
660 660 class unbundle20(unpackermixin):
661 661 """interpret a bundle2 stream
662 662
663 663 This class is fed with a binary stream and yields parts through its
664 664 `iterparts` methods."""
665 665
666 666 _magicstring = 'HG20'
667 667
668 668 def __init__(self, ui, fp):
669 669 """If header is specified, we do not read it out of the stream."""
670 670 self.ui = ui
671 671 self._decompressor = util.decompressors[None]
672 672 self._compressed = None
673 673 super(unbundle20, self).__init__(fp)
674 674
675 675 @util.propertycache
676 676 def params(self):
677 677 """dictionary of stream level parameters"""
678 678 indebug(self.ui, 'reading bundle2 stream parameters')
679 679 params = {}
680 680 paramssize = self._unpack(_fstreamparamsize)[0]
681 681 if paramssize < 0:
682 682 raise error.BundleValueError('negative bundle param size: %i'
683 683 % paramssize)
684 684 if paramssize:
685 685 params = self._readexact(paramssize)
686 686 params = self._processallparams(params)
687 687 return params
688 688
689 689 def _processallparams(self, paramsblock):
690 690 """"""
691 691 params = {}
692 692 for p in paramsblock.split(' '):
693 693 p = p.split('=', 1)
694 694 p = [urllib.unquote(i) for i in p]
695 695 if len(p) < 2:
696 696 p.append(None)
697 697 self._processparam(*p)
698 698 params[p[0]] = p[1]
699 699 return params
700 700
701 701
702 702 def _processparam(self, name, value):
703 703 """process a parameter, applying its effect if needed
704 704
705 705 Parameter starting with a lower case letter are advisory and will be
706 706 ignored when unknown. Those starting with an upper case letter are
707 707 mandatory and will this function will raise a KeyError when unknown.
708 708
709 709 Note: no option are currently supported. Any input will be either
710 710 ignored or failing.
711 711 """
712 712 if not name:
713 713 raise ValueError('empty parameter name')
714 714 if name[0] not in string.letters:
715 715 raise ValueError('non letter first character: %r' % name)
716 716 try:
717 717 handler = b2streamparamsmap[name.lower()]
718 718 except KeyError:
719 719 if name[0].islower():
720 720 indebug(self.ui, "ignoring unknown parameter %r" % name)
721 721 else:
722 722 raise error.BundleUnknownFeatureError(params=(name,))
723 723 else:
724 724 handler(self, name, value)
725 725
726 726 def _forwardchunks(self):
727 727 """utility to transfer a bundle2 as binary
728 728
729 729 This is made necessary by the fact the 'getbundle' command over 'ssh'
730 730 have no way to know then the reply end, relying on the bundle to be
731 731 interpreted to know its end. This is terrible and we are sorry, but we
732 732 needed to move forward to get general delta enabled.
733 733 """
734 734 yield self._magicstring
735 735 assert 'params' not in vars(self)
736 736 paramssize = self._unpack(_fstreamparamsize)[0]
737 737 if paramssize < 0:
738 738 raise error.BundleValueError('negative bundle param size: %i'
739 739 % paramssize)
740 740 yield _pack(_fstreamparamsize, paramssize)
741 741 if paramssize:
742 742 params = self._readexact(paramssize)
743 743 self._processallparams(params)
744 744 yield params
745 745 assert self._decompressor is util.decompressors[None]
746 746 # From there, payload might need to be decompressed
747 747 self._fp = self._decompressor(self._fp)
748 748 emptycount = 0
749 749 while emptycount < 2:
750 750 # so we can brainlessly loop
751 751 assert _fpartheadersize == _fpayloadsize
752 752 size = self._unpack(_fpartheadersize)[0]
753 753 yield _pack(_fpartheadersize, size)
754 754 if size:
755 755 emptycount = 0
756 756 else:
757 757 emptycount += 1
758 758 continue
759 759 if size == flaginterrupt:
760 760 continue
761 761 elif size < 0:
762 762 raise error.BundleValueError('negative chunk size: %i')
763 763 yield self._readexact(size)
764 764
765 765
766 766 def iterparts(self):
767 767 """yield all parts contained in the stream"""
768 768 # make sure param have been loaded
769 769 self.params
770 770 # From there, payload need to be decompressed
771 771 self._fp = self._decompressor(self._fp)
772 772 indebug(self.ui, 'start extraction of bundle2 parts')
773 773 headerblock = self._readpartheader()
774 774 while headerblock is not None:
775 775 part = unbundlepart(self.ui, headerblock, self._fp)
776 776 yield part
777 777 part.seek(0, 2)
778 778 headerblock = self._readpartheader()
779 779 indebug(self.ui, 'end of bundle2 stream')
780 780
781 781 def _readpartheader(self):
782 782 """reads a part header size and return the bytes blob
783 783
784 784 returns None if empty"""
785 785 headersize = self._unpack(_fpartheadersize)[0]
786 786 if headersize < 0:
787 787 raise error.BundleValueError('negative part header size: %i'
788 788 % headersize)
789 789 indebug(self.ui, 'part header size: %i' % headersize)
790 790 if headersize:
791 791 return self._readexact(headersize)
792 792 return None
793 793
794 794 def compressed(self):
795 795 self.params # load params
796 796 return self._compressed
797 797
798 798 formatmap = {'20': unbundle20}
799 799
800 800 b2streamparamsmap = {}
801 801
802 802 def b2streamparamhandler(name):
803 803 """register a handler for a stream level parameter"""
804 804 def decorator(func):
805 805 assert name not in formatmap
806 806 b2streamparamsmap[name] = func
807 807 return func
808 808 return decorator
809 809
810 810 @b2streamparamhandler('compression')
811 811 def processcompression(unbundler, param, value):
812 812 """read compression parameter and install payload decompression"""
813 813 if value not in util.decompressors:
814 814 raise error.BundleUnknownFeatureError(params=(param,),
815 815 values=(value,))
816 816 unbundler._decompressor = util.decompressors[value]
817 817 if value is not None:
818 818 unbundler._compressed = True
819 819
820 820 class bundlepart(object):
821 821 """A bundle2 part contains application level payload
822 822
823 823 The part `type` is used to route the part to the application level
824 824 handler.
825 825
826 826 The part payload is contained in ``part.data``. It could be raw bytes or a
827 827 generator of byte chunks.
828 828
829 829 You can add parameters to the part using the ``addparam`` method.
830 830 Parameters can be either mandatory (default) or advisory. Remote side
831 831 should be able to safely ignore the advisory ones.
832 832
833 833 Both data and parameters cannot be modified after the generation has begun.
834 834 """
835 835
836 836 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
837 837 data='', mandatory=True):
838 838 validateparttype(parttype)
839 839 self.id = None
840 840 self.type = parttype
841 841 self._data = data
842 842 self._mandatoryparams = list(mandatoryparams)
843 843 self._advisoryparams = list(advisoryparams)
844 844 # checking for duplicated entries
845 845 self._seenparams = set()
846 846 for pname, __ in self._mandatoryparams + self._advisoryparams:
847 847 if pname in self._seenparams:
848 848 raise RuntimeError('duplicated params: %s' % pname)
849 849 self._seenparams.add(pname)
850 850 # status of the part's generation:
851 851 # - None: not started,
852 852 # - False: currently generated,
853 853 # - True: generation done.
854 854 self._generated = None
855 855 self.mandatory = mandatory
856 856
857 857 def copy(self):
858 858 """return a copy of the part
859 859
860 860 The new part have the very same content but no partid assigned yet.
861 861 Parts with generated data cannot be copied."""
862 862 assert not util.safehasattr(self.data, 'next')
863 863 return self.__class__(self.type, self._mandatoryparams,
864 864 self._advisoryparams, self._data, self.mandatory)
865 865
866 866 # methods used to defines the part content
867 867 @property
868 868 def data(self):
869 869 return self._data
870 870
871 871 @data.setter
872 872 def data(self, data):
873 873 if self._generated is not None:
874 874 raise error.ReadOnlyPartError('part is being generated')
875 875 self._data = data
876 876
877 877 @property
878 878 def mandatoryparams(self):
879 879 # make it an immutable tuple to force people through ``addparam``
880 880 return tuple(self._mandatoryparams)
881 881
882 882 @property
883 883 def advisoryparams(self):
884 884 # make it an immutable tuple to force people through ``addparam``
885 885 return tuple(self._advisoryparams)
886 886
887 887 def addparam(self, name, value='', mandatory=True):
888 888 if self._generated is not None:
889 889 raise error.ReadOnlyPartError('part is being generated')
890 890 if name in self._seenparams:
891 891 raise ValueError('duplicated params: %s' % name)
892 892 self._seenparams.add(name)
893 893 params = self._advisoryparams
894 894 if mandatory:
895 895 params = self._mandatoryparams
896 896 params.append((name, value))
897 897
898 898 # methods used to generates the bundle2 stream
899 899 def getchunks(self, ui):
900 900 if self._generated is not None:
901 901 raise RuntimeError('part can only be consumed once')
902 902 self._generated = False
903 903
904 904 if ui.debugflag:
905 905 msg = ['bundle2-output-part: "%s"' % self.type]
906 906 if not self.mandatory:
907 907 msg.append(' (advisory)')
908 908 nbmp = len(self.mandatoryparams)
909 909 nbap = len(self.advisoryparams)
910 910 if nbmp or nbap:
911 911 msg.append(' (params:')
912 912 if nbmp:
913 913 msg.append(' %i mandatory' % nbmp)
914 914 if nbap:
915 915 msg.append(' %i advisory' % nbmp)
916 916 msg.append(')')
917 917 if not self.data:
918 918 msg.append(' empty payload')
919 919 elif util.safehasattr(self.data, 'next'):
920 920 msg.append(' streamed payload')
921 921 else:
922 922 msg.append(' %i bytes payload' % len(self.data))
923 923 msg.append('\n')
924 924 ui.debug(''.join(msg))
925 925
926 926 #### header
927 927 if self.mandatory:
928 928 parttype = self.type.upper()
929 929 else:
930 930 parttype = self.type.lower()
931 931 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
932 932 ## parttype
933 933 header = [_pack(_fparttypesize, len(parttype)),
934 934 parttype, _pack(_fpartid, self.id),
935 935 ]
936 936 ## parameters
937 937 # count
938 938 manpar = self.mandatoryparams
939 939 advpar = self.advisoryparams
940 940 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
941 941 # size
942 942 parsizes = []
943 943 for key, value in manpar:
944 944 parsizes.append(len(key))
945 945 parsizes.append(len(value))
946 946 for key, value in advpar:
947 947 parsizes.append(len(key))
948 948 parsizes.append(len(value))
949 949 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
950 950 header.append(paramsizes)
951 951 # key, value
952 952 for key, value in manpar:
953 953 header.append(key)
954 954 header.append(value)
955 955 for key, value in advpar:
956 956 header.append(key)
957 957 header.append(value)
958 958 ## finalize header
959 959 headerchunk = ''.join(header)
960 960 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
961 961 yield _pack(_fpartheadersize, len(headerchunk))
962 962 yield headerchunk
963 963 ## payload
964 964 try:
965 965 for chunk in self._payloadchunks():
966 966 outdebug(ui, 'payload chunk size: %i' % len(chunk))
967 967 yield _pack(_fpayloadsize, len(chunk))
968 968 yield chunk
969 969 except GeneratorExit:
970 970 # GeneratorExit means that nobody is listening for our
971 971 # results anyway, so just bail quickly rather than trying
972 972 # to produce an error part.
973 973 ui.debug('bundle2-generatorexit\n')
974 974 raise
975 975 except BaseException as exc:
976 976 # backup exception data for later
977 977 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
978 978 % exc)
979 979 exc_info = sys.exc_info()
980 980 msg = 'unexpected error: %s' % exc
981 981 interpart = bundlepart('error:abort', [('message', msg)],
982 982 mandatory=False)
983 983 interpart.id = 0
984 984 yield _pack(_fpayloadsize, -1)
985 985 for chunk in interpart.getchunks(ui=ui):
986 986 yield chunk
987 987 outdebug(ui, 'closing payload chunk')
988 988 # abort current part payload
989 989 yield _pack(_fpayloadsize, 0)
990 990 raise exc_info[0], exc_info[1], exc_info[2]
991 991 # end of payload
992 992 outdebug(ui, 'closing payload chunk')
993 993 yield _pack(_fpayloadsize, 0)
994 994 self._generated = True
995 995
996 996 def _payloadchunks(self):
997 997 """yield chunks of a the part payload
998 998
999 999 Exists to handle the different methods to provide data to a part."""
1000 1000 # we only support fixed size data now.
1001 1001 # This will be improved in the future.
1002 1002 if util.safehasattr(self.data, 'next'):
1003 1003 buff = util.chunkbuffer(self.data)
1004 1004 chunk = buff.read(preferedchunksize)
1005 1005 while chunk:
1006 1006 yield chunk
1007 1007 chunk = buff.read(preferedchunksize)
1008 1008 elif len(self.data):
1009 1009 yield self.data
1010 1010
1011 1011
1012 1012 flaginterrupt = -1
1013 1013
1014 1014 class interrupthandler(unpackermixin):
1015 1015 """read one part and process it with restricted capability
1016 1016
1017 1017 This allows to transmit exception raised on the producer size during part
1018 1018 iteration while the consumer is reading a part.
1019 1019
1020 1020 Part processed in this manner only have access to a ui object,"""
1021 1021
1022 1022 def __init__(self, ui, fp):
1023 1023 super(interrupthandler, self).__init__(fp)
1024 1024 self.ui = ui
1025 1025
1026 1026 def _readpartheader(self):
1027 1027 """reads a part header size and return the bytes blob
1028 1028
1029 1029 returns None if empty"""
1030 1030 headersize = self._unpack(_fpartheadersize)[0]
1031 1031 if headersize < 0:
1032 1032 raise error.BundleValueError('negative part header size: %i'
1033 1033 % headersize)
1034 1034 indebug(self.ui, 'part header size: %i\n' % headersize)
1035 1035 if headersize:
1036 1036 return self._readexact(headersize)
1037 1037 return None
1038 1038
1039 1039 def __call__(self):
1040 1040
1041 1041 self.ui.debug('bundle2-input-stream-interrupt:'
1042 1042 ' opening out of band context\n')
1043 1043 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1044 1044 headerblock = self._readpartheader()
1045 1045 if headerblock is None:
1046 1046 indebug(self.ui, 'no part found during interruption.')
1047 1047 return
1048 1048 part = unbundlepart(self.ui, headerblock, self._fp)
1049 1049 op = interruptoperation(self.ui)
1050 1050 _processpart(op, part)
1051 1051 self.ui.debug('bundle2-input-stream-interrupt:'
1052 1052 ' closing out of band context\n')
1053 1053
1054 1054 class interruptoperation(object):
1055 1055 """A limited operation to be use by part handler during interruption
1056 1056
1057 1057 It only have access to an ui object.
1058 1058 """
1059 1059
1060 1060 def __init__(self, ui):
1061 1061 self.ui = ui
1062 1062 self.reply = None
1063 1063 self.captureoutput = False
1064 1064
1065 1065 @property
1066 1066 def repo(self):
1067 1067 raise RuntimeError('no repo access from stream interruption')
1068 1068
1069 1069 def gettransaction(self):
1070 1070 raise TransactionUnavailable('no repo access from stream interruption')
1071 1071
1072 1072 class unbundlepart(unpackermixin):
1073 1073 """a bundle part read from a bundle"""
1074 1074
1075 1075 def __init__(self, ui, header, fp):
1076 1076 super(unbundlepart, self).__init__(fp)
1077 1077 self.ui = ui
1078 1078 # unbundle state attr
1079 1079 self._headerdata = header
1080 1080 self._headeroffset = 0
1081 1081 self._initialized = False
1082 1082 self.consumed = False
1083 1083 # part data
1084 1084 self.id = None
1085 1085 self.type = None
1086 1086 self.mandatoryparams = None
1087 1087 self.advisoryparams = None
1088 1088 self.params = None
1089 1089 self.mandatorykeys = ()
1090 1090 self._payloadstream = None
1091 1091 self._readheader()
1092 1092 self._mandatory = None
1093 1093 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1094 1094 self._pos = 0
1095 1095
1096 1096 def _fromheader(self, size):
1097 1097 """return the next <size> byte from the header"""
1098 1098 offset = self._headeroffset
1099 1099 data = self._headerdata[offset:(offset + size)]
1100 1100 self._headeroffset = offset + size
1101 1101 return data
1102 1102
1103 1103 def _unpackheader(self, format):
1104 1104 """read given format from header
1105 1105
1106 1106 This automatically compute the size of the format to read."""
1107 1107 data = self._fromheader(struct.calcsize(format))
1108 1108 return _unpack(format, data)
1109 1109
1110 1110 def _initparams(self, mandatoryparams, advisoryparams):
1111 1111 """internal function to setup all logic related parameters"""
1112 1112 # make it read only to prevent people touching it by mistake.
1113 1113 self.mandatoryparams = tuple(mandatoryparams)
1114 1114 self.advisoryparams = tuple(advisoryparams)
1115 1115 # user friendly UI
1116 1116 self.params = dict(self.mandatoryparams)
1117 1117 self.params.update(dict(self.advisoryparams))
1118 1118 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1119 1119
1120 1120 def _payloadchunks(self, chunknum=0):
1121 1121 '''seek to specified chunk and start yielding data'''
1122 1122 if len(self._chunkindex) == 0:
1123 1123 assert chunknum == 0, 'Must start with chunk 0'
1124 1124 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1125 1125 else:
1126 1126 assert chunknum < len(self._chunkindex), \
1127 1127 'Unknown chunk %d' % chunknum
1128 1128 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1129 1129
1130 1130 pos = self._chunkindex[chunknum][0]
1131 1131 payloadsize = self._unpack(_fpayloadsize)[0]
1132 1132 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1133 1133 while payloadsize:
1134 1134 if payloadsize == flaginterrupt:
1135 1135 # interruption detection, the handler will now read a
1136 1136 # single part and process it.
1137 1137 interrupthandler(self.ui, self._fp)()
1138 1138 elif payloadsize < 0:
1139 1139 msg = 'negative payload chunk size: %i' % payloadsize
1140 1140 raise error.BundleValueError(msg)
1141 1141 else:
1142 1142 result = self._readexact(payloadsize)
1143 1143 chunknum += 1
1144 1144 pos += payloadsize
1145 1145 if chunknum == len(self._chunkindex):
1146 1146 self._chunkindex.append((pos,
1147 1147 super(unbundlepart, self).tell()))
1148 1148 yield result
1149 1149 payloadsize = self._unpack(_fpayloadsize)[0]
1150 1150 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1151 1151
1152 1152 def _findchunk(self, pos):
1153 1153 '''for a given payload position, return a chunk number and offset'''
1154 1154 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1155 1155 if ppos == pos:
1156 1156 return chunk, 0
1157 1157 elif ppos > pos:
1158 1158 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1159 1159 raise ValueError('Unknown chunk')
1160 1160
1161 1161 def _readheader(self):
1162 1162 """read the header and setup the object"""
1163 1163 typesize = self._unpackheader(_fparttypesize)[0]
1164 1164 self.type = self._fromheader(typesize)
1165 1165 indebug(self.ui, 'part type: "%s"' % self.type)
1166 1166 self.id = self._unpackheader(_fpartid)[0]
1167 1167 indebug(self.ui, 'part id: "%s"' % self.id)
1168 1168 # extract mandatory bit from type
1169 1169 self.mandatory = (self.type != self.type.lower())
1170 1170 self.type = self.type.lower()
1171 1171 ## reading parameters
1172 1172 # param count
1173 1173 mancount, advcount = self._unpackheader(_fpartparamcount)
1174 1174 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1175 1175 # param size
1176 1176 fparamsizes = _makefpartparamsizes(mancount + advcount)
1177 1177 paramsizes = self._unpackheader(fparamsizes)
1178 1178 # make it a list of couple again
1179 1179 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1180 1180 # split mandatory from advisory
1181 1181 mansizes = paramsizes[:mancount]
1182 1182 advsizes = paramsizes[mancount:]
1183 1183 # retrieve param value
1184 1184 manparams = []
1185 1185 for key, value in mansizes:
1186 1186 manparams.append((self._fromheader(key), self._fromheader(value)))
1187 1187 advparams = []
1188 1188 for key, value in advsizes:
1189 1189 advparams.append((self._fromheader(key), self._fromheader(value)))
1190 1190 self._initparams(manparams, advparams)
1191 1191 ## part payload
1192 1192 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1193 1193 # we read the data, tell it
1194 1194 self._initialized = True
1195 1195
1196 1196 def read(self, size=None):
1197 1197 """read payload data"""
1198 1198 if not self._initialized:
1199 1199 self._readheader()
1200 1200 if size is None:
1201 1201 data = self._payloadstream.read()
1202 1202 else:
1203 1203 data = self._payloadstream.read(size)
1204 1204 self._pos += len(data)
1205 1205 if size is None or len(data) < size:
1206 1206 if not self.consumed and self._pos:
1207 1207 self.ui.debug('bundle2-input-part: total payload size %i\n'
1208 1208 % self._pos)
1209 1209 self.consumed = True
1210 1210 return data
1211 1211
1212 1212 def tell(self):
1213 1213 return self._pos
1214 1214
1215 1215 def seek(self, offset, whence=0):
1216 1216 if whence == 0:
1217 1217 newpos = offset
1218 1218 elif whence == 1:
1219 1219 newpos = self._pos + offset
1220 1220 elif whence == 2:
1221 1221 if not self.consumed:
1222 1222 self.read()
1223 1223 newpos = self._chunkindex[-1][0] - offset
1224 1224 else:
1225 1225 raise ValueError('Unknown whence value: %r' % (whence,))
1226 1226
1227 1227 if newpos > self._chunkindex[-1][0] and not self.consumed:
1228 1228 self.read()
1229 1229 if not 0 <= newpos <= self._chunkindex[-1][0]:
1230 1230 raise ValueError('Offset out of range')
1231 1231
1232 1232 if self._pos != newpos:
1233 1233 chunk, internaloffset = self._findchunk(newpos)
1234 1234 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1235 1235 adjust = self.read(internaloffset)
1236 1236 if len(adjust) != internaloffset:
1237 1237 raise error.Abort(_('Seek failed\n'))
1238 1238 self._pos = newpos
1239 1239
1240 1240 # These are only the static capabilities.
1241 1241 # Check the 'getrepocaps' function for the rest.
1242 1242 capabilities = {'HG20': (),
1243 1243 'error': ('abort', 'unsupportedcontent', 'pushraced',
1244 1244 'pushkey'),
1245 1245 'listkeys': (),
1246 1246 'pushkey': (),
1247 1247 'digests': tuple(sorted(util.DIGESTS.keys())),
1248 1248 'remote-changegroup': ('http', 'https'),
1249 1249 'hgtagsfnodes': (),
1250 1250 }
1251 1251
1252 1252 def getrepocaps(repo, allowpushback=False):
1253 1253 """return the bundle2 capabilities for a given repo
1254 1254
1255 1255 Exists to allow extensions (like evolution) to mutate the capabilities.
1256 1256 """
1257 1257 caps = capabilities.copy()
1258 1258 caps['changegroup'] = tuple(sorted(
1259 1259 changegroup.supportedincomingversions(repo)))
1260 1260 if obsolete.isenabled(repo, obsolete.exchangeopt):
1261 1261 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1262 1262 caps['obsmarkers'] = supportedformat
1263 1263 if allowpushback:
1264 1264 caps['pushback'] = ()
1265 1265 return caps
1266 1266
1267 1267 def bundle2caps(remote):
1268 1268 """return the bundle capabilities of a peer as dict"""
1269 1269 raw = remote.capable('bundle2')
1270 1270 if not raw and raw != '':
1271 1271 return {}
1272 1272 capsblob = urllib.unquote(remote.capable('bundle2'))
1273 1273 return decodecaps(capsblob)
1274 1274
1275 1275 def obsmarkersversion(caps):
1276 1276 """extract the list of supported obsmarkers versions from a bundle2caps dict
1277 1277 """
1278 1278 obscaps = caps.get('obsmarkers', ())
1279 1279 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1280 1280
1281 1281 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1282 1282 """Write a bundle file and return its filename.
1283 1283
1284 1284 Existing files will not be overwritten.
1285 1285 If no filename is specified, a temporary file is created.
1286 1286 bz2 compression can be turned off.
1287 1287 The bundle file will be deleted in case of errors.
1288 1288 """
1289 1289
1290 1290 if bundletype == "HG20":
1291 1291 bundle = bundle20(ui)
1292 1292 bundle.setcompression(compression)
1293 1293 part = bundle.newpart('changegroup', data=cg.getchunks())
1294 1294 part.addparam('version', cg.version)
1295 1295 chunkiter = bundle.getchunks()
1296 1296 else:
1297 1297 # compression argument is only for the bundle2 case
1298 1298 assert compression is None
1299 1299 if cg.version != '01':
1300 1300 raise error.Abort(_('old bundle types only supports v1 '
1301 1301 'changegroups'))
1302 1302 header, comp = bundletypes[bundletype]
1303 1303 if comp not in util.compressors:
1304 1304 raise error.Abort(_('unknown stream compression type: %s')
1305 1305 % comp)
1306 1306 z = util.compressors[comp]()
1307 1307 subchunkiter = cg.getchunks()
1308 1308 def chunkiter():
1309 1309 yield header
1310 1310 for chunk in subchunkiter:
1311 1311 yield z.compress(chunk)
1312 1312 yield z.flush()
1313 1313 chunkiter = chunkiter()
1314 1314
1315 1315 # parse the changegroup data, otherwise we will block
1316 1316 # in case of sshrepo because we don't know the end of the stream
1317
1318 # an empty chunkgroup is the end of the changegroup
1319 # a changegroup has at least 2 chunkgroups (changelog and manifest).
1320 # after that, an empty chunkgroup is the end of the changegroup
1321 1317 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1322 1318
1323 1319 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1324 1320 def handlechangegroup(op, inpart):
1325 1321 """apply a changegroup part on the repo
1326 1322
1327 1323 This is a very early implementation that will massive rework before being
1328 1324 inflicted to any end-user.
1329 1325 """
1330 1326 # Make sure we trigger a transaction creation
1331 1327 #
1332 1328 # The addchangegroup function will get a transaction object by itself, but
1333 1329 # we need to make sure we trigger the creation of a transaction object used
1334 1330 # for the whole processing scope.
1335 1331 op.gettransaction()
1336 1332 unpackerversion = inpart.params.get('version', '01')
1337 1333 # We should raise an appropriate exception here
1338 1334 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1339 1335 # the source and url passed here are overwritten by the one contained in
1340 1336 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1341 1337 nbchangesets = None
1342 1338 if 'nbchanges' in inpart.params:
1343 1339 nbchangesets = int(inpart.params.get('nbchanges'))
1344 1340 if ('treemanifest' in inpart.params and
1345 1341 'treemanifest' not in op.repo.requirements):
1346 1342 if len(op.repo.changelog) != 0:
1347 1343 raise error.Abort(_(
1348 1344 "bundle contains tree manifests, but local repo is "
1349 1345 "non-empty and does not use tree manifests"))
1350 1346 op.repo.requirements.add('treemanifest')
1351 1347 op.repo._applyopenerreqs()
1352 1348 op.repo._writerequirements()
1353 1349 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1354 1350 op.records.add('changegroup', {'return': ret})
1355 1351 if op.reply is not None:
1356 1352 # This is definitely not the final form of this
1357 1353 # return. But one need to start somewhere.
1358 1354 part = op.reply.newpart('reply:changegroup', mandatory=False)
1359 1355 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1360 1356 part.addparam('return', '%i' % ret, mandatory=False)
1361 1357 assert not inpart.read()
1362 1358
1363 1359 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1364 1360 ['digest:%s' % k for k in util.DIGESTS.keys()])
1365 1361 @parthandler('remote-changegroup', _remotechangegroupparams)
1366 1362 def handleremotechangegroup(op, inpart):
1367 1363 """apply a bundle10 on the repo, given an url and validation information
1368 1364
1369 1365 All the information about the remote bundle to import are given as
1370 1366 parameters. The parameters include:
1371 1367 - url: the url to the bundle10.
1372 1368 - size: the bundle10 file size. It is used to validate what was
1373 1369 retrieved by the client matches the server knowledge about the bundle.
1374 1370 - digests: a space separated list of the digest types provided as
1375 1371 parameters.
1376 1372 - digest:<digest-type>: the hexadecimal representation of the digest with
1377 1373 that name. Like the size, it is used to validate what was retrieved by
1378 1374 the client matches what the server knows about the bundle.
1379 1375
1380 1376 When multiple digest types are given, all of them are checked.
1381 1377 """
1382 1378 try:
1383 1379 raw_url = inpart.params['url']
1384 1380 except KeyError:
1385 1381 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1386 1382 parsed_url = util.url(raw_url)
1387 1383 if parsed_url.scheme not in capabilities['remote-changegroup']:
1388 1384 raise error.Abort(_('remote-changegroup does not support %s urls') %
1389 1385 parsed_url.scheme)
1390 1386
1391 1387 try:
1392 1388 size = int(inpart.params['size'])
1393 1389 except ValueError:
1394 1390 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1395 1391 % 'size')
1396 1392 except KeyError:
1397 1393 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1398 1394
1399 1395 digests = {}
1400 1396 for typ in inpart.params.get('digests', '').split():
1401 1397 param = 'digest:%s' % typ
1402 1398 try:
1403 1399 value = inpart.params[param]
1404 1400 except KeyError:
1405 1401 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1406 1402 param)
1407 1403 digests[typ] = value
1408 1404
1409 1405 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1410 1406
1411 1407 # Make sure we trigger a transaction creation
1412 1408 #
1413 1409 # The addchangegroup function will get a transaction object by itself, but
1414 1410 # we need to make sure we trigger the creation of a transaction object used
1415 1411 # for the whole processing scope.
1416 1412 op.gettransaction()
1417 1413 from . import exchange
1418 1414 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1419 1415 if not isinstance(cg, changegroup.cg1unpacker):
1420 1416 raise error.Abort(_('%s: not a bundle version 1.0') %
1421 1417 util.hidepassword(raw_url))
1422 1418 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1423 1419 op.records.add('changegroup', {'return': ret})
1424 1420 if op.reply is not None:
1425 1421 # This is definitely not the final form of this
1426 1422 # return. But one need to start somewhere.
1427 1423 part = op.reply.newpart('reply:changegroup')
1428 1424 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1429 1425 part.addparam('return', '%i' % ret, mandatory=False)
1430 1426 try:
1431 1427 real_part.validate()
1432 1428 except error.Abort as e:
1433 1429 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1434 1430 (util.hidepassword(raw_url), str(e)))
1435 1431 assert not inpart.read()
1436 1432
1437 1433 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1438 1434 def handlereplychangegroup(op, inpart):
1439 1435 ret = int(inpart.params['return'])
1440 1436 replyto = int(inpart.params['in-reply-to'])
1441 1437 op.records.add('changegroup', {'return': ret}, replyto)
1442 1438
1443 1439 @parthandler('check:heads')
1444 1440 def handlecheckheads(op, inpart):
1445 1441 """check that head of the repo did not change
1446 1442
1447 1443 This is used to detect a push race when using unbundle.
1448 1444 This replaces the "heads" argument of unbundle."""
1449 1445 h = inpart.read(20)
1450 1446 heads = []
1451 1447 while len(h) == 20:
1452 1448 heads.append(h)
1453 1449 h = inpart.read(20)
1454 1450 assert not h
1455 1451 # Trigger a transaction so that we are guaranteed to have the lock now.
1456 1452 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1457 1453 op.gettransaction()
1458 1454 if heads != op.repo.heads():
1459 1455 raise error.PushRaced('repository changed while pushing - '
1460 1456 'please try again')
1461 1457
1462 1458 @parthandler('output')
1463 1459 def handleoutput(op, inpart):
1464 1460 """forward output captured on the server to the client"""
1465 1461 for line in inpart.read().splitlines():
1466 1462 op.ui.status(('remote: %s\n' % line))
1467 1463
1468 1464 @parthandler('replycaps')
1469 1465 def handlereplycaps(op, inpart):
1470 1466 """Notify that a reply bundle should be created
1471 1467
1472 1468 The payload contains the capabilities information for the reply"""
1473 1469 caps = decodecaps(inpart.read())
1474 1470 if op.reply is None:
1475 1471 op.reply = bundle20(op.ui, caps)
1476 1472
1477 1473 class AbortFromPart(error.Abort):
1478 1474 """Sub-class of Abort that denotes an error from a bundle2 part."""
1479 1475
1480 1476 @parthandler('error:abort', ('message', 'hint'))
1481 1477 def handleerrorabort(op, inpart):
1482 1478 """Used to transmit abort error over the wire"""
1483 1479 raise AbortFromPart(inpart.params['message'],
1484 1480 hint=inpart.params.get('hint'))
1485 1481
1486 1482 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1487 1483 'in-reply-to'))
1488 1484 def handleerrorpushkey(op, inpart):
1489 1485 """Used to transmit failure of a mandatory pushkey over the wire"""
1490 1486 kwargs = {}
1491 1487 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1492 1488 value = inpart.params.get(name)
1493 1489 if value is not None:
1494 1490 kwargs[name] = value
1495 1491 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1496 1492
1497 1493 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1498 1494 def handleerrorunsupportedcontent(op, inpart):
1499 1495 """Used to transmit unknown content error over the wire"""
1500 1496 kwargs = {}
1501 1497 parttype = inpart.params.get('parttype')
1502 1498 if parttype is not None:
1503 1499 kwargs['parttype'] = parttype
1504 1500 params = inpart.params.get('params')
1505 1501 if params is not None:
1506 1502 kwargs['params'] = params.split('\0')
1507 1503
1508 1504 raise error.BundleUnknownFeatureError(**kwargs)
1509 1505
1510 1506 @parthandler('error:pushraced', ('message',))
1511 1507 def handleerrorpushraced(op, inpart):
1512 1508 """Used to transmit push race error over the wire"""
1513 1509 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1514 1510
1515 1511 @parthandler('listkeys', ('namespace',))
1516 1512 def handlelistkeys(op, inpart):
1517 1513 """retrieve pushkey namespace content stored in a bundle2"""
1518 1514 namespace = inpart.params['namespace']
1519 1515 r = pushkey.decodekeys(inpart.read())
1520 1516 op.records.add('listkeys', (namespace, r))
1521 1517
1522 1518 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1523 1519 def handlepushkey(op, inpart):
1524 1520 """process a pushkey request"""
1525 1521 dec = pushkey.decode
1526 1522 namespace = dec(inpart.params['namespace'])
1527 1523 key = dec(inpart.params['key'])
1528 1524 old = dec(inpart.params['old'])
1529 1525 new = dec(inpart.params['new'])
1530 1526 # Grab the transaction to ensure that we have the lock before performing the
1531 1527 # pushkey.
1532 1528 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1533 1529 op.gettransaction()
1534 1530 ret = op.repo.pushkey(namespace, key, old, new)
1535 1531 record = {'namespace': namespace,
1536 1532 'key': key,
1537 1533 'old': old,
1538 1534 'new': new}
1539 1535 op.records.add('pushkey', record)
1540 1536 if op.reply is not None:
1541 1537 rpart = op.reply.newpart('reply:pushkey')
1542 1538 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1543 1539 rpart.addparam('return', '%i' % ret, mandatory=False)
1544 1540 if inpart.mandatory and not ret:
1545 1541 kwargs = {}
1546 1542 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1547 1543 if key in inpart.params:
1548 1544 kwargs[key] = inpart.params[key]
1549 1545 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1550 1546
1551 1547 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1552 1548 def handlepushkeyreply(op, inpart):
1553 1549 """retrieve the result of a pushkey request"""
1554 1550 ret = int(inpart.params['return'])
1555 1551 partid = int(inpart.params['in-reply-to'])
1556 1552 op.records.add('pushkey', {'return': ret}, partid)
1557 1553
1558 1554 @parthandler('obsmarkers')
1559 1555 def handleobsmarker(op, inpart):
1560 1556 """add a stream of obsmarkers to the repo"""
1561 1557 tr = op.gettransaction()
1562 1558 markerdata = inpart.read()
1563 1559 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1564 1560 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1565 1561 % len(markerdata))
1566 1562 # The mergemarkers call will crash if marker creation is not enabled.
1567 1563 # we want to avoid this if the part is advisory.
1568 1564 if not inpart.mandatory and op.repo.obsstore.readonly:
1569 1565 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1570 1566 return
1571 1567 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1572 1568 if new:
1573 1569 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1574 1570 op.records.add('obsmarkers', {'new': new})
1575 1571 if op.reply is not None:
1576 1572 rpart = op.reply.newpart('reply:obsmarkers')
1577 1573 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1578 1574 rpart.addparam('new', '%i' % new, mandatory=False)
1579 1575
1580 1576
1581 1577 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1582 1578 def handleobsmarkerreply(op, inpart):
1583 1579 """retrieve the result of a pushkey request"""
1584 1580 ret = int(inpart.params['new'])
1585 1581 partid = int(inpart.params['in-reply-to'])
1586 1582 op.records.add('obsmarkers', {'new': ret}, partid)
1587 1583
1588 1584 @parthandler('hgtagsfnodes')
1589 1585 def handlehgtagsfnodes(op, inpart):
1590 1586 """Applies .hgtags fnodes cache entries to the local repo.
1591 1587
1592 1588 Payload is pairs of 20 byte changeset nodes and filenodes.
1593 1589 """
1594 1590 # Grab the transaction so we ensure that we have the lock at this point.
1595 1591 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1596 1592 op.gettransaction()
1597 1593 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1598 1594
1599 1595 count = 0
1600 1596 while True:
1601 1597 node = inpart.read(20)
1602 1598 fnode = inpart.read(20)
1603 1599 if len(node) < 20 or len(fnode) < 20:
1604 1600 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1605 1601 break
1606 1602 cache.setfnode(node, fnode)
1607 1603 count += 1
1608 1604
1609 1605 cache.write()
1610 1606 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now