##// END OF EJS Templates
bundle2: don't assume ordering of heads checked after push...
Mads Kiilerich -
r29294:077d0535 stable
parent child Browse files
Show More
@@ -1,1608 +1,1608 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 pushkey,
162 162 tags,
163 163 url,
164 164 util,
165 165 )
166 166
167 167 urlerr = util.urlerr
168 168 urlreq = util.urlreq
169 169
170 170 _pack = struct.pack
171 171 _unpack = struct.unpack
172 172
173 173 _fstreamparamsize = '>i'
174 174 _fpartheadersize = '>i'
175 175 _fparttypesize = '>B'
176 176 _fpartid = '>I'
177 177 _fpayloadsize = '>i'
178 178 _fpartparamcount = '>BB'
179 179
180 180 preferedchunksize = 4096
181 181
182 182 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
183 183
184 184 def outdebug(ui, message):
185 185 """debug regarding output stream (bundling)"""
186 186 if ui.configbool('devel', 'bundle2.debug', False):
187 187 ui.debug('bundle2-output: %s\n' % message)
188 188
189 189 def indebug(ui, message):
190 190 """debug on input stream (unbundling)"""
191 191 if ui.configbool('devel', 'bundle2.debug', False):
192 192 ui.debug('bundle2-input: %s\n' % message)
193 193
194 194 def validateparttype(parttype):
195 195 """raise ValueError if a parttype contains invalid character"""
196 196 if _parttypeforbidden.search(parttype):
197 197 raise ValueError(parttype)
198 198
199 199 def _makefpartparamsizes(nbparams):
200 200 """return a struct format to read part parameter sizes
201 201
202 202 The number parameters is variable so we need to build that format
203 203 dynamically.
204 204 """
205 205 return '>'+('BB'*nbparams)
206 206
207 207 parthandlermapping = {}
208 208
209 209 def parthandler(parttype, params=()):
210 210 """decorator that register a function as a bundle2 part handler
211 211
212 212 eg::
213 213
214 214 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
215 215 def myparttypehandler(...):
216 216 '''process a part of type "my part".'''
217 217 ...
218 218 """
219 219 validateparttype(parttype)
220 220 def _decorator(func):
221 221 lparttype = parttype.lower() # enforce lower case matching.
222 222 assert lparttype not in parthandlermapping
223 223 parthandlermapping[lparttype] = func
224 224 func.params = frozenset(params)
225 225 return func
226 226 return _decorator
227 227
228 228 class unbundlerecords(object):
229 229 """keep record of what happens during and unbundle
230 230
231 231 New records are added using `records.add('cat', obj)`. Where 'cat' is a
232 232 category of record and obj is an arbitrary object.
233 233
234 234 `records['cat']` will return all entries of this category 'cat'.
235 235
236 236 Iterating on the object itself will yield `('category', obj)` tuples
237 237 for all entries.
238 238
239 239 All iterations happens in chronological order.
240 240 """
241 241
242 242 def __init__(self):
243 243 self._categories = {}
244 244 self._sequences = []
245 245 self._replies = {}
246 246
247 247 def add(self, category, entry, inreplyto=None):
248 248 """add a new record of a given category.
249 249
250 250 The entry can then be retrieved in the list returned by
251 251 self['category']."""
252 252 self._categories.setdefault(category, []).append(entry)
253 253 self._sequences.append((category, entry))
254 254 if inreplyto is not None:
255 255 self.getreplies(inreplyto).add(category, entry)
256 256
257 257 def getreplies(self, partid):
258 258 """get the records that are replies to a specific part"""
259 259 return self._replies.setdefault(partid, unbundlerecords())
260 260
261 261 def __getitem__(self, cat):
262 262 return tuple(self._categories.get(cat, ()))
263 263
264 264 def __iter__(self):
265 265 return iter(self._sequences)
266 266
267 267 def __len__(self):
268 268 return len(self._sequences)
269 269
270 270 def __nonzero__(self):
271 271 return bool(self._sequences)
272 272
273 273 class bundleoperation(object):
274 274 """an object that represents a single bundling process
275 275
276 276 Its purpose is to carry unbundle-related objects and states.
277 277
278 278 A new object should be created at the beginning of each bundle processing.
279 279 The object is to be returned by the processing function.
280 280
281 281 The object has very little content now it will ultimately contain:
282 282 * an access to the repo the bundle is applied to,
283 283 * a ui object,
284 284 * a way to retrieve a transaction to add changes to the repo,
285 285 * a way to record the result of processing each part,
286 286 * a way to construct a bundle response when applicable.
287 287 """
288 288
289 289 def __init__(self, repo, transactiongetter, captureoutput=True):
290 290 self.repo = repo
291 291 self.ui = repo.ui
292 292 self.records = unbundlerecords()
293 293 self.gettransaction = transactiongetter
294 294 self.reply = None
295 295 self.captureoutput = captureoutput
296 296
297 297 class TransactionUnavailable(RuntimeError):
298 298 pass
299 299
300 300 def _notransaction():
301 301 """default method to get a transaction while processing a bundle
302 302
303 303 Raise an exception to highlight the fact that no transaction was expected
304 304 to be created"""
305 305 raise TransactionUnavailable()
306 306
307 307 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
308 308 # transform me into unbundler.apply() as soon as the freeze is lifted
309 309 tr.hookargs['bundle2'] = '1'
310 310 if source is not None and 'source' not in tr.hookargs:
311 311 tr.hookargs['source'] = source
312 312 if url is not None and 'url' not in tr.hookargs:
313 313 tr.hookargs['url'] = url
314 314 return processbundle(repo, unbundler, lambda: tr, op=op)
315 315
316 316 def processbundle(repo, unbundler, transactiongetter=None, op=None):
317 317 """This function process a bundle, apply effect to/from a repo
318 318
319 319 It iterates over each part then searches for and uses the proper handling
320 320 code to process the part. Parts are processed in order.
321 321
322 322 This is very early version of this function that will be strongly reworked
323 323 before final usage.
324 324
325 325 Unknown Mandatory part will abort the process.
326 326
327 327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 328 function. This is used to ensure output is properly propagated in case of
329 329 an error during the unbundling. This output capturing part will likely be
330 330 reworked and this ability will probably go away in the process.
331 331 """
332 332 if op is None:
333 333 if transactiongetter is None:
334 334 transactiongetter = _notransaction
335 335 op = bundleoperation(repo, transactiongetter)
336 336 # todo:
337 337 # - replace this is a init function soon.
338 338 # - exception catching
339 339 unbundler.params
340 340 if repo.ui.debugflag:
341 341 msg = ['bundle2-input-bundle:']
342 342 if unbundler.params:
343 343 msg.append(' %i params')
344 344 if op.gettransaction is None:
345 345 msg.append(' no-transaction')
346 346 else:
347 347 msg.append(' with-transaction')
348 348 msg.append('\n')
349 349 repo.ui.debug(''.join(msg))
350 350 iterparts = enumerate(unbundler.iterparts())
351 351 part = None
352 352 nbpart = 0
353 353 try:
354 354 for nbpart, part in iterparts:
355 355 _processpart(op, part)
356 356 except BaseException as exc:
357 357 for nbpart, part in iterparts:
358 358 # consume the bundle content
359 359 part.seek(0, 2)
360 360 # Small hack to let caller code distinguish exceptions from bundle2
361 361 # processing from processing the old format. This is mostly
362 362 # needed to handle different return codes to unbundle according to the
363 363 # type of bundle. We should probably clean up or drop this return code
364 364 # craziness in a future version.
365 365 exc.duringunbundle2 = True
366 366 salvaged = []
367 367 replycaps = None
368 368 if op.reply is not None:
369 369 salvaged = op.reply.salvageoutput()
370 370 replycaps = op.reply.capabilities
371 371 exc._replycaps = replycaps
372 372 exc._bundle2salvagedoutput = salvaged
373 373 raise
374 374 finally:
375 375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376 376
377 377 return op
378 378
379 379 def _processpart(op, part):
380 380 """process a single part from a bundle
381 381
382 382 The part is guaranteed to have been fully consumed when the function exits
383 383 (even if an exception is raised)."""
384 384 status = 'unknown' # used by debug output
385 385 try:
386 386 try:
387 387 handler = parthandlermapping.get(part.type)
388 388 if handler is None:
389 389 status = 'unsupported-type'
390 390 raise error.BundleUnknownFeatureError(parttype=part.type)
391 391 indebug(op.ui, 'found a handler for part %r' % part.type)
392 392 unknownparams = part.mandatorykeys - handler.params
393 393 if unknownparams:
394 394 unknownparams = list(unknownparams)
395 395 unknownparams.sort()
396 396 status = 'unsupported-params (%s)' % unknownparams
397 397 raise error.BundleUnknownFeatureError(parttype=part.type,
398 398 params=unknownparams)
399 399 status = 'supported'
400 400 except error.BundleUnknownFeatureError as exc:
401 401 if part.mandatory: # mandatory parts
402 402 raise
403 403 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
404 404 return # skip to part processing
405 405 finally:
406 406 if op.ui.debugflag:
407 407 msg = ['bundle2-input-part: "%s"' % part.type]
408 408 if not part.mandatory:
409 409 msg.append(' (advisory)')
410 410 nbmp = len(part.mandatorykeys)
411 411 nbap = len(part.params) - nbmp
412 412 if nbmp or nbap:
413 413 msg.append(' (params:')
414 414 if nbmp:
415 415 msg.append(' %i mandatory' % nbmp)
416 416 if nbap:
417 417 msg.append(' %i advisory' % nbmp)
418 418 msg.append(')')
419 419 msg.append(' %s\n' % status)
420 420 op.ui.debug(''.join(msg))
421 421
422 422 # handler is called outside the above try block so that we don't
423 423 # risk catching KeyErrors from anything other than the
424 424 # parthandlermapping lookup (any KeyError raised by handler()
425 425 # itself represents a defect of a different variety).
426 426 output = None
427 427 if op.captureoutput and op.reply is not None:
428 428 op.ui.pushbuffer(error=True, subproc=True)
429 429 output = ''
430 430 try:
431 431 handler(op, part)
432 432 finally:
433 433 if output is not None:
434 434 output = op.ui.popbuffer()
435 435 if output:
436 436 outpart = op.reply.newpart('output', data=output,
437 437 mandatory=False)
438 438 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
439 439 finally:
440 440 # consume the part content to not corrupt the stream.
441 441 part.seek(0, 2)
442 442
443 443
444 444 def decodecaps(blob):
445 445 """decode a bundle2 caps bytes blob into a dictionary
446 446
447 447 The blob is a list of capabilities (one per line)
448 448 Capabilities may have values using a line of the form::
449 449
450 450 capability=value1,value2,value3
451 451
452 452 The values are always a list."""
453 453 caps = {}
454 454 for line in blob.splitlines():
455 455 if not line:
456 456 continue
457 457 if '=' not in line:
458 458 key, vals = line, ()
459 459 else:
460 460 key, vals = line.split('=', 1)
461 461 vals = vals.split(',')
462 462 key = urlreq.unquote(key)
463 463 vals = [urlreq.unquote(v) for v in vals]
464 464 caps[key] = vals
465 465 return caps
466 466
467 467 def encodecaps(caps):
468 468 """encode a bundle2 caps dictionary into a bytes blob"""
469 469 chunks = []
470 470 for ca in sorted(caps):
471 471 vals = caps[ca]
472 472 ca = urlreq.quote(ca)
473 473 vals = [urlreq.quote(v) for v in vals]
474 474 if vals:
475 475 ca = "%s=%s" % (ca, ','.join(vals))
476 476 chunks.append(ca)
477 477 return '\n'.join(chunks)
478 478
479 479 bundletypes = {
480 480 "": ("", None), # only when using unbundle on ssh and old http servers
481 481 # since the unification ssh accepts a header but there
482 482 # is no capability signaling it.
483 483 "HG20": (), # special-cased below
484 484 "HG10UN": ("HG10UN", None),
485 485 "HG10BZ": ("HG10", 'BZ'),
486 486 "HG10GZ": ("HG10GZ", 'GZ'),
487 487 }
488 488
489 489 # hgweb uses this list to communicate its preferred type
490 490 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
491 491
492 492 class bundle20(object):
493 493 """represent an outgoing bundle2 container
494 494
495 495 Use the `addparam` method to add stream level parameter. and `newpart` to
496 496 populate it. Then call `getchunks` to retrieve all the binary chunks of
497 497 data that compose the bundle2 container."""
498 498
499 499 _magicstring = 'HG20'
500 500
501 501 def __init__(self, ui, capabilities=()):
502 502 self.ui = ui
503 503 self._params = []
504 504 self._parts = []
505 505 self.capabilities = dict(capabilities)
506 506 self._compressor = util.compressors[None]()
507 507
508 508 def setcompression(self, alg):
509 509 """setup core part compression to <alg>"""
510 510 if alg is None:
511 511 return
512 512 assert not any(n.lower() == 'Compression' for n, v in self._params)
513 513 self.addparam('Compression', alg)
514 514 self._compressor = util.compressors[alg]()
515 515
516 516 @property
517 517 def nbparts(self):
518 518 """total number of parts added to the bundler"""
519 519 return len(self._parts)
520 520
521 521 # methods used to defines the bundle2 content
522 522 def addparam(self, name, value=None):
523 523 """add a stream level parameter"""
524 524 if not name:
525 525 raise ValueError('empty parameter name')
526 526 if name[0] not in string.letters:
527 527 raise ValueError('non letter first character: %r' % name)
528 528 self._params.append((name, value))
529 529
530 530 def addpart(self, part):
531 531 """add a new part to the bundle2 container
532 532
533 533 Parts contains the actual applicative payload."""
534 534 assert part.id is None
535 535 part.id = len(self._parts) # very cheap counter
536 536 self._parts.append(part)
537 537
538 538 def newpart(self, typeid, *args, **kwargs):
539 539 """create a new part and add it to the containers
540 540
541 541 As the part is directly added to the containers. For now, this means
542 542 that any failure to properly initialize the part after calling
543 543 ``newpart`` should result in a failure of the whole bundling process.
544 544
545 545 You can still fall back to manually create and add if you need better
546 546 control."""
547 547 part = bundlepart(typeid, *args, **kwargs)
548 548 self.addpart(part)
549 549 return part
550 550
551 551 # methods used to generate the bundle2 stream
552 552 def getchunks(self):
553 553 if self.ui.debugflag:
554 554 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
555 555 if self._params:
556 556 msg.append(' (%i params)' % len(self._params))
557 557 msg.append(' %i parts total\n' % len(self._parts))
558 558 self.ui.debug(''.join(msg))
559 559 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
560 560 yield self._magicstring
561 561 param = self._paramchunk()
562 562 outdebug(self.ui, 'bundle parameter: %s' % param)
563 563 yield _pack(_fstreamparamsize, len(param))
564 564 if param:
565 565 yield param
566 566 # starting compression
567 567 for chunk in self._getcorechunk():
568 568 yield self._compressor.compress(chunk)
569 569 yield self._compressor.flush()
570 570
571 571 def _paramchunk(self):
572 572 """return a encoded version of all stream parameters"""
573 573 blocks = []
574 574 for par, value in self._params:
575 575 par = urlreq.quote(par)
576 576 if value is not None:
577 577 value = urlreq.quote(value)
578 578 par = '%s=%s' % (par, value)
579 579 blocks.append(par)
580 580 return ' '.join(blocks)
581 581
582 582 def _getcorechunk(self):
583 583 """yield chunk for the core part of the bundle
584 584
585 585 (all but headers and parameters)"""
586 586 outdebug(self.ui, 'start of parts')
587 587 for part in self._parts:
588 588 outdebug(self.ui, 'bundle part: "%s"' % part.type)
589 589 for chunk in part.getchunks(ui=self.ui):
590 590 yield chunk
591 591 outdebug(self.ui, 'end of bundle')
592 592 yield _pack(_fpartheadersize, 0)
593 593
594 594
595 595 def salvageoutput(self):
596 596 """return a list with a copy of all output parts in the bundle
597 597
598 598 This is meant to be used during error handling to make sure we preserve
599 599 server output"""
600 600 salvaged = []
601 601 for part in self._parts:
602 602 if part.type.startswith('output'):
603 603 salvaged.append(part.copy())
604 604 return salvaged
605 605
606 606
607 607 class unpackermixin(object):
608 608 """A mixin to extract bytes and struct data from a stream"""
609 609
610 610 def __init__(self, fp):
611 611 self._fp = fp
612 612 self._seekable = (util.safehasattr(fp, 'seek') and
613 613 util.safehasattr(fp, 'tell'))
614 614
615 615 def _unpack(self, format):
616 616 """unpack this struct format from the stream"""
617 617 data = self._readexact(struct.calcsize(format))
618 618 return _unpack(format, data)
619 619
620 620 def _readexact(self, size):
621 621 """read exactly <size> bytes from the stream"""
622 622 return changegroup.readexactly(self._fp, size)
623 623
624 624 def seek(self, offset, whence=0):
625 625 """move the underlying file pointer"""
626 626 if self._seekable:
627 627 return self._fp.seek(offset, whence)
628 628 else:
629 629 raise NotImplementedError(_('File pointer is not seekable'))
630 630
631 631 def tell(self):
632 632 """return the file offset, or None if file is not seekable"""
633 633 if self._seekable:
634 634 try:
635 635 return self._fp.tell()
636 636 except IOError as e:
637 637 if e.errno == errno.ESPIPE:
638 638 self._seekable = False
639 639 else:
640 640 raise
641 641 return None
642 642
643 643 def close(self):
644 644 """close underlying file"""
645 645 if util.safehasattr(self._fp, 'close'):
646 646 return self._fp.close()
647 647
648 648 def getunbundler(ui, fp, magicstring=None):
649 649 """return a valid unbundler object for a given magicstring"""
650 650 if magicstring is None:
651 651 magicstring = changegroup.readexactly(fp, 4)
652 652 magic, version = magicstring[0:2], magicstring[2:4]
653 653 if magic != 'HG':
654 654 raise error.Abort(_('not a Mercurial bundle'))
655 655 unbundlerclass = formatmap.get(version)
656 656 if unbundlerclass is None:
657 657 raise error.Abort(_('unknown bundle version %s') % version)
658 658 unbundler = unbundlerclass(ui, fp)
659 659 indebug(ui, 'start processing of %s stream' % magicstring)
660 660 return unbundler
661 661
662 662 class unbundle20(unpackermixin):
663 663 """interpret a bundle2 stream
664 664
665 665 This class is fed with a binary stream and yields parts through its
666 666 `iterparts` methods."""
667 667
668 668 _magicstring = 'HG20'
669 669
670 670 def __init__(self, ui, fp):
671 671 """If header is specified, we do not read it out of the stream."""
672 672 self.ui = ui
673 673 self._decompressor = util.decompressors[None]
674 674 self._compressed = None
675 675 super(unbundle20, self).__init__(fp)
676 676
677 677 @util.propertycache
678 678 def params(self):
679 679 """dictionary of stream level parameters"""
680 680 indebug(self.ui, 'reading bundle2 stream parameters')
681 681 params = {}
682 682 paramssize = self._unpack(_fstreamparamsize)[0]
683 683 if paramssize < 0:
684 684 raise error.BundleValueError('negative bundle param size: %i'
685 685 % paramssize)
686 686 if paramssize:
687 687 params = self._readexact(paramssize)
688 688 params = self._processallparams(params)
689 689 return params
690 690
691 691 def _processallparams(self, paramsblock):
692 692 """"""
693 693 params = {}
694 694 for p in paramsblock.split(' '):
695 695 p = p.split('=', 1)
696 696 p = [urlreq.unquote(i) for i in p]
697 697 if len(p) < 2:
698 698 p.append(None)
699 699 self._processparam(*p)
700 700 params[p[0]] = p[1]
701 701 return params
702 702
703 703
704 704 def _processparam(self, name, value):
705 705 """process a parameter, applying its effect if needed
706 706
707 707 Parameter starting with a lower case letter are advisory and will be
708 708 ignored when unknown. Those starting with an upper case letter are
709 709 mandatory and will this function will raise a KeyError when unknown.
710 710
711 711 Note: no option are currently supported. Any input will be either
712 712 ignored or failing.
713 713 """
714 714 if not name:
715 715 raise ValueError('empty parameter name')
716 716 if name[0] not in string.letters:
717 717 raise ValueError('non letter first character: %r' % name)
718 718 try:
719 719 handler = b2streamparamsmap[name.lower()]
720 720 except KeyError:
721 721 if name[0].islower():
722 722 indebug(self.ui, "ignoring unknown parameter %r" % name)
723 723 else:
724 724 raise error.BundleUnknownFeatureError(params=(name,))
725 725 else:
726 726 handler(self, name, value)
727 727
728 728 def _forwardchunks(self):
729 729 """utility to transfer a bundle2 as binary
730 730
731 731 This is made necessary by the fact the 'getbundle' command over 'ssh'
732 732 have no way to know then the reply end, relying on the bundle to be
733 733 interpreted to know its end. This is terrible and we are sorry, but we
734 734 needed to move forward to get general delta enabled.
735 735 """
736 736 yield self._magicstring
737 737 assert 'params' not in vars(self)
738 738 paramssize = self._unpack(_fstreamparamsize)[0]
739 739 if paramssize < 0:
740 740 raise error.BundleValueError('negative bundle param size: %i'
741 741 % paramssize)
742 742 yield _pack(_fstreamparamsize, paramssize)
743 743 if paramssize:
744 744 params = self._readexact(paramssize)
745 745 self._processallparams(params)
746 746 yield params
747 747 assert self._decompressor is util.decompressors[None]
748 748 # From there, payload might need to be decompressed
749 749 self._fp = self._decompressor(self._fp)
750 750 emptycount = 0
751 751 while emptycount < 2:
752 752 # so we can brainlessly loop
753 753 assert _fpartheadersize == _fpayloadsize
754 754 size = self._unpack(_fpartheadersize)[0]
755 755 yield _pack(_fpartheadersize, size)
756 756 if size:
757 757 emptycount = 0
758 758 else:
759 759 emptycount += 1
760 760 continue
761 761 if size == flaginterrupt:
762 762 continue
763 763 elif size < 0:
764 764 raise error.BundleValueError('negative chunk size: %i')
765 765 yield self._readexact(size)
766 766
767 767
768 768 def iterparts(self):
769 769 """yield all parts contained in the stream"""
770 770 # make sure param have been loaded
771 771 self.params
772 772 # From there, payload need to be decompressed
773 773 self._fp = self._decompressor(self._fp)
774 774 indebug(self.ui, 'start extraction of bundle2 parts')
775 775 headerblock = self._readpartheader()
776 776 while headerblock is not None:
777 777 part = unbundlepart(self.ui, headerblock, self._fp)
778 778 yield part
779 779 part.seek(0, 2)
780 780 headerblock = self._readpartheader()
781 781 indebug(self.ui, 'end of bundle2 stream')
782 782
783 783 def _readpartheader(self):
784 784 """reads a part header size and return the bytes blob
785 785
786 786 returns None if empty"""
787 787 headersize = self._unpack(_fpartheadersize)[0]
788 788 if headersize < 0:
789 789 raise error.BundleValueError('negative part header size: %i'
790 790 % headersize)
791 791 indebug(self.ui, 'part header size: %i' % headersize)
792 792 if headersize:
793 793 return self._readexact(headersize)
794 794 return None
795 795
796 796 def compressed(self):
797 797 self.params # load params
798 798 return self._compressed
799 799
800 800 formatmap = {'20': unbundle20}
801 801
802 802 b2streamparamsmap = {}
803 803
804 804 def b2streamparamhandler(name):
805 805 """register a handler for a stream level parameter"""
806 806 def decorator(func):
807 807 assert name not in formatmap
808 808 b2streamparamsmap[name] = func
809 809 return func
810 810 return decorator
811 811
812 812 @b2streamparamhandler('compression')
813 813 def processcompression(unbundler, param, value):
814 814 """read compression parameter and install payload decompression"""
815 815 if value not in util.decompressors:
816 816 raise error.BundleUnknownFeatureError(params=(param,),
817 817 values=(value,))
818 818 unbundler._decompressor = util.decompressors[value]
819 819 if value is not None:
820 820 unbundler._compressed = True
821 821
822 822 class bundlepart(object):
823 823 """A bundle2 part contains application level payload
824 824
825 825 The part `type` is used to route the part to the application level
826 826 handler.
827 827
828 828 The part payload is contained in ``part.data``. It could be raw bytes or a
829 829 generator of byte chunks.
830 830
831 831 You can add parameters to the part using the ``addparam`` method.
832 832 Parameters can be either mandatory (default) or advisory. Remote side
833 833 should be able to safely ignore the advisory ones.
834 834
835 835 Both data and parameters cannot be modified after the generation has begun.
836 836 """
837 837
838 838 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
839 839 data='', mandatory=True):
840 840 validateparttype(parttype)
841 841 self.id = None
842 842 self.type = parttype
843 843 self._data = data
844 844 self._mandatoryparams = list(mandatoryparams)
845 845 self._advisoryparams = list(advisoryparams)
846 846 # checking for duplicated entries
847 847 self._seenparams = set()
848 848 for pname, __ in self._mandatoryparams + self._advisoryparams:
849 849 if pname in self._seenparams:
850 850 raise RuntimeError('duplicated params: %s' % pname)
851 851 self._seenparams.add(pname)
852 852 # status of the part's generation:
853 853 # - None: not started,
854 854 # - False: currently generated,
855 855 # - True: generation done.
856 856 self._generated = None
857 857 self.mandatory = mandatory
858 858
859 859 def copy(self):
860 860 """return a copy of the part
861 861
862 862 The new part have the very same content but no partid assigned yet.
863 863 Parts with generated data cannot be copied."""
864 864 assert not util.safehasattr(self.data, 'next')
865 865 return self.__class__(self.type, self._mandatoryparams,
866 866 self._advisoryparams, self._data, self.mandatory)
867 867
868 868 # methods used to defines the part content
869 869 @property
870 870 def data(self):
871 871 return self._data
872 872
873 873 @data.setter
874 874 def data(self, data):
875 875 if self._generated is not None:
876 876 raise error.ReadOnlyPartError('part is being generated')
877 877 self._data = data
878 878
879 879 @property
880 880 def mandatoryparams(self):
881 881 # make it an immutable tuple to force people through ``addparam``
882 882 return tuple(self._mandatoryparams)
883 883
884 884 @property
885 885 def advisoryparams(self):
886 886 # make it an immutable tuple to force people through ``addparam``
887 887 return tuple(self._advisoryparams)
888 888
889 889 def addparam(self, name, value='', mandatory=True):
890 890 if self._generated is not None:
891 891 raise error.ReadOnlyPartError('part is being generated')
892 892 if name in self._seenparams:
893 893 raise ValueError('duplicated params: %s' % name)
894 894 self._seenparams.add(name)
895 895 params = self._advisoryparams
896 896 if mandatory:
897 897 params = self._mandatoryparams
898 898 params.append((name, value))
899 899
900 900 # methods used to generates the bundle2 stream
901 901 def getchunks(self, ui):
902 902 if self._generated is not None:
903 903 raise RuntimeError('part can only be consumed once')
904 904 self._generated = False
905 905
906 906 if ui.debugflag:
907 907 msg = ['bundle2-output-part: "%s"' % self.type]
908 908 if not self.mandatory:
909 909 msg.append(' (advisory)')
910 910 nbmp = len(self.mandatoryparams)
911 911 nbap = len(self.advisoryparams)
912 912 if nbmp or nbap:
913 913 msg.append(' (params:')
914 914 if nbmp:
915 915 msg.append(' %i mandatory' % nbmp)
916 916 if nbap:
917 917 msg.append(' %i advisory' % nbmp)
918 918 msg.append(')')
919 919 if not self.data:
920 920 msg.append(' empty payload')
921 921 elif util.safehasattr(self.data, 'next'):
922 922 msg.append(' streamed payload')
923 923 else:
924 924 msg.append(' %i bytes payload' % len(self.data))
925 925 msg.append('\n')
926 926 ui.debug(''.join(msg))
927 927
928 928 #### header
929 929 if self.mandatory:
930 930 parttype = self.type.upper()
931 931 else:
932 932 parttype = self.type.lower()
933 933 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
934 934 ## parttype
935 935 header = [_pack(_fparttypesize, len(parttype)),
936 936 parttype, _pack(_fpartid, self.id),
937 937 ]
938 938 ## parameters
939 939 # count
940 940 manpar = self.mandatoryparams
941 941 advpar = self.advisoryparams
942 942 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
943 943 # size
944 944 parsizes = []
945 945 for key, value in manpar:
946 946 parsizes.append(len(key))
947 947 parsizes.append(len(value))
948 948 for key, value in advpar:
949 949 parsizes.append(len(key))
950 950 parsizes.append(len(value))
951 951 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
952 952 header.append(paramsizes)
953 953 # key, value
954 954 for key, value in manpar:
955 955 header.append(key)
956 956 header.append(value)
957 957 for key, value in advpar:
958 958 header.append(key)
959 959 header.append(value)
960 960 ## finalize header
961 961 headerchunk = ''.join(header)
962 962 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
963 963 yield _pack(_fpartheadersize, len(headerchunk))
964 964 yield headerchunk
965 965 ## payload
966 966 try:
967 967 for chunk in self._payloadchunks():
968 968 outdebug(ui, 'payload chunk size: %i' % len(chunk))
969 969 yield _pack(_fpayloadsize, len(chunk))
970 970 yield chunk
971 971 except GeneratorExit:
972 972 # GeneratorExit means that nobody is listening for our
973 973 # results anyway, so just bail quickly rather than trying
974 974 # to produce an error part.
975 975 ui.debug('bundle2-generatorexit\n')
976 976 raise
977 977 except BaseException as exc:
978 978 # backup exception data for later
979 979 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
980 980 % exc)
981 981 exc_info = sys.exc_info()
982 982 msg = 'unexpected error: %s' % exc
983 983 interpart = bundlepart('error:abort', [('message', msg)],
984 984 mandatory=False)
985 985 interpart.id = 0
986 986 yield _pack(_fpayloadsize, -1)
987 987 for chunk in interpart.getchunks(ui=ui):
988 988 yield chunk
989 989 outdebug(ui, 'closing payload chunk')
990 990 # abort current part payload
991 991 yield _pack(_fpayloadsize, 0)
992 992 raise exc_info[0], exc_info[1], exc_info[2]
993 993 # end of payload
994 994 outdebug(ui, 'closing payload chunk')
995 995 yield _pack(_fpayloadsize, 0)
996 996 self._generated = True
997 997
998 998 def _payloadchunks(self):
999 999 """yield chunks of a the part payload
1000 1000
1001 1001 Exists to handle the different methods to provide data to a part."""
1002 1002 # we only support fixed size data now.
1003 1003 # This will be improved in the future.
1004 1004 if util.safehasattr(self.data, 'next'):
1005 1005 buff = util.chunkbuffer(self.data)
1006 1006 chunk = buff.read(preferedchunksize)
1007 1007 while chunk:
1008 1008 yield chunk
1009 1009 chunk = buff.read(preferedchunksize)
1010 1010 elif len(self.data):
1011 1011 yield self.data
1012 1012
1013 1013
1014 1014 flaginterrupt = -1
1015 1015
1016 1016 class interrupthandler(unpackermixin):
1017 1017 """read one part and process it with restricted capability
1018 1018
1019 1019 This allows to transmit exception raised on the producer size during part
1020 1020 iteration while the consumer is reading a part.
1021 1021
1022 1022 Part processed in this manner only have access to a ui object,"""
1023 1023
1024 1024 def __init__(self, ui, fp):
1025 1025 super(interrupthandler, self).__init__(fp)
1026 1026 self.ui = ui
1027 1027
1028 1028 def _readpartheader(self):
1029 1029 """reads a part header size and return the bytes blob
1030 1030
1031 1031 returns None if empty"""
1032 1032 headersize = self._unpack(_fpartheadersize)[0]
1033 1033 if headersize < 0:
1034 1034 raise error.BundleValueError('negative part header size: %i'
1035 1035 % headersize)
1036 1036 indebug(self.ui, 'part header size: %i\n' % headersize)
1037 1037 if headersize:
1038 1038 return self._readexact(headersize)
1039 1039 return None
1040 1040
1041 1041 def __call__(self):
1042 1042
1043 1043 self.ui.debug('bundle2-input-stream-interrupt:'
1044 1044 ' opening out of band context\n')
1045 1045 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1046 1046 headerblock = self._readpartheader()
1047 1047 if headerblock is None:
1048 1048 indebug(self.ui, 'no part found during interruption.')
1049 1049 return
1050 1050 part = unbundlepart(self.ui, headerblock, self._fp)
1051 1051 op = interruptoperation(self.ui)
1052 1052 _processpart(op, part)
1053 1053 self.ui.debug('bundle2-input-stream-interrupt:'
1054 1054 ' closing out of band context\n')
1055 1055
1056 1056 class interruptoperation(object):
1057 1057 """A limited operation to be use by part handler during interruption
1058 1058
1059 1059 It only have access to an ui object.
1060 1060 """
1061 1061
1062 1062 def __init__(self, ui):
1063 1063 self.ui = ui
1064 1064 self.reply = None
1065 1065 self.captureoutput = False
1066 1066
1067 1067 @property
1068 1068 def repo(self):
1069 1069 raise RuntimeError('no repo access from stream interruption')
1070 1070
1071 1071 def gettransaction(self):
1072 1072 raise TransactionUnavailable('no repo access from stream interruption')
1073 1073
1074 1074 class unbundlepart(unpackermixin):
1075 1075 """a bundle part read from a bundle"""
1076 1076
1077 1077 def __init__(self, ui, header, fp):
1078 1078 super(unbundlepart, self).__init__(fp)
1079 1079 self.ui = ui
1080 1080 # unbundle state attr
1081 1081 self._headerdata = header
1082 1082 self._headeroffset = 0
1083 1083 self._initialized = False
1084 1084 self.consumed = False
1085 1085 # part data
1086 1086 self.id = None
1087 1087 self.type = None
1088 1088 self.mandatoryparams = None
1089 1089 self.advisoryparams = None
1090 1090 self.params = None
1091 1091 self.mandatorykeys = ()
1092 1092 self._payloadstream = None
1093 1093 self._readheader()
1094 1094 self._mandatory = None
1095 1095 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1096 1096 self._pos = 0
1097 1097
1098 1098 def _fromheader(self, size):
1099 1099 """return the next <size> byte from the header"""
1100 1100 offset = self._headeroffset
1101 1101 data = self._headerdata[offset:(offset + size)]
1102 1102 self._headeroffset = offset + size
1103 1103 return data
1104 1104
1105 1105 def _unpackheader(self, format):
1106 1106 """read given format from header
1107 1107
1108 1108 This automatically compute the size of the format to read."""
1109 1109 data = self._fromheader(struct.calcsize(format))
1110 1110 return _unpack(format, data)
1111 1111
1112 1112 def _initparams(self, mandatoryparams, advisoryparams):
1113 1113 """internal function to setup all logic related parameters"""
1114 1114 # make it read only to prevent people touching it by mistake.
1115 1115 self.mandatoryparams = tuple(mandatoryparams)
1116 1116 self.advisoryparams = tuple(advisoryparams)
1117 1117 # user friendly UI
1118 1118 self.params = dict(self.mandatoryparams)
1119 1119 self.params.update(dict(self.advisoryparams))
1120 1120 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1121 1121
1122 1122 def _payloadchunks(self, chunknum=0):
1123 1123 '''seek to specified chunk and start yielding data'''
1124 1124 if len(self._chunkindex) == 0:
1125 1125 assert chunknum == 0, 'Must start with chunk 0'
1126 1126 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1127 1127 else:
1128 1128 assert chunknum < len(self._chunkindex), \
1129 1129 'Unknown chunk %d' % chunknum
1130 1130 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1131 1131
1132 1132 pos = self._chunkindex[chunknum][0]
1133 1133 payloadsize = self._unpack(_fpayloadsize)[0]
1134 1134 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1135 1135 while payloadsize:
1136 1136 if payloadsize == flaginterrupt:
1137 1137 # interruption detection, the handler will now read a
1138 1138 # single part and process it.
1139 1139 interrupthandler(self.ui, self._fp)()
1140 1140 elif payloadsize < 0:
1141 1141 msg = 'negative payload chunk size: %i' % payloadsize
1142 1142 raise error.BundleValueError(msg)
1143 1143 else:
1144 1144 result = self._readexact(payloadsize)
1145 1145 chunknum += 1
1146 1146 pos += payloadsize
1147 1147 if chunknum == len(self._chunkindex):
1148 1148 self._chunkindex.append((pos,
1149 1149 super(unbundlepart, self).tell()))
1150 1150 yield result
1151 1151 payloadsize = self._unpack(_fpayloadsize)[0]
1152 1152 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1153 1153
1154 1154 def _findchunk(self, pos):
1155 1155 '''for a given payload position, return a chunk number and offset'''
1156 1156 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1157 1157 if ppos == pos:
1158 1158 return chunk, 0
1159 1159 elif ppos > pos:
1160 1160 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1161 1161 raise ValueError('Unknown chunk')
1162 1162
1163 1163 def _readheader(self):
1164 1164 """read the header and setup the object"""
1165 1165 typesize = self._unpackheader(_fparttypesize)[0]
1166 1166 self.type = self._fromheader(typesize)
1167 1167 indebug(self.ui, 'part type: "%s"' % self.type)
1168 1168 self.id = self._unpackheader(_fpartid)[0]
1169 1169 indebug(self.ui, 'part id: "%s"' % self.id)
1170 1170 # extract mandatory bit from type
1171 1171 self.mandatory = (self.type != self.type.lower())
1172 1172 self.type = self.type.lower()
1173 1173 ## reading parameters
1174 1174 # param count
1175 1175 mancount, advcount = self._unpackheader(_fpartparamcount)
1176 1176 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1177 1177 # param size
1178 1178 fparamsizes = _makefpartparamsizes(mancount + advcount)
1179 1179 paramsizes = self._unpackheader(fparamsizes)
1180 1180 # make it a list of couple again
1181 1181 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1182 1182 # split mandatory from advisory
1183 1183 mansizes = paramsizes[:mancount]
1184 1184 advsizes = paramsizes[mancount:]
1185 1185 # retrieve param value
1186 1186 manparams = []
1187 1187 for key, value in mansizes:
1188 1188 manparams.append((self._fromheader(key), self._fromheader(value)))
1189 1189 advparams = []
1190 1190 for key, value in advsizes:
1191 1191 advparams.append((self._fromheader(key), self._fromheader(value)))
1192 1192 self._initparams(manparams, advparams)
1193 1193 ## part payload
1194 1194 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1195 1195 # we read the data, tell it
1196 1196 self._initialized = True
1197 1197
1198 1198 def read(self, size=None):
1199 1199 """read payload data"""
1200 1200 if not self._initialized:
1201 1201 self._readheader()
1202 1202 if size is None:
1203 1203 data = self._payloadstream.read()
1204 1204 else:
1205 1205 data = self._payloadstream.read(size)
1206 1206 self._pos += len(data)
1207 1207 if size is None or len(data) < size:
1208 1208 if not self.consumed and self._pos:
1209 1209 self.ui.debug('bundle2-input-part: total payload size %i\n'
1210 1210 % self._pos)
1211 1211 self.consumed = True
1212 1212 return data
1213 1213
1214 1214 def tell(self):
1215 1215 return self._pos
1216 1216
1217 1217 def seek(self, offset, whence=0):
1218 1218 if whence == 0:
1219 1219 newpos = offset
1220 1220 elif whence == 1:
1221 1221 newpos = self._pos + offset
1222 1222 elif whence == 2:
1223 1223 if not self.consumed:
1224 1224 self.read()
1225 1225 newpos = self._chunkindex[-1][0] - offset
1226 1226 else:
1227 1227 raise ValueError('Unknown whence value: %r' % (whence,))
1228 1228
1229 1229 if newpos > self._chunkindex[-1][0] and not self.consumed:
1230 1230 self.read()
1231 1231 if not 0 <= newpos <= self._chunkindex[-1][0]:
1232 1232 raise ValueError('Offset out of range')
1233 1233
1234 1234 if self._pos != newpos:
1235 1235 chunk, internaloffset = self._findchunk(newpos)
1236 1236 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1237 1237 adjust = self.read(internaloffset)
1238 1238 if len(adjust) != internaloffset:
1239 1239 raise error.Abort(_('Seek failed\n'))
1240 1240 self._pos = newpos
1241 1241
1242 1242 # These are only the static capabilities.
1243 1243 # Check the 'getrepocaps' function for the rest.
1244 1244 capabilities = {'HG20': (),
1245 1245 'error': ('abort', 'unsupportedcontent', 'pushraced',
1246 1246 'pushkey'),
1247 1247 'listkeys': (),
1248 1248 'pushkey': (),
1249 1249 'digests': tuple(sorted(util.DIGESTS.keys())),
1250 1250 'remote-changegroup': ('http', 'https'),
1251 1251 'hgtagsfnodes': (),
1252 1252 }
1253 1253
1254 1254 def getrepocaps(repo, allowpushback=False):
1255 1255 """return the bundle2 capabilities for a given repo
1256 1256
1257 1257 Exists to allow extensions (like evolution) to mutate the capabilities.
1258 1258 """
1259 1259 caps = capabilities.copy()
1260 1260 caps['changegroup'] = tuple(sorted(
1261 1261 changegroup.supportedincomingversions(repo)))
1262 1262 if obsolete.isenabled(repo, obsolete.exchangeopt):
1263 1263 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1264 1264 caps['obsmarkers'] = supportedformat
1265 1265 if allowpushback:
1266 1266 caps['pushback'] = ()
1267 1267 return caps
1268 1268
1269 1269 def bundle2caps(remote):
1270 1270 """return the bundle capabilities of a peer as dict"""
1271 1271 raw = remote.capable('bundle2')
1272 1272 if not raw and raw != '':
1273 1273 return {}
1274 1274 capsblob = urlreq.unquote(remote.capable('bundle2'))
1275 1275 return decodecaps(capsblob)
1276 1276
1277 1277 def obsmarkersversion(caps):
1278 1278 """extract the list of supported obsmarkers versions from a bundle2caps dict
1279 1279 """
1280 1280 obscaps = caps.get('obsmarkers', ())
1281 1281 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1282 1282
1283 1283 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1284 1284 """Write a bundle file and return its filename.
1285 1285
1286 1286 Existing files will not be overwritten.
1287 1287 If no filename is specified, a temporary file is created.
1288 1288 bz2 compression can be turned off.
1289 1289 The bundle file will be deleted in case of errors.
1290 1290 """
1291 1291
1292 1292 if bundletype == "HG20":
1293 1293 bundle = bundle20(ui)
1294 1294 bundle.setcompression(compression)
1295 1295 part = bundle.newpart('changegroup', data=cg.getchunks())
1296 1296 part.addparam('version', cg.version)
1297 1297 chunkiter = bundle.getchunks()
1298 1298 else:
1299 1299 # compression argument is only for the bundle2 case
1300 1300 assert compression is None
1301 1301 if cg.version != '01':
1302 1302 raise error.Abort(_('old bundle types only supports v1 '
1303 1303 'changegroups'))
1304 1304 header, comp = bundletypes[bundletype]
1305 1305 if comp not in util.compressors:
1306 1306 raise error.Abort(_('unknown stream compression type: %s')
1307 1307 % comp)
1308 1308 z = util.compressors[comp]()
1309 1309 subchunkiter = cg.getchunks()
1310 1310 def chunkiter():
1311 1311 yield header
1312 1312 for chunk in subchunkiter:
1313 1313 yield z.compress(chunk)
1314 1314 yield z.flush()
1315 1315 chunkiter = chunkiter()
1316 1316
1317 1317 # parse the changegroup data, otherwise we will block
1318 1318 # in case of sshrepo because we don't know the end of the stream
1319 1319 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1320 1320
1321 1321 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1322 1322 def handlechangegroup(op, inpart):
1323 1323 """apply a changegroup part on the repo
1324 1324
1325 1325 This is a very early implementation that will massive rework before being
1326 1326 inflicted to any end-user.
1327 1327 """
1328 1328 # Make sure we trigger a transaction creation
1329 1329 #
1330 1330 # The addchangegroup function will get a transaction object by itself, but
1331 1331 # we need to make sure we trigger the creation of a transaction object used
1332 1332 # for the whole processing scope.
1333 1333 op.gettransaction()
1334 1334 unpackerversion = inpart.params.get('version', '01')
1335 1335 # We should raise an appropriate exception here
1336 1336 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1337 1337 # the source and url passed here are overwritten by the one contained in
1338 1338 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1339 1339 nbchangesets = None
1340 1340 if 'nbchanges' in inpart.params:
1341 1341 nbchangesets = int(inpart.params.get('nbchanges'))
1342 1342 if ('treemanifest' in inpart.params and
1343 1343 'treemanifest' not in op.repo.requirements):
1344 1344 if len(op.repo.changelog) != 0:
1345 1345 raise error.Abort(_(
1346 1346 "bundle contains tree manifests, but local repo is "
1347 1347 "non-empty and does not use tree manifests"))
1348 1348 op.repo.requirements.add('treemanifest')
1349 1349 op.repo._applyopenerreqs()
1350 1350 op.repo._writerequirements()
1351 1351 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1352 1352 op.records.add('changegroup', {'return': ret})
1353 1353 if op.reply is not None:
1354 1354 # This is definitely not the final form of this
1355 1355 # return. But one need to start somewhere.
1356 1356 part = op.reply.newpart('reply:changegroup', mandatory=False)
1357 1357 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1358 1358 part.addparam('return', '%i' % ret, mandatory=False)
1359 1359 assert not inpart.read()
1360 1360
1361 1361 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1362 1362 ['digest:%s' % k for k in util.DIGESTS.keys()])
1363 1363 @parthandler('remote-changegroup', _remotechangegroupparams)
1364 1364 def handleremotechangegroup(op, inpart):
1365 1365 """apply a bundle10 on the repo, given an url and validation information
1366 1366
1367 1367 All the information about the remote bundle to import are given as
1368 1368 parameters. The parameters include:
1369 1369 - url: the url to the bundle10.
1370 1370 - size: the bundle10 file size. It is used to validate what was
1371 1371 retrieved by the client matches the server knowledge about the bundle.
1372 1372 - digests: a space separated list of the digest types provided as
1373 1373 parameters.
1374 1374 - digest:<digest-type>: the hexadecimal representation of the digest with
1375 1375 that name. Like the size, it is used to validate what was retrieved by
1376 1376 the client matches what the server knows about the bundle.
1377 1377
1378 1378 When multiple digest types are given, all of them are checked.
1379 1379 """
1380 1380 try:
1381 1381 raw_url = inpart.params['url']
1382 1382 except KeyError:
1383 1383 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1384 1384 parsed_url = util.url(raw_url)
1385 1385 if parsed_url.scheme not in capabilities['remote-changegroup']:
1386 1386 raise error.Abort(_('remote-changegroup does not support %s urls') %
1387 1387 parsed_url.scheme)
1388 1388
1389 1389 try:
1390 1390 size = int(inpart.params['size'])
1391 1391 except ValueError:
1392 1392 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1393 1393 % 'size')
1394 1394 except KeyError:
1395 1395 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1396 1396
1397 1397 digests = {}
1398 1398 for typ in inpart.params.get('digests', '').split():
1399 1399 param = 'digest:%s' % typ
1400 1400 try:
1401 1401 value = inpart.params[param]
1402 1402 except KeyError:
1403 1403 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1404 1404 param)
1405 1405 digests[typ] = value
1406 1406
1407 1407 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1408 1408
1409 1409 # Make sure we trigger a transaction creation
1410 1410 #
1411 1411 # The addchangegroup function will get a transaction object by itself, but
1412 1412 # we need to make sure we trigger the creation of a transaction object used
1413 1413 # for the whole processing scope.
1414 1414 op.gettransaction()
1415 1415 from . import exchange
1416 1416 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1417 1417 if not isinstance(cg, changegroup.cg1unpacker):
1418 1418 raise error.Abort(_('%s: not a bundle version 1.0') %
1419 1419 util.hidepassword(raw_url))
1420 1420 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1421 1421 op.records.add('changegroup', {'return': ret})
1422 1422 if op.reply is not None:
1423 1423 # This is definitely not the final form of this
1424 1424 # return. But one need to start somewhere.
1425 1425 part = op.reply.newpart('reply:changegroup')
1426 1426 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1427 1427 part.addparam('return', '%i' % ret, mandatory=False)
1428 1428 try:
1429 1429 real_part.validate()
1430 1430 except error.Abort as e:
1431 1431 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1432 1432 (util.hidepassword(raw_url), str(e)))
1433 1433 assert not inpart.read()
1434 1434
1435 1435 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1436 1436 def handlereplychangegroup(op, inpart):
1437 1437 ret = int(inpart.params['return'])
1438 1438 replyto = int(inpart.params['in-reply-to'])
1439 1439 op.records.add('changegroup', {'return': ret}, replyto)
1440 1440
1441 1441 @parthandler('check:heads')
1442 1442 def handlecheckheads(op, inpart):
1443 1443 """check that head of the repo did not change
1444 1444
1445 1445 This is used to detect a push race when using unbundle.
1446 1446 This replaces the "heads" argument of unbundle."""
1447 1447 h = inpart.read(20)
1448 1448 heads = []
1449 1449 while len(h) == 20:
1450 1450 heads.append(h)
1451 1451 h = inpart.read(20)
1452 1452 assert not h
1453 1453 # Trigger a transaction so that we are guaranteed to have the lock now.
1454 1454 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1455 1455 op.gettransaction()
1456 if heads != op.repo.heads():
1456 if sorted(heads) != sorted(op.repo.heads()):
1457 1457 raise error.PushRaced('repository changed while pushing - '
1458 1458 'please try again')
1459 1459
1460 1460 @parthandler('output')
1461 1461 def handleoutput(op, inpart):
1462 1462 """forward output captured on the server to the client"""
1463 1463 for line in inpart.read().splitlines():
1464 1464 op.ui.status(('remote: %s\n' % line))
1465 1465
1466 1466 @parthandler('replycaps')
1467 1467 def handlereplycaps(op, inpart):
1468 1468 """Notify that a reply bundle should be created
1469 1469
1470 1470 The payload contains the capabilities information for the reply"""
1471 1471 caps = decodecaps(inpart.read())
1472 1472 if op.reply is None:
1473 1473 op.reply = bundle20(op.ui, caps)
1474 1474
1475 1475 class AbortFromPart(error.Abort):
1476 1476 """Sub-class of Abort that denotes an error from a bundle2 part."""
1477 1477
1478 1478 @parthandler('error:abort', ('message', 'hint'))
1479 1479 def handleerrorabort(op, inpart):
1480 1480 """Used to transmit abort error over the wire"""
1481 1481 raise AbortFromPart(inpart.params['message'],
1482 1482 hint=inpart.params.get('hint'))
1483 1483
1484 1484 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1485 1485 'in-reply-to'))
1486 1486 def handleerrorpushkey(op, inpart):
1487 1487 """Used to transmit failure of a mandatory pushkey over the wire"""
1488 1488 kwargs = {}
1489 1489 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1490 1490 value = inpart.params.get(name)
1491 1491 if value is not None:
1492 1492 kwargs[name] = value
1493 1493 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1494 1494
1495 1495 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1496 1496 def handleerrorunsupportedcontent(op, inpart):
1497 1497 """Used to transmit unknown content error over the wire"""
1498 1498 kwargs = {}
1499 1499 parttype = inpart.params.get('parttype')
1500 1500 if parttype is not None:
1501 1501 kwargs['parttype'] = parttype
1502 1502 params = inpart.params.get('params')
1503 1503 if params is not None:
1504 1504 kwargs['params'] = params.split('\0')
1505 1505
1506 1506 raise error.BundleUnknownFeatureError(**kwargs)
1507 1507
1508 1508 @parthandler('error:pushraced', ('message',))
1509 1509 def handleerrorpushraced(op, inpart):
1510 1510 """Used to transmit push race error over the wire"""
1511 1511 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1512 1512
1513 1513 @parthandler('listkeys', ('namespace',))
1514 1514 def handlelistkeys(op, inpart):
1515 1515 """retrieve pushkey namespace content stored in a bundle2"""
1516 1516 namespace = inpart.params['namespace']
1517 1517 r = pushkey.decodekeys(inpart.read())
1518 1518 op.records.add('listkeys', (namespace, r))
1519 1519
1520 1520 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1521 1521 def handlepushkey(op, inpart):
1522 1522 """process a pushkey request"""
1523 1523 dec = pushkey.decode
1524 1524 namespace = dec(inpart.params['namespace'])
1525 1525 key = dec(inpart.params['key'])
1526 1526 old = dec(inpart.params['old'])
1527 1527 new = dec(inpart.params['new'])
1528 1528 # Grab the transaction to ensure that we have the lock before performing the
1529 1529 # pushkey.
1530 1530 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1531 1531 op.gettransaction()
1532 1532 ret = op.repo.pushkey(namespace, key, old, new)
1533 1533 record = {'namespace': namespace,
1534 1534 'key': key,
1535 1535 'old': old,
1536 1536 'new': new}
1537 1537 op.records.add('pushkey', record)
1538 1538 if op.reply is not None:
1539 1539 rpart = op.reply.newpart('reply:pushkey')
1540 1540 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1541 1541 rpart.addparam('return', '%i' % ret, mandatory=False)
1542 1542 if inpart.mandatory and not ret:
1543 1543 kwargs = {}
1544 1544 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1545 1545 if key in inpart.params:
1546 1546 kwargs[key] = inpart.params[key]
1547 1547 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1548 1548
1549 1549 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1550 1550 def handlepushkeyreply(op, inpart):
1551 1551 """retrieve the result of a pushkey request"""
1552 1552 ret = int(inpart.params['return'])
1553 1553 partid = int(inpart.params['in-reply-to'])
1554 1554 op.records.add('pushkey', {'return': ret}, partid)
1555 1555
1556 1556 @parthandler('obsmarkers')
1557 1557 def handleobsmarker(op, inpart):
1558 1558 """add a stream of obsmarkers to the repo"""
1559 1559 tr = op.gettransaction()
1560 1560 markerdata = inpart.read()
1561 1561 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1562 1562 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1563 1563 % len(markerdata))
1564 1564 # The mergemarkers call will crash if marker creation is not enabled.
1565 1565 # we want to avoid this if the part is advisory.
1566 1566 if not inpart.mandatory and op.repo.obsstore.readonly:
1567 1567 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1568 1568 return
1569 1569 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1570 1570 if new:
1571 1571 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1572 1572 op.records.add('obsmarkers', {'new': new})
1573 1573 if op.reply is not None:
1574 1574 rpart = op.reply.newpart('reply:obsmarkers')
1575 1575 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1576 1576 rpart.addparam('new', '%i' % new, mandatory=False)
1577 1577
1578 1578
1579 1579 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1580 1580 def handleobsmarkerreply(op, inpart):
1581 1581 """retrieve the result of a pushkey request"""
1582 1582 ret = int(inpart.params['new'])
1583 1583 partid = int(inpart.params['in-reply-to'])
1584 1584 op.records.add('obsmarkers', {'new': ret}, partid)
1585 1585
1586 1586 @parthandler('hgtagsfnodes')
1587 1587 def handlehgtagsfnodes(op, inpart):
1588 1588 """Applies .hgtags fnodes cache entries to the local repo.
1589 1589
1590 1590 Payload is pairs of 20 byte changeset nodes and filenodes.
1591 1591 """
1592 1592 # Grab the transaction so we ensure that we have the lock at this point.
1593 1593 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1594 1594 op.gettransaction()
1595 1595 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1596 1596
1597 1597 count = 0
1598 1598 while True:
1599 1599 node = inpart.read(20)
1600 1600 fnode = inpart.read(20)
1601 1601 if len(node) < 20 or len(fnode) < 20:
1602 1602 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1603 1603 break
1604 1604 cache.setfnode(node, fnode)
1605 1605 count += 1
1606 1606
1607 1607 cache.write()
1608 1608 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now