##// END OF EJS Templates
applybundle: set 'bundle2=1' env for all transaction...
Pierre-Yves David -
r26792:a84e0cac default
parent child Browse files
Show More
@@ -1,1531 +1,1532 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155 import urllib
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 changegroup,
160 160 error,
161 161 obsolete,
162 162 pushkey,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 _pack = struct.pack
169 169 _unpack = struct.unpack
170 170
171 171 _fstreamparamsize = '>i'
172 172 _fpartheadersize = '>i'
173 173 _fparttypesize = '>B'
174 174 _fpartid = '>I'
175 175 _fpayloadsize = '>i'
176 176 _fpartparamcount = '>BB'
177 177
178 178 preferedchunksize = 4096
179 179
180 180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181 181
182 182 def outdebug(ui, message):
183 183 """debug regarding output stream (bundling)"""
184 184 if ui.configbool('devel', 'bundle2.debug', False):
185 185 ui.debug('bundle2-output: %s\n' % message)
186 186
187 187 def indebug(ui, message):
188 188 """debug on input stream (unbundling)"""
189 189 if ui.configbool('devel', 'bundle2.debug', False):
190 190 ui.debug('bundle2-input: %s\n' % message)
191 191
192 192 def validateparttype(parttype):
193 193 """raise ValueError if a parttype contains invalid character"""
194 194 if _parttypeforbidden.search(parttype):
195 195 raise ValueError(parttype)
196 196
197 197 def _makefpartparamsizes(nbparams):
198 198 """return a struct format to read part parameter sizes
199 199
200 200 The number parameters is variable so we need to build that format
201 201 dynamically.
202 202 """
203 203 return '>'+('BB'*nbparams)
204 204
205 205 parthandlermapping = {}
206 206
207 207 def parthandler(parttype, params=()):
208 208 """decorator that register a function as a bundle2 part handler
209 209
210 210 eg::
211 211
212 212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 213 def myparttypehandler(...):
214 214 '''process a part of type "my part".'''
215 215 ...
216 216 """
217 217 validateparttype(parttype)
218 218 def _decorator(func):
219 219 lparttype = parttype.lower() # enforce lower case matching.
220 220 assert lparttype not in parthandlermapping
221 221 parthandlermapping[lparttype] = func
222 222 func.params = frozenset(params)
223 223 return func
224 224 return _decorator
225 225
226 226 class unbundlerecords(object):
227 227 """keep record of what happens during and unbundle
228 228
229 229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 230 category of record and obj is an arbitrary object.
231 231
232 232 `records['cat']` will return all entries of this category 'cat'.
233 233
234 234 Iterating on the object itself will yield `('category', obj)` tuples
235 235 for all entries.
236 236
237 237 All iterations happens in chronological order.
238 238 """
239 239
240 240 def __init__(self):
241 241 self._categories = {}
242 242 self._sequences = []
243 243 self._replies = {}
244 244
245 245 def add(self, category, entry, inreplyto=None):
246 246 """add a new record of a given category.
247 247
248 248 The entry can then be retrieved in the list returned by
249 249 self['category']."""
250 250 self._categories.setdefault(category, []).append(entry)
251 251 self._sequences.append((category, entry))
252 252 if inreplyto is not None:
253 253 self.getreplies(inreplyto).add(category, entry)
254 254
255 255 def getreplies(self, partid):
256 256 """get the records that are replies to a specific part"""
257 257 return self._replies.setdefault(partid, unbundlerecords())
258 258
259 259 def __getitem__(self, cat):
260 260 return tuple(self._categories.get(cat, ()))
261 261
262 262 def __iter__(self):
263 263 return iter(self._sequences)
264 264
265 265 def __len__(self):
266 266 return len(self._sequences)
267 267
268 268 def __nonzero__(self):
269 269 return bool(self._sequences)
270 270
271 271 class bundleoperation(object):
272 272 """an object that represents a single bundling process
273 273
274 274 Its purpose is to carry unbundle-related objects and states.
275 275
276 276 A new object should be created at the beginning of each bundle processing.
277 277 The object is to be returned by the processing function.
278 278
279 279 The object has very little content now it will ultimately contain:
280 280 * an access to the repo the bundle is applied to,
281 281 * a ui object,
282 282 * a way to retrieve a transaction to add changes to the repo,
283 283 * a way to record the result of processing each part,
284 284 * a way to construct a bundle response when applicable.
285 285 """
286 286
287 287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 288 self.repo = repo
289 289 self.ui = repo.ui
290 290 self.records = unbundlerecords()
291 291 self.gettransaction = transactiongetter
292 292 self.reply = None
293 293 self.captureoutput = captureoutput
294 294
295 295 class TransactionUnavailable(RuntimeError):
296 296 pass
297 297
298 298 def _notransaction():
299 299 """default method to get a transaction while processing a bundle
300 300
301 301 Raise an exception to highlight the fact that no transaction was expected
302 302 to be created"""
303 303 raise TransactionUnavailable()
304 304
305 305 def applybundle(repo, unbundler, tr, op=None):
306 306 # transform me into unbundler.apply() as soon as the freeze is lifted
307 tr.hookargs['bundle2'] = '1'
307 308 return processbundle(repo, unbundler, lambda: tr, op=op)
308 309
309 310 def processbundle(repo, unbundler, transactiongetter=None, op=None):
310 311 """This function process a bundle, apply effect to/from a repo
311 312
312 313 It iterates over each part then searches for and uses the proper handling
313 314 code to process the part. Parts are processed in order.
314 315
315 316 This is very early version of this function that will be strongly reworked
316 317 before final usage.
317 318
318 319 Unknown Mandatory part will abort the process.
319 320
320 321 It is temporarily possible to provide a prebuilt bundleoperation to the
321 322 function. This is used to ensure output is properly propagated in case of
322 323 an error during the unbundling. This output capturing part will likely be
323 324 reworked and this ability will probably go away in the process.
324 325 """
325 326 if op is None:
326 327 if transactiongetter is None:
327 328 transactiongetter = _notransaction
328 329 op = bundleoperation(repo, transactiongetter)
329 330 # todo:
330 331 # - replace this is a init function soon.
331 332 # - exception catching
332 333 unbundler.params
333 334 if repo.ui.debugflag:
334 335 msg = ['bundle2-input-bundle:']
335 336 if unbundler.params:
336 337 msg.append(' %i params')
337 338 if op.gettransaction is None:
338 339 msg.append(' no-transaction')
339 340 else:
340 341 msg.append(' with-transaction')
341 342 msg.append('\n')
342 343 repo.ui.debug(''.join(msg))
343 344 iterparts = enumerate(unbundler.iterparts())
344 345 part = None
345 346 nbpart = 0
346 347 try:
347 348 for nbpart, part in iterparts:
348 349 _processpart(op, part)
349 350 except BaseException as exc:
350 351 for nbpart, part in iterparts:
351 352 # consume the bundle content
352 353 part.seek(0, 2)
353 354 # Small hack to let caller code distinguish exceptions from bundle2
354 355 # processing from processing the old format. This is mostly
355 356 # needed to handle different return codes to unbundle according to the
356 357 # type of bundle. We should probably clean up or drop this return code
357 358 # craziness in a future version.
358 359 exc.duringunbundle2 = True
359 360 salvaged = []
360 361 replycaps = None
361 362 if op.reply is not None:
362 363 salvaged = op.reply.salvageoutput()
363 364 replycaps = op.reply.capabilities
364 365 exc._replycaps = replycaps
365 366 exc._bundle2salvagedoutput = salvaged
366 367 raise
367 368 finally:
368 369 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
369 370
370 371 return op
371 372
372 373 def _processpart(op, part):
373 374 """process a single part from a bundle
374 375
375 376 The part is guaranteed to have been fully consumed when the function exits
376 377 (even if an exception is raised)."""
377 378 status = 'unknown' # used by debug output
378 379 try:
379 380 try:
380 381 handler = parthandlermapping.get(part.type)
381 382 if handler is None:
382 383 status = 'unsupported-type'
383 384 raise error.BundleUnknownFeatureError(parttype=part.type)
384 385 indebug(op.ui, 'found a handler for part %r' % part.type)
385 386 unknownparams = part.mandatorykeys - handler.params
386 387 if unknownparams:
387 388 unknownparams = list(unknownparams)
388 389 unknownparams.sort()
389 390 status = 'unsupported-params (%s)' % unknownparams
390 391 raise error.BundleUnknownFeatureError(parttype=part.type,
391 392 params=unknownparams)
392 393 status = 'supported'
393 394 except error.BundleUnknownFeatureError as exc:
394 395 if part.mandatory: # mandatory parts
395 396 raise
396 397 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
397 398 return # skip to part processing
398 399 finally:
399 400 if op.ui.debugflag:
400 401 msg = ['bundle2-input-part: "%s"' % part.type]
401 402 if not part.mandatory:
402 403 msg.append(' (advisory)')
403 404 nbmp = len(part.mandatorykeys)
404 405 nbap = len(part.params) - nbmp
405 406 if nbmp or nbap:
406 407 msg.append(' (params:')
407 408 if nbmp:
408 409 msg.append(' %i mandatory' % nbmp)
409 410 if nbap:
410 411 msg.append(' %i advisory' % nbmp)
411 412 msg.append(')')
412 413 msg.append(' %s\n' % status)
413 414 op.ui.debug(''.join(msg))
414 415
415 416 # handler is called outside the above try block so that we don't
416 417 # risk catching KeyErrors from anything other than the
417 418 # parthandlermapping lookup (any KeyError raised by handler()
418 419 # itself represents a defect of a different variety).
419 420 output = None
420 421 if op.captureoutput and op.reply is not None:
421 422 op.ui.pushbuffer(error=True, subproc=True)
422 423 output = ''
423 424 try:
424 425 handler(op, part)
425 426 finally:
426 427 if output is not None:
427 428 output = op.ui.popbuffer()
428 429 if output:
429 430 outpart = op.reply.newpart('output', data=output,
430 431 mandatory=False)
431 432 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
432 433 finally:
433 434 # consume the part content to not corrupt the stream.
434 435 part.seek(0, 2)
435 436
436 437
437 438 def decodecaps(blob):
438 439 """decode a bundle2 caps bytes blob into a dictionary
439 440
440 441 The blob is a list of capabilities (one per line)
441 442 Capabilities may have values using a line of the form::
442 443
443 444 capability=value1,value2,value3
444 445
445 446 The values are always a list."""
446 447 caps = {}
447 448 for line in blob.splitlines():
448 449 if not line:
449 450 continue
450 451 if '=' not in line:
451 452 key, vals = line, ()
452 453 else:
453 454 key, vals = line.split('=', 1)
454 455 vals = vals.split(',')
455 456 key = urllib.unquote(key)
456 457 vals = [urllib.unquote(v) for v in vals]
457 458 caps[key] = vals
458 459 return caps
459 460
460 461 def encodecaps(caps):
461 462 """encode a bundle2 caps dictionary into a bytes blob"""
462 463 chunks = []
463 464 for ca in sorted(caps):
464 465 vals = caps[ca]
465 466 ca = urllib.quote(ca)
466 467 vals = [urllib.quote(v) for v in vals]
467 468 if vals:
468 469 ca = "%s=%s" % (ca, ','.join(vals))
469 470 chunks.append(ca)
470 471 return '\n'.join(chunks)
471 472
472 473 class bundle20(object):
473 474 """represent an outgoing bundle2 container
474 475
475 476 Use the `addparam` method to add stream level parameter. and `newpart` to
476 477 populate it. Then call `getchunks` to retrieve all the binary chunks of
477 478 data that compose the bundle2 container."""
478 479
479 480 _magicstring = 'HG20'
480 481
481 482 def __init__(self, ui, capabilities=()):
482 483 self.ui = ui
483 484 self._params = []
484 485 self._parts = []
485 486 self.capabilities = dict(capabilities)
486 487 self._compressor = util.compressors[None]()
487 488
488 489 def setcompression(self, alg):
489 490 """setup core part compression to <alg>"""
490 491 if alg is None:
491 492 return
492 493 assert not any(n.lower() == 'Compression' for n, v in self._params)
493 494 self.addparam('Compression', alg)
494 495 self._compressor = util.compressors[alg]()
495 496
496 497 @property
497 498 def nbparts(self):
498 499 """total number of parts added to the bundler"""
499 500 return len(self._parts)
500 501
501 502 # methods used to defines the bundle2 content
502 503 def addparam(self, name, value=None):
503 504 """add a stream level parameter"""
504 505 if not name:
505 506 raise ValueError('empty parameter name')
506 507 if name[0] not in string.letters:
507 508 raise ValueError('non letter first character: %r' % name)
508 509 self._params.append((name, value))
509 510
510 511 def addpart(self, part):
511 512 """add a new part to the bundle2 container
512 513
513 514 Parts contains the actual applicative payload."""
514 515 assert part.id is None
515 516 part.id = len(self._parts) # very cheap counter
516 517 self._parts.append(part)
517 518
518 519 def newpart(self, typeid, *args, **kwargs):
519 520 """create a new part and add it to the containers
520 521
521 522 As the part is directly added to the containers. For now, this means
522 523 that any failure to properly initialize the part after calling
523 524 ``newpart`` should result in a failure of the whole bundling process.
524 525
525 526 You can still fall back to manually create and add if you need better
526 527 control."""
527 528 part = bundlepart(typeid, *args, **kwargs)
528 529 self.addpart(part)
529 530 return part
530 531
531 532 # methods used to generate the bundle2 stream
532 533 def getchunks(self):
533 534 if self.ui.debugflag:
534 535 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
535 536 if self._params:
536 537 msg.append(' (%i params)' % len(self._params))
537 538 msg.append(' %i parts total\n' % len(self._parts))
538 539 self.ui.debug(''.join(msg))
539 540 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
540 541 yield self._magicstring
541 542 param = self._paramchunk()
542 543 outdebug(self.ui, 'bundle parameter: %s' % param)
543 544 yield _pack(_fstreamparamsize, len(param))
544 545 if param:
545 546 yield param
546 547 # starting compression
547 548 for chunk in self._getcorechunk():
548 549 yield self._compressor.compress(chunk)
549 550 yield self._compressor.flush()
550 551
551 552 def _paramchunk(self):
552 553 """return a encoded version of all stream parameters"""
553 554 blocks = []
554 555 for par, value in self._params:
555 556 par = urllib.quote(par)
556 557 if value is not None:
557 558 value = urllib.quote(value)
558 559 par = '%s=%s' % (par, value)
559 560 blocks.append(par)
560 561 return ' '.join(blocks)
561 562
562 563 def _getcorechunk(self):
563 564 """yield chunk for the core part of the bundle
564 565
565 566 (all but headers and parameters)"""
566 567 outdebug(self.ui, 'start of parts')
567 568 for part in self._parts:
568 569 outdebug(self.ui, 'bundle part: "%s"' % part.type)
569 570 for chunk in part.getchunks(ui=self.ui):
570 571 yield chunk
571 572 outdebug(self.ui, 'end of bundle')
572 573 yield _pack(_fpartheadersize, 0)
573 574
574 575
575 576 def salvageoutput(self):
576 577 """return a list with a copy of all output parts in the bundle
577 578
578 579 This is meant to be used during error handling to make sure we preserve
579 580 server output"""
580 581 salvaged = []
581 582 for part in self._parts:
582 583 if part.type.startswith('output'):
583 584 salvaged.append(part.copy())
584 585 return salvaged
585 586
586 587
587 588 class unpackermixin(object):
588 589 """A mixin to extract bytes and struct data from a stream"""
589 590
590 591 def __init__(self, fp):
591 592 self._fp = fp
592 593 self._seekable = (util.safehasattr(fp, 'seek') and
593 594 util.safehasattr(fp, 'tell'))
594 595
595 596 def _unpack(self, format):
596 597 """unpack this struct format from the stream"""
597 598 data = self._readexact(struct.calcsize(format))
598 599 return _unpack(format, data)
599 600
600 601 def _readexact(self, size):
601 602 """read exactly <size> bytes from the stream"""
602 603 return changegroup.readexactly(self._fp, size)
603 604
604 605 def seek(self, offset, whence=0):
605 606 """move the underlying file pointer"""
606 607 if self._seekable:
607 608 return self._fp.seek(offset, whence)
608 609 else:
609 610 raise NotImplementedError(_('File pointer is not seekable'))
610 611
611 612 def tell(self):
612 613 """return the file offset, or None if file is not seekable"""
613 614 if self._seekable:
614 615 try:
615 616 return self._fp.tell()
616 617 except IOError as e:
617 618 if e.errno == errno.ESPIPE:
618 619 self._seekable = False
619 620 else:
620 621 raise
621 622 return None
622 623
623 624 def close(self):
624 625 """close underlying file"""
625 626 if util.safehasattr(self._fp, 'close'):
626 627 return self._fp.close()
627 628
628 629 def getunbundler(ui, fp, magicstring=None):
629 630 """return a valid unbundler object for a given magicstring"""
630 631 if magicstring is None:
631 632 magicstring = changegroup.readexactly(fp, 4)
632 633 magic, version = magicstring[0:2], magicstring[2:4]
633 634 if magic != 'HG':
634 635 raise error.Abort(_('not a Mercurial bundle'))
635 636 unbundlerclass = formatmap.get(version)
636 637 if unbundlerclass is None:
637 638 raise error.Abort(_('unknown bundle version %s') % version)
638 639 unbundler = unbundlerclass(ui, fp)
639 640 indebug(ui, 'start processing of %s stream' % magicstring)
640 641 return unbundler
641 642
642 643 class unbundle20(unpackermixin):
643 644 """interpret a bundle2 stream
644 645
645 646 This class is fed with a binary stream and yields parts through its
646 647 `iterparts` methods."""
647 648
648 649 _magicstring = 'HG20'
649 650
650 651 def __init__(self, ui, fp):
651 652 """If header is specified, we do not read it out of the stream."""
652 653 self.ui = ui
653 654 self._decompressor = util.decompressors[None]
654 655 super(unbundle20, self).__init__(fp)
655 656
656 657 @util.propertycache
657 658 def params(self):
658 659 """dictionary of stream level parameters"""
659 660 indebug(self.ui, 'reading bundle2 stream parameters')
660 661 params = {}
661 662 paramssize = self._unpack(_fstreamparamsize)[0]
662 663 if paramssize < 0:
663 664 raise error.BundleValueError('negative bundle param size: %i'
664 665 % paramssize)
665 666 if paramssize:
666 667 params = self._readexact(paramssize)
667 668 params = self._processallparams(params)
668 669 return params
669 670
670 671 def _processallparams(self, paramsblock):
671 672 """"""
672 673 params = {}
673 674 for p in paramsblock.split(' '):
674 675 p = p.split('=', 1)
675 676 p = [urllib.unquote(i) for i in p]
676 677 if len(p) < 2:
677 678 p.append(None)
678 679 self._processparam(*p)
679 680 params[p[0]] = p[1]
680 681 return params
681 682
682 683
683 684 def _processparam(self, name, value):
684 685 """process a parameter, applying its effect if needed
685 686
686 687 Parameter starting with a lower case letter are advisory and will be
687 688 ignored when unknown. Those starting with an upper case letter are
688 689 mandatory and will this function will raise a KeyError when unknown.
689 690
690 691 Note: no option are currently supported. Any input will be either
691 692 ignored or failing.
692 693 """
693 694 if not name:
694 695 raise ValueError('empty parameter name')
695 696 if name[0] not in string.letters:
696 697 raise ValueError('non letter first character: %r' % name)
697 698 try:
698 699 handler = b2streamparamsmap[name.lower()]
699 700 except KeyError:
700 701 if name[0].islower():
701 702 indebug(self.ui, "ignoring unknown parameter %r" % name)
702 703 else:
703 704 raise error.BundleUnknownFeatureError(params=(name,))
704 705 else:
705 706 handler(self, name, value)
706 707
707 708 def _forwardchunks(self):
708 709 """utility to transfer a bundle2 as binary
709 710
710 711 This is made necessary by the fact the 'getbundle' command over 'ssh'
711 712 have no way to know then the reply end, relying on the bundle to be
712 713 interpreted to know its end. This is terrible and we are sorry, but we
713 714 needed to move forward to get general delta enabled.
714 715 """
715 716 yield self._magicstring
716 717 assert 'params' not in vars(self)
717 718 paramssize = self._unpack(_fstreamparamsize)[0]
718 719 if paramssize < 0:
719 720 raise error.BundleValueError('negative bundle param size: %i'
720 721 % paramssize)
721 722 yield _pack(_fstreamparamsize, paramssize)
722 723 if paramssize:
723 724 params = self._readexact(paramssize)
724 725 self._processallparams(params)
725 726 yield params
726 727 assert self._decompressor is util.decompressors[None]
727 728 # From there, payload might need to be decompressed
728 729 self._fp = self._decompressor(self._fp)
729 730 emptycount = 0
730 731 while emptycount < 2:
731 732 # so we can brainlessly loop
732 733 assert _fpartheadersize == _fpayloadsize
733 734 size = self._unpack(_fpartheadersize)[0]
734 735 yield _pack(_fpartheadersize, size)
735 736 if size:
736 737 emptycount = 0
737 738 else:
738 739 emptycount += 1
739 740 continue
740 741 if size == flaginterrupt:
741 742 continue
742 743 elif size < 0:
743 744 raise error.BundleValueError('negative chunk size: %i')
744 745 yield self._readexact(size)
745 746
746 747
747 748 def iterparts(self):
748 749 """yield all parts contained in the stream"""
749 750 # make sure param have been loaded
750 751 self.params
751 752 # From there, payload need to be decompressed
752 753 self._fp = self._decompressor(self._fp)
753 754 indebug(self.ui, 'start extraction of bundle2 parts')
754 755 headerblock = self._readpartheader()
755 756 while headerblock is not None:
756 757 part = unbundlepart(self.ui, headerblock, self._fp)
757 758 yield part
758 759 part.seek(0, 2)
759 760 headerblock = self._readpartheader()
760 761 indebug(self.ui, 'end of bundle2 stream')
761 762
762 763 def _readpartheader(self):
763 764 """reads a part header size and return the bytes blob
764 765
765 766 returns None if empty"""
766 767 headersize = self._unpack(_fpartheadersize)[0]
767 768 if headersize < 0:
768 769 raise error.BundleValueError('negative part header size: %i'
769 770 % headersize)
770 771 indebug(self.ui, 'part header size: %i' % headersize)
771 772 if headersize:
772 773 return self._readexact(headersize)
773 774 return None
774 775
775 776 def compressed(self):
776 777 return False
777 778
778 779 formatmap = {'20': unbundle20}
779 780
780 781 b2streamparamsmap = {}
781 782
782 783 def b2streamparamhandler(name):
783 784 """register a handler for a stream level parameter"""
784 785 def decorator(func):
785 786 assert name not in formatmap
786 787 b2streamparamsmap[name] = func
787 788 return func
788 789 return decorator
789 790
790 791 @b2streamparamhandler('compression')
791 792 def processcompression(unbundler, param, value):
792 793 """read compression parameter and install payload decompression"""
793 794 if value not in util.decompressors:
794 795 raise error.BundleUnknownFeatureError(params=(param,),
795 796 values=(value,))
796 797 unbundler._decompressor = util.decompressors[value]
797 798
798 799 class bundlepart(object):
799 800 """A bundle2 part contains application level payload
800 801
801 802 The part `type` is used to route the part to the application level
802 803 handler.
803 804
804 805 The part payload is contained in ``part.data``. It could be raw bytes or a
805 806 generator of byte chunks.
806 807
807 808 You can add parameters to the part using the ``addparam`` method.
808 809 Parameters can be either mandatory (default) or advisory. Remote side
809 810 should be able to safely ignore the advisory ones.
810 811
811 812 Both data and parameters cannot be modified after the generation has begun.
812 813 """
813 814
814 815 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
815 816 data='', mandatory=True):
816 817 validateparttype(parttype)
817 818 self.id = None
818 819 self.type = parttype
819 820 self._data = data
820 821 self._mandatoryparams = list(mandatoryparams)
821 822 self._advisoryparams = list(advisoryparams)
822 823 # checking for duplicated entries
823 824 self._seenparams = set()
824 825 for pname, __ in self._mandatoryparams + self._advisoryparams:
825 826 if pname in self._seenparams:
826 827 raise RuntimeError('duplicated params: %s' % pname)
827 828 self._seenparams.add(pname)
828 829 # status of the part's generation:
829 830 # - None: not started,
830 831 # - False: currently generated,
831 832 # - True: generation done.
832 833 self._generated = None
833 834 self.mandatory = mandatory
834 835
835 836 def copy(self):
836 837 """return a copy of the part
837 838
838 839 The new part have the very same content but no partid assigned yet.
839 840 Parts with generated data cannot be copied."""
840 841 assert not util.safehasattr(self.data, 'next')
841 842 return self.__class__(self.type, self._mandatoryparams,
842 843 self._advisoryparams, self._data, self.mandatory)
843 844
844 845 # methods used to defines the part content
845 846 def __setdata(self, data):
846 847 if self._generated is not None:
847 848 raise error.ReadOnlyPartError('part is being generated')
848 849 self._data = data
849 850 def __getdata(self):
850 851 return self._data
851 852 data = property(__getdata, __setdata)
852 853
853 854 @property
854 855 def mandatoryparams(self):
855 856 # make it an immutable tuple to force people through ``addparam``
856 857 return tuple(self._mandatoryparams)
857 858
858 859 @property
859 860 def advisoryparams(self):
860 861 # make it an immutable tuple to force people through ``addparam``
861 862 return tuple(self._advisoryparams)
862 863
863 864 def addparam(self, name, value='', mandatory=True):
864 865 if self._generated is not None:
865 866 raise error.ReadOnlyPartError('part is being generated')
866 867 if name in self._seenparams:
867 868 raise ValueError('duplicated params: %s' % name)
868 869 self._seenparams.add(name)
869 870 params = self._advisoryparams
870 871 if mandatory:
871 872 params = self._mandatoryparams
872 873 params.append((name, value))
873 874
874 875 # methods used to generates the bundle2 stream
875 876 def getchunks(self, ui):
876 877 if self._generated is not None:
877 878 raise RuntimeError('part can only be consumed once')
878 879 self._generated = False
879 880
880 881 if ui.debugflag:
881 882 msg = ['bundle2-output-part: "%s"' % self.type]
882 883 if not self.mandatory:
883 884 msg.append(' (advisory)')
884 885 nbmp = len(self.mandatoryparams)
885 886 nbap = len(self.advisoryparams)
886 887 if nbmp or nbap:
887 888 msg.append(' (params:')
888 889 if nbmp:
889 890 msg.append(' %i mandatory' % nbmp)
890 891 if nbap:
891 892 msg.append(' %i advisory' % nbmp)
892 893 msg.append(')')
893 894 if not self.data:
894 895 msg.append(' empty payload')
895 896 elif util.safehasattr(self.data, 'next'):
896 897 msg.append(' streamed payload')
897 898 else:
898 899 msg.append(' %i bytes payload' % len(self.data))
899 900 msg.append('\n')
900 901 ui.debug(''.join(msg))
901 902
902 903 #### header
903 904 if self.mandatory:
904 905 parttype = self.type.upper()
905 906 else:
906 907 parttype = self.type.lower()
907 908 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
908 909 ## parttype
909 910 header = [_pack(_fparttypesize, len(parttype)),
910 911 parttype, _pack(_fpartid, self.id),
911 912 ]
912 913 ## parameters
913 914 # count
914 915 manpar = self.mandatoryparams
915 916 advpar = self.advisoryparams
916 917 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
917 918 # size
918 919 parsizes = []
919 920 for key, value in manpar:
920 921 parsizes.append(len(key))
921 922 parsizes.append(len(value))
922 923 for key, value in advpar:
923 924 parsizes.append(len(key))
924 925 parsizes.append(len(value))
925 926 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
926 927 header.append(paramsizes)
927 928 # key, value
928 929 for key, value in manpar:
929 930 header.append(key)
930 931 header.append(value)
931 932 for key, value in advpar:
932 933 header.append(key)
933 934 header.append(value)
934 935 ## finalize header
935 936 headerchunk = ''.join(header)
936 937 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
937 938 yield _pack(_fpartheadersize, len(headerchunk))
938 939 yield headerchunk
939 940 ## payload
940 941 try:
941 942 for chunk in self._payloadchunks():
942 943 outdebug(ui, 'payload chunk size: %i' % len(chunk))
943 944 yield _pack(_fpayloadsize, len(chunk))
944 945 yield chunk
945 946 except GeneratorExit:
946 947 # GeneratorExit means that nobody is listening for our
947 948 # results anyway, so just bail quickly rather than trying
948 949 # to produce an error part.
949 950 ui.debug('bundle2-generatorexit\n')
950 951 raise
951 952 except BaseException as exc:
952 953 # backup exception data for later
953 954 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
954 955 % exc)
955 956 exc_info = sys.exc_info()
956 957 msg = 'unexpected error: %s' % exc
957 958 interpart = bundlepart('error:abort', [('message', msg)],
958 959 mandatory=False)
959 960 interpart.id = 0
960 961 yield _pack(_fpayloadsize, -1)
961 962 for chunk in interpart.getchunks(ui=ui):
962 963 yield chunk
963 964 outdebug(ui, 'closing payload chunk')
964 965 # abort current part payload
965 966 yield _pack(_fpayloadsize, 0)
966 967 raise exc_info[0], exc_info[1], exc_info[2]
967 968 # end of payload
968 969 outdebug(ui, 'closing payload chunk')
969 970 yield _pack(_fpayloadsize, 0)
970 971 self._generated = True
971 972
972 973 def _payloadchunks(self):
973 974 """yield chunks of a the part payload
974 975
975 976 Exists to handle the different methods to provide data to a part."""
976 977 # we only support fixed size data now.
977 978 # This will be improved in the future.
978 979 if util.safehasattr(self.data, 'next'):
979 980 buff = util.chunkbuffer(self.data)
980 981 chunk = buff.read(preferedchunksize)
981 982 while chunk:
982 983 yield chunk
983 984 chunk = buff.read(preferedchunksize)
984 985 elif len(self.data):
985 986 yield self.data
986 987
987 988
988 989 flaginterrupt = -1
989 990
990 991 class interrupthandler(unpackermixin):
991 992 """read one part and process it with restricted capability
992 993
993 994 This allows to transmit exception raised on the producer size during part
994 995 iteration while the consumer is reading a part.
995 996
996 997 Part processed in this manner only have access to a ui object,"""
997 998
998 999 def __init__(self, ui, fp):
999 1000 super(interrupthandler, self).__init__(fp)
1000 1001 self.ui = ui
1001 1002
1002 1003 def _readpartheader(self):
1003 1004 """reads a part header size and return the bytes blob
1004 1005
1005 1006 returns None if empty"""
1006 1007 headersize = self._unpack(_fpartheadersize)[0]
1007 1008 if headersize < 0:
1008 1009 raise error.BundleValueError('negative part header size: %i'
1009 1010 % headersize)
1010 1011 indebug(self.ui, 'part header size: %i\n' % headersize)
1011 1012 if headersize:
1012 1013 return self._readexact(headersize)
1013 1014 return None
1014 1015
1015 1016 def __call__(self):
1016 1017
1017 1018 self.ui.debug('bundle2-input-stream-interrupt:'
1018 1019 ' opening out of band context\n')
1019 1020 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1020 1021 headerblock = self._readpartheader()
1021 1022 if headerblock is None:
1022 1023 indebug(self.ui, 'no part found during interruption.')
1023 1024 return
1024 1025 part = unbundlepart(self.ui, headerblock, self._fp)
1025 1026 op = interruptoperation(self.ui)
1026 1027 _processpart(op, part)
1027 1028 self.ui.debug('bundle2-input-stream-interrupt:'
1028 1029 ' closing out of band context\n')
1029 1030
1030 1031 class interruptoperation(object):
1031 1032 """A limited operation to be use by part handler during interruption
1032 1033
1033 1034 It only have access to an ui object.
1034 1035 """
1035 1036
1036 1037 def __init__(self, ui):
1037 1038 self.ui = ui
1038 1039 self.reply = None
1039 1040 self.captureoutput = False
1040 1041
1041 1042 @property
1042 1043 def repo(self):
1043 1044 raise RuntimeError('no repo access from stream interruption')
1044 1045
1045 1046 def gettransaction(self):
1046 1047 raise TransactionUnavailable('no repo access from stream interruption')
1047 1048
1048 1049 class unbundlepart(unpackermixin):
1049 1050 """a bundle part read from a bundle"""
1050 1051
1051 1052 def __init__(self, ui, header, fp):
1052 1053 super(unbundlepart, self).__init__(fp)
1053 1054 self.ui = ui
1054 1055 # unbundle state attr
1055 1056 self._headerdata = header
1056 1057 self._headeroffset = 0
1057 1058 self._initialized = False
1058 1059 self.consumed = False
1059 1060 # part data
1060 1061 self.id = None
1061 1062 self.type = None
1062 1063 self.mandatoryparams = None
1063 1064 self.advisoryparams = None
1064 1065 self.params = None
1065 1066 self.mandatorykeys = ()
1066 1067 self._payloadstream = None
1067 1068 self._readheader()
1068 1069 self._mandatory = None
1069 1070 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1070 1071 self._pos = 0
1071 1072
1072 1073 def _fromheader(self, size):
1073 1074 """return the next <size> byte from the header"""
1074 1075 offset = self._headeroffset
1075 1076 data = self._headerdata[offset:(offset + size)]
1076 1077 self._headeroffset = offset + size
1077 1078 return data
1078 1079
1079 1080 def _unpackheader(self, format):
1080 1081 """read given format from header
1081 1082
1082 1083 This automatically compute the size of the format to read."""
1083 1084 data = self._fromheader(struct.calcsize(format))
1084 1085 return _unpack(format, data)
1085 1086
1086 1087 def _initparams(self, mandatoryparams, advisoryparams):
1087 1088 """internal function to setup all logic related parameters"""
1088 1089 # make it read only to prevent people touching it by mistake.
1089 1090 self.mandatoryparams = tuple(mandatoryparams)
1090 1091 self.advisoryparams = tuple(advisoryparams)
1091 1092 # user friendly UI
1092 1093 self.params = dict(self.mandatoryparams)
1093 1094 self.params.update(dict(self.advisoryparams))
1094 1095 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1095 1096
1096 1097 def _payloadchunks(self, chunknum=0):
1097 1098 '''seek to specified chunk and start yielding data'''
1098 1099 if len(self._chunkindex) == 0:
1099 1100 assert chunknum == 0, 'Must start with chunk 0'
1100 1101 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1101 1102 else:
1102 1103 assert chunknum < len(self._chunkindex), \
1103 1104 'Unknown chunk %d' % chunknum
1104 1105 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1105 1106
1106 1107 pos = self._chunkindex[chunknum][0]
1107 1108 payloadsize = self._unpack(_fpayloadsize)[0]
1108 1109 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1109 1110 while payloadsize:
1110 1111 if payloadsize == flaginterrupt:
1111 1112 # interruption detection, the handler will now read a
1112 1113 # single part and process it.
1113 1114 interrupthandler(self.ui, self._fp)()
1114 1115 elif payloadsize < 0:
1115 1116 msg = 'negative payload chunk size: %i' % payloadsize
1116 1117 raise error.BundleValueError(msg)
1117 1118 else:
1118 1119 result = self._readexact(payloadsize)
1119 1120 chunknum += 1
1120 1121 pos += payloadsize
1121 1122 if chunknum == len(self._chunkindex):
1122 1123 self._chunkindex.append((pos,
1123 1124 super(unbundlepart, self).tell()))
1124 1125 yield result
1125 1126 payloadsize = self._unpack(_fpayloadsize)[0]
1126 1127 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1127 1128
1128 1129 def _findchunk(self, pos):
1129 1130 '''for a given payload position, return a chunk number and offset'''
1130 1131 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1131 1132 if ppos == pos:
1132 1133 return chunk, 0
1133 1134 elif ppos > pos:
1134 1135 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1135 1136 raise ValueError('Unknown chunk')
1136 1137
1137 1138 def _readheader(self):
1138 1139 """read the header and setup the object"""
1139 1140 typesize = self._unpackheader(_fparttypesize)[0]
1140 1141 self.type = self._fromheader(typesize)
1141 1142 indebug(self.ui, 'part type: "%s"' % self.type)
1142 1143 self.id = self._unpackheader(_fpartid)[0]
1143 1144 indebug(self.ui, 'part id: "%s"' % self.id)
1144 1145 # extract mandatory bit from type
1145 1146 self.mandatory = (self.type != self.type.lower())
1146 1147 self.type = self.type.lower()
1147 1148 ## reading parameters
1148 1149 # param count
1149 1150 mancount, advcount = self._unpackheader(_fpartparamcount)
1150 1151 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1151 1152 # param size
1152 1153 fparamsizes = _makefpartparamsizes(mancount + advcount)
1153 1154 paramsizes = self._unpackheader(fparamsizes)
1154 1155 # make it a list of couple again
1155 1156 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1156 1157 # split mandatory from advisory
1157 1158 mansizes = paramsizes[:mancount]
1158 1159 advsizes = paramsizes[mancount:]
1159 1160 # retrieve param value
1160 1161 manparams = []
1161 1162 for key, value in mansizes:
1162 1163 manparams.append((self._fromheader(key), self._fromheader(value)))
1163 1164 advparams = []
1164 1165 for key, value in advsizes:
1165 1166 advparams.append((self._fromheader(key), self._fromheader(value)))
1166 1167 self._initparams(manparams, advparams)
1167 1168 ## part payload
1168 1169 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1169 1170 # we read the data, tell it
1170 1171 self._initialized = True
1171 1172
1172 1173 def read(self, size=None):
1173 1174 """read payload data"""
1174 1175 if not self._initialized:
1175 1176 self._readheader()
1176 1177 if size is None:
1177 1178 data = self._payloadstream.read()
1178 1179 else:
1179 1180 data = self._payloadstream.read(size)
1180 1181 self._pos += len(data)
1181 1182 if size is None or len(data) < size:
1182 1183 if not self.consumed and self._pos:
1183 1184 self.ui.debug('bundle2-input-part: total payload size %i\n'
1184 1185 % self._pos)
1185 1186 self.consumed = True
1186 1187 return data
1187 1188
1188 1189 def tell(self):
1189 1190 return self._pos
1190 1191
1191 1192 def seek(self, offset, whence=0):
1192 1193 if whence == 0:
1193 1194 newpos = offset
1194 1195 elif whence == 1:
1195 1196 newpos = self._pos + offset
1196 1197 elif whence == 2:
1197 1198 if not self.consumed:
1198 1199 self.read()
1199 1200 newpos = self._chunkindex[-1][0] - offset
1200 1201 else:
1201 1202 raise ValueError('Unknown whence value: %r' % (whence,))
1202 1203
1203 1204 if newpos > self._chunkindex[-1][0] and not self.consumed:
1204 1205 self.read()
1205 1206 if not 0 <= newpos <= self._chunkindex[-1][0]:
1206 1207 raise ValueError('Offset out of range')
1207 1208
1208 1209 if self._pos != newpos:
1209 1210 chunk, internaloffset = self._findchunk(newpos)
1210 1211 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1211 1212 adjust = self.read(internaloffset)
1212 1213 if len(adjust) != internaloffset:
1213 1214 raise error.Abort(_('Seek failed\n'))
1214 1215 self._pos = newpos
1215 1216
1216 1217 # These are only the static capabilities.
1217 1218 # Check the 'getrepocaps' function for the rest.
1218 1219 capabilities = {'HG20': (),
1219 1220 'error': ('abort', 'unsupportedcontent', 'pushraced',
1220 1221 'pushkey'),
1221 1222 'listkeys': (),
1222 1223 'pushkey': (),
1223 1224 'digests': tuple(sorted(util.DIGESTS.keys())),
1224 1225 'remote-changegroup': ('http', 'https'),
1225 1226 'hgtagsfnodes': (),
1226 1227 }
1227 1228
1228 1229 def getrepocaps(repo, allowpushback=False):
1229 1230 """return the bundle2 capabilities for a given repo
1230 1231
1231 1232 Exists to allow extensions (like evolution) to mutate the capabilities.
1232 1233 """
1233 1234 caps = capabilities.copy()
1234 1235 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1235 1236 if obsolete.isenabled(repo, obsolete.exchangeopt):
1236 1237 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1237 1238 caps['obsmarkers'] = supportedformat
1238 1239 if allowpushback:
1239 1240 caps['pushback'] = ()
1240 1241 return caps
1241 1242
1242 1243 def bundle2caps(remote):
1243 1244 """return the bundle capabilities of a peer as dict"""
1244 1245 raw = remote.capable('bundle2')
1245 1246 if not raw and raw != '':
1246 1247 return {}
1247 1248 capsblob = urllib.unquote(remote.capable('bundle2'))
1248 1249 return decodecaps(capsblob)
1249 1250
1250 1251 def obsmarkersversion(caps):
1251 1252 """extract the list of supported obsmarkers versions from a bundle2caps dict
1252 1253 """
1253 1254 obscaps = caps.get('obsmarkers', ())
1254 1255 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1255 1256
1256 1257 @parthandler('changegroup', ('version', 'nbchanges'))
1257 1258 def handlechangegroup(op, inpart):
1258 1259 """apply a changegroup part on the repo
1259 1260
1260 1261 This is a very early implementation that will massive rework before being
1261 1262 inflicted to any end-user.
1262 1263 """
1263 1264 # Make sure we trigger a transaction creation
1264 1265 #
1265 1266 # The addchangegroup function will get a transaction object by itself, but
1266 1267 # we need to make sure we trigger the creation of a transaction object used
1267 1268 # for the whole processing scope.
1268 1269 op.gettransaction()
1269 1270 unpackerversion = inpart.params.get('version', '01')
1270 1271 # We should raise an appropriate exception here
1271 1272 unpacker = changegroup.packermap[unpackerversion][1]
1272 1273 cg = unpacker(inpart, None)
1273 1274 # the source and url passed here are overwritten by the one contained in
1274 1275 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1275 1276 nbchangesets = None
1276 1277 if 'nbchanges' in inpart.params:
1277 1278 nbchangesets = int(inpart.params.get('nbchanges'))
1278 1279 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1279 1280 op.records.add('changegroup', {'return': ret})
1280 1281 if op.reply is not None:
1281 1282 # This is definitely not the final form of this
1282 1283 # return. But one need to start somewhere.
1283 1284 part = op.reply.newpart('reply:changegroup', mandatory=False)
1284 1285 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1285 1286 part.addparam('return', '%i' % ret, mandatory=False)
1286 1287 assert not inpart.read()
1287 1288
1288 1289 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1289 1290 ['digest:%s' % k for k in util.DIGESTS.keys()])
1290 1291 @parthandler('remote-changegroup', _remotechangegroupparams)
1291 1292 def handleremotechangegroup(op, inpart):
1292 1293 """apply a bundle10 on the repo, given an url and validation information
1293 1294
1294 1295 All the information about the remote bundle to import are given as
1295 1296 parameters. The parameters include:
1296 1297 - url: the url to the bundle10.
1297 1298 - size: the bundle10 file size. It is used to validate what was
1298 1299 retrieved by the client matches the server knowledge about the bundle.
1299 1300 - digests: a space separated list of the digest types provided as
1300 1301 parameters.
1301 1302 - digest:<digest-type>: the hexadecimal representation of the digest with
1302 1303 that name. Like the size, it is used to validate what was retrieved by
1303 1304 the client matches what the server knows about the bundle.
1304 1305
1305 1306 When multiple digest types are given, all of them are checked.
1306 1307 """
1307 1308 try:
1308 1309 raw_url = inpart.params['url']
1309 1310 except KeyError:
1310 1311 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1311 1312 parsed_url = util.url(raw_url)
1312 1313 if parsed_url.scheme not in capabilities['remote-changegroup']:
1313 1314 raise error.Abort(_('remote-changegroup does not support %s urls') %
1314 1315 parsed_url.scheme)
1315 1316
1316 1317 try:
1317 1318 size = int(inpart.params['size'])
1318 1319 except ValueError:
1319 1320 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1320 1321 % 'size')
1321 1322 except KeyError:
1322 1323 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1323 1324
1324 1325 digests = {}
1325 1326 for typ in inpart.params.get('digests', '').split():
1326 1327 param = 'digest:%s' % typ
1327 1328 try:
1328 1329 value = inpart.params[param]
1329 1330 except KeyError:
1330 1331 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1331 1332 param)
1332 1333 digests[typ] = value
1333 1334
1334 1335 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1335 1336
1336 1337 # Make sure we trigger a transaction creation
1337 1338 #
1338 1339 # The addchangegroup function will get a transaction object by itself, but
1339 1340 # we need to make sure we trigger the creation of a transaction object used
1340 1341 # for the whole processing scope.
1341 1342 op.gettransaction()
1342 1343 from . import exchange
1343 1344 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1344 1345 if not isinstance(cg, changegroup.cg1unpacker):
1345 1346 raise error.Abort(_('%s: not a bundle version 1.0') %
1346 1347 util.hidepassword(raw_url))
1347 1348 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1348 1349 op.records.add('changegroup', {'return': ret})
1349 1350 if op.reply is not None:
1350 1351 # This is definitely not the final form of this
1351 1352 # return. But one need to start somewhere.
1352 1353 part = op.reply.newpart('reply:changegroup')
1353 1354 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1354 1355 part.addparam('return', '%i' % ret, mandatory=False)
1355 1356 try:
1356 1357 real_part.validate()
1357 1358 except error.Abort as e:
1358 1359 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1359 1360 (util.hidepassword(raw_url), str(e)))
1360 1361 assert not inpart.read()
1361 1362
1362 1363 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1363 1364 def handlereplychangegroup(op, inpart):
1364 1365 ret = int(inpart.params['return'])
1365 1366 replyto = int(inpart.params['in-reply-to'])
1366 1367 op.records.add('changegroup', {'return': ret}, replyto)
1367 1368
1368 1369 @parthandler('check:heads')
1369 1370 def handlecheckheads(op, inpart):
1370 1371 """check that head of the repo did not change
1371 1372
1372 1373 This is used to detect a push race when using unbundle.
1373 1374 This replaces the "heads" argument of unbundle."""
1374 1375 h = inpart.read(20)
1375 1376 heads = []
1376 1377 while len(h) == 20:
1377 1378 heads.append(h)
1378 1379 h = inpart.read(20)
1379 1380 assert not h
1380 1381 # Trigger a transaction so that we are guaranteed to have the lock now.
1381 1382 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1382 1383 op.gettransaction()
1383 1384 if heads != op.repo.heads():
1384 1385 raise error.PushRaced('repository changed while pushing - '
1385 1386 'please try again')
1386 1387
1387 1388 @parthandler('output')
1388 1389 def handleoutput(op, inpart):
1389 1390 """forward output captured on the server to the client"""
1390 1391 for line in inpart.read().splitlines():
1391 1392 op.ui.status(('remote: %s\n' % line))
1392 1393
1393 1394 @parthandler('replycaps')
1394 1395 def handlereplycaps(op, inpart):
1395 1396 """Notify that a reply bundle should be created
1396 1397
1397 1398 The payload contains the capabilities information for the reply"""
1398 1399 caps = decodecaps(inpart.read())
1399 1400 if op.reply is None:
1400 1401 op.reply = bundle20(op.ui, caps)
1401 1402
1402 1403 @parthandler('error:abort', ('message', 'hint'))
1403 1404 def handleerrorabort(op, inpart):
1404 1405 """Used to transmit abort error over the wire"""
1405 1406 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1406 1407
1407 1408 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1408 1409 'in-reply-to'))
1409 1410 def handleerrorpushkey(op, inpart):
1410 1411 """Used to transmit failure of a mandatory pushkey over the wire"""
1411 1412 kwargs = {}
1412 1413 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1413 1414 value = inpart.params.get(name)
1414 1415 if value is not None:
1415 1416 kwargs[name] = value
1416 1417 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1417 1418
1418 1419 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1419 1420 def handleerrorunsupportedcontent(op, inpart):
1420 1421 """Used to transmit unknown content error over the wire"""
1421 1422 kwargs = {}
1422 1423 parttype = inpart.params.get('parttype')
1423 1424 if parttype is not None:
1424 1425 kwargs['parttype'] = parttype
1425 1426 params = inpart.params.get('params')
1426 1427 if params is not None:
1427 1428 kwargs['params'] = params.split('\0')
1428 1429
1429 1430 raise error.BundleUnknownFeatureError(**kwargs)
1430 1431
1431 1432 @parthandler('error:pushraced', ('message',))
1432 1433 def handleerrorpushraced(op, inpart):
1433 1434 """Used to transmit push race error over the wire"""
1434 1435 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1435 1436
1436 1437 @parthandler('listkeys', ('namespace',))
1437 1438 def handlelistkeys(op, inpart):
1438 1439 """retrieve pushkey namespace content stored in a bundle2"""
1439 1440 namespace = inpart.params['namespace']
1440 1441 r = pushkey.decodekeys(inpart.read())
1441 1442 op.records.add('listkeys', (namespace, r))
1442 1443
1443 1444 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1444 1445 def handlepushkey(op, inpart):
1445 1446 """process a pushkey request"""
1446 1447 dec = pushkey.decode
1447 1448 namespace = dec(inpart.params['namespace'])
1448 1449 key = dec(inpart.params['key'])
1449 1450 old = dec(inpart.params['old'])
1450 1451 new = dec(inpart.params['new'])
1451 1452 # Grab the transaction to ensure that we have the lock before performing the
1452 1453 # pushkey.
1453 1454 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1454 1455 op.gettransaction()
1455 1456 ret = op.repo.pushkey(namespace, key, old, new)
1456 1457 record = {'namespace': namespace,
1457 1458 'key': key,
1458 1459 'old': old,
1459 1460 'new': new}
1460 1461 op.records.add('pushkey', record)
1461 1462 if op.reply is not None:
1462 1463 rpart = op.reply.newpart('reply:pushkey')
1463 1464 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1464 1465 rpart.addparam('return', '%i' % ret, mandatory=False)
1465 1466 if inpart.mandatory and not ret:
1466 1467 kwargs = {}
1467 1468 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1468 1469 if key in inpart.params:
1469 1470 kwargs[key] = inpart.params[key]
1470 1471 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1471 1472
1472 1473 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1473 1474 def handlepushkeyreply(op, inpart):
1474 1475 """retrieve the result of a pushkey request"""
1475 1476 ret = int(inpart.params['return'])
1476 1477 partid = int(inpart.params['in-reply-to'])
1477 1478 op.records.add('pushkey', {'return': ret}, partid)
1478 1479
1479 1480 @parthandler('obsmarkers')
1480 1481 def handleobsmarker(op, inpart):
1481 1482 """add a stream of obsmarkers to the repo"""
1482 1483 tr = op.gettransaction()
1483 1484 markerdata = inpart.read()
1484 1485 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1485 1486 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1486 1487 % len(markerdata))
1487 1488 # The mergemarkers call will crash if marker creation is not enabled.
1488 1489 # we want to avoid this if the part is advisory.
1489 1490 if not inpart.mandatory and op.repo.obsstore.readonly:
1490 1491 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1491 1492 return
1492 1493 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1493 1494 if new:
1494 1495 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1495 1496 op.records.add('obsmarkers', {'new': new})
1496 1497 if op.reply is not None:
1497 1498 rpart = op.reply.newpart('reply:obsmarkers')
1498 1499 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1499 1500 rpart.addparam('new', '%i' % new, mandatory=False)
1500 1501
1501 1502
1502 1503 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1503 1504 def handleobsmarkerreply(op, inpart):
1504 1505 """retrieve the result of a pushkey request"""
1505 1506 ret = int(inpart.params['new'])
1506 1507 partid = int(inpart.params['in-reply-to'])
1507 1508 op.records.add('obsmarkers', {'new': ret}, partid)
1508 1509
1509 1510 @parthandler('hgtagsfnodes')
1510 1511 def handlehgtagsfnodes(op, inpart):
1511 1512 """Applies .hgtags fnodes cache entries to the local repo.
1512 1513
1513 1514 Payload is pairs of 20 byte changeset nodes and filenodes.
1514 1515 """
1515 1516 # Grab the transaction so we ensure that we have the lock at this point.
1516 1517 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1517 1518 op.gettransaction()
1518 1519 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1519 1520
1520 1521 count = 0
1521 1522 while True:
1522 1523 node = inpart.read(20)
1523 1524 fnode = inpart.read(20)
1524 1525 if len(node) < 20 or len(fnode) < 20:
1525 1526 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1526 1527 break
1527 1528 cache.setfnode(node, fnode)
1528 1529 count += 1
1529 1530
1530 1531 cache.write()
1531 1532 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now