##// END OF EJS Templates
bundle2: make unbundle.compressed return True when compressed...
Pierre-Yves David -
r26802:42f705f2 default
parent child Browse files
Show More
@@ -1,1536 +1,1540
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155 import urllib
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 changegroup,
160 160 error,
161 161 obsolete,
162 162 pushkey,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 _pack = struct.pack
169 169 _unpack = struct.unpack
170 170
171 171 _fstreamparamsize = '>i'
172 172 _fpartheadersize = '>i'
173 173 _fparttypesize = '>B'
174 174 _fpartid = '>I'
175 175 _fpayloadsize = '>i'
176 176 _fpartparamcount = '>BB'
177 177
178 178 preferedchunksize = 4096
179 179
180 180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181 181
182 182 def outdebug(ui, message):
183 183 """debug regarding output stream (bundling)"""
184 184 if ui.configbool('devel', 'bundle2.debug', False):
185 185 ui.debug('bundle2-output: %s\n' % message)
186 186
187 187 def indebug(ui, message):
188 188 """debug on input stream (unbundling)"""
189 189 if ui.configbool('devel', 'bundle2.debug', False):
190 190 ui.debug('bundle2-input: %s\n' % message)
191 191
192 192 def validateparttype(parttype):
193 193 """raise ValueError if a parttype contains invalid character"""
194 194 if _parttypeforbidden.search(parttype):
195 195 raise ValueError(parttype)
196 196
197 197 def _makefpartparamsizes(nbparams):
198 198 """return a struct format to read part parameter sizes
199 199
200 200 The number parameters is variable so we need to build that format
201 201 dynamically.
202 202 """
203 203 return '>'+('BB'*nbparams)
204 204
205 205 parthandlermapping = {}
206 206
207 207 def parthandler(parttype, params=()):
208 208 """decorator that register a function as a bundle2 part handler
209 209
210 210 eg::
211 211
212 212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 213 def myparttypehandler(...):
214 214 '''process a part of type "my part".'''
215 215 ...
216 216 """
217 217 validateparttype(parttype)
218 218 def _decorator(func):
219 219 lparttype = parttype.lower() # enforce lower case matching.
220 220 assert lparttype not in parthandlermapping
221 221 parthandlermapping[lparttype] = func
222 222 func.params = frozenset(params)
223 223 return func
224 224 return _decorator
225 225
226 226 class unbundlerecords(object):
227 227 """keep record of what happens during and unbundle
228 228
229 229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 230 category of record and obj is an arbitrary object.
231 231
232 232 `records['cat']` will return all entries of this category 'cat'.
233 233
234 234 Iterating on the object itself will yield `('category', obj)` tuples
235 235 for all entries.
236 236
237 237 All iterations happens in chronological order.
238 238 """
239 239
240 240 def __init__(self):
241 241 self._categories = {}
242 242 self._sequences = []
243 243 self._replies = {}
244 244
245 245 def add(self, category, entry, inreplyto=None):
246 246 """add a new record of a given category.
247 247
248 248 The entry can then be retrieved in the list returned by
249 249 self['category']."""
250 250 self._categories.setdefault(category, []).append(entry)
251 251 self._sequences.append((category, entry))
252 252 if inreplyto is not None:
253 253 self.getreplies(inreplyto).add(category, entry)
254 254
255 255 def getreplies(self, partid):
256 256 """get the records that are replies to a specific part"""
257 257 return self._replies.setdefault(partid, unbundlerecords())
258 258
259 259 def __getitem__(self, cat):
260 260 return tuple(self._categories.get(cat, ()))
261 261
262 262 def __iter__(self):
263 263 return iter(self._sequences)
264 264
265 265 def __len__(self):
266 266 return len(self._sequences)
267 267
268 268 def __nonzero__(self):
269 269 return bool(self._sequences)
270 270
271 271 class bundleoperation(object):
272 272 """an object that represents a single bundling process
273 273
274 274 Its purpose is to carry unbundle-related objects and states.
275 275
276 276 A new object should be created at the beginning of each bundle processing.
277 277 The object is to be returned by the processing function.
278 278
279 279 The object has very little content now it will ultimately contain:
280 280 * an access to the repo the bundle is applied to,
281 281 * a ui object,
282 282 * a way to retrieve a transaction to add changes to the repo,
283 283 * a way to record the result of processing each part,
284 284 * a way to construct a bundle response when applicable.
285 285 """
286 286
287 287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 288 self.repo = repo
289 289 self.ui = repo.ui
290 290 self.records = unbundlerecords()
291 291 self.gettransaction = transactiongetter
292 292 self.reply = None
293 293 self.captureoutput = captureoutput
294 294
295 295 class TransactionUnavailable(RuntimeError):
296 296 pass
297 297
298 298 def _notransaction():
299 299 """default method to get a transaction while processing a bundle
300 300
301 301 Raise an exception to highlight the fact that no transaction was expected
302 302 to be created"""
303 303 raise TransactionUnavailable()
304 304
305 305 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
306 306 # transform me into unbundler.apply() as soon as the freeze is lifted
307 307 tr.hookargs['bundle2'] = '1'
308 308 if source is not None and 'source' not in tr.hookargs:
309 309 tr.hookargs['source'] = source
310 310 if url is not None and 'url' not in tr.hookargs:
311 311 tr.hookargs['url'] = url
312 312 return processbundle(repo, unbundler, lambda: tr, op=op)
313 313
314 314 def processbundle(repo, unbundler, transactiongetter=None, op=None):
315 315 """This function process a bundle, apply effect to/from a repo
316 316
317 317 It iterates over each part then searches for and uses the proper handling
318 318 code to process the part. Parts are processed in order.
319 319
320 320 This is very early version of this function that will be strongly reworked
321 321 before final usage.
322 322
323 323 Unknown Mandatory part will abort the process.
324 324
325 325 It is temporarily possible to provide a prebuilt bundleoperation to the
326 326 function. This is used to ensure output is properly propagated in case of
327 327 an error during the unbundling. This output capturing part will likely be
328 328 reworked and this ability will probably go away in the process.
329 329 """
330 330 if op is None:
331 331 if transactiongetter is None:
332 332 transactiongetter = _notransaction
333 333 op = bundleoperation(repo, transactiongetter)
334 334 # todo:
335 335 # - replace this is a init function soon.
336 336 # - exception catching
337 337 unbundler.params
338 338 if repo.ui.debugflag:
339 339 msg = ['bundle2-input-bundle:']
340 340 if unbundler.params:
341 341 msg.append(' %i params')
342 342 if op.gettransaction is None:
343 343 msg.append(' no-transaction')
344 344 else:
345 345 msg.append(' with-transaction')
346 346 msg.append('\n')
347 347 repo.ui.debug(''.join(msg))
348 348 iterparts = enumerate(unbundler.iterparts())
349 349 part = None
350 350 nbpart = 0
351 351 try:
352 352 for nbpart, part in iterparts:
353 353 _processpart(op, part)
354 354 except BaseException as exc:
355 355 for nbpart, part in iterparts:
356 356 # consume the bundle content
357 357 part.seek(0, 2)
358 358 # Small hack to let caller code distinguish exceptions from bundle2
359 359 # processing from processing the old format. This is mostly
360 360 # needed to handle different return codes to unbundle according to the
361 361 # type of bundle. We should probably clean up or drop this return code
362 362 # craziness in a future version.
363 363 exc.duringunbundle2 = True
364 364 salvaged = []
365 365 replycaps = None
366 366 if op.reply is not None:
367 367 salvaged = op.reply.salvageoutput()
368 368 replycaps = op.reply.capabilities
369 369 exc._replycaps = replycaps
370 370 exc._bundle2salvagedoutput = salvaged
371 371 raise
372 372 finally:
373 373 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
374 374
375 375 return op
376 376
377 377 def _processpart(op, part):
378 378 """process a single part from a bundle
379 379
380 380 The part is guaranteed to have been fully consumed when the function exits
381 381 (even if an exception is raised)."""
382 382 status = 'unknown' # used by debug output
383 383 try:
384 384 try:
385 385 handler = parthandlermapping.get(part.type)
386 386 if handler is None:
387 387 status = 'unsupported-type'
388 388 raise error.BundleUnknownFeatureError(parttype=part.type)
389 389 indebug(op.ui, 'found a handler for part %r' % part.type)
390 390 unknownparams = part.mandatorykeys - handler.params
391 391 if unknownparams:
392 392 unknownparams = list(unknownparams)
393 393 unknownparams.sort()
394 394 status = 'unsupported-params (%s)' % unknownparams
395 395 raise error.BundleUnknownFeatureError(parttype=part.type,
396 396 params=unknownparams)
397 397 status = 'supported'
398 398 except error.BundleUnknownFeatureError as exc:
399 399 if part.mandatory: # mandatory parts
400 400 raise
401 401 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
402 402 return # skip to part processing
403 403 finally:
404 404 if op.ui.debugflag:
405 405 msg = ['bundle2-input-part: "%s"' % part.type]
406 406 if not part.mandatory:
407 407 msg.append(' (advisory)')
408 408 nbmp = len(part.mandatorykeys)
409 409 nbap = len(part.params) - nbmp
410 410 if nbmp or nbap:
411 411 msg.append(' (params:')
412 412 if nbmp:
413 413 msg.append(' %i mandatory' % nbmp)
414 414 if nbap:
415 415 msg.append(' %i advisory' % nbmp)
416 416 msg.append(')')
417 417 msg.append(' %s\n' % status)
418 418 op.ui.debug(''.join(msg))
419 419
420 420 # handler is called outside the above try block so that we don't
421 421 # risk catching KeyErrors from anything other than the
422 422 # parthandlermapping lookup (any KeyError raised by handler()
423 423 # itself represents a defect of a different variety).
424 424 output = None
425 425 if op.captureoutput and op.reply is not None:
426 426 op.ui.pushbuffer(error=True, subproc=True)
427 427 output = ''
428 428 try:
429 429 handler(op, part)
430 430 finally:
431 431 if output is not None:
432 432 output = op.ui.popbuffer()
433 433 if output:
434 434 outpart = op.reply.newpart('output', data=output,
435 435 mandatory=False)
436 436 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
437 437 finally:
438 438 # consume the part content to not corrupt the stream.
439 439 part.seek(0, 2)
440 440
441 441
442 442 def decodecaps(blob):
443 443 """decode a bundle2 caps bytes blob into a dictionary
444 444
445 445 The blob is a list of capabilities (one per line)
446 446 Capabilities may have values using a line of the form::
447 447
448 448 capability=value1,value2,value3
449 449
450 450 The values are always a list."""
451 451 caps = {}
452 452 for line in blob.splitlines():
453 453 if not line:
454 454 continue
455 455 if '=' not in line:
456 456 key, vals = line, ()
457 457 else:
458 458 key, vals = line.split('=', 1)
459 459 vals = vals.split(',')
460 460 key = urllib.unquote(key)
461 461 vals = [urllib.unquote(v) for v in vals]
462 462 caps[key] = vals
463 463 return caps
464 464
465 465 def encodecaps(caps):
466 466 """encode a bundle2 caps dictionary into a bytes blob"""
467 467 chunks = []
468 468 for ca in sorted(caps):
469 469 vals = caps[ca]
470 470 ca = urllib.quote(ca)
471 471 vals = [urllib.quote(v) for v in vals]
472 472 if vals:
473 473 ca = "%s=%s" % (ca, ','.join(vals))
474 474 chunks.append(ca)
475 475 return '\n'.join(chunks)
476 476
477 477 class bundle20(object):
478 478 """represent an outgoing bundle2 container
479 479
480 480 Use the `addparam` method to add stream level parameter. and `newpart` to
481 481 populate it. Then call `getchunks` to retrieve all the binary chunks of
482 482 data that compose the bundle2 container."""
483 483
484 484 _magicstring = 'HG20'
485 485
486 486 def __init__(self, ui, capabilities=()):
487 487 self.ui = ui
488 488 self._params = []
489 489 self._parts = []
490 490 self.capabilities = dict(capabilities)
491 491 self._compressor = util.compressors[None]()
492 492
493 493 def setcompression(self, alg):
494 494 """setup core part compression to <alg>"""
495 495 if alg is None:
496 496 return
497 497 assert not any(n.lower() == 'Compression' for n, v in self._params)
498 498 self.addparam('Compression', alg)
499 499 self._compressor = util.compressors[alg]()
500 500
501 501 @property
502 502 def nbparts(self):
503 503 """total number of parts added to the bundler"""
504 504 return len(self._parts)
505 505
506 506 # methods used to defines the bundle2 content
507 507 def addparam(self, name, value=None):
508 508 """add a stream level parameter"""
509 509 if not name:
510 510 raise ValueError('empty parameter name')
511 511 if name[0] not in string.letters:
512 512 raise ValueError('non letter first character: %r' % name)
513 513 self._params.append((name, value))
514 514
515 515 def addpart(self, part):
516 516 """add a new part to the bundle2 container
517 517
518 518 Parts contains the actual applicative payload."""
519 519 assert part.id is None
520 520 part.id = len(self._parts) # very cheap counter
521 521 self._parts.append(part)
522 522
523 523 def newpart(self, typeid, *args, **kwargs):
524 524 """create a new part and add it to the containers
525 525
526 526 As the part is directly added to the containers. For now, this means
527 527 that any failure to properly initialize the part after calling
528 528 ``newpart`` should result in a failure of the whole bundling process.
529 529
530 530 You can still fall back to manually create and add if you need better
531 531 control."""
532 532 part = bundlepart(typeid, *args, **kwargs)
533 533 self.addpart(part)
534 534 return part
535 535
536 536 # methods used to generate the bundle2 stream
537 537 def getchunks(self):
538 538 if self.ui.debugflag:
539 539 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
540 540 if self._params:
541 541 msg.append(' (%i params)' % len(self._params))
542 542 msg.append(' %i parts total\n' % len(self._parts))
543 543 self.ui.debug(''.join(msg))
544 544 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
545 545 yield self._magicstring
546 546 param = self._paramchunk()
547 547 outdebug(self.ui, 'bundle parameter: %s' % param)
548 548 yield _pack(_fstreamparamsize, len(param))
549 549 if param:
550 550 yield param
551 551 # starting compression
552 552 for chunk in self._getcorechunk():
553 553 yield self._compressor.compress(chunk)
554 554 yield self._compressor.flush()
555 555
556 556 def _paramchunk(self):
557 557 """return a encoded version of all stream parameters"""
558 558 blocks = []
559 559 for par, value in self._params:
560 560 par = urllib.quote(par)
561 561 if value is not None:
562 562 value = urllib.quote(value)
563 563 par = '%s=%s' % (par, value)
564 564 blocks.append(par)
565 565 return ' '.join(blocks)
566 566
567 567 def _getcorechunk(self):
568 568 """yield chunk for the core part of the bundle
569 569
570 570 (all but headers and parameters)"""
571 571 outdebug(self.ui, 'start of parts')
572 572 for part in self._parts:
573 573 outdebug(self.ui, 'bundle part: "%s"' % part.type)
574 574 for chunk in part.getchunks(ui=self.ui):
575 575 yield chunk
576 576 outdebug(self.ui, 'end of bundle')
577 577 yield _pack(_fpartheadersize, 0)
578 578
579 579
580 580 def salvageoutput(self):
581 581 """return a list with a copy of all output parts in the bundle
582 582
583 583 This is meant to be used during error handling to make sure we preserve
584 584 server output"""
585 585 salvaged = []
586 586 for part in self._parts:
587 587 if part.type.startswith('output'):
588 588 salvaged.append(part.copy())
589 589 return salvaged
590 590
591 591
592 592 class unpackermixin(object):
593 593 """A mixin to extract bytes and struct data from a stream"""
594 594
595 595 def __init__(self, fp):
596 596 self._fp = fp
597 597 self._seekable = (util.safehasattr(fp, 'seek') and
598 598 util.safehasattr(fp, 'tell'))
599 599
600 600 def _unpack(self, format):
601 601 """unpack this struct format from the stream"""
602 602 data = self._readexact(struct.calcsize(format))
603 603 return _unpack(format, data)
604 604
605 605 def _readexact(self, size):
606 606 """read exactly <size> bytes from the stream"""
607 607 return changegroup.readexactly(self._fp, size)
608 608
609 609 def seek(self, offset, whence=0):
610 610 """move the underlying file pointer"""
611 611 if self._seekable:
612 612 return self._fp.seek(offset, whence)
613 613 else:
614 614 raise NotImplementedError(_('File pointer is not seekable'))
615 615
616 616 def tell(self):
617 617 """return the file offset, or None if file is not seekable"""
618 618 if self._seekable:
619 619 try:
620 620 return self._fp.tell()
621 621 except IOError as e:
622 622 if e.errno == errno.ESPIPE:
623 623 self._seekable = False
624 624 else:
625 625 raise
626 626 return None
627 627
628 628 def close(self):
629 629 """close underlying file"""
630 630 if util.safehasattr(self._fp, 'close'):
631 631 return self._fp.close()
632 632
633 633 def getunbundler(ui, fp, magicstring=None):
634 634 """return a valid unbundler object for a given magicstring"""
635 635 if magicstring is None:
636 636 magicstring = changegroup.readexactly(fp, 4)
637 637 magic, version = magicstring[0:2], magicstring[2:4]
638 638 if magic != 'HG':
639 639 raise error.Abort(_('not a Mercurial bundle'))
640 640 unbundlerclass = formatmap.get(version)
641 641 if unbundlerclass is None:
642 642 raise error.Abort(_('unknown bundle version %s') % version)
643 643 unbundler = unbundlerclass(ui, fp)
644 644 indebug(ui, 'start processing of %s stream' % magicstring)
645 645 return unbundler
646 646
647 647 class unbundle20(unpackermixin):
648 648 """interpret a bundle2 stream
649 649
650 650 This class is fed with a binary stream and yields parts through its
651 651 `iterparts` methods."""
652 652
653 653 _magicstring = 'HG20'
654 654
655 655 def __init__(self, ui, fp):
656 656 """If header is specified, we do not read it out of the stream."""
657 657 self.ui = ui
658 658 self._decompressor = util.decompressors[None]
659 self._compressed = None
659 660 super(unbundle20, self).__init__(fp)
660 661
661 662 @util.propertycache
662 663 def params(self):
663 664 """dictionary of stream level parameters"""
664 665 indebug(self.ui, 'reading bundle2 stream parameters')
665 666 params = {}
666 667 paramssize = self._unpack(_fstreamparamsize)[0]
667 668 if paramssize < 0:
668 669 raise error.BundleValueError('negative bundle param size: %i'
669 670 % paramssize)
670 671 if paramssize:
671 672 params = self._readexact(paramssize)
672 673 params = self._processallparams(params)
673 674 return params
674 675
675 676 def _processallparams(self, paramsblock):
676 677 """"""
677 678 params = {}
678 679 for p in paramsblock.split(' '):
679 680 p = p.split('=', 1)
680 681 p = [urllib.unquote(i) for i in p]
681 682 if len(p) < 2:
682 683 p.append(None)
683 684 self._processparam(*p)
684 685 params[p[0]] = p[1]
685 686 return params
686 687
687 688
688 689 def _processparam(self, name, value):
689 690 """process a parameter, applying its effect if needed
690 691
691 692 Parameter starting with a lower case letter are advisory and will be
692 693 ignored when unknown. Those starting with an upper case letter are
693 694 mandatory and will this function will raise a KeyError when unknown.
694 695
695 696 Note: no option are currently supported. Any input will be either
696 697 ignored or failing.
697 698 """
698 699 if not name:
699 700 raise ValueError('empty parameter name')
700 701 if name[0] not in string.letters:
701 702 raise ValueError('non letter first character: %r' % name)
702 703 try:
703 704 handler = b2streamparamsmap[name.lower()]
704 705 except KeyError:
705 706 if name[0].islower():
706 707 indebug(self.ui, "ignoring unknown parameter %r" % name)
707 708 else:
708 709 raise error.BundleUnknownFeatureError(params=(name,))
709 710 else:
710 711 handler(self, name, value)
711 712
712 713 def _forwardchunks(self):
713 714 """utility to transfer a bundle2 as binary
714 715
715 716 This is made necessary by the fact the 'getbundle' command over 'ssh'
716 717 have no way to know then the reply end, relying on the bundle to be
717 718 interpreted to know its end. This is terrible and we are sorry, but we
718 719 needed to move forward to get general delta enabled.
719 720 """
720 721 yield self._magicstring
721 722 assert 'params' not in vars(self)
722 723 paramssize = self._unpack(_fstreamparamsize)[0]
723 724 if paramssize < 0:
724 725 raise error.BundleValueError('negative bundle param size: %i'
725 726 % paramssize)
726 727 yield _pack(_fstreamparamsize, paramssize)
727 728 if paramssize:
728 729 params = self._readexact(paramssize)
729 730 self._processallparams(params)
730 731 yield params
731 732 assert self._decompressor is util.decompressors[None]
732 733 # From there, payload might need to be decompressed
733 734 self._fp = self._decompressor(self._fp)
734 735 emptycount = 0
735 736 while emptycount < 2:
736 737 # so we can brainlessly loop
737 738 assert _fpartheadersize == _fpayloadsize
738 739 size = self._unpack(_fpartheadersize)[0]
739 740 yield _pack(_fpartheadersize, size)
740 741 if size:
741 742 emptycount = 0
742 743 else:
743 744 emptycount += 1
744 745 continue
745 746 if size == flaginterrupt:
746 747 continue
747 748 elif size < 0:
748 749 raise error.BundleValueError('negative chunk size: %i')
749 750 yield self._readexact(size)
750 751
751 752
752 753 def iterparts(self):
753 754 """yield all parts contained in the stream"""
754 755 # make sure param have been loaded
755 756 self.params
756 757 # From there, payload need to be decompressed
757 758 self._fp = self._decompressor(self._fp)
758 759 indebug(self.ui, 'start extraction of bundle2 parts')
759 760 headerblock = self._readpartheader()
760 761 while headerblock is not None:
761 762 part = unbundlepart(self.ui, headerblock, self._fp)
762 763 yield part
763 764 part.seek(0, 2)
764 765 headerblock = self._readpartheader()
765 766 indebug(self.ui, 'end of bundle2 stream')
766 767
767 768 def _readpartheader(self):
768 769 """reads a part header size and return the bytes blob
769 770
770 771 returns None if empty"""
771 772 headersize = self._unpack(_fpartheadersize)[0]
772 773 if headersize < 0:
773 774 raise error.BundleValueError('negative part header size: %i'
774 775 % headersize)
775 776 indebug(self.ui, 'part header size: %i' % headersize)
776 777 if headersize:
777 778 return self._readexact(headersize)
778 779 return None
779 780
780 781 def compressed(self):
781 return False
782 self.params # load params
783 return self._compressed
782 784
783 785 formatmap = {'20': unbundle20}
784 786
785 787 b2streamparamsmap = {}
786 788
787 789 def b2streamparamhandler(name):
788 790 """register a handler for a stream level parameter"""
789 791 def decorator(func):
790 792 assert name not in formatmap
791 793 b2streamparamsmap[name] = func
792 794 return func
793 795 return decorator
794 796
795 797 @b2streamparamhandler('compression')
796 798 def processcompression(unbundler, param, value):
797 799 """read compression parameter and install payload decompression"""
798 800 if value not in util.decompressors:
799 801 raise error.BundleUnknownFeatureError(params=(param,),
800 802 values=(value,))
801 803 unbundler._decompressor = util.decompressors[value]
804 if value is not None:
805 unbundler._compressed = True
802 806
803 807 class bundlepart(object):
804 808 """A bundle2 part contains application level payload
805 809
806 810 The part `type` is used to route the part to the application level
807 811 handler.
808 812
809 813 The part payload is contained in ``part.data``. It could be raw bytes or a
810 814 generator of byte chunks.
811 815
812 816 You can add parameters to the part using the ``addparam`` method.
813 817 Parameters can be either mandatory (default) or advisory. Remote side
814 818 should be able to safely ignore the advisory ones.
815 819
816 820 Both data and parameters cannot be modified after the generation has begun.
817 821 """
818 822
819 823 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
820 824 data='', mandatory=True):
821 825 validateparttype(parttype)
822 826 self.id = None
823 827 self.type = parttype
824 828 self._data = data
825 829 self._mandatoryparams = list(mandatoryparams)
826 830 self._advisoryparams = list(advisoryparams)
827 831 # checking for duplicated entries
828 832 self._seenparams = set()
829 833 for pname, __ in self._mandatoryparams + self._advisoryparams:
830 834 if pname in self._seenparams:
831 835 raise RuntimeError('duplicated params: %s' % pname)
832 836 self._seenparams.add(pname)
833 837 # status of the part's generation:
834 838 # - None: not started,
835 839 # - False: currently generated,
836 840 # - True: generation done.
837 841 self._generated = None
838 842 self.mandatory = mandatory
839 843
840 844 def copy(self):
841 845 """return a copy of the part
842 846
843 847 The new part have the very same content but no partid assigned yet.
844 848 Parts with generated data cannot be copied."""
845 849 assert not util.safehasattr(self.data, 'next')
846 850 return self.__class__(self.type, self._mandatoryparams,
847 851 self._advisoryparams, self._data, self.mandatory)
848 852
849 853 # methods used to defines the part content
850 854 def __setdata(self, data):
851 855 if self._generated is not None:
852 856 raise error.ReadOnlyPartError('part is being generated')
853 857 self._data = data
854 858 def __getdata(self):
855 859 return self._data
856 860 data = property(__getdata, __setdata)
857 861
858 862 @property
859 863 def mandatoryparams(self):
860 864 # make it an immutable tuple to force people through ``addparam``
861 865 return tuple(self._mandatoryparams)
862 866
863 867 @property
864 868 def advisoryparams(self):
865 869 # make it an immutable tuple to force people through ``addparam``
866 870 return tuple(self._advisoryparams)
867 871
868 872 def addparam(self, name, value='', mandatory=True):
869 873 if self._generated is not None:
870 874 raise error.ReadOnlyPartError('part is being generated')
871 875 if name in self._seenparams:
872 876 raise ValueError('duplicated params: %s' % name)
873 877 self._seenparams.add(name)
874 878 params = self._advisoryparams
875 879 if mandatory:
876 880 params = self._mandatoryparams
877 881 params.append((name, value))
878 882
879 883 # methods used to generates the bundle2 stream
880 884 def getchunks(self, ui):
881 885 if self._generated is not None:
882 886 raise RuntimeError('part can only be consumed once')
883 887 self._generated = False
884 888
885 889 if ui.debugflag:
886 890 msg = ['bundle2-output-part: "%s"' % self.type]
887 891 if not self.mandatory:
888 892 msg.append(' (advisory)')
889 893 nbmp = len(self.mandatoryparams)
890 894 nbap = len(self.advisoryparams)
891 895 if nbmp or nbap:
892 896 msg.append(' (params:')
893 897 if nbmp:
894 898 msg.append(' %i mandatory' % nbmp)
895 899 if nbap:
896 900 msg.append(' %i advisory' % nbmp)
897 901 msg.append(')')
898 902 if not self.data:
899 903 msg.append(' empty payload')
900 904 elif util.safehasattr(self.data, 'next'):
901 905 msg.append(' streamed payload')
902 906 else:
903 907 msg.append(' %i bytes payload' % len(self.data))
904 908 msg.append('\n')
905 909 ui.debug(''.join(msg))
906 910
907 911 #### header
908 912 if self.mandatory:
909 913 parttype = self.type.upper()
910 914 else:
911 915 parttype = self.type.lower()
912 916 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
913 917 ## parttype
914 918 header = [_pack(_fparttypesize, len(parttype)),
915 919 parttype, _pack(_fpartid, self.id),
916 920 ]
917 921 ## parameters
918 922 # count
919 923 manpar = self.mandatoryparams
920 924 advpar = self.advisoryparams
921 925 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
922 926 # size
923 927 parsizes = []
924 928 for key, value in manpar:
925 929 parsizes.append(len(key))
926 930 parsizes.append(len(value))
927 931 for key, value in advpar:
928 932 parsizes.append(len(key))
929 933 parsizes.append(len(value))
930 934 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
931 935 header.append(paramsizes)
932 936 # key, value
933 937 for key, value in manpar:
934 938 header.append(key)
935 939 header.append(value)
936 940 for key, value in advpar:
937 941 header.append(key)
938 942 header.append(value)
939 943 ## finalize header
940 944 headerchunk = ''.join(header)
941 945 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
942 946 yield _pack(_fpartheadersize, len(headerchunk))
943 947 yield headerchunk
944 948 ## payload
945 949 try:
946 950 for chunk in self._payloadchunks():
947 951 outdebug(ui, 'payload chunk size: %i' % len(chunk))
948 952 yield _pack(_fpayloadsize, len(chunk))
949 953 yield chunk
950 954 except GeneratorExit:
951 955 # GeneratorExit means that nobody is listening for our
952 956 # results anyway, so just bail quickly rather than trying
953 957 # to produce an error part.
954 958 ui.debug('bundle2-generatorexit\n')
955 959 raise
956 960 except BaseException as exc:
957 961 # backup exception data for later
958 962 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
959 963 % exc)
960 964 exc_info = sys.exc_info()
961 965 msg = 'unexpected error: %s' % exc
962 966 interpart = bundlepart('error:abort', [('message', msg)],
963 967 mandatory=False)
964 968 interpart.id = 0
965 969 yield _pack(_fpayloadsize, -1)
966 970 for chunk in interpart.getchunks(ui=ui):
967 971 yield chunk
968 972 outdebug(ui, 'closing payload chunk')
969 973 # abort current part payload
970 974 yield _pack(_fpayloadsize, 0)
971 975 raise exc_info[0], exc_info[1], exc_info[2]
972 976 # end of payload
973 977 outdebug(ui, 'closing payload chunk')
974 978 yield _pack(_fpayloadsize, 0)
975 979 self._generated = True
976 980
977 981 def _payloadchunks(self):
978 982 """yield chunks of a the part payload
979 983
980 984 Exists to handle the different methods to provide data to a part."""
981 985 # we only support fixed size data now.
982 986 # This will be improved in the future.
983 987 if util.safehasattr(self.data, 'next'):
984 988 buff = util.chunkbuffer(self.data)
985 989 chunk = buff.read(preferedchunksize)
986 990 while chunk:
987 991 yield chunk
988 992 chunk = buff.read(preferedchunksize)
989 993 elif len(self.data):
990 994 yield self.data
991 995
992 996
993 997 flaginterrupt = -1
994 998
995 999 class interrupthandler(unpackermixin):
996 1000 """read one part and process it with restricted capability
997 1001
998 1002 This allows to transmit exception raised on the producer size during part
999 1003 iteration while the consumer is reading a part.
1000 1004
1001 1005 Part processed in this manner only have access to a ui object,"""
1002 1006
1003 1007 def __init__(self, ui, fp):
1004 1008 super(interrupthandler, self).__init__(fp)
1005 1009 self.ui = ui
1006 1010
1007 1011 def _readpartheader(self):
1008 1012 """reads a part header size and return the bytes blob
1009 1013
1010 1014 returns None if empty"""
1011 1015 headersize = self._unpack(_fpartheadersize)[0]
1012 1016 if headersize < 0:
1013 1017 raise error.BundleValueError('negative part header size: %i'
1014 1018 % headersize)
1015 1019 indebug(self.ui, 'part header size: %i\n' % headersize)
1016 1020 if headersize:
1017 1021 return self._readexact(headersize)
1018 1022 return None
1019 1023
1020 1024 def __call__(self):
1021 1025
1022 1026 self.ui.debug('bundle2-input-stream-interrupt:'
1023 1027 ' opening out of band context\n')
1024 1028 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1025 1029 headerblock = self._readpartheader()
1026 1030 if headerblock is None:
1027 1031 indebug(self.ui, 'no part found during interruption.')
1028 1032 return
1029 1033 part = unbundlepart(self.ui, headerblock, self._fp)
1030 1034 op = interruptoperation(self.ui)
1031 1035 _processpart(op, part)
1032 1036 self.ui.debug('bundle2-input-stream-interrupt:'
1033 1037 ' closing out of band context\n')
1034 1038
1035 1039 class interruptoperation(object):
1036 1040 """A limited operation to be use by part handler during interruption
1037 1041
1038 1042 It only have access to an ui object.
1039 1043 """
1040 1044
1041 1045 def __init__(self, ui):
1042 1046 self.ui = ui
1043 1047 self.reply = None
1044 1048 self.captureoutput = False
1045 1049
1046 1050 @property
1047 1051 def repo(self):
1048 1052 raise RuntimeError('no repo access from stream interruption')
1049 1053
1050 1054 def gettransaction(self):
1051 1055 raise TransactionUnavailable('no repo access from stream interruption')
1052 1056
1053 1057 class unbundlepart(unpackermixin):
1054 1058 """a bundle part read from a bundle"""
1055 1059
1056 1060 def __init__(self, ui, header, fp):
1057 1061 super(unbundlepart, self).__init__(fp)
1058 1062 self.ui = ui
1059 1063 # unbundle state attr
1060 1064 self._headerdata = header
1061 1065 self._headeroffset = 0
1062 1066 self._initialized = False
1063 1067 self.consumed = False
1064 1068 # part data
1065 1069 self.id = None
1066 1070 self.type = None
1067 1071 self.mandatoryparams = None
1068 1072 self.advisoryparams = None
1069 1073 self.params = None
1070 1074 self.mandatorykeys = ()
1071 1075 self._payloadstream = None
1072 1076 self._readheader()
1073 1077 self._mandatory = None
1074 1078 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1075 1079 self._pos = 0
1076 1080
1077 1081 def _fromheader(self, size):
1078 1082 """return the next <size> byte from the header"""
1079 1083 offset = self._headeroffset
1080 1084 data = self._headerdata[offset:(offset + size)]
1081 1085 self._headeroffset = offset + size
1082 1086 return data
1083 1087
1084 1088 def _unpackheader(self, format):
1085 1089 """read given format from header
1086 1090
1087 1091 This automatically compute the size of the format to read."""
1088 1092 data = self._fromheader(struct.calcsize(format))
1089 1093 return _unpack(format, data)
1090 1094
1091 1095 def _initparams(self, mandatoryparams, advisoryparams):
1092 1096 """internal function to setup all logic related parameters"""
1093 1097 # make it read only to prevent people touching it by mistake.
1094 1098 self.mandatoryparams = tuple(mandatoryparams)
1095 1099 self.advisoryparams = tuple(advisoryparams)
1096 1100 # user friendly UI
1097 1101 self.params = dict(self.mandatoryparams)
1098 1102 self.params.update(dict(self.advisoryparams))
1099 1103 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1100 1104
1101 1105 def _payloadchunks(self, chunknum=0):
1102 1106 '''seek to specified chunk and start yielding data'''
1103 1107 if len(self._chunkindex) == 0:
1104 1108 assert chunknum == 0, 'Must start with chunk 0'
1105 1109 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1106 1110 else:
1107 1111 assert chunknum < len(self._chunkindex), \
1108 1112 'Unknown chunk %d' % chunknum
1109 1113 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1110 1114
1111 1115 pos = self._chunkindex[chunknum][0]
1112 1116 payloadsize = self._unpack(_fpayloadsize)[0]
1113 1117 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1114 1118 while payloadsize:
1115 1119 if payloadsize == flaginterrupt:
1116 1120 # interruption detection, the handler will now read a
1117 1121 # single part and process it.
1118 1122 interrupthandler(self.ui, self._fp)()
1119 1123 elif payloadsize < 0:
1120 1124 msg = 'negative payload chunk size: %i' % payloadsize
1121 1125 raise error.BundleValueError(msg)
1122 1126 else:
1123 1127 result = self._readexact(payloadsize)
1124 1128 chunknum += 1
1125 1129 pos += payloadsize
1126 1130 if chunknum == len(self._chunkindex):
1127 1131 self._chunkindex.append((pos,
1128 1132 super(unbundlepart, self).tell()))
1129 1133 yield result
1130 1134 payloadsize = self._unpack(_fpayloadsize)[0]
1131 1135 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1132 1136
1133 1137 def _findchunk(self, pos):
1134 1138 '''for a given payload position, return a chunk number and offset'''
1135 1139 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1136 1140 if ppos == pos:
1137 1141 return chunk, 0
1138 1142 elif ppos > pos:
1139 1143 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1140 1144 raise ValueError('Unknown chunk')
1141 1145
1142 1146 def _readheader(self):
1143 1147 """read the header and setup the object"""
1144 1148 typesize = self._unpackheader(_fparttypesize)[0]
1145 1149 self.type = self._fromheader(typesize)
1146 1150 indebug(self.ui, 'part type: "%s"' % self.type)
1147 1151 self.id = self._unpackheader(_fpartid)[0]
1148 1152 indebug(self.ui, 'part id: "%s"' % self.id)
1149 1153 # extract mandatory bit from type
1150 1154 self.mandatory = (self.type != self.type.lower())
1151 1155 self.type = self.type.lower()
1152 1156 ## reading parameters
1153 1157 # param count
1154 1158 mancount, advcount = self._unpackheader(_fpartparamcount)
1155 1159 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1156 1160 # param size
1157 1161 fparamsizes = _makefpartparamsizes(mancount + advcount)
1158 1162 paramsizes = self._unpackheader(fparamsizes)
1159 1163 # make it a list of couple again
1160 1164 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1161 1165 # split mandatory from advisory
1162 1166 mansizes = paramsizes[:mancount]
1163 1167 advsizes = paramsizes[mancount:]
1164 1168 # retrieve param value
1165 1169 manparams = []
1166 1170 for key, value in mansizes:
1167 1171 manparams.append((self._fromheader(key), self._fromheader(value)))
1168 1172 advparams = []
1169 1173 for key, value in advsizes:
1170 1174 advparams.append((self._fromheader(key), self._fromheader(value)))
1171 1175 self._initparams(manparams, advparams)
1172 1176 ## part payload
1173 1177 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1174 1178 # we read the data, tell it
1175 1179 self._initialized = True
1176 1180
1177 1181 def read(self, size=None):
1178 1182 """read payload data"""
1179 1183 if not self._initialized:
1180 1184 self._readheader()
1181 1185 if size is None:
1182 1186 data = self._payloadstream.read()
1183 1187 else:
1184 1188 data = self._payloadstream.read(size)
1185 1189 self._pos += len(data)
1186 1190 if size is None or len(data) < size:
1187 1191 if not self.consumed and self._pos:
1188 1192 self.ui.debug('bundle2-input-part: total payload size %i\n'
1189 1193 % self._pos)
1190 1194 self.consumed = True
1191 1195 return data
1192 1196
1193 1197 def tell(self):
1194 1198 return self._pos
1195 1199
1196 1200 def seek(self, offset, whence=0):
1197 1201 if whence == 0:
1198 1202 newpos = offset
1199 1203 elif whence == 1:
1200 1204 newpos = self._pos + offset
1201 1205 elif whence == 2:
1202 1206 if not self.consumed:
1203 1207 self.read()
1204 1208 newpos = self._chunkindex[-1][0] - offset
1205 1209 else:
1206 1210 raise ValueError('Unknown whence value: %r' % (whence,))
1207 1211
1208 1212 if newpos > self._chunkindex[-1][0] and not self.consumed:
1209 1213 self.read()
1210 1214 if not 0 <= newpos <= self._chunkindex[-1][0]:
1211 1215 raise ValueError('Offset out of range')
1212 1216
1213 1217 if self._pos != newpos:
1214 1218 chunk, internaloffset = self._findchunk(newpos)
1215 1219 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1216 1220 adjust = self.read(internaloffset)
1217 1221 if len(adjust) != internaloffset:
1218 1222 raise error.Abort(_('Seek failed\n'))
1219 1223 self._pos = newpos
1220 1224
1221 1225 # These are only the static capabilities.
1222 1226 # Check the 'getrepocaps' function for the rest.
1223 1227 capabilities = {'HG20': (),
1224 1228 'error': ('abort', 'unsupportedcontent', 'pushraced',
1225 1229 'pushkey'),
1226 1230 'listkeys': (),
1227 1231 'pushkey': (),
1228 1232 'digests': tuple(sorted(util.DIGESTS.keys())),
1229 1233 'remote-changegroup': ('http', 'https'),
1230 1234 'hgtagsfnodes': (),
1231 1235 }
1232 1236
1233 1237 def getrepocaps(repo, allowpushback=False):
1234 1238 """return the bundle2 capabilities for a given repo
1235 1239
1236 1240 Exists to allow extensions (like evolution) to mutate the capabilities.
1237 1241 """
1238 1242 caps = capabilities.copy()
1239 1243 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1240 1244 if obsolete.isenabled(repo, obsolete.exchangeopt):
1241 1245 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1242 1246 caps['obsmarkers'] = supportedformat
1243 1247 if allowpushback:
1244 1248 caps['pushback'] = ()
1245 1249 return caps
1246 1250
1247 1251 def bundle2caps(remote):
1248 1252 """return the bundle capabilities of a peer as dict"""
1249 1253 raw = remote.capable('bundle2')
1250 1254 if not raw and raw != '':
1251 1255 return {}
1252 1256 capsblob = urllib.unquote(remote.capable('bundle2'))
1253 1257 return decodecaps(capsblob)
1254 1258
1255 1259 def obsmarkersversion(caps):
1256 1260 """extract the list of supported obsmarkers versions from a bundle2caps dict
1257 1261 """
1258 1262 obscaps = caps.get('obsmarkers', ())
1259 1263 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1260 1264
1261 1265 @parthandler('changegroup', ('version', 'nbchanges'))
1262 1266 def handlechangegroup(op, inpart):
1263 1267 """apply a changegroup part on the repo
1264 1268
1265 1269 This is a very early implementation that will massive rework before being
1266 1270 inflicted to any end-user.
1267 1271 """
1268 1272 # Make sure we trigger a transaction creation
1269 1273 #
1270 1274 # The addchangegroup function will get a transaction object by itself, but
1271 1275 # we need to make sure we trigger the creation of a transaction object used
1272 1276 # for the whole processing scope.
1273 1277 op.gettransaction()
1274 1278 unpackerversion = inpart.params.get('version', '01')
1275 1279 # We should raise an appropriate exception here
1276 1280 unpacker = changegroup.packermap[unpackerversion][1]
1277 1281 cg = unpacker(inpart, None)
1278 1282 # the source and url passed here are overwritten by the one contained in
1279 1283 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1280 1284 nbchangesets = None
1281 1285 if 'nbchanges' in inpart.params:
1282 1286 nbchangesets = int(inpart.params.get('nbchanges'))
1283 1287 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1284 1288 op.records.add('changegroup', {'return': ret})
1285 1289 if op.reply is not None:
1286 1290 # This is definitely not the final form of this
1287 1291 # return. But one need to start somewhere.
1288 1292 part = op.reply.newpart('reply:changegroup', mandatory=False)
1289 1293 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1290 1294 part.addparam('return', '%i' % ret, mandatory=False)
1291 1295 assert not inpart.read()
1292 1296
1293 1297 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1294 1298 ['digest:%s' % k for k in util.DIGESTS.keys()])
1295 1299 @parthandler('remote-changegroup', _remotechangegroupparams)
1296 1300 def handleremotechangegroup(op, inpart):
1297 1301 """apply a bundle10 on the repo, given an url and validation information
1298 1302
1299 1303 All the information about the remote bundle to import are given as
1300 1304 parameters. The parameters include:
1301 1305 - url: the url to the bundle10.
1302 1306 - size: the bundle10 file size. It is used to validate what was
1303 1307 retrieved by the client matches the server knowledge about the bundle.
1304 1308 - digests: a space separated list of the digest types provided as
1305 1309 parameters.
1306 1310 - digest:<digest-type>: the hexadecimal representation of the digest with
1307 1311 that name. Like the size, it is used to validate what was retrieved by
1308 1312 the client matches what the server knows about the bundle.
1309 1313
1310 1314 When multiple digest types are given, all of them are checked.
1311 1315 """
1312 1316 try:
1313 1317 raw_url = inpart.params['url']
1314 1318 except KeyError:
1315 1319 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1316 1320 parsed_url = util.url(raw_url)
1317 1321 if parsed_url.scheme not in capabilities['remote-changegroup']:
1318 1322 raise error.Abort(_('remote-changegroup does not support %s urls') %
1319 1323 parsed_url.scheme)
1320 1324
1321 1325 try:
1322 1326 size = int(inpart.params['size'])
1323 1327 except ValueError:
1324 1328 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1325 1329 % 'size')
1326 1330 except KeyError:
1327 1331 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1328 1332
1329 1333 digests = {}
1330 1334 for typ in inpart.params.get('digests', '').split():
1331 1335 param = 'digest:%s' % typ
1332 1336 try:
1333 1337 value = inpart.params[param]
1334 1338 except KeyError:
1335 1339 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1336 1340 param)
1337 1341 digests[typ] = value
1338 1342
1339 1343 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1340 1344
1341 1345 # Make sure we trigger a transaction creation
1342 1346 #
1343 1347 # The addchangegroup function will get a transaction object by itself, but
1344 1348 # we need to make sure we trigger the creation of a transaction object used
1345 1349 # for the whole processing scope.
1346 1350 op.gettransaction()
1347 1351 from . import exchange
1348 1352 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1349 1353 if not isinstance(cg, changegroup.cg1unpacker):
1350 1354 raise error.Abort(_('%s: not a bundle version 1.0') %
1351 1355 util.hidepassword(raw_url))
1352 1356 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1353 1357 op.records.add('changegroup', {'return': ret})
1354 1358 if op.reply is not None:
1355 1359 # This is definitely not the final form of this
1356 1360 # return. But one need to start somewhere.
1357 1361 part = op.reply.newpart('reply:changegroup')
1358 1362 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1359 1363 part.addparam('return', '%i' % ret, mandatory=False)
1360 1364 try:
1361 1365 real_part.validate()
1362 1366 except error.Abort as e:
1363 1367 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1364 1368 (util.hidepassword(raw_url), str(e)))
1365 1369 assert not inpart.read()
1366 1370
1367 1371 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1368 1372 def handlereplychangegroup(op, inpart):
1369 1373 ret = int(inpart.params['return'])
1370 1374 replyto = int(inpart.params['in-reply-to'])
1371 1375 op.records.add('changegroup', {'return': ret}, replyto)
1372 1376
1373 1377 @parthandler('check:heads')
1374 1378 def handlecheckheads(op, inpart):
1375 1379 """check that head of the repo did not change
1376 1380
1377 1381 This is used to detect a push race when using unbundle.
1378 1382 This replaces the "heads" argument of unbundle."""
1379 1383 h = inpart.read(20)
1380 1384 heads = []
1381 1385 while len(h) == 20:
1382 1386 heads.append(h)
1383 1387 h = inpart.read(20)
1384 1388 assert not h
1385 1389 # Trigger a transaction so that we are guaranteed to have the lock now.
1386 1390 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1387 1391 op.gettransaction()
1388 1392 if heads != op.repo.heads():
1389 1393 raise error.PushRaced('repository changed while pushing - '
1390 1394 'please try again')
1391 1395
1392 1396 @parthandler('output')
1393 1397 def handleoutput(op, inpart):
1394 1398 """forward output captured on the server to the client"""
1395 1399 for line in inpart.read().splitlines():
1396 1400 op.ui.status(('remote: %s\n' % line))
1397 1401
1398 1402 @parthandler('replycaps')
1399 1403 def handlereplycaps(op, inpart):
1400 1404 """Notify that a reply bundle should be created
1401 1405
1402 1406 The payload contains the capabilities information for the reply"""
1403 1407 caps = decodecaps(inpart.read())
1404 1408 if op.reply is None:
1405 1409 op.reply = bundle20(op.ui, caps)
1406 1410
1407 1411 @parthandler('error:abort', ('message', 'hint'))
1408 1412 def handleerrorabort(op, inpart):
1409 1413 """Used to transmit abort error over the wire"""
1410 1414 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1411 1415
1412 1416 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1413 1417 'in-reply-to'))
1414 1418 def handleerrorpushkey(op, inpart):
1415 1419 """Used to transmit failure of a mandatory pushkey over the wire"""
1416 1420 kwargs = {}
1417 1421 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1418 1422 value = inpart.params.get(name)
1419 1423 if value is not None:
1420 1424 kwargs[name] = value
1421 1425 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1422 1426
1423 1427 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1424 1428 def handleerrorunsupportedcontent(op, inpart):
1425 1429 """Used to transmit unknown content error over the wire"""
1426 1430 kwargs = {}
1427 1431 parttype = inpart.params.get('parttype')
1428 1432 if parttype is not None:
1429 1433 kwargs['parttype'] = parttype
1430 1434 params = inpart.params.get('params')
1431 1435 if params is not None:
1432 1436 kwargs['params'] = params.split('\0')
1433 1437
1434 1438 raise error.BundleUnknownFeatureError(**kwargs)
1435 1439
1436 1440 @parthandler('error:pushraced', ('message',))
1437 1441 def handleerrorpushraced(op, inpart):
1438 1442 """Used to transmit push race error over the wire"""
1439 1443 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1440 1444
1441 1445 @parthandler('listkeys', ('namespace',))
1442 1446 def handlelistkeys(op, inpart):
1443 1447 """retrieve pushkey namespace content stored in a bundle2"""
1444 1448 namespace = inpart.params['namespace']
1445 1449 r = pushkey.decodekeys(inpart.read())
1446 1450 op.records.add('listkeys', (namespace, r))
1447 1451
1448 1452 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1449 1453 def handlepushkey(op, inpart):
1450 1454 """process a pushkey request"""
1451 1455 dec = pushkey.decode
1452 1456 namespace = dec(inpart.params['namespace'])
1453 1457 key = dec(inpart.params['key'])
1454 1458 old = dec(inpart.params['old'])
1455 1459 new = dec(inpart.params['new'])
1456 1460 # Grab the transaction to ensure that we have the lock before performing the
1457 1461 # pushkey.
1458 1462 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1459 1463 op.gettransaction()
1460 1464 ret = op.repo.pushkey(namespace, key, old, new)
1461 1465 record = {'namespace': namespace,
1462 1466 'key': key,
1463 1467 'old': old,
1464 1468 'new': new}
1465 1469 op.records.add('pushkey', record)
1466 1470 if op.reply is not None:
1467 1471 rpart = op.reply.newpart('reply:pushkey')
1468 1472 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1469 1473 rpart.addparam('return', '%i' % ret, mandatory=False)
1470 1474 if inpart.mandatory and not ret:
1471 1475 kwargs = {}
1472 1476 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1473 1477 if key in inpart.params:
1474 1478 kwargs[key] = inpart.params[key]
1475 1479 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1476 1480
1477 1481 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1478 1482 def handlepushkeyreply(op, inpart):
1479 1483 """retrieve the result of a pushkey request"""
1480 1484 ret = int(inpart.params['return'])
1481 1485 partid = int(inpart.params['in-reply-to'])
1482 1486 op.records.add('pushkey', {'return': ret}, partid)
1483 1487
1484 1488 @parthandler('obsmarkers')
1485 1489 def handleobsmarker(op, inpart):
1486 1490 """add a stream of obsmarkers to the repo"""
1487 1491 tr = op.gettransaction()
1488 1492 markerdata = inpart.read()
1489 1493 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1490 1494 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1491 1495 % len(markerdata))
1492 1496 # The mergemarkers call will crash if marker creation is not enabled.
1493 1497 # we want to avoid this if the part is advisory.
1494 1498 if not inpart.mandatory and op.repo.obsstore.readonly:
1495 1499 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1496 1500 return
1497 1501 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1498 1502 if new:
1499 1503 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1500 1504 op.records.add('obsmarkers', {'new': new})
1501 1505 if op.reply is not None:
1502 1506 rpart = op.reply.newpart('reply:obsmarkers')
1503 1507 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1504 1508 rpart.addparam('new', '%i' % new, mandatory=False)
1505 1509
1506 1510
1507 1511 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1508 1512 def handleobsmarkerreply(op, inpart):
1509 1513 """retrieve the result of a pushkey request"""
1510 1514 ret = int(inpart.params['new'])
1511 1515 partid = int(inpart.params['in-reply-to'])
1512 1516 op.records.add('obsmarkers', {'new': ret}, partid)
1513 1517
1514 1518 @parthandler('hgtagsfnodes')
1515 1519 def handlehgtagsfnodes(op, inpart):
1516 1520 """Applies .hgtags fnodes cache entries to the local repo.
1517 1521
1518 1522 Payload is pairs of 20 byte changeset nodes and filenodes.
1519 1523 """
1520 1524 # Grab the transaction so we ensure that we have the lock at this point.
1521 1525 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1522 1526 op.gettransaction()
1523 1527 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1524 1528
1525 1529 count = 0
1526 1530 while True:
1527 1531 node = inpart.read(20)
1528 1532 fnode = inpart.read(20)
1529 1533 if len(node) < 20 or len(fnode) < 20:
1530 1534 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1531 1535 break
1532 1536 cache.setfnode(node, fnode)
1533 1537 count += 1
1534 1538
1535 1539 cache.write()
1536 1540 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now