##// END OF EJS Templates
bundle2: use compressstream compression engine API...
Gregory Szorc -
r30357:5925bda4 default
parent child Browse files
Show More
@@ -1,1628 +1,1618 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 pushkey,
162 162 pycompat,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 urlerr = util.urlerr
169 169 urlreq = util.urlreq
170 170
171 171 _pack = struct.pack
172 172 _unpack = struct.unpack
173 173
174 174 _fstreamparamsize = '>i'
175 175 _fpartheadersize = '>i'
176 176 _fparttypesize = '>B'
177 177 _fpartid = '>I'
178 178 _fpayloadsize = '>i'
179 179 _fpartparamcount = '>BB'
180 180
181 181 preferedchunksize = 4096
182 182
183 183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184 184
185 185 def outdebug(ui, message):
186 186 """debug regarding output stream (bundling)"""
187 187 if ui.configbool('devel', 'bundle2.debug', False):
188 188 ui.debug('bundle2-output: %s\n' % message)
189 189
190 190 def indebug(ui, message):
191 191 """debug on input stream (unbundling)"""
192 192 if ui.configbool('devel', 'bundle2.debug', False):
193 193 ui.debug('bundle2-input: %s\n' % message)
194 194
195 195 def validateparttype(parttype):
196 196 """raise ValueError if a parttype contains invalid character"""
197 197 if _parttypeforbidden.search(parttype):
198 198 raise ValueError(parttype)
199 199
200 200 def _makefpartparamsizes(nbparams):
201 201 """return a struct format to read part parameter sizes
202 202
203 203 The number parameters is variable so we need to build that format
204 204 dynamically.
205 205 """
206 206 return '>'+('BB'*nbparams)
207 207
208 208 parthandlermapping = {}
209 209
210 210 def parthandler(parttype, params=()):
211 211 """decorator that register a function as a bundle2 part handler
212 212
213 213 eg::
214 214
215 215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 216 def myparttypehandler(...):
217 217 '''process a part of type "my part".'''
218 218 ...
219 219 """
220 220 validateparttype(parttype)
221 221 def _decorator(func):
222 222 lparttype = parttype.lower() # enforce lower case matching.
223 223 assert lparttype not in parthandlermapping
224 224 parthandlermapping[lparttype] = func
225 225 func.params = frozenset(params)
226 226 return func
227 227 return _decorator
228 228
229 229 class unbundlerecords(object):
230 230 """keep record of what happens during and unbundle
231 231
232 232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 233 category of record and obj is an arbitrary object.
234 234
235 235 `records['cat']` will return all entries of this category 'cat'.
236 236
237 237 Iterating on the object itself will yield `('category', obj)` tuples
238 238 for all entries.
239 239
240 240 All iterations happens in chronological order.
241 241 """
242 242
243 243 def __init__(self):
244 244 self._categories = {}
245 245 self._sequences = []
246 246 self._replies = {}
247 247
248 248 def add(self, category, entry, inreplyto=None):
249 249 """add a new record of a given category.
250 250
251 251 The entry can then be retrieved in the list returned by
252 252 self['category']."""
253 253 self._categories.setdefault(category, []).append(entry)
254 254 self._sequences.append((category, entry))
255 255 if inreplyto is not None:
256 256 self.getreplies(inreplyto).add(category, entry)
257 257
258 258 def getreplies(self, partid):
259 259 """get the records that are replies to a specific part"""
260 260 return self._replies.setdefault(partid, unbundlerecords())
261 261
262 262 def __getitem__(self, cat):
263 263 return tuple(self._categories.get(cat, ()))
264 264
265 265 def __iter__(self):
266 266 return iter(self._sequences)
267 267
268 268 def __len__(self):
269 269 return len(self._sequences)
270 270
271 271 def __nonzero__(self):
272 272 return bool(self._sequences)
273 273
274 274 class bundleoperation(object):
275 275 """an object that represents a single bundling process
276 276
277 277 Its purpose is to carry unbundle-related objects and states.
278 278
279 279 A new object should be created at the beginning of each bundle processing.
280 280 The object is to be returned by the processing function.
281 281
282 282 The object has very little content now it will ultimately contain:
283 283 * an access to the repo the bundle is applied to,
284 284 * a ui object,
285 285 * a way to retrieve a transaction to add changes to the repo,
286 286 * a way to record the result of processing each part,
287 287 * a way to construct a bundle response when applicable.
288 288 """
289 289
290 290 def __init__(self, repo, transactiongetter, captureoutput=True):
291 291 self.repo = repo
292 292 self.ui = repo.ui
293 293 self.records = unbundlerecords()
294 294 self.gettransaction = transactiongetter
295 295 self.reply = None
296 296 self.captureoutput = captureoutput
297 297
298 298 class TransactionUnavailable(RuntimeError):
299 299 pass
300 300
301 301 def _notransaction():
302 302 """default method to get a transaction while processing a bundle
303 303
304 304 Raise an exception to highlight the fact that no transaction was expected
305 305 to be created"""
306 306 raise TransactionUnavailable()
307 307
308 308 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
309 309 # transform me into unbundler.apply() as soon as the freeze is lifted
310 310 tr.hookargs['bundle2'] = '1'
311 311 if source is not None and 'source' not in tr.hookargs:
312 312 tr.hookargs['source'] = source
313 313 if url is not None and 'url' not in tr.hookargs:
314 314 tr.hookargs['url'] = url
315 315 return processbundle(repo, unbundler, lambda: tr, op=op)
316 316
317 317 def processbundle(repo, unbundler, transactiongetter=None, op=None):
318 318 """This function process a bundle, apply effect to/from a repo
319 319
320 320 It iterates over each part then searches for and uses the proper handling
321 321 code to process the part. Parts are processed in order.
322 322
323 323 This is very early version of this function that will be strongly reworked
324 324 before final usage.
325 325
326 326 Unknown Mandatory part will abort the process.
327 327
328 328 It is temporarily possible to provide a prebuilt bundleoperation to the
329 329 function. This is used to ensure output is properly propagated in case of
330 330 an error during the unbundling. This output capturing part will likely be
331 331 reworked and this ability will probably go away in the process.
332 332 """
333 333 if op is None:
334 334 if transactiongetter is None:
335 335 transactiongetter = _notransaction
336 336 op = bundleoperation(repo, transactiongetter)
337 337 # todo:
338 338 # - replace this is a init function soon.
339 339 # - exception catching
340 340 unbundler.params
341 341 if repo.ui.debugflag:
342 342 msg = ['bundle2-input-bundle:']
343 343 if unbundler.params:
344 344 msg.append(' %i params')
345 345 if op.gettransaction is None:
346 346 msg.append(' no-transaction')
347 347 else:
348 348 msg.append(' with-transaction')
349 349 msg.append('\n')
350 350 repo.ui.debug(''.join(msg))
351 351 iterparts = enumerate(unbundler.iterparts())
352 352 part = None
353 353 nbpart = 0
354 354 try:
355 355 for nbpart, part in iterparts:
356 356 _processpart(op, part)
357 357 except Exception as exc:
358 358 for nbpart, part in iterparts:
359 359 # consume the bundle content
360 360 part.seek(0, 2)
361 361 # Small hack to let caller code distinguish exceptions from bundle2
362 362 # processing from processing the old format. This is mostly
363 363 # needed to handle different return codes to unbundle according to the
364 364 # type of bundle. We should probably clean up or drop this return code
365 365 # craziness in a future version.
366 366 exc.duringunbundle2 = True
367 367 salvaged = []
368 368 replycaps = None
369 369 if op.reply is not None:
370 370 salvaged = op.reply.salvageoutput()
371 371 replycaps = op.reply.capabilities
372 372 exc._replycaps = replycaps
373 373 exc._bundle2salvagedoutput = salvaged
374 374 raise
375 375 finally:
376 376 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
377 377
378 378 return op
379 379
380 380 def _processpart(op, part):
381 381 """process a single part from a bundle
382 382
383 383 The part is guaranteed to have been fully consumed when the function exits
384 384 (even if an exception is raised)."""
385 385 status = 'unknown' # used by debug output
386 386 hardabort = False
387 387 try:
388 388 try:
389 389 handler = parthandlermapping.get(part.type)
390 390 if handler is None:
391 391 status = 'unsupported-type'
392 392 raise error.BundleUnknownFeatureError(parttype=part.type)
393 393 indebug(op.ui, 'found a handler for part %r' % part.type)
394 394 unknownparams = part.mandatorykeys - handler.params
395 395 if unknownparams:
396 396 unknownparams = list(unknownparams)
397 397 unknownparams.sort()
398 398 status = 'unsupported-params (%s)' % unknownparams
399 399 raise error.BundleUnknownFeatureError(parttype=part.type,
400 400 params=unknownparams)
401 401 status = 'supported'
402 402 except error.BundleUnknownFeatureError as exc:
403 403 if part.mandatory: # mandatory parts
404 404 raise
405 405 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
406 406 return # skip to part processing
407 407 finally:
408 408 if op.ui.debugflag:
409 409 msg = ['bundle2-input-part: "%s"' % part.type]
410 410 if not part.mandatory:
411 411 msg.append(' (advisory)')
412 412 nbmp = len(part.mandatorykeys)
413 413 nbap = len(part.params) - nbmp
414 414 if nbmp or nbap:
415 415 msg.append(' (params:')
416 416 if nbmp:
417 417 msg.append(' %i mandatory' % nbmp)
418 418 if nbap:
419 419 msg.append(' %i advisory' % nbmp)
420 420 msg.append(')')
421 421 msg.append(' %s\n' % status)
422 422 op.ui.debug(''.join(msg))
423 423
424 424 # handler is called outside the above try block so that we don't
425 425 # risk catching KeyErrors from anything other than the
426 426 # parthandlermapping lookup (any KeyError raised by handler()
427 427 # itself represents a defect of a different variety).
428 428 output = None
429 429 if op.captureoutput and op.reply is not None:
430 430 op.ui.pushbuffer(error=True, subproc=True)
431 431 output = ''
432 432 try:
433 433 handler(op, part)
434 434 finally:
435 435 if output is not None:
436 436 output = op.ui.popbuffer()
437 437 if output:
438 438 outpart = op.reply.newpart('output', data=output,
439 439 mandatory=False)
440 440 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
441 441 # If exiting or interrupted, do not attempt to seek the stream in the
442 442 # finally block below. This makes abort faster.
443 443 except (SystemExit, KeyboardInterrupt):
444 444 hardabort = True
445 445 raise
446 446 finally:
447 447 # consume the part content to not corrupt the stream.
448 448 if not hardabort:
449 449 part.seek(0, 2)
450 450
451 451
452 452 def decodecaps(blob):
453 453 """decode a bundle2 caps bytes blob into a dictionary
454 454
455 455 The blob is a list of capabilities (one per line)
456 456 Capabilities may have values using a line of the form::
457 457
458 458 capability=value1,value2,value3
459 459
460 460 The values are always a list."""
461 461 caps = {}
462 462 for line in blob.splitlines():
463 463 if not line:
464 464 continue
465 465 if '=' not in line:
466 466 key, vals = line, ()
467 467 else:
468 468 key, vals = line.split('=', 1)
469 469 vals = vals.split(',')
470 470 key = urlreq.unquote(key)
471 471 vals = [urlreq.unquote(v) for v in vals]
472 472 caps[key] = vals
473 473 return caps
474 474
475 475 def encodecaps(caps):
476 476 """encode a bundle2 caps dictionary into a bytes blob"""
477 477 chunks = []
478 478 for ca in sorted(caps):
479 479 vals = caps[ca]
480 480 ca = urlreq.quote(ca)
481 481 vals = [urlreq.quote(v) for v in vals]
482 482 if vals:
483 483 ca = "%s=%s" % (ca, ','.join(vals))
484 484 chunks.append(ca)
485 485 return '\n'.join(chunks)
486 486
487 487 bundletypes = {
488 488 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
489 489 # since the unification ssh accepts a header but there
490 490 # is no capability signaling it.
491 491 "HG20": (), # special-cased below
492 492 "HG10UN": ("HG10UN", 'UN'),
493 493 "HG10BZ": ("HG10", 'BZ'),
494 494 "HG10GZ": ("HG10GZ", 'GZ'),
495 495 }
496 496
497 497 # hgweb uses this list to communicate its preferred type
498 498 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
499 499
500 500 class bundle20(object):
501 501 """represent an outgoing bundle2 container
502 502
503 503 Use the `addparam` method to add stream level parameter. and `newpart` to
504 504 populate it. Then call `getchunks` to retrieve all the binary chunks of
505 505 data that compose the bundle2 container."""
506 506
507 507 _magicstring = 'HG20'
508 508
509 509 def __init__(self, ui, capabilities=()):
510 510 self.ui = ui
511 511 self._params = []
512 512 self._parts = []
513 513 self.capabilities = dict(capabilities)
514 514 self._compengine = util.compengines.forbundletype('UN')
515 515
516 516 def setcompression(self, alg):
517 517 """setup core part compression to <alg>"""
518 518 if alg is None:
519 519 return
520 520 assert not any(n.lower() == 'Compression' for n, v in self._params)
521 521 self.addparam('Compression', alg)
522 522 self._compengine = util.compengines.forbundletype(alg)
523 523
524 524 @property
525 525 def nbparts(self):
526 526 """total number of parts added to the bundler"""
527 527 return len(self._parts)
528 528
529 529 # methods used to defines the bundle2 content
530 530 def addparam(self, name, value=None):
531 531 """add a stream level parameter"""
532 532 if not name:
533 533 raise ValueError('empty parameter name')
534 534 if name[0] not in string.letters:
535 535 raise ValueError('non letter first character: %r' % name)
536 536 self._params.append((name, value))
537 537
538 538 def addpart(self, part):
539 539 """add a new part to the bundle2 container
540 540
541 541 Parts contains the actual applicative payload."""
542 542 assert part.id is None
543 543 part.id = len(self._parts) # very cheap counter
544 544 self._parts.append(part)
545 545
546 546 def newpart(self, typeid, *args, **kwargs):
547 547 """create a new part and add it to the containers
548 548
549 549 As the part is directly added to the containers. For now, this means
550 550 that any failure to properly initialize the part after calling
551 551 ``newpart`` should result in a failure of the whole bundling process.
552 552
553 553 You can still fall back to manually create and add if you need better
554 554 control."""
555 555 part = bundlepart(typeid, *args, **kwargs)
556 556 self.addpart(part)
557 557 return part
558 558
559 559 # methods used to generate the bundle2 stream
560 560 def getchunks(self):
561 561 if self.ui.debugflag:
562 562 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
563 563 if self._params:
564 564 msg.append(' (%i params)' % len(self._params))
565 565 msg.append(' %i parts total\n' % len(self._parts))
566 566 self.ui.debug(''.join(msg))
567 567 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
568 568 yield self._magicstring
569 569 param = self._paramchunk()
570 570 outdebug(self.ui, 'bundle parameter: %s' % param)
571 571 yield _pack(_fstreamparamsize, len(param))
572 572 if param:
573 573 yield param
574 # starting compression
575 compressor = self._compengine.compressorobj()
576 for chunk in self._getcorechunk():
577 data = compressor.compress(chunk)
578 if data:
579 yield data
580 yield compressor.flush()
574 for chunk in self._compengine.compressstream(self._getcorechunk()):
575 yield chunk
581 576
582 577 def _paramchunk(self):
583 578 """return a encoded version of all stream parameters"""
584 579 blocks = []
585 580 for par, value in self._params:
586 581 par = urlreq.quote(par)
587 582 if value is not None:
588 583 value = urlreq.quote(value)
589 584 par = '%s=%s' % (par, value)
590 585 blocks.append(par)
591 586 return ' '.join(blocks)
592 587
593 588 def _getcorechunk(self):
594 589 """yield chunk for the core part of the bundle
595 590
596 591 (all but headers and parameters)"""
597 592 outdebug(self.ui, 'start of parts')
598 593 for part in self._parts:
599 594 outdebug(self.ui, 'bundle part: "%s"' % part.type)
600 595 for chunk in part.getchunks(ui=self.ui):
601 596 yield chunk
602 597 outdebug(self.ui, 'end of bundle')
603 598 yield _pack(_fpartheadersize, 0)
604 599
605 600
606 601 def salvageoutput(self):
607 602 """return a list with a copy of all output parts in the bundle
608 603
609 604 This is meant to be used during error handling to make sure we preserve
610 605 server output"""
611 606 salvaged = []
612 607 for part in self._parts:
613 608 if part.type.startswith('output'):
614 609 salvaged.append(part.copy())
615 610 return salvaged
616 611
617 612
618 613 class unpackermixin(object):
619 614 """A mixin to extract bytes and struct data from a stream"""
620 615
621 616 def __init__(self, fp):
622 617 self._fp = fp
623 618 self._seekable = (util.safehasattr(fp, 'seek') and
624 619 util.safehasattr(fp, 'tell'))
625 620
626 621 def _unpack(self, format):
627 622 """unpack this struct format from the stream"""
628 623 data = self._readexact(struct.calcsize(format))
629 624 return _unpack(format, data)
630 625
631 626 def _readexact(self, size):
632 627 """read exactly <size> bytes from the stream"""
633 628 return changegroup.readexactly(self._fp, size)
634 629
635 630 def seek(self, offset, whence=0):
636 631 """move the underlying file pointer"""
637 632 if self._seekable:
638 633 return self._fp.seek(offset, whence)
639 634 else:
640 635 raise NotImplementedError(_('File pointer is not seekable'))
641 636
642 637 def tell(self):
643 638 """return the file offset, or None if file is not seekable"""
644 639 if self._seekable:
645 640 try:
646 641 return self._fp.tell()
647 642 except IOError as e:
648 643 if e.errno == errno.ESPIPE:
649 644 self._seekable = False
650 645 else:
651 646 raise
652 647 return None
653 648
654 649 def close(self):
655 650 """close underlying file"""
656 651 if util.safehasattr(self._fp, 'close'):
657 652 return self._fp.close()
658 653
659 654 def getunbundler(ui, fp, magicstring=None):
660 655 """return a valid unbundler object for a given magicstring"""
661 656 if magicstring is None:
662 657 magicstring = changegroup.readexactly(fp, 4)
663 658 magic, version = magicstring[0:2], magicstring[2:4]
664 659 if magic != 'HG':
665 660 raise error.Abort(_('not a Mercurial bundle'))
666 661 unbundlerclass = formatmap.get(version)
667 662 if unbundlerclass is None:
668 663 raise error.Abort(_('unknown bundle version %s') % version)
669 664 unbundler = unbundlerclass(ui, fp)
670 665 indebug(ui, 'start processing of %s stream' % magicstring)
671 666 return unbundler
672 667
673 668 class unbundle20(unpackermixin):
674 669 """interpret a bundle2 stream
675 670
676 671 This class is fed with a binary stream and yields parts through its
677 672 `iterparts` methods."""
678 673
679 674 _magicstring = 'HG20'
680 675
681 676 def __init__(self, ui, fp):
682 677 """If header is specified, we do not read it out of the stream."""
683 678 self.ui = ui
684 679 self._compengine = util.compengines.forbundletype('UN')
685 680 self._compressed = None
686 681 super(unbundle20, self).__init__(fp)
687 682
688 683 @util.propertycache
689 684 def params(self):
690 685 """dictionary of stream level parameters"""
691 686 indebug(self.ui, 'reading bundle2 stream parameters')
692 687 params = {}
693 688 paramssize = self._unpack(_fstreamparamsize)[0]
694 689 if paramssize < 0:
695 690 raise error.BundleValueError('negative bundle param size: %i'
696 691 % paramssize)
697 692 if paramssize:
698 693 params = self._readexact(paramssize)
699 694 params = self._processallparams(params)
700 695 return params
701 696
702 697 def _processallparams(self, paramsblock):
703 698 """"""
704 699 params = util.sortdict()
705 700 for p in paramsblock.split(' '):
706 701 p = p.split('=', 1)
707 702 p = [urlreq.unquote(i) for i in p]
708 703 if len(p) < 2:
709 704 p.append(None)
710 705 self._processparam(*p)
711 706 params[p[0]] = p[1]
712 707 return params
713 708
714 709
715 710 def _processparam(self, name, value):
716 711 """process a parameter, applying its effect if needed
717 712
718 713 Parameter starting with a lower case letter are advisory and will be
719 714 ignored when unknown. Those starting with an upper case letter are
720 715 mandatory and will this function will raise a KeyError when unknown.
721 716
722 717 Note: no option are currently supported. Any input will be either
723 718 ignored or failing.
724 719 """
725 720 if not name:
726 721 raise ValueError('empty parameter name')
727 722 if name[0] not in string.letters:
728 723 raise ValueError('non letter first character: %r' % name)
729 724 try:
730 725 handler = b2streamparamsmap[name.lower()]
731 726 except KeyError:
732 727 if name[0].islower():
733 728 indebug(self.ui, "ignoring unknown parameter %r" % name)
734 729 else:
735 730 raise error.BundleUnknownFeatureError(params=(name,))
736 731 else:
737 732 handler(self, name, value)
738 733
739 734 def _forwardchunks(self):
740 735 """utility to transfer a bundle2 as binary
741 736
742 737 This is made necessary by the fact the 'getbundle' command over 'ssh'
743 738 have no way to know then the reply end, relying on the bundle to be
744 739 interpreted to know its end. This is terrible and we are sorry, but we
745 740 needed to move forward to get general delta enabled.
746 741 """
747 742 yield self._magicstring
748 743 assert 'params' not in vars(self)
749 744 paramssize = self._unpack(_fstreamparamsize)[0]
750 745 if paramssize < 0:
751 746 raise error.BundleValueError('negative bundle param size: %i'
752 747 % paramssize)
753 748 yield _pack(_fstreamparamsize, paramssize)
754 749 if paramssize:
755 750 params = self._readexact(paramssize)
756 751 self._processallparams(params)
757 752 yield params
758 753 assert self._compengine.bundletype == 'UN'
759 754 # From there, payload might need to be decompressed
760 755 self._fp = self._compengine.decompressorreader(self._fp)
761 756 emptycount = 0
762 757 while emptycount < 2:
763 758 # so we can brainlessly loop
764 759 assert _fpartheadersize == _fpayloadsize
765 760 size = self._unpack(_fpartheadersize)[0]
766 761 yield _pack(_fpartheadersize, size)
767 762 if size:
768 763 emptycount = 0
769 764 else:
770 765 emptycount += 1
771 766 continue
772 767 if size == flaginterrupt:
773 768 continue
774 769 elif size < 0:
775 770 raise error.BundleValueError('negative chunk size: %i')
776 771 yield self._readexact(size)
777 772
778 773
779 774 def iterparts(self):
780 775 """yield all parts contained in the stream"""
781 776 # make sure param have been loaded
782 777 self.params
783 778 # From there, payload need to be decompressed
784 779 self._fp = self._compengine.decompressorreader(self._fp)
785 780 indebug(self.ui, 'start extraction of bundle2 parts')
786 781 headerblock = self._readpartheader()
787 782 while headerblock is not None:
788 783 part = unbundlepart(self.ui, headerblock, self._fp)
789 784 yield part
790 785 part.seek(0, 2)
791 786 headerblock = self._readpartheader()
792 787 indebug(self.ui, 'end of bundle2 stream')
793 788
794 789 def _readpartheader(self):
795 790 """reads a part header size and return the bytes blob
796 791
797 792 returns None if empty"""
798 793 headersize = self._unpack(_fpartheadersize)[0]
799 794 if headersize < 0:
800 795 raise error.BundleValueError('negative part header size: %i'
801 796 % headersize)
802 797 indebug(self.ui, 'part header size: %i' % headersize)
803 798 if headersize:
804 799 return self._readexact(headersize)
805 800 return None
806 801
807 802 def compressed(self):
808 803 self.params # load params
809 804 return self._compressed
810 805
811 806 formatmap = {'20': unbundle20}
812 807
813 808 b2streamparamsmap = {}
814 809
815 810 def b2streamparamhandler(name):
816 811 """register a handler for a stream level parameter"""
817 812 def decorator(func):
818 813 assert name not in formatmap
819 814 b2streamparamsmap[name] = func
820 815 return func
821 816 return decorator
822 817
823 818 @b2streamparamhandler('compression')
824 819 def processcompression(unbundler, param, value):
825 820 """read compression parameter and install payload decompression"""
826 821 if value not in util.compengines.supportedbundletypes:
827 822 raise error.BundleUnknownFeatureError(params=(param,),
828 823 values=(value,))
829 824 unbundler._compengine = util.compengines.forbundletype(value)
830 825 if value is not None:
831 826 unbundler._compressed = True
832 827
833 828 class bundlepart(object):
834 829 """A bundle2 part contains application level payload
835 830
836 831 The part `type` is used to route the part to the application level
837 832 handler.
838 833
839 834 The part payload is contained in ``part.data``. It could be raw bytes or a
840 835 generator of byte chunks.
841 836
842 837 You can add parameters to the part using the ``addparam`` method.
843 838 Parameters can be either mandatory (default) or advisory. Remote side
844 839 should be able to safely ignore the advisory ones.
845 840
846 841 Both data and parameters cannot be modified after the generation has begun.
847 842 """
848 843
849 844 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
850 845 data='', mandatory=True):
851 846 validateparttype(parttype)
852 847 self.id = None
853 848 self.type = parttype
854 849 self._data = data
855 850 self._mandatoryparams = list(mandatoryparams)
856 851 self._advisoryparams = list(advisoryparams)
857 852 # checking for duplicated entries
858 853 self._seenparams = set()
859 854 for pname, __ in self._mandatoryparams + self._advisoryparams:
860 855 if pname in self._seenparams:
861 856 raise RuntimeError('duplicated params: %s' % pname)
862 857 self._seenparams.add(pname)
863 858 # status of the part's generation:
864 859 # - None: not started,
865 860 # - False: currently generated,
866 861 # - True: generation done.
867 862 self._generated = None
868 863 self.mandatory = mandatory
869 864
870 865 def copy(self):
871 866 """return a copy of the part
872 867
873 868 The new part have the very same content but no partid assigned yet.
874 869 Parts with generated data cannot be copied."""
875 870 assert not util.safehasattr(self.data, 'next')
876 871 return self.__class__(self.type, self._mandatoryparams,
877 872 self._advisoryparams, self._data, self.mandatory)
878 873
879 874 # methods used to defines the part content
880 875 @property
881 876 def data(self):
882 877 return self._data
883 878
884 879 @data.setter
885 880 def data(self, data):
886 881 if self._generated is not None:
887 882 raise error.ReadOnlyPartError('part is being generated')
888 883 self._data = data
889 884
890 885 @property
891 886 def mandatoryparams(self):
892 887 # make it an immutable tuple to force people through ``addparam``
893 888 return tuple(self._mandatoryparams)
894 889
895 890 @property
896 891 def advisoryparams(self):
897 892 # make it an immutable tuple to force people through ``addparam``
898 893 return tuple(self._advisoryparams)
899 894
900 895 def addparam(self, name, value='', mandatory=True):
901 896 if self._generated is not None:
902 897 raise error.ReadOnlyPartError('part is being generated')
903 898 if name in self._seenparams:
904 899 raise ValueError('duplicated params: %s' % name)
905 900 self._seenparams.add(name)
906 901 params = self._advisoryparams
907 902 if mandatory:
908 903 params = self._mandatoryparams
909 904 params.append((name, value))
910 905
911 906 # methods used to generates the bundle2 stream
912 907 def getchunks(self, ui):
913 908 if self._generated is not None:
914 909 raise RuntimeError('part can only be consumed once')
915 910 self._generated = False
916 911
917 912 if ui.debugflag:
918 913 msg = ['bundle2-output-part: "%s"' % self.type]
919 914 if not self.mandatory:
920 915 msg.append(' (advisory)')
921 916 nbmp = len(self.mandatoryparams)
922 917 nbap = len(self.advisoryparams)
923 918 if nbmp or nbap:
924 919 msg.append(' (params:')
925 920 if nbmp:
926 921 msg.append(' %i mandatory' % nbmp)
927 922 if nbap:
928 923 msg.append(' %i advisory' % nbmp)
929 924 msg.append(')')
930 925 if not self.data:
931 926 msg.append(' empty payload')
932 927 elif util.safehasattr(self.data, 'next'):
933 928 msg.append(' streamed payload')
934 929 else:
935 930 msg.append(' %i bytes payload' % len(self.data))
936 931 msg.append('\n')
937 932 ui.debug(''.join(msg))
938 933
939 934 #### header
940 935 if self.mandatory:
941 936 parttype = self.type.upper()
942 937 else:
943 938 parttype = self.type.lower()
944 939 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
945 940 ## parttype
946 941 header = [_pack(_fparttypesize, len(parttype)),
947 942 parttype, _pack(_fpartid, self.id),
948 943 ]
949 944 ## parameters
950 945 # count
951 946 manpar = self.mandatoryparams
952 947 advpar = self.advisoryparams
953 948 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
954 949 # size
955 950 parsizes = []
956 951 for key, value in manpar:
957 952 parsizes.append(len(key))
958 953 parsizes.append(len(value))
959 954 for key, value in advpar:
960 955 parsizes.append(len(key))
961 956 parsizes.append(len(value))
962 957 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
963 958 header.append(paramsizes)
964 959 # key, value
965 960 for key, value in manpar:
966 961 header.append(key)
967 962 header.append(value)
968 963 for key, value in advpar:
969 964 header.append(key)
970 965 header.append(value)
971 966 ## finalize header
972 967 headerchunk = ''.join(header)
973 968 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
974 969 yield _pack(_fpartheadersize, len(headerchunk))
975 970 yield headerchunk
976 971 ## payload
977 972 try:
978 973 for chunk in self._payloadchunks():
979 974 outdebug(ui, 'payload chunk size: %i' % len(chunk))
980 975 yield _pack(_fpayloadsize, len(chunk))
981 976 yield chunk
982 977 except GeneratorExit:
983 978 # GeneratorExit means that nobody is listening for our
984 979 # results anyway, so just bail quickly rather than trying
985 980 # to produce an error part.
986 981 ui.debug('bundle2-generatorexit\n')
987 982 raise
988 983 except BaseException as exc:
989 984 # backup exception data for later
990 985 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
991 986 % exc)
992 987 exc_info = sys.exc_info()
993 988 msg = 'unexpected error: %s' % exc
994 989 interpart = bundlepart('error:abort', [('message', msg)],
995 990 mandatory=False)
996 991 interpart.id = 0
997 992 yield _pack(_fpayloadsize, -1)
998 993 for chunk in interpart.getchunks(ui=ui):
999 994 yield chunk
1000 995 outdebug(ui, 'closing payload chunk')
1001 996 # abort current part payload
1002 997 yield _pack(_fpayloadsize, 0)
1003 998 if pycompat.ispy3:
1004 999 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1005 1000 else:
1006 1001 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1007 1002 # end of payload
1008 1003 outdebug(ui, 'closing payload chunk')
1009 1004 yield _pack(_fpayloadsize, 0)
1010 1005 self._generated = True
1011 1006
1012 1007 def _payloadchunks(self):
1013 1008 """yield chunks of a the part payload
1014 1009
1015 1010 Exists to handle the different methods to provide data to a part."""
1016 1011 # we only support fixed size data now.
1017 1012 # This will be improved in the future.
1018 1013 if util.safehasattr(self.data, 'next'):
1019 1014 buff = util.chunkbuffer(self.data)
1020 1015 chunk = buff.read(preferedchunksize)
1021 1016 while chunk:
1022 1017 yield chunk
1023 1018 chunk = buff.read(preferedchunksize)
1024 1019 elif len(self.data):
1025 1020 yield self.data
1026 1021
1027 1022
1028 1023 flaginterrupt = -1
1029 1024
1030 1025 class interrupthandler(unpackermixin):
1031 1026 """read one part and process it with restricted capability
1032 1027
1033 1028 This allows to transmit exception raised on the producer size during part
1034 1029 iteration while the consumer is reading a part.
1035 1030
1036 1031 Part processed in this manner only have access to a ui object,"""
1037 1032
1038 1033 def __init__(self, ui, fp):
1039 1034 super(interrupthandler, self).__init__(fp)
1040 1035 self.ui = ui
1041 1036
1042 1037 def _readpartheader(self):
1043 1038 """reads a part header size and return the bytes blob
1044 1039
1045 1040 returns None if empty"""
1046 1041 headersize = self._unpack(_fpartheadersize)[0]
1047 1042 if headersize < 0:
1048 1043 raise error.BundleValueError('negative part header size: %i'
1049 1044 % headersize)
1050 1045 indebug(self.ui, 'part header size: %i\n' % headersize)
1051 1046 if headersize:
1052 1047 return self._readexact(headersize)
1053 1048 return None
1054 1049
1055 1050 def __call__(self):
1056 1051
1057 1052 self.ui.debug('bundle2-input-stream-interrupt:'
1058 1053 ' opening out of band context\n')
1059 1054 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1060 1055 headerblock = self._readpartheader()
1061 1056 if headerblock is None:
1062 1057 indebug(self.ui, 'no part found during interruption.')
1063 1058 return
1064 1059 part = unbundlepart(self.ui, headerblock, self._fp)
1065 1060 op = interruptoperation(self.ui)
1066 1061 _processpart(op, part)
1067 1062 self.ui.debug('bundle2-input-stream-interrupt:'
1068 1063 ' closing out of band context\n')
1069 1064
1070 1065 class interruptoperation(object):
1071 1066 """A limited operation to be use by part handler during interruption
1072 1067
1073 1068 It only have access to an ui object.
1074 1069 """
1075 1070
1076 1071 def __init__(self, ui):
1077 1072 self.ui = ui
1078 1073 self.reply = None
1079 1074 self.captureoutput = False
1080 1075
1081 1076 @property
1082 1077 def repo(self):
1083 1078 raise RuntimeError('no repo access from stream interruption')
1084 1079
1085 1080 def gettransaction(self):
1086 1081 raise TransactionUnavailable('no repo access from stream interruption')
1087 1082
1088 1083 class unbundlepart(unpackermixin):
1089 1084 """a bundle part read from a bundle"""
1090 1085
1091 1086 def __init__(self, ui, header, fp):
1092 1087 super(unbundlepart, self).__init__(fp)
1093 1088 self.ui = ui
1094 1089 # unbundle state attr
1095 1090 self._headerdata = header
1096 1091 self._headeroffset = 0
1097 1092 self._initialized = False
1098 1093 self.consumed = False
1099 1094 # part data
1100 1095 self.id = None
1101 1096 self.type = None
1102 1097 self.mandatoryparams = None
1103 1098 self.advisoryparams = None
1104 1099 self.params = None
1105 1100 self.mandatorykeys = ()
1106 1101 self._payloadstream = None
1107 1102 self._readheader()
1108 1103 self._mandatory = None
1109 1104 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1110 1105 self._pos = 0
1111 1106
1112 1107 def _fromheader(self, size):
1113 1108 """return the next <size> byte from the header"""
1114 1109 offset = self._headeroffset
1115 1110 data = self._headerdata[offset:(offset + size)]
1116 1111 self._headeroffset = offset + size
1117 1112 return data
1118 1113
1119 1114 def _unpackheader(self, format):
1120 1115 """read given format from header
1121 1116
1122 1117 This automatically compute the size of the format to read."""
1123 1118 data = self._fromheader(struct.calcsize(format))
1124 1119 return _unpack(format, data)
1125 1120
1126 1121 def _initparams(self, mandatoryparams, advisoryparams):
1127 1122 """internal function to setup all logic related parameters"""
1128 1123 # make it read only to prevent people touching it by mistake.
1129 1124 self.mandatoryparams = tuple(mandatoryparams)
1130 1125 self.advisoryparams = tuple(advisoryparams)
1131 1126 # user friendly UI
1132 1127 self.params = util.sortdict(self.mandatoryparams)
1133 1128 self.params.update(self.advisoryparams)
1134 1129 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1135 1130
1136 1131 def _payloadchunks(self, chunknum=0):
1137 1132 '''seek to specified chunk and start yielding data'''
1138 1133 if len(self._chunkindex) == 0:
1139 1134 assert chunknum == 0, 'Must start with chunk 0'
1140 1135 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1141 1136 else:
1142 1137 assert chunknum < len(self._chunkindex), \
1143 1138 'Unknown chunk %d' % chunknum
1144 1139 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1145 1140
1146 1141 pos = self._chunkindex[chunknum][0]
1147 1142 payloadsize = self._unpack(_fpayloadsize)[0]
1148 1143 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1149 1144 while payloadsize:
1150 1145 if payloadsize == flaginterrupt:
1151 1146 # interruption detection, the handler will now read a
1152 1147 # single part and process it.
1153 1148 interrupthandler(self.ui, self._fp)()
1154 1149 elif payloadsize < 0:
1155 1150 msg = 'negative payload chunk size: %i' % payloadsize
1156 1151 raise error.BundleValueError(msg)
1157 1152 else:
1158 1153 result = self._readexact(payloadsize)
1159 1154 chunknum += 1
1160 1155 pos += payloadsize
1161 1156 if chunknum == len(self._chunkindex):
1162 1157 self._chunkindex.append((pos,
1163 1158 super(unbundlepart, self).tell()))
1164 1159 yield result
1165 1160 payloadsize = self._unpack(_fpayloadsize)[0]
1166 1161 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1167 1162
1168 1163 def _findchunk(self, pos):
1169 1164 '''for a given payload position, return a chunk number and offset'''
1170 1165 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1171 1166 if ppos == pos:
1172 1167 return chunk, 0
1173 1168 elif ppos > pos:
1174 1169 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1175 1170 raise ValueError('Unknown chunk')
1176 1171
1177 1172 def _readheader(self):
1178 1173 """read the header and setup the object"""
1179 1174 typesize = self._unpackheader(_fparttypesize)[0]
1180 1175 self.type = self._fromheader(typesize)
1181 1176 indebug(self.ui, 'part type: "%s"' % self.type)
1182 1177 self.id = self._unpackheader(_fpartid)[0]
1183 1178 indebug(self.ui, 'part id: "%s"' % self.id)
1184 1179 # extract mandatory bit from type
1185 1180 self.mandatory = (self.type != self.type.lower())
1186 1181 self.type = self.type.lower()
1187 1182 ## reading parameters
1188 1183 # param count
1189 1184 mancount, advcount = self._unpackheader(_fpartparamcount)
1190 1185 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1191 1186 # param size
1192 1187 fparamsizes = _makefpartparamsizes(mancount + advcount)
1193 1188 paramsizes = self._unpackheader(fparamsizes)
1194 1189 # make it a list of couple again
1195 1190 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1196 1191 # split mandatory from advisory
1197 1192 mansizes = paramsizes[:mancount]
1198 1193 advsizes = paramsizes[mancount:]
1199 1194 # retrieve param value
1200 1195 manparams = []
1201 1196 for key, value in mansizes:
1202 1197 manparams.append((self._fromheader(key), self._fromheader(value)))
1203 1198 advparams = []
1204 1199 for key, value in advsizes:
1205 1200 advparams.append((self._fromheader(key), self._fromheader(value)))
1206 1201 self._initparams(manparams, advparams)
1207 1202 ## part payload
1208 1203 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1209 1204 # we read the data, tell it
1210 1205 self._initialized = True
1211 1206
1212 1207 def read(self, size=None):
1213 1208 """read payload data"""
1214 1209 if not self._initialized:
1215 1210 self._readheader()
1216 1211 if size is None:
1217 1212 data = self._payloadstream.read()
1218 1213 else:
1219 1214 data = self._payloadstream.read(size)
1220 1215 self._pos += len(data)
1221 1216 if size is None or len(data) < size:
1222 1217 if not self.consumed and self._pos:
1223 1218 self.ui.debug('bundle2-input-part: total payload size %i\n'
1224 1219 % self._pos)
1225 1220 self.consumed = True
1226 1221 return data
1227 1222
1228 1223 def tell(self):
1229 1224 return self._pos
1230 1225
1231 1226 def seek(self, offset, whence=0):
1232 1227 if whence == 0:
1233 1228 newpos = offset
1234 1229 elif whence == 1:
1235 1230 newpos = self._pos + offset
1236 1231 elif whence == 2:
1237 1232 if not self.consumed:
1238 1233 self.read()
1239 1234 newpos = self._chunkindex[-1][0] - offset
1240 1235 else:
1241 1236 raise ValueError('Unknown whence value: %r' % (whence,))
1242 1237
1243 1238 if newpos > self._chunkindex[-1][0] and not self.consumed:
1244 1239 self.read()
1245 1240 if not 0 <= newpos <= self._chunkindex[-1][0]:
1246 1241 raise ValueError('Offset out of range')
1247 1242
1248 1243 if self._pos != newpos:
1249 1244 chunk, internaloffset = self._findchunk(newpos)
1250 1245 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1251 1246 adjust = self.read(internaloffset)
1252 1247 if len(adjust) != internaloffset:
1253 1248 raise error.Abort(_('Seek failed\n'))
1254 1249 self._pos = newpos
1255 1250
1256 1251 # These are only the static capabilities.
1257 1252 # Check the 'getrepocaps' function for the rest.
1258 1253 capabilities = {'HG20': (),
1259 1254 'error': ('abort', 'unsupportedcontent', 'pushraced',
1260 1255 'pushkey'),
1261 1256 'listkeys': (),
1262 1257 'pushkey': (),
1263 1258 'digests': tuple(sorted(util.DIGESTS.keys())),
1264 1259 'remote-changegroup': ('http', 'https'),
1265 1260 'hgtagsfnodes': (),
1266 1261 }
1267 1262
1268 1263 def getrepocaps(repo, allowpushback=False):
1269 1264 """return the bundle2 capabilities for a given repo
1270 1265
1271 1266 Exists to allow extensions (like evolution) to mutate the capabilities.
1272 1267 """
1273 1268 caps = capabilities.copy()
1274 1269 caps['changegroup'] = tuple(sorted(
1275 1270 changegroup.supportedincomingversions(repo)))
1276 1271 if obsolete.isenabled(repo, obsolete.exchangeopt):
1277 1272 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1278 1273 caps['obsmarkers'] = supportedformat
1279 1274 if allowpushback:
1280 1275 caps['pushback'] = ()
1281 1276 return caps
1282 1277
1283 1278 def bundle2caps(remote):
1284 1279 """return the bundle capabilities of a peer as dict"""
1285 1280 raw = remote.capable('bundle2')
1286 1281 if not raw and raw != '':
1287 1282 return {}
1288 1283 capsblob = urlreq.unquote(remote.capable('bundle2'))
1289 1284 return decodecaps(capsblob)
1290 1285
1291 1286 def obsmarkersversion(caps):
1292 1287 """extract the list of supported obsmarkers versions from a bundle2caps dict
1293 1288 """
1294 1289 obscaps = caps.get('obsmarkers', ())
1295 1290 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1296 1291
1297 1292 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1298 1293 """Write a bundle file and return its filename.
1299 1294
1300 1295 Existing files will not be overwritten.
1301 1296 If no filename is specified, a temporary file is created.
1302 1297 bz2 compression can be turned off.
1303 1298 The bundle file will be deleted in case of errors.
1304 1299 """
1305 1300
1306 1301 if bundletype == "HG20":
1307 1302 bundle = bundle20(ui)
1308 1303 bundle.setcompression(compression)
1309 1304 part = bundle.newpart('changegroup', data=cg.getchunks())
1310 1305 part.addparam('version', cg.version)
1311 1306 if 'clcount' in cg.extras:
1312 1307 part.addparam('nbchanges', str(cg.extras['clcount']),
1313 1308 mandatory=False)
1314 1309 chunkiter = bundle.getchunks()
1315 1310 else:
1316 1311 # compression argument is only for the bundle2 case
1317 1312 assert compression is None
1318 1313 if cg.version != '01':
1319 1314 raise error.Abort(_('old bundle types only supports v1 '
1320 1315 'changegroups'))
1321 1316 header, comp = bundletypes[bundletype]
1322 1317 if comp not in util.compengines.supportedbundletypes:
1323 1318 raise error.Abort(_('unknown stream compression type: %s')
1324 1319 % comp)
1325 1320 compengine = util.compengines.forbundletype(comp)
1326 compressor = compengine.compressorobj()
1327 subchunkiter = cg.getchunks()
1328 1321 def chunkiter():
1329 1322 yield header
1330 for chunk in subchunkiter:
1331 data = compressor.compress(chunk)
1332 if data:
1333 yield data
1334 yield compressor.flush()
1323 for chunk in compengine.compressstream(cg.getchunks()):
1324 yield chunk
1335 1325 chunkiter = chunkiter()
1336 1326
1337 1327 # parse the changegroup data, otherwise we will block
1338 1328 # in case of sshrepo because we don't know the end of the stream
1339 1329 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1340 1330
1341 1331 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1342 1332 def handlechangegroup(op, inpart):
1343 1333 """apply a changegroup part on the repo
1344 1334
1345 1335 This is a very early implementation that will massive rework before being
1346 1336 inflicted to any end-user.
1347 1337 """
1348 1338 # Make sure we trigger a transaction creation
1349 1339 #
1350 1340 # The addchangegroup function will get a transaction object by itself, but
1351 1341 # we need to make sure we trigger the creation of a transaction object used
1352 1342 # for the whole processing scope.
1353 1343 op.gettransaction()
1354 1344 unpackerversion = inpart.params.get('version', '01')
1355 1345 # We should raise an appropriate exception here
1356 1346 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1357 1347 # the source and url passed here are overwritten by the one contained in
1358 1348 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1359 1349 nbchangesets = None
1360 1350 if 'nbchanges' in inpart.params:
1361 1351 nbchangesets = int(inpart.params.get('nbchanges'))
1362 1352 if ('treemanifest' in inpart.params and
1363 1353 'treemanifest' not in op.repo.requirements):
1364 1354 if len(op.repo.changelog) != 0:
1365 1355 raise error.Abort(_(
1366 1356 "bundle contains tree manifests, but local repo is "
1367 1357 "non-empty and does not use tree manifests"))
1368 1358 op.repo.requirements.add('treemanifest')
1369 1359 op.repo._applyopenerreqs()
1370 1360 op.repo._writerequirements()
1371 1361 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1372 1362 op.records.add('changegroup', {'return': ret})
1373 1363 if op.reply is not None:
1374 1364 # This is definitely not the final form of this
1375 1365 # return. But one need to start somewhere.
1376 1366 part = op.reply.newpart('reply:changegroup', mandatory=False)
1377 1367 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1378 1368 part.addparam('return', '%i' % ret, mandatory=False)
1379 1369 assert not inpart.read()
1380 1370
1381 1371 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1382 1372 ['digest:%s' % k for k in util.DIGESTS.keys()])
1383 1373 @parthandler('remote-changegroup', _remotechangegroupparams)
1384 1374 def handleremotechangegroup(op, inpart):
1385 1375 """apply a bundle10 on the repo, given an url and validation information
1386 1376
1387 1377 All the information about the remote bundle to import are given as
1388 1378 parameters. The parameters include:
1389 1379 - url: the url to the bundle10.
1390 1380 - size: the bundle10 file size. It is used to validate what was
1391 1381 retrieved by the client matches the server knowledge about the bundle.
1392 1382 - digests: a space separated list of the digest types provided as
1393 1383 parameters.
1394 1384 - digest:<digest-type>: the hexadecimal representation of the digest with
1395 1385 that name. Like the size, it is used to validate what was retrieved by
1396 1386 the client matches what the server knows about the bundle.
1397 1387
1398 1388 When multiple digest types are given, all of them are checked.
1399 1389 """
1400 1390 try:
1401 1391 raw_url = inpart.params['url']
1402 1392 except KeyError:
1403 1393 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1404 1394 parsed_url = util.url(raw_url)
1405 1395 if parsed_url.scheme not in capabilities['remote-changegroup']:
1406 1396 raise error.Abort(_('remote-changegroup does not support %s urls') %
1407 1397 parsed_url.scheme)
1408 1398
1409 1399 try:
1410 1400 size = int(inpart.params['size'])
1411 1401 except ValueError:
1412 1402 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1413 1403 % 'size')
1414 1404 except KeyError:
1415 1405 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1416 1406
1417 1407 digests = {}
1418 1408 for typ in inpart.params.get('digests', '').split():
1419 1409 param = 'digest:%s' % typ
1420 1410 try:
1421 1411 value = inpart.params[param]
1422 1412 except KeyError:
1423 1413 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1424 1414 param)
1425 1415 digests[typ] = value
1426 1416
1427 1417 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1428 1418
1429 1419 # Make sure we trigger a transaction creation
1430 1420 #
1431 1421 # The addchangegroup function will get a transaction object by itself, but
1432 1422 # we need to make sure we trigger the creation of a transaction object used
1433 1423 # for the whole processing scope.
1434 1424 op.gettransaction()
1435 1425 from . import exchange
1436 1426 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1437 1427 if not isinstance(cg, changegroup.cg1unpacker):
1438 1428 raise error.Abort(_('%s: not a bundle version 1.0') %
1439 1429 util.hidepassword(raw_url))
1440 1430 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1441 1431 op.records.add('changegroup', {'return': ret})
1442 1432 if op.reply is not None:
1443 1433 # This is definitely not the final form of this
1444 1434 # return. But one need to start somewhere.
1445 1435 part = op.reply.newpart('reply:changegroup')
1446 1436 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1447 1437 part.addparam('return', '%i' % ret, mandatory=False)
1448 1438 try:
1449 1439 real_part.validate()
1450 1440 except error.Abort as e:
1451 1441 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1452 1442 (util.hidepassword(raw_url), str(e)))
1453 1443 assert not inpart.read()
1454 1444
1455 1445 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1456 1446 def handlereplychangegroup(op, inpart):
1457 1447 ret = int(inpart.params['return'])
1458 1448 replyto = int(inpart.params['in-reply-to'])
1459 1449 op.records.add('changegroup', {'return': ret}, replyto)
1460 1450
1461 1451 @parthandler('check:heads')
1462 1452 def handlecheckheads(op, inpart):
1463 1453 """check that head of the repo did not change
1464 1454
1465 1455 This is used to detect a push race when using unbundle.
1466 1456 This replaces the "heads" argument of unbundle."""
1467 1457 h = inpart.read(20)
1468 1458 heads = []
1469 1459 while len(h) == 20:
1470 1460 heads.append(h)
1471 1461 h = inpart.read(20)
1472 1462 assert not h
1473 1463 # Trigger a transaction so that we are guaranteed to have the lock now.
1474 1464 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1475 1465 op.gettransaction()
1476 1466 if sorted(heads) != sorted(op.repo.heads()):
1477 1467 raise error.PushRaced('repository changed while pushing - '
1478 1468 'please try again')
1479 1469
1480 1470 @parthandler('output')
1481 1471 def handleoutput(op, inpart):
1482 1472 """forward output captured on the server to the client"""
1483 1473 for line in inpart.read().splitlines():
1484 1474 op.ui.status(_('remote: %s\n') % line)
1485 1475
1486 1476 @parthandler('replycaps')
1487 1477 def handlereplycaps(op, inpart):
1488 1478 """Notify that a reply bundle should be created
1489 1479
1490 1480 The payload contains the capabilities information for the reply"""
1491 1481 caps = decodecaps(inpart.read())
1492 1482 if op.reply is None:
1493 1483 op.reply = bundle20(op.ui, caps)
1494 1484
1495 1485 class AbortFromPart(error.Abort):
1496 1486 """Sub-class of Abort that denotes an error from a bundle2 part."""
1497 1487
1498 1488 @parthandler('error:abort', ('message', 'hint'))
1499 1489 def handleerrorabort(op, inpart):
1500 1490 """Used to transmit abort error over the wire"""
1501 1491 raise AbortFromPart(inpart.params['message'],
1502 1492 hint=inpart.params.get('hint'))
1503 1493
1504 1494 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1505 1495 'in-reply-to'))
1506 1496 def handleerrorpushkey(op, inpart):
1507 1497 """Used to transmit failure of a mandatory pushkey over the wire"""
1508 1498 kwargs = {}
1509 1499 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1510 1500 value = inpart.params.get(name)
1511 1501 if value is not None:
1512 1502 kwargs[name] = value
1513 1503 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1514 1504
1515 1505 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1516 1506 def handleerrorunsupportedcontent(op, inpart):
1517 1507 """Used to transmit unknown content error over the wire"""
1518 1508 kwargs = {}
1519 1509 parttype = inpart.params.get('parttype')
1520 1510 if parttype is not None:
1521 1511 kwargs['parttype'] = parttype
1522 1512 params = inpart.params.get('params')
1523 1513 if params is not None:
1524 1514 kwargs['params'] = params.split('\0')
1525 1515
1526 1516 raise error.BundleUnknownFeatureError(**kwargs)
1527 1517
1528 1518 @parthandler('error:pushraced', ('message',))
1529 1519 def handleerrorpushraced(op, inpart):
1530 1520 """Used to transmit push race error over the wire"""
1531 1521 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1532 1522
1533 1523 @parthandler('listkeys', ('namespace',))
1534 1524 def handlelistkeys(op, inpart):
1535 1525 """retrieve pushkey namespace content stored in a bundle2"""
1536 1526 namespace = inpart.params['namespace']
1537 1527 r = pushkey.decodekeys(inpart.read())
1538 1528 op.records.add('listkeys', (namespace, r))
1539 1529
1540 1530 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1541 1531 def handlepushkey(op, inpart):
1542 1532 """process a pushkey request"""
1543 1533 dec = pushkey.decode
1544 1534 namespace = dec(inpart.params['namespace'])
1545 1535 key = dec(inpart.params['key'])
1546 1536 old = dec(inpart.params['old'])
1547 1537 new = dec(inpart.params['new'])
1548 1538 # Grab the transaction to ensure that we have the lock before performing the
1549 1539 # pushkey.
1550 1540 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1551 1541 op.gettransaction()
1552 1542 ret = op.repo.pushkey(namespace, key, old, new)
1553 1543 record = {'namespace': namespace,
1554 1544 'key': key,
1555 1545 'old': old,
1556 1546 'new': new}
1557 1547 op.records.add('pushkey', record)
1558 1548 if op.reply is not None:
1559 1549 rpart = op.reply.newpart('reply:pushkey')
1560 1550 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1561 1551 rpart.addparam('return', '%i' % ret, mandatory=False)
1562 1552 if inpart.mandatory and not ret:
1563 1553 kwargs = {}
1564 1554 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1565 1555 if key in inpart.params:
1566 1556 kwargs[key] = inpart.params[key]
1567 1557 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1568 1558
1569 1559 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1570 1560 def handlepushkeyreply(op, inpart):
1571 1561 """retrieve the result of a pushkey request"""
1572 1562 ret = int(inpart.params['return'])
1573 1563 partid = int(inpart.params['in-reply-to'])
1574 1564 op.records.add('pushkey', {'return': ret}, partid)
1575 1565
1576 1566 @parthandler('obsmarkers')
1577 1567 def handleobsmarker(op, inpart):
1578 1568 """add a stream of obsmarkers to the repo"""
1579 1569 tr = op.gettransaction()
1580 1570 markerdata = inpart.read()
1581 1571 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1582 1572 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1583 1573 % len(markerdata))
1584 1574 # The mergemarkers call will crash if marker creation is not enabled.
1585 1575 # we want to avoid this if the part is advisory.
1586 1576 if not inpart.mandatory and op.repo.obsstore.readonly:
1587 1577 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1588 1578 return
1589 1579 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1590 1580 if new:
1591 1581 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1592 1582 op.records.add('obsmarkers', {'new': new})
1593 1583 if op.reply is not None:
1594 1584 rpart = op.reply.newpart('reply:obsmarkers')
1595 1585 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1596 1586 rpart.addparam('new', '%i' % new, mandatory=False)
1597 1587
1598 1588
1599 1589 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1600 1590 def handleobsmarkerreply(op, inpart):
1601 1591 """retrieve the result of a pushkey request"""
1602 1592 ret = int(inpart.params['new'])
1603 1593 partid = int(inpart.params['in-reply-to'])
1604 1594 op.records.add('obsmarkers', {'new': ret}, partid)
1605 1595
1606 1596 @parthandler('hgtagsfnodes')
1607 1597 def handlehgtagsfnodes(op, inpart):
1608 1598 """Applies .hgtags fnodes cache entries to the local repo.
1609 1599
1610 1600 Payload is pairs of 20 byte changeset nodes and filenodes.
1611 1601 """
1612 1602 # Grab the transaction so we ensure that we have the lock at this point.
1613 1603 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1614 1604 op.gettransaction()
1615 1605 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1616 1606
1617 1607 count = 0
1618 1608 while True:
1619 1609 node = inpart.read(20)
1620 1610 fnode = inpart.read(20)
1621 1611 if len(node) < 20 or len(fnode) < 20:
1622 1612 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1623 1613 break
1624 1614 cache.setfnode(node, fnode)
1625 1615 count += 1
1626 1616
1627 1617 cache.write()
1628 1618 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now