##// END OF EJS Templates
bundle2: only seek to beginning of part in bundlerepo...
Gregory Szorc -
r35112:2b72bc88 default
parent child Browse files
Show More
@@ -1,2001 +1,1999
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import, division
149 149
150 150 import errno
151 151 import os
152 152 import re
153 153 import string
154 154 import struct
155 155 import sys
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 changegroup,
160 160 error,
161 161 node as nodemod,
162 162 obsolete,
163 163 phases,
164 164 pushkey,
165 165 pycompat,
166 166 tags,
167 167 url,
168 168 util,
169 169 )
170 170
171 171 urlerr = util.urlerr
172 172 urlreq = util.urlreq
173 173
174 174 _pack = struct.pack
175 175 _unpack = struct.unpack
176 176
177 177 _fstreamparamsize = '>i'
178 178 _fpartheadersize = '>i'
179 179 _fparttypesize = '>B'
180 180 _fpartid = '>I'
181 181 _fpayloadsize = '>i'
182 182 _fpartparamcount = '>BB'
183 183
184 184 preferedchunksize = 4096
185 185
186 186 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
187 187
188 188 def outdebug(ui, message):
189 189 """debug regarding output stream (bundling)"""
190 190 if ui.configbool('devel', 'bundle2.debug'):
191 191 ui.debug('bundle2-output: %s\n' % message)
192 192
193 193 def indebug(ui, message):
194 194 """debug on input stream (unbundling)"""
195 195 if ui.configbool('devel', 'bundle2.debug'):
196 196 ui.debug('bundle2-input: %s\n' % message)
197 197
198 198 def validateparttype(parttype):
199 199 """raise ValueError if a parttype contains invalid character"""
200 200 if _parttypeforbidden.search(parttype):
201 201 raise ValueError(parttype)
202 202
203 203 def _makefpartparamsizes(nbparams):
204 204 """return a struct format to read part parameter sizes
205 205
206 206 The number parameters is variable so we need to build that format
207 207 dynamically.
208 208 """
209 209 return '>'+('BB'*nbparams)
210 210
211 211 parthandlermapping = {}
212 212
213 213 def parthandler(parttype, params=()):
214 214 """decorator that register a function as a bundle2 part handler
215 215
216 216 eg::
217 217
218 218 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
219 219 def myparttypehandler(...):
220 220 '''process a part of type "my part".'''
221 221 ...
222 222 """
223 223 validateparttype(parttype)
224 224 def _decorator(func):
225 225 lparttype = parttype.lower() # enforce lower case matching.
226 226 assert lparttype not in parthandlermapping
227 227 parthandlermapping[lparttype] = func
228 228 func.params = frozenset(params)
229 229 return func
230 230 return _decorator
231 231
232 232 class unbundlerecords(object):
233 233 """keep record of what happens during and unbundle
234 234
235 235 New records are added using `records.add('cat', obj)`. Where 'cat' is a
236 236 category of record and obj is an arbitrary object.
237 237
238 238 `records['cat']` will return all entries of this category 'cat'.
239 239
240 240 Iterating on the object itself will yield `('category', obj)` tuples
241 241 for all entries.
242 242
243 243 All iterations happens in chronological order.
244 244 """
245 245
246 246 def __init__(self):
247 247 self._categories = {}
248 248 self._sequences = []
249 249 self._replies = {}
250 250
251 251 def add(self, category, entry, inreplyto=None):
252 252 """add a new record of a given category.
253 253
254 254 The entry can then be retrieved in the list returned by
255 255 self['category']."""
256 256 self._categories.setdefault(category, []).append(entry)
257 257 self._sequences.append((category, entry))
258 258 if inreplyto is not None:
259 259 self.getreplies(inreplyto).add(category, entry)
260 260
261 261 def getreplies(self, partid):
262 262 """get the records that are replies to a specific part"""
263 263 return self._replies.setdefault(partid, unbundlerecords())
264 264
265 265 def __getitem__(self, cat):
266 266 return tuple(self._categories.get(cat, ()))
267 267
268 268 def __iter__(self):
269 269 return iter(self._sequences)
270 270
271 271 def __len__(self):
272 272 return len(self._sequences)
273 273
274 274 def __nonzero__(self):
275 275 return bool(self._sequences)
276 276
277 277 __bool__ = __nonzero__
278 278
279 279 class bundleoperation(object):
280 280 """an object that represents a single bundling process
281 281
282 282 Its purpose is to carry unbundle-related objects and states.
283 283
284 284 A new object should be created at the beginning of each bundle processing.
285 285 The object is to be returned by the processing function.
286 286
287 287 The object has very little content now it will ultimately contain:
288 288 * an access to the repo the bundle is applied to,
289 289 * a ui object,
290 290 * a way to retrieve a transaction to add changes to the repo,
291 291 * a way to record the result of processing each part,
292 292 * a way to construct a bundle response when applicable.
293 293 """
294 294
295 295 def __init__(self, repo, transactiongetter, captureoutput=True):
296 296 self.repo = repo
297 297 self.ui = repo.ui
298 298 self.records = unbundlerecords()
299 299 self.reply = None
300 300 self.captureoutput = captureoutput
301 301 self.hookargs = {}
302 302 self._gettransaction = transactiongetter
303 303
304 304 def gettransaction(self):
305 305 transaction = self._gettransaction()
306 306
307 307 if self.hookargs:
308 308 # the ones added to the transaction supercede those added
309 309 # to the operation.
310 310 self.hookargs.update(transaction.hookargs)
311 311 transaction.hookargs = self.hookargs
312 312
313 313 # mark the hookargs as flushed. further attempts to add to
314 314 # hookargs will result in an abort.
315 315 self.hookargs = None
316 316
317 317 return transaction
318 318
319 319 def addhookargs(self, hookargs):
320 320 if self.hookargs is None:
321 321 raise error.ProgrammingError('attempted to add hookargs to '
322 322 'operation after transaction started')
323 323 self.hookargs.update(hookargs)
324 324
325 325 class TransactionUnavailable(RuntimeError):
326 326 pass
327 327
328 328 def _notransaction():
329 329 """default method to get a transaction while processing a bundle
330 330
331 331 Raise an exception to highlight the fact that no transaction was expected
332 332 to be created"""
333 333 raise TransactionUnavailable()
334 334
335 335 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
336 336 # transform me into unbundler.apply() as soon as the freeze is lifted
337 337 if isinstance(unbundler, unbundle20):
338 338 tr.hookargs['bundle2'] = '1'
339 339 if source is not None and 'source' not in tr.hookargs:
340 340 tr.hookargs['source'] = source
341 341 if url is not None and 'url' not in tr.hookargs:
342 342 tr.hookargs['url'] = url
343 343 return processbundle(repo, unbundler, lambda: tr)
344 344 else:
345 345 # the transactiongetter won't be used, but we might as well set it
346 346 op = bundleoperation(repo, lambda: tr)
347 347 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
348 348 return op
349 349
350 350 class partiterator(object):
351 351 def __init__(self, repo, op, unbundler):
352 352 self.repo = repo
353 353 self.op = op
354 354 self.unbundler = unbundler
355 355 self.iterator = None
356 356 self.count = 0
357 357 self.current = None
358 358
359 359 def __enter__(self):
360 360 def func():
361 361 itr = enumerate(self.unbundler.iterparts())
362 362 for count, p in itr:
363 363 self.count = count
364 364 self.current = p
365 365 yield p
366 366 p.consume()
367 367 self.current = None
368 368 self.iterator = func()
369 369 return self.iterator
370 370
371 371 def __exit__(self, type, exc, tb):
372 372 if not self.iterator:
373 373 return
374 374
375 375 # Only gracefully abort in a normal exception situation. User aborts
376 376 # like Ctrl+C throw a KeyboardInterrupt which is not a base Exception,
377 377 # and should not gracefully cleanup.
378 378 if isinstance(exc, Exception):
379 379 # Any exceptions seeking to the end of the bundle at this point are
380 380 # almost certainly related to the underlying stream being bad.
381 381 # And, chances are that the exception we're handling is related to
382 382 # getting in that bad state. So, we swallow the seeking error and
383 383 # re-raise the original error.
384 384 seekerror = False
385 385 try:
386 386 if self.current:
387 387 # consume the part content to not corrupt the stream.
388 388 self.current.consume()
389 389
390 390 for part in self.iterator:
391 391 # consume the bundle content
392 392 part.consume()
393 393 except Exception:
394 394 seekerror = True
395 395
396 396 # Small hack to let caller code distinguish exceptions from bundle2
397 397 # processing from processing the old format. This is mostly needed
398 398 # to handle different return codes to unbundle according to the type
399 399 # of bundle. We should probably clean up or drop this return code
400 400 # craziness in a future version.
401 401 exc.duringunbundle2 = True
402 402 salvaged = []
403 403 replycaps = None
404 404 if self.op.reply is not None:
405 405 salvaged = self.op.reply.salvageoutput()
406 406 replycaps = self.op.reply.capabilities
407 407 exc._replycaps = replycaps
408 408 exc._bundle2salvagedoutput = salvaged
409 409
410 410 # Re-raising from a variable loses the original stack. So only use
411 411 # that form if we need to.
412 412 if seekerror:
413 413 raise exc
414 414
415 415 self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
416 416 self.count)
417 417
418 418 def processbundle(repo, unbundler, transactiongetter=None, op=None):
419 419 """This function process a bundle, apply effect to/from a repo
420 420
421 421 It iterates over each part then searches for and uses the proper handling
422 422 code to process the part. Parts are processed in order.
423 423
424 424 Unknown Mandatory part will abort the process.
425 425
426 426 It is temporarily possible to provide a prebuilt bundleoperation to the
427 427 function. This is used to ensure output is properly propagated in case of
428 428 an error during the unbundling. This output capturing part will likely be
429 429 reworked and this ability will probably go away in the process.
430 430 """
431 431 if op is None:
432 432 if transactiongetter is None:
433 433 transactiongetter = _notransaction
434 434 op = bundleoperation(repo, transactiongetter)
435 435 # todo:
436 436 # - replace this is a init function soon.
437 437 # - exception catching
438 438 unbundler.params
439 439 if repo.ui.debugflag:
440 440 msg = ['bundle2-input-bundle:']
441 441 if unbundler.params:
442 442 msg.append(' %i params' % len(unbundler.params))
443 443 if op._gettransaction is None or op._gettransaction is _notransaction:
444 444 msg.append(' no-transaction')
445 445 else:
446 446 msg.append(' with-transaction')
447 447 msg.append('\n')
448 448 repo.ui.debug(''.join(msg))
449 449
450 450 processparts(repo, op, unbundler)
451 451
452 452 return op
453 453
454 454 def processparts(repo, op, unbundler):
455 455 with partiterator(repo, op, unbundler) as parts:
456 456 for part in parts:
457 457 _processpart(op, part)
458 458
459 459 def _processchangegroup(op, cg, tr, source, url, **kwargs):
460 460 ret = cg.apply(op.repo, tr, source, url, **kwargs)
461 461 op.records.add('changegroup', {
462 462 'return': ret,
463 463 })
464 464 return ret
465 465
466 466 def _gethandler(op, part):
467 467 status = 'unknown' # used by debug output
468 468 try:
469 469 handler = parthandlermapping.get(part.type)
470 470 if handler is None:
471 471 status = 'unsupported-type'
472 472 raise error.BundleUnknownFeatureError(parttype=part.type)
473 473 indebug(op.ui, 'found a handler for part %s' % part.type)
474 474 unknownparams = part.mandatorykeys - handler.params
475 475 if unknownparams:
476 476 unknownparams = list(unknownparams)
477 477 unknownparams.sort()
478 478 status = 'unsupported-params (%s)' % ', '.join(unknownparams)
479 479 raise error.BundleUnknownFeatureError(parttype=part.type,
480 480 params=unknownparams)
481 481 status = 'supported'
482 482 except error.BundleUnknownFeatureError as exc:
483 483 if part.mandatory: # mandatory parts
484 484 raise
485 485 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
486 486 return # skip to part processing
487 487 finally:
488 488 if op.ui.debugflag:
489 489 msg = ['bundle2-input-part: "%s"' % part.type]
490 490 if not part.mandatory:
491 491 msg.append(' (advisory)')
492 492 nbmp = len(part.mandatorykeys)
493 493 nbap = len(part.params) - nbmp
494 494 if nbmp or nbap:
495 495 msg.append(' (params:')
496 496 if nbmp:
497 497 msg.append(' %i mandatory' % nbmp)
498 498 if nbap:
499 499 msg.append(' %i advisory' % nbmp)
500 500 msg.append(')')
501 501 msg.append(' %s\n' % status)
502 502 op.ui.debug(''.join(msg))
503 503
504 504 return handler
505 505
506 506 def _processpart(op, part):
507 507 """process a single part from a bundle
508 508
509 509 The part is guaranteed to have been fully consumed when the function exits
510 510 (even if an exception is raised)."""
511 511 handler = _gethandler(op, part)
512 512 if handler is None:
513 513 return
514 514
515 515 # handler is called outside the above try block so that we don't
516 516 # risk catching KeyErrors from anything other than the
517 517 # parthandlermapping lookup (any KeyError raised by handler()
518 518 # itself represents a defect of a different variety).
519 519 output = None
520 520 if op.captureoutput and op.reply is not None:
521 521 op.ui.pushbuffer(error=True, subproc=True)
522 522 output = ''
523 523 try:
524 524 handler(op, part)
525 525 finally:
526 526 if output is not None:
527 527 output = op.ui.popbuffer()
528 528 if output:
529 529 outpart = op.reply.newpart('output', data=output,
530 530 mandatory=False)
531 531 outpart.addparam(
532 532 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
533 533
534 534 def decodecaps(blob):
535 535 """decode a bundle2 caps bytes blob into a dictionary
536 536
537 537 The blob is a list of capabilities (one per line)
538 538 Capabilities may have values using a line of the form::
539 539
540 540 capability=value1,value2,value3
541 541
542 542 The values are always a list."""
543 543 caps = {}
544 544 for line in blob.splitlines():
545 545 if not line:
546 546 continue
547 547 if '=' not in line:
548 548 key, vals = line, ()
549 549 else:
550 550 key, vals = line.split('=', 1)
551 551 vals = vals.split(',')
552 552 key = urlreq.unquote(key)
553 553 vals = [urlreq.unquote(v) for v in vals]
554 554 caps[key] = vals
555 555 return caps
556 556
557 557 def encodecaps(caps):
558 558 """encode a bundle2 caps dictionary into a bytes blob"""
559 559 chunks = []
560 560 for ca in sorted(caps):
561 561 vals = caps[ca]
562 562 ca = urlreq.quote(ca)
563 563 vals = [urlreq.quote(v) for v in vals]
564 564 if vals:
565 565 ca = "%s=%s" % (ca, ','.join(vals))
566 566 chunks.append(ca)
567 567 return '\n'.join(chunks)
568 568
569 569 bundletypes = {
570 570 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
571 571 # since the unification ssh accepts a header but there
572 572 # is no capability signaling it.
573 573 "HG20": (), # special-cased below
574 574 "HG10UN": ("HG10UN", 'UN'),
575 575 "HG10BZ": ("HG10", 'BZ'),
576 576 "HG10GZ": ("HG10GZ", 'GZ'),
577 577 }
578 578
579 579 # hgweb uses this list to communicate its preferred type
580 580 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
581 581
582 582 class bundle20(object):
583 583 """represent an outgoing bundle2 container
584 584
585 585 Use the `addparam` method to add stream level parameter. and `newpart` to
586 586 populate it. Then call `getchunks` to retrieve all the binary chunks of
587 587 data that compose the bundle2 container."""
588 588
589 589 _magicstring = 'HG20'
590 590
591 591 def __init__(self, ui, capabilities=()):
592 592 self.ui = ui
593 593 self._params = []
594 594 self._parts = []
595 595 self.capabilities = dict(capabilities)
596 596 self._compengine = util.compengines.forbundletype('UN')
597 597 self._compopts = None
598 598
599 599 def setcompression(self, alg, compopts=None):
600 600 """setup core part compression to <alg>"""
601 601 if alg in (None, 'UN'):
602 602 return
603 603 assert not any(n.lower() == 'compression' for n, v in self._params)
604 604 self.addparam('Compression', alg)
605 605 self._compengine = util.compengines.forbundletype(alg)
606 606 self._compopts = compopts
607 607
608 608 @property
609 609 def nbparts(self):
610 610 """total number of parts added to the bundler"""
611 611 return len(self._parts)
612 612
613 613 # methods used to defines the bundle2 content
614 614 def addparam(self, name, value=None):
615 615 """add a stream level parameter"""
616 616 if not name:
617 617 raise ValueError(r'empty parameter name')
618 618 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
619 619 raise ValueError(r'non letter first character: %s' % name)
620 620 self._params.append((name, value))
621 621
622 622 def addpart(self, part):
623 623 """add a new part to the bundle2 container
624 624
625 625 Parts contains the actual applicative payload."""
626 626 assert part.id is None
627 627 part.id = len(self._parts) # very cheap counter
628 628 self._parts.append(part)
629 629
630 630 def newpart(self, typeid, *args, **kwargs):
631 631 """create a new part and add it to the containers
632 632
633 633 As the part is directly added to the containers. For now, this means
634 634 that any failure to properly initialize the part after calling
635 635 ``newpart`` should result in a failure of the whole bundling process.
636 636
637 637 You can still fall back to manually create and add if you need better
638 638 control."""
639 639 part = bundlepart(typeid, *args, **kwargs)
640 640 self.addpart(part)
641 641 return part
642 642
643 643 # methods used to generate the bundle2 stream
644 644 def getchunks(self):
645 645 if self.ui.debugflag:
646 646 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
647 647 if self._params:
648 648 msg.append(' (%i params)' % len(self._params))
649 649 msg.append(' %i parts total\n' % len(self._parts))
650 650 self.ui.debug(''.join(msg))
651 651 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
652 652 yield self._magicstring
653 653 param = self._paramchunk()
654 654 outdebug(self.ui, 'bundle parameter: %s' % param)
655 655 yield _pack(_fstreamparamsize, len(param))
656 656 if param:
657 657 yield param
658 658 for chunk in self._compengine.compressstream(self._getcorechunk(),
659 659 self._compopts):
660 660 yield chunk
661 661
662 662 def _paramchunk(self):
663 663 """return a encoded version of all stream parameters"""
664 664 blocks = []
665 665 for par, value in self._params:
666 666 par = urlreq.quote(par)
667 667 if value is not None:
668 668 value = urlreq.quote(value)
669 669 par = '%s=%s' % (par, value)
670 670 blocks.append(par)
671 671 return ' '.join(blocks)
672 672
673 673 def _getcorechunk(self):
674 674 """yield chunk for the core part of the bundle
675 675
676 676 (all but headers and parameters)"""
677 677 outdebug(self.ui, 'start of parts')
678 678 for part in self._parts:
679 679 outdebug(self.ui, 'bundle part: "%s"' % part.type)
680 680 for chunk in part.getchunks(ui=self.ui):
681 681 yield chunk
682 682 outdebug(self.ui, 'end of bundle')
683 683 yield _pack(_fpartheadersize, 0)
684 684
685 685
686 686 def salvageoutput(self):
687 687 """return a list with a copy of all output parts in the bundle
688 688
689 689 This is meant to be used during error handling to make sure we preserve
690 690 server output"""
691 691 salvaged = []
692 692 for part in self._parts:
693 693 if part.type.startswith('output'):
694 694 salvaged.append(part.copy())
695 695 return salvaged
696 696
697 697
698 698 class unpackermixin(object):
699 699 """A mixin to extract bytes and struct data from a stream"""
700 700
701 701 def __init__(self, fp):
702 702 self._fp = fp
703 703
704 704 def _unpack(self, format):
705 705 """unpack this struct format from the stream
706 706
707 707 This method is meant for internal usage by the bundle2 protocol only.
708 708 They directly manipulate the low level stream including bundle2 level
709 709 instruction.
710 710
711 711 Do not use it to implement higher-level logic or methods."""
712 712 data = self._readexact(struct.calcsize(format))
713 713 return _unpack(format, data)
714 714
715 715 def _readexact(self, size):
716 716 """read exactly <size> bytes from the stream
717 717
718 718 This method is meant for internal usage by the bundle2 protocol only.
719 719 They directly manipulate the low level stream including bundle2 level
720 720 instruction.
721 721
722 722 Do not use it to implement higher-level logic or methods."""
723 723 return changegroup.readexactly(self._fp, size)
724 724
725 725 def getunbundler(ui, fp, magicstring=None):
726 726 """return a valid unbundler object for a given magicstring"""
727 727 if magicstring is None:
728 728 magicstring = changegroup.readexactly(fp, 4)
729 729 magic, version = magicstring[0:2], magicstring[2:4]
730 730 if magic != 'HG':
731 731 ui.debug(
732 732 "error: invalid magic: %r (version %r), should be 'HG'\n"
733 733 % (magic, version))
734 734 raise error.Abort(_('not a Mercurial bundle'))
735 735 unbundlerclass = formatmap.get(version)
736 736 if unbundlerclass is None:
737 737 raise error.Abort(_('unknown bundle version %s') % version)
738 738 unbundler = unbundlerclass(ui, fp)
739 739 indebug(ui, 'start processing of %s stream' % magicstring)
740 740 return unbundler
741 741
742 742 class unbundle20(unpackermixin):
743 743 """interpret a bundle2 stream
744 744
745 745 This class is fed with a binary stream and yields parts through its
746 746 `iterparts` methods."""
747 747
748 748 _magicstring = 'HG20'
749 749
750 750 def __init__(self, ui, fp):
751 751 """If header is specified, we do not read it out of the stream."""
752 752 self.ui = ui
753 753 self._compengine = util.compengines.forbundletype('UN')
754 754 self._compressed = None
755 755 super(unbundle20, self).__init__(fp)
756 756
757 757 @util.propertycache
758 758 def params(self):
759 759 """dictionary of stream level parameters"""
760 760 indebug(self.ui, 'reading bundle2 stream parameters')
761 761 params = {}
762 762 paramssize = self._unpack(_fstreamparamsize)[0]
763 763 if paramssize < 0:
764 764 raise error.BundleValueError('negative bundle param size: %i'
765 765 % paramssize)
766 766 if paramssize:
767 767 params = self._readexact(paramssize)
768 768 params = self._processallparams(params)
769 769 return params
770 770
771 771 def _processallparams(self, paramsblock):
772 772 """"""
773 773 params = util.sortdict()
774 774 for p in paramsblock.split(' '):
775 775 p = p.split('=', 1)
776 776 p = [urlreq.unquote(i) for i in p]
777 777 if len(p) < 2:
778 778 p.append(None)
779 779 self._processparam(*p)
780 780 params[p[0]] = p[1]
781 781 return params
782 782
783 783
784 784 def _processparam(self, name, value):
785 785 """process a parameter, applying its effect if needed
786 786
787 787 Parameter starting with a lower case letter are advisory and will be
788 788 ignored when unknown. Those starting with an upper case letter are
789 789 mandatory and will this function will raise a KeyError when unknown.
790 790
791 791 Note: no option are currently supported. Any input will be either
792 792 ignored or failing.
793 793 """
794 794 if not name:
795 795 raise ValueError(r'empty parameter name')
796 796 if name[0:1] not in pycompat.bytestr(string.ascii_letters):
797 797 raise ValueError(r'non letter first character: %s' % name)
798 798 try:
799 799 handler = b2streamparamsmap[name.lower()]
800 800 except KeyError:
801 801 if name[0:1].islower():
802 802 indebug(self.ui, "ignoring unknown parameter %s" % name)
803 803 else:
804 804 raise error.BundleUnknownFeatureError(params=(name,))
805 805 else:
806 806 handler(self, name, value)
807 807
808 808 def _forwardchunks(self):
809 809 """utility to transfer a bundle2 as binary
810 810
811 811 This is made necessary by the fact the 'getbundle' command over 'ssh'
812 812 have no way to know then the reply end, relying on the bundle to be
813 813 interpreted to know its end. This is terrible and we are sorry, but we
814 814 needed to move forward to get general delta enabled.
815 815 """
816 816 yield self._magicstring
817 817 assert 'params' not in vars(self)
818 818 paramssize = self._unpack(_fstreamparamsize)[0]
819 819 if paramssize < 0:
820 820 raise error.BundleValueError('negative bundle param size: %i'
821 821 % paramssize)
822 822 yield _pack(_fstreamparamsize, paramssize)
823 823 if paramssize:
824 824 params = self._readexact(paramssize)
825 825 self._processallparams(params)
826 826 yield params
827 827 assert self._compengine.bundletype == 'UN'
828 828 # From there, payload might need to be decompressed
829 829 self._fp = self._compengine.decompressorreader(self._fp)
830 830 emptycount = 0
831 831 while emptycount < 2:
832 832 # so we can brainlessly loop
833 833 assert _fpartheadersize == _fpayloadsize
834 834 size = self._unpack(_fpartheadersize)[0]
835 835 yield _pack(_fpartheadersize, size)
836 836 if size:
837 837 emptycount = 0
838 838 else:
839 839 emptycount += 1
840 840 continue
841 841 if size == flaginterrupt:
842 842 continue
843 843 elif size < 0:
844 844 raise error.BundleValueError('negative chunk size: %i')
845 845 yield self._readexact(size)
846 846
847 847
848 848 def iterparts(self):
849 849 """yield all parts contained in the stream"""
850 850 # make sure param have been loaded
851 851 self.params
852 852 # From there, payload need to be decompressed
853 853 self._fp = self._compengine.decompressorreader(self._fp)
854 854 indebug(self.ui, 'start extraction of bundle2 parts')
855 855 headerblock = self._readpartheader()
856 856 while headerblock is not None:
857 857 part = seekableunbundlepart(self.ui, headerblock, self._fp)
858 858 yield part
859 859 # Ensure part is fully consumed so we can start reading the next
860 860 # part.
861 861 part.consume()
862 # But then seek back to the beginning so the code consuming this
863 # generator has a part that starts at 0.
864 part.seek(0, os.SEEK_SET)
862
865 863 headerblock = self._readpartheader()
866 864 indebug(self.ui, 'end of bundle2 stream')
867 865
868 866 def _readpartheader(self):
869 867 """reads a part header size and return the bytes blob
870 868
871 869 returns None if empty"""
872 870 headersize = self._unpack(_fpartheadersize)[0]
873 871 if headersize < 0:
874 872 raise error.BundleValueError('negative part header size: %i'
875 873 % headersize)
876 874 indebug(self.ui, 'part header size: %i' % headersize)
877 875 if headersize:
878 876 return self._readexact(headersize)
879 877 return None
880 878
881 879 def compressed(self):
882 880 self.params # load params
883 881 return self._compressed
884 882
885 883 def close(self):
886 884 """close underlying file"""
887 885 if util.safehasattr(self._fp, 'close'):
888 886 return self._fp.close()
889 887
890 888 formatmap = {'20': unbundle20}
891 889
892 890 b2streamparamsmap = {}
893 891
894 892 def b2streamparamhandler(name):
895 893 """register a handler for a stream level parameter"""
896 894 def decorator(func):
897 895 assert name not in formatmap
898 896 b2streamparamsmap[name] = func
899 897 return func
900 898 return decorator
901 899
902 900 @b2streamparamhandler('compression')
903 901 def processcompression(unbundler, param, value):
904 902 """read compression parameter and install payload decompression"""
905 903 if value not in util.compengines.supportedbundletypes:
906 904 raise error.BundleUnknownFeatureError(params=(param,),
907 905 values=(value,))
908 906 unbundler._compengine = util.compengines.forbundletype(value)
909 907 if value is not None:
910 908 unbundler._compressed = True
911 909
912 910 class bundlepart(object):
913 911 """A bundle2 part contains application level payload
914 912
915 913 The part `type` is used to route the part to the application level
916 914 handler.
917 915
918 916 The part payload is contained in ``part.data``. It could be raw bytes or a
919 917 generator of byte chunks.
920 918
921 919 You can add parameters to the part using the ``addparam`` method.
922 920 Parameters can be either mandatory (default) or advisory. Remote side
923 921 should be able to safely ignore the advisory ones.
924 922
925 923 Both data and parameters cannot be modified after the generation has begun.
926 924 """
927 925
928 926 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
929 927 data='', mandatory=True):
930 928 validateparttype(parttype)
931 929 self.id = None
932 930 self.type = parttype
933 931 self._data = data
934 932 self._mandatoryparams = list(mandatoryparams)
935 933 self._advisoryparams = list(advisoryparams)
936 934 # checking for duplicated entries
937 935 self._seenparams = set()
938 936 for pname, __ in self._mandatoryparams + self._advisoryparams:
939 937 if pname in self._seenparams:
940 938 raise error.ProgrammingError('duplicated params: %s' % pname)
941 939 self._seenparams.add(pname)
942 940 # status of the part's generation:
943 941 # - None: not started,
944 942 # - False: currently generated,
945 943 # - True: generation done.
946 944 self._generated = None
947 945 self.mandatory = mandatory
948 946
949 947 def __repr__(self):
950 948 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
951 949 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
952 950 % (cls, id(self), self.id, self.type, self.mandatory))
953 951
954 952 def copy(self):
955 953 """return a copy of the part
956 954
957 955 The new part have the very same content but no partid assigned yet.
958 956 Parts with generated data cannot be copied."""
959 957 assert not util.safehasattr(self.data, 'next')
960 958 return self.__class__(self.type, self._mandatoryparams,
961 959 self._advisoryparams, self._data, self.mandatory)
962 960
963 961 # methods used to defines the part content
964 962 @property
965 963 def data(self):
966 964 return self._data
967 965
968 966 @data.setter
969 967 def data(self, data):
970 968 if self._generated is not None:
971 969 raise error.ReadOnlyPartError('part is being generated')
972 970 self._data = data
973 971
974 972 @property
975 973 def mandatoryparams(self):
976 974 # make it an immutable tuple to force people through ``addparam``
977 975 return tuple(self._mandatoryparams)
978 976
979 977 @property
980 978 def advisoryparams(self):
981 979 # make it an immutable tuple to force people through ``addparam``
982 980 return tuple(self._advisoryparams)
983 981
984 982 def addparam(self, name, value='', mandatory=True):
985 983 """add a parameter to the part
986 984
987 985 If 'mandatory' is set to True, the remote handler must claim support
988 986 for this parameter or the unbundling will be aborted.
989 987
990 988 The 'name' and 'value' cannot exceed 255 bytes each.
991 989 """
992 990 if self._generated is not None:
993 991 raise error.ReadOnlyPartError('part is being generated')
994 992 if name in self._seenparams:
995 993 raise ValueError('duplicated params: %s' % name)
996 994 self._seenparams.add(name)
997 995 params = self._advisoryparams
998 996 if mandatory:
999 997 params = self._mandatoryparams
1000 998 params.append((name, value))
1001 999
1002 1000 # methods used to generates the bundle2 stream
1003 1001 def getchunks(self, ui):
1004 1002 if self._generated is not None:
1005 1003 raise error.ProgrammingError('part can only be consumed once')
1006 1004 self._generated = False
1007 1005
1008 1006 if ui.debugflag:
1009 1007 msg = ['bundle2-output-part: "%s"' % self.type]
1010 1008 if not self.mandatory:
1011 1009 msg.append(' (advisory)')
1012 1010 nbmp = len(self.mandatoryparams)
1013 1011 nbap = len(self.advisoryparams)
1014 1012 if nbmp or nbap:
1015 1013 msg.append(' (params:')
1016 1014 if nbmp:
1017 1015 msg.append(' %i mandatory' % nbmp)
1018 1016 if nbap:
1019 1017 msg.append(' %i advisory' % nbmp)
1020 1018 msg.append(')')
1021 1019 if not self.data:
1022 1020 msg.append(' empty payload')
1023 1021 elif (util.safehasattr(self.data, 'next')
1024 1022 or util.safehasattr(self.data, '__next__')):
1025 1023 msg.append(' streamed payload')
1026 1024 else:
1027 1025 msg.append(' %i bytes payload' % len(self.data))
1028 1026 msg.append('\n')
1029 1027 ui.debug(''.join(msg))
1030 1028
1031 1029 #### header
1032 1030 if self.mandatory:
1033 1031 parttype = self.type.upper()
1034 1032 else:
1035 1033 parttype = self.type.lower()
1036 1034 outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
1037 1035 ## parttype
1038 1036 header = [_pack(_fparttypesize, len(parttype)),
1039 1037 parttype, _pack(_fpartid, self.id),
1040 1038 ]
1041 1039 ## parameters
1042 1040 # count
1043 1041 manpar = self.mandatoryparams
1044 1042 advpar = self.advisoryparams
1045 1043 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
1046 1044 # size
1047 1045 parsizes = []
1048 1046 for key, value in manpar:
1049 1047 parsizes.append(len(key))
1050 1048 parsizes.append(len(value))
1051 1049 for key, value in advpar:
1052 1050 parsizes.append(len(key))
1053 1051 parsizes.append(len(value))
1054 1052 paramsizes = _pack(_makefpartparamsizes(len(parsizes) // 2), *parsizes)
1055 1053 header.append(paramsizes)
1056 1054 # key, value
1057 1055 for key, value in manpar:
1058 1056 header.append(key)
1059 1057 header.append(value)
1060 1058 for key, value in advpar:
1061 1059 header.append(key)
1062 1060 header.append(value)
1063 1061 ## finalize header
1064 1062 try:
1065 1063 headerchunk = ''.join(header)
1066 1064 except TypeError:
1067 1065 raise TypeError(r'Found a non-bytes trying to '
1068 1066 r'build bundle part header: %r' % header)
1069 1067 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1070 1068 yield _pack(_fpartheadersize, len(headerchunk))
1071 1069 yield headerchunk
1072 1070 ## payload
1073 1071 try:
1074 1072 for chunk in self._payloadchunks():
1075 1073 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1076 1074 yield _pack(_fpayloadsize, len(chunk))
1077 1075 yield chunk
1078 1076 except GeneratorExit:
1079 1077 # GeneratorExit means that nobody is listening for our
1080 1078 # results anyway, so just bail quickly rather than trying
1081 1079 # to produce an error part.
1082 1080 ui.debug('bundle2-generatorexit\n')
1083 1081 raise
1084 1082 except BaseException as exc:
1085 1083 bexc = util.forcebytestr(exc)
1086 1084 # backup exception data for later
1087 1085 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1088 1086 % bexc)
1089 1087 tb = sys.exc_info()[2]
1090 1088 msg = 'unexpected error: %s' % bexc
1091 1089 interpart = bundlepart('error:abort', [('message', msg)],
1092 1090 mandatory=False)
1093 1091 interpart.id = 0
1094 1092 yield _pack(_fpayloadsize, -1)
1095 1093 for chunk in interpart.getchunks(ui=ui):
1096 1094 yield chunk
1097 1095 outdebug(ui, 'closing payload chunk')
1098 1096 # abort current part payload
1099 1097 yield _pack(_fpayloadsize, 0)
1100 1098 pycompat.raisewithtb(exc, tb)
1101 1099 # end of payload
1102 1100 outdebug(ui, 'closing payload chunk')
1103 1101 yield _pack(_fpayloadsize, 0)
1104 1102 self._generated = True
1105 1103
1106 1104 def _payloadchunks(self):
1107 1105 """yield chunks of a the part payload
1108 1106
1109 1107 Exists to handle the different methods to provide data to a part."""
1110 1108 # we only support fixed size data now.
1111 1109 # This will be improved in the future.
1112 1110 if (util.safehasattr(self.data, 'next')
1113 1111 or util.safehasattr(self.data, '__next__')):
1114 1112 buff = util.chunkbuffer(self.data)
1115 1113 chunk = buff.read(preferedchunksize)
1116 1114 while chunk:
1117 1115 yield chunk
1118 1116 chunk = buff.read(preferedchunksize)
1119 1117 elif len(self.data):
1120 1118 yield self.data
1121 1119
1122 1120
1123 1121 flaginterrupt = -1
1124 1122
1125 1123 class interrupthandler(unpackermixin):
1126 1124 """read one part and process it with restricted capability
1127 1125
1128 1126 This allows to transmit exception raised on the producer size during part
1129 1127 iteration while the consumer is reading a part.
1130 1128
1131 1129 Part processed in this manner only have access to a ui object,"""
1132 1130
1133 1131 def __init__(self, ui, fp):
1134 1132 super(interrupthandler, self).__init__(fp)
1135 1133 self.ui = ui
1136 1134
1137 1135 def _readpartheader(self):
1138 1136 """reads a part header size and return the bytes blob
1139 1137
1140 1138 returns None if empty"""
1141 1139 headersize = self._unpack(_fpartheadersize)[0]
1142 1140 if headersize < 0:
1143 1141 raise error.BundleValueError('negative part header size: %i'
1144 1142 % headersize)
1145 1143 indebug(self.ui, 'part header size: %i\n' % headersize)
1146 1144 if headersize:
1147 1145 return self._readexact(headersize)
1148 1146 return None
1149 1147
1150 1148 def __call__(self):
1151 1149
1152 1150 self.ui.debug('bundle2-input-stream-interrupt:'
1153 1151 ' opening out of band context\n')
1154 1152 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1155 1153 headerblock = self._readpartheader()
1156 1154 if headerblock is None:
1157 1155 indebug(self.ui, 'no part found during interruption.')
1158 1156 return
1159 1157 part = seekableunbundlepart(self.ui, headerblock, self._fp)
1160 1158 op = interruptoperation(self.ui)
1161 1159 hardabort = False
1162 1160 try:
1163 1161 _processpart(op, part)
1164 1162 except (SystemExit, KeyboardInterrupt):
1165 1163 hardabort = True
1166 1164 raise
1167 1165 finally:
1168 1166 if not hardabort:
1169 1167 part.consume()
1170 1168 self.ui.debug('bundle2-input-stream-interrupt:'
1171 1169 ' closing out of band context\n')
1172 1170
1173 1171 class interruptoperation(object):
1174 1172 """A limited operation to be use by part handler during interruption
1175 1173
1176 1174 It only have access to an ui object.
1177 1175 """
1178 1176
1179 1177 def __init__(self, ui):
1180 1178 self.ui = ui
1181 1179 self.reply = None
1182 1180 self.captureoutput = False
1183 1181
1184 1182 @property
1185 1183 def repo(self):
1186 1184 raise error.ProgrammingError('no repo access from stream interruption')
1187 1185
1188 1186 def gettransaction(self):
1189 1187 raise TransactionUnavailable('no repo access from stream interruption')
1190 1188
1191 1189 def decodepayloadchunks(ui, fh):
1192 1190 """Reads bundle2 part payload data into chunks.
1193 1191
1194 1192 Part payload data consists of framed chunks. This function takes
1195 1193 a file handle and emits those chunks.
1196 1194 """
1197 1195 headersize = struct.calcsize(_fpayloadsize)
1198 1196 readexactly = changegroup.readexactly
1199 1197
1200 1198 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
1201 1199 indebug(ui, 'payload chunk size: %i' % chunksize)
1202 1200
1203 1201 while chunksize:
1204 1202 if chunksize >= 0:
1205 1203 yield readexactly(fh, chunksize)
1206 1204 elif chunksize == flaginterrupt:
1207 1205 # Interrupt "signal" detected. The regular stream is interrupted
1208 1206 # and a bundle2 part follows. Consume it.
1209 1207 interrupthandler(ui, fh)()
1210 1208 else:
1211 1209 raise error.BundleValueError(
1212 1210 'negative payload chunk size: %s' % chunksize)
1213 1211
1214 1212 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
1215 1213 indebug(ui, 'payload chunk size: %i' % chunksize)
1216 1214
1217 1215 class unbundlepart(unpackermixin):
1218 1216 """a bundle part read from a bundle"""
1219 1217
1220 1218 def __init__(self, ui, header, fp):
1221 1219 super(unbundlepart, self).__init__(fp)
1222 1220 self._seekable = (util.safehasattr(fp, 'seek') and
1223 1221 util.safehasattr(fp, 'tell'))
1224 1222 self.ui = ui
1225 1223 # unbundle state attr
1226 1224 self._headerdata = header
1227 1225 self._headeroffset = 0
1228 1226 self._initialized = False
1229 1227 self.consumed = False
1230 1228 # part data
1231 1229 self.id = None
1232 1230 self.type = None
1233 1231 self.mandatoryparams = None
1234 1232 self.advisoryparams = None
1235 1233 self.params = None
1236 1234 self.mandatorykeys = ()
1237 1235 self._readheader()
1238 1236 self._mandatory = None
1239 1237 self._pos = 0
1240 1238
1241 1239 def _fromheader(self, size):
1242 1240 """return the next <size> byte from the header"""
1243 1241 offset = self._headeroffset
1244 1242 data = self._headerdata[offset:(offset + size)]
1245 1243 self._headeroffset = offset + size
1246 1244 return data
1247 1245
1248 1246 def _unpackheader(self, format):
1249 1247 """read given format from header
1250 1248
1251 1249 This automatically compute the size of the format to read."""
1252 1250 data = self._fromheader(struct.calcsize(format))
1253 1251 return _unpack(format, data)
1254 1252
1255 1253 def _initparams(self, mandatoryparams, advisoryparams):
1256 1254 """internal function to setup all logic related parameters"""
1257 1255 # make it read only to prevent people touching it by mistake.
1258 1256 self.mandatoryparams = tuple(mandatoryparams)
1259 1257 self.advisoryparams = tuple(advisoryparams)
1260 1258 # user friendly UI
1261 1259 self.params = util.sortdict(self.mandatoryparams)
1262 1260 self.params.update(self.advisoryparams)
1263 1261 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1264 1262
1265 1263 def _readheader(self):
1266 1264 """read the header and setup the object"""
1267 1265 typesize = self._unpackheader(_fparttypesize)[0]
1268 1266 self.type = self._fromheader(typesize)
1269 1267 indebug(self.ui, 'part type: "%s"' % self.type)
1270 1268 self.id = self._unpackheader(_fpartid)[0]
1271 1269 indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
1272 1270 # extract mandatory bit from type
1273 1271 self.mandatory = (self.type != self.type.lower())
1274 1272 self.type = self.type.lower()
1275 1273 ## reading parameters
1276 1274 # param count
1277 1275 mancount, advcount = self._unpackheader(_fpartparamcount)
1278 1276 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1279 1277 # param size
1280 1278 fparamsizes = _makefpartparamsizes(mancount + advcount)
1281 1279 paramsizes = self._unpackheader(fparamsizes)
1282 1280 # make it a list of couple again
1283 1281 paramsizes = list(zip(paramsizes[::2], paramsizes[1::2]))
1284 1282 # split mandatory from advisory
1285 1283 mansizes = paramsizes[:mancount]
1286 1284 advsizes = paramsizes[mancount:]
1287 1285 # retrieve param value
1288 1286 manparams = []
1289 1287 for key, value in mansizes:
1290 1288 manparams.append((self._fromheader(key), self._fromheader(value)))
1291 1289 advparams = []
1292 1290 for key, value in advsizes:
1293 1291 advparams.append((self._fromheader(key), self._fromheader(value)))
1294 1292 self._initparams(manparams, advparams)
1295 1293 ## part payload
1296 1294 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1297 1295 # we read the data, tell it
1298 1296 self._initialized = True
1299 1297
1300 1298 def _payloadchunks(self):
1301 1299 """Generator of decoded chunks in the payload."""
1302 1300 return decodepayloadchunks(self.ui, self._fp)
1303 1301
1304 1302 def consume(self):
1305 1303 """Read the part payload until completion.
1306 1304
1307 1305 By consuming the part data, the underlying stream read offset will
1308 1306 be advanced to the next part (or end of stream).
1309 1307 """
1310 1308 if self.consumed:
1311 1309 return
1312 1310
1313 1311 chunk = self.read(32768)
1314 1312 while chunk:
1315 1313 self._pos += len(chunk)
1316 1314 chunk = self.read(32768)
1317 1315
1318 1316 def read(self, size=None):
1319 1317 """read payload data"""
1320 1318 if not self._initialized:
1321 1319 self._readheader()
1322 1320 if size is None:
1323 1321 data = self._payloadstream.read()
1324 1322 else:
1325 1323 data = self._payloadstream.read(size)
1326 1324 self._pos += len(data)
1327 1325 if size is None or len(data) < size:
1328 1326 if not self.consumed and self._pos:
1329 1327 self.ui.debug('bundle2-input-part: total payload size %i\n'
1330 1328 % self._pos)
1331 1329 self.consumed = True
1332 1330 return data
1333 1331
1334 1332 class seekableunbundlepart(unbundlepart):
1335 1333 """A bundle2 part in a bundle that is seekable.
1336 1334
1337 1335 Regular ``unbundlepart`` instances can only be read once. This class
1338 1336 extends ``unbundlepart`` to enable bi-directional seeking within the
1339 1337 part.
1340 1338
1341 1339 Bundle2 part data consists of framed chunks. Offsets when seeking
1342 1340 refer to the decoded data, not the offsets in the underlying bundle2
1343 1341 stream.
1344 1342
1345 1343 To facilitate quickly seeking within the decoded data, instances of this
1346 1344 class maintain a mapping between offsets in the underlying stream and
1347 1345 the decoded payload. This mapping will consume memory in proportion
1348 1346 to the number of chunks within the payload (which almost certainly
1349 1347 increases in proportion with the size of the part).
1350 1348 """
1351 1349 def __init__(self, ui, header, fp):
1352 1350 # (payload, file) offsets for chunk starts.
1353 1351 self._chunkindex = []
1354 1352
1355 1353 super(seekableunbundlepart, self).__init__(ui, header, fp)
1356 1354
1357 1355 def _payloadchunks(self, chunknum=0):
1358 1356 '''seek to specified chunk and start yielding data'''
1359 1357 if len(self._chunkindex) == 0:
1360 1358 assert chunknum == 0, 'Must start with chunk 0'
1361 1359 self._chunkindex.append((0, self._tellfp()))
1362 1360 else:
1363 1361 assert chunknum < len(self._chunkindex), \
1364 1362 'Unknown chunk %d' % chunknum
1365 1363 self._seekfp(self._chunkindex[chunknum][1])
1366 1364
1367 1365 pos = self._chunkindex[chunknum][0]
1368 1366
1369 1367 for chunk in decodepayloadchunks(self.ui, self._fp):
1370 1368 chunknum += 1
1371 1369 pos += len(chunk)
1372 1370 if chunknum == len(self._chunkindex):
1373 1371 self._chunkindex.append((pos, self._tellfp()))
1374 1372
1375 1373 yield chunk
1376 1374
1377 1375 def _findchunk(self, pos):
1378 1376 '''for a given payload position, return a chunk number and offset'''
1379 1377 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1380 1378 if ppos == pos:
1381 1379 return chunk, 0
1382 1380 elif ppos > pos:
1383 1381 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1384 1382 raise ValueError('Unknown chunk')
1385 1383
1386 1384 def tell(self):
1387 1385 return self._pos
1388 1386
1389 1387 def seek(self, offset, whence=os.SEEK_SET):
1390 1388 if whence == os.SEEK_SET:
1391 1389 newpos = offset
1392 1390 elif whence == os.SEEK_CUR:
1393 1391 newpos = self._pos + offset
1394 1392 elif whence == os.SEEK_END:
1395 1393 if not self.consumed:
1396 1394 self.read()
1397 1395 newpos = self._chunkindex[-1][0] - offset
1398 1396 else:
1399 1397 raise ValueError('Unknown whence value: %r' % (whence,))
1400 1398
1401 1399 if newpos > self._chunkindex[-1][0] and not self.consumed:
1402 1400 self.read()
1403 1401 if not 0 <= newpos <= self._chunkindex[-1][0]:
1404 1402 raise ValueError('Offset out of range')
1405 1403
1406 1404 if self._pos != newpos:
1407 1405 chunk, internaloffset = self._findchunk(newpos)
1408 1406 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1409 1407 adjust = self.read(internaloffset)
1410 1408 if len(adjust) != internaloffset:
1411 1409 raise error.Abort(_('Seek failed\n'))
1412 1410 self._pos = newpos
1413 1411
1414 1412 def _seekfp(self, offset, whence=0):
1415 1413 """move the underlying file pointer
1416 1414
1417 1415 This method is meant for internal usage by the bundle2 protocol only.
1418 1416 They directly manipulate the low level stream including bundle2 level
1419 1417 instruction.
1420 1418
1421 1419 Do not use it to implement higher-level logic or methods."""
1422 1420 if self._seekable:
1423 1421 return self._fp.seek(offset, whence)
1424 1422 else:
1425 1423 raise NotImplementedError(_('File pointer is not seekable'))
1426 1424
1427 1425 def _tellfp(self):
1428 1426 """return the file offset, or None if file is not seekable
1429 1427
1430 1428 This method is meant for internal usage by the bundle2 protocol only.
1431 1429 They directly manipulate the low level stream including bundle2 level
1432 1430 instruction.
1433 1431
1434 1432 Do not use it to implement higher-level logic or methods."""
1435 1433 if self._seekable:
1436 1434 try:
1437 1435 return self._fp.tell()
1438 1436 except IOError as e:
1439 1437 if e.errno == errno.ESPIPE:
1440 1438 self._seekable = False
1441 1439 else:
1442 1440 raise
1443 1441 return None
1444 1442
1445 1443 # These are only the static capabilities.
1446 1444 # Check the 'getrepocaps' function for the rest.
1447 1445 capabilities = {'HG20': (),
1448 1446 'error': ('abort', 'unsupportedcontent', 'pushraced',
1449 1447 'pushkey'),
1450 1448 'listkeys': (),
1451 1449 'pushkey': (),
1452 1450 'digests': tuple(sorted(util.DIGESTS.keys())),
1453 1451 'remote-changegroup': ('http', 'https'),
1454 1452 'hgtagsfnodes': (),
1455 1453 'phases': ('heads',),
1456 1454 }
1457 1455
1458 1456 def getrepocaps(repo, allowpushback=False):
1459 1457 """return the bundle2 capabilities for a given repo
1460 1458
1461 1459 Exists to allow extensions (like evolution) to mutate the capabilities.
1462 1460 """
1463 1461 caps = capabilities.copy()
1464 1462 caps['changegroup'] = tuple(sorted(
1465 1463 changegroup.supportedincomingversions(repo)))
1466 1464 if obsolete.isenabled(repo, obsolete.exchangeopt):
1467 1465 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1468 1466 caps['obsmarkers'] = supportedformat
1469 1467 if allowpushback:
1470 1468 caps['pushback'] = ()
1471 1469 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1472 1470 if cpmode == 'check-related':
1473 1471 caps['checkheads'] = ('related',)
1474 1472 if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
1475 1473 caps.pop('phases')
1476 1474 return caps
1477 1475
1478 1476 def bundle2caps(remote):
1479 1477 """return the bundle capabilities of a peer as dict"""
1480 1478 raw = remote.capable('bundle2')
1481 1479 if not raw and raw != '':
1482 1480 return {}
1483 1481 capsblob = urlreq.unquote(remote.capable('bundle2'))
1484 1482 return decodecaps(capsblob)
1485 1483
1486 1484 def obsmarkersversion(caps):
1487 1485 """extract the list of supported obsmarkers versions from a bundle2caps dict
1488 1486 """
1489 1487 obscaps = caps.get('obsmarkers', ())
1490 1488 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1491 1489
1492 1490 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1493 1491 vfs=None, compression=None, compopts=None):
1494 1492 if bundletype.startswith('HG10'):
1495 1493 cg = changegroup.makechangegroup(repo, outgoing, '01', source)
1496 1494 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1497 1495 compression=compression, compopts=compopts)
1498 1496 elif not bundletype.startswith('HG20'):
1499 1497 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1500 1498
1501 1499 caps = {}
1502 1500 if 'obsolescence' in opts:
1503 1501 caps['obsmarkers'] = ('V1',)
1504 1502 bundle = bundle20(ui, caps)
1505 1503 bundle.setcompression(compression, compopts)
1506 1504 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1507 1505 chunkiter = bundle.getchunks()
1508 1506
1509 1507 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1510 1508
1511 1509 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1512 1510 # We should eventually reconcile this logic with the one behind
1513 1511 # 'exchange.getbundle2partsgenerator'.
1514 1512 #
1515 1513 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1516 1514 # different right now. So we keep them separated for now for the sake of
1517 1515 # simplicity.
1518 1516
1519 1517 # we always want a changegroup in such bundle
1520 1518 cgversion = opts.get('cg.version')
1521 1519 if cgversion is None:
1522 1520 cgversion = changegroup.safeversion(repo)
1523 1521 cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
1524 1522 part = bundler.newpart('changegroup', data=cg.getchunks())
1525 1523 part.addparam('version', cg.version)
1526 1524 if 'clcount' in cg.extras:
1527 1525 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1528 1526 mandatory=False)
1529 1527 if opts.get('phases') and repo.revs('%ln and secret()',
1530 1528 outgoing.missingheads):
1531 1529 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1532 1530
1533 1531 addparttagsfnodescache(repo, bundler, outgoing)
1534 1532
1535 1533 if opts.get('obsolescence', False):
1536 1534 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1537 1535 buildobsmarkerspart(bundler, obsmarkers)
1538 1536
1539 1537 if opts.get('phases', False):
1540 1538 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1541 1539 phasedata = phases.binaryencode(headsbyphase)
1542 1540 bundler.newpart('phase-heads', data=phasedata)
1543 1541
1544 1542 def addparttagsfnodescache(repo, bundler, outgoing):
1545 1543 # we include the tags fnode cache for the bundle changeset
1546 1544 # (as an optional parts)
1547 1545 cache = tags.hgtagsfnodescache(repo.unfiltered())
1548 1546 chunks = []
1549 1547
1550 1548 # .hgtags fnodes are only relevant for head changesets. While we could
1551 1549 # transfer values for all known nodes, there will likely be little to
1552 1550 # no benefit.
1553 1551 #
1554 1552 # We don't bother using a generator to produce output data because
1555 1553 # a) we only have 40 bytes per head and even esoteric numbers of heads
1556 1554 # consume little memory (1M heads is 40MB) b) we don't want to send the
1557 1555 # part if we don't have entries and knowing if we have entries requires
1558 1556 # cache lookups.
1559 1557 for node in outgoing.missingheads:
1560 1558 # Don't compute missing, as this may slow down serving.
1561 1559 fnode = cache.getfnode(node, computemissing=False)
1562 1560 if fnode is not None:
1563 1561 chunks.extend([node, fnode])
1564 1562
1565 1563 if chunks:
1566 1564 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1567 1565
1568 1566 def buildobsmarkerspart(bundler, markers):
1569 1567 """add an obsmarker part to the bundler with <markers>
1570 1568
1571 1569 No part is created if markers is empty.
1572 1570 Raises ValueError if the bundler doesn't support any known obsmarker format.
1573 1571 """
1574 1572 if not markers:
1575 1573 return None
1576 1574
1577 1575 remoteversions = obsmarkersversion(bundler.capabilities)
1578 1576 version = obsolete.commonversion(remoteversions)
1579 1577 if version is None:
1580 1578 raise ValueError('bundler does not support common obsmarker format')
1581 1579 stream = obsolete.encodemarkers(markers, True, version=version)
1582 1580 return bundler.newpart('obsmarkers', data=stream)
1583 1581
1584 1582 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1585 1583 compopts=None):
1586 1584 """Write a bundle file and return its filename.
1587 1585
1588 1586 Existing files will not be overwritten.
1589 1587 If no filename is specified, a temporary file is created.
1590 1588 bz2 compression can be turned off.
1591 1589 The bundle file will be deleted in case of errors.
1592 1590 """
1593 1591
1594 1592 if bundletype == "HG20":
1595 1593 bundle = bundle20(ui)
1596 1594 bundle.setcompression(compression, compopts)
1597 1595 part = bundle.newpart('changegroup', data=cg.getchunks())
1598 1596 part.addparam('version', cg.version)
1599 1597 if 'clcount' in cg.extras:
1600 1598 part.addparam('nbchanges', '%d' % cg.extras['clcount'],
1601 1599 mandatory=False)
1602 1600 chunkiter = bundle.getchunks()
1603 1601 else:
1604 1602 # compression argument is only for the bundle2 case
1605 1603 assert compression is None
1606 1604 if cg.version != '01':
1607 1605 raise error.Abort(_('old bundle types only supports v1 '
1608 1606 'changegroups'))
1609 1607 header, comp = bundletypes[bundletype]
1610 1608 if comp not in util.compengines.supportedbundletypes:
1611 1609 raise error.Abort(_('unknown stream compression type: %s')
1612 1610 % comp)
1613 1611 compengine = util.compengines.forbundletype(comp)
1614 1612 def chunkiter():
1615 1613 yield header
1616 1614 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1617 1615 yield chunk
1618 1616 chunkiter = chunkiter()
1619 1617
1620 1618 # parse the changegroup data, otherwise we will block
1621 1619 # in case of sshrepo because we don't know the end of the stream
1622 1620 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1623 1621
1624 1622 def combinechangegroupresults(op):
1625 1623 """logic to combine 0 or more addchangegroup results into one"""
1626 1624 results = [r.get('return', 0)
1627 1625 for r in op.records['changegroup']]
1628 1626 changedheads = 0
1629 1627 result = 1
1630 1628 for ret in results:
1631 1629 # If any changegroup result is 0, return 0
1632 1630 if ret == 0:
1633 1631 result = 0
1634 1632 break
1635 1633 if ret < -1:
1636 1634 changedheads += ret + 1
1637 1635 elif ret > 1:
1638 1636 changedheads += ret - 1
1639 1637 if changedheads > 0:
1640 1638 result = 1 + changedheads
1641 1639 elif changedheads < 0:
1642 1640 result = -1 + changedheads
1643 1641 return result
1644 1642
1645 1643 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1646 1644 'targetphase'))
1647 1645 def handlechangegroup(op, inpart):
1648 1646 """apply a changegroup part on the repo
1649 1647
1650 1648 This is a very early implementation that will massive rework before being
1651 1649 inflicted to any end-user.
1652 1650 """
1653 1651 tr = op.gettransaction()
1654 1652 unpackerversion = inpart.params.get('version', '01')
1655 1653 # We should raise an appropriate exception here
1656 1654 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1657 1655 # the source and url passed here are overwritten by the one contained in
1658 1656 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1659 1657 nbchangesets = None
1660 1658 if 'nbchanges' in inpart.params:
1661 1659 nbchangesets = int(inpart.params.get('nbchanges'))
1662 1660 if ('treemanifest' in inpart.params and
1663 1661 'treemanifest' not in op.repo.requirements):
1664 1662 if len(op.repo.changelog) != 0:
1665 1663 raise error.Abort(_(
1666 1664 "bundle contains tree manifests, but local repo is "
1667 1665 "non-empty and does not use tree manifests"))
1668 1666 op.repo.requirements.add('treemanifest')
1669 1667 op.repo._applyopenerreqs()
1670 1668 op.repo._writerequirements()
1671 1669 extrakwargs = {}
1672 1670 targetphase = inpart.params.get('targetphase')
1673 1671 if targetphase is not None:
1674 1672 extrakwargs['targetphase'] = int(targetphase)
1675 1673 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1676 1674 expectedtotal=nbchangesets, **extrakwargs)
1677 1675 if op.reply is not None:
1678 1676 # This is definitely not the final form of this
1679 1677 # return. But one need to start somewhere.
1680 1678 part = op.reply.newpart('reply:changegroup', mandatory=False)
1681 1679 part.addparam(
1682 1680 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1683 1681 part.addparam('return', '%i' % ret, mandatory=False)
1684 1682 assert not inpart.read()
1685 1683
1686 1684 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1687 1685 ['digest:%s' % k for k in util.DIGESTS.keys()])
1688 1686 @parthandler('remote-changegroup', _remotechangegroupparams)
1689 1687 def handleremotechangegroup(op, inpart):
1690 1688 """apply a bundle10 on the repo, given an url and validation information
1691 1689
1692 1690 All the information about the remote bundle to import are given as
1693 1691 parameters. The parameters include:
1694 1692 - url: the url to the bundle10.
1695 1693 - size: the bundle10 file size. It is used to validate what was
1696 1694 retrieved by the client matches the server knowledge about the bundle.
1697 1695 - digests: a space separated list of the digest types provided as
1698 1696 parameters.
1699 1697 - digest:<digest-type>: the hexadecimal representation of the digest with
1700 1698 that name. Like the size, it is used to validate what was retrieved by
1701 1699 the client matches what the server knows about the bundle.
1702 1700
1703 1701 When multiple digest types are given, all of them are checked.
1704 1702 """
1705 1703 try:
1706 1704 raw_url = inpart.params['url']
1707 1705 except KeyError:
1708 1706 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1709 1707 parsed_url = util.url(raw_url)
1710 1708 if parsed_url.scheme not in capabilities['remote-changegroup']:
1711 1709 raise error.Abort(_('remote-changegroup does not support %s urls') %
1712 1710 parsed_url.scheme)
1713 1711
1714 1712 try:
1715 1713 size = int(inpart.params['size'])
1716 1714 except ValueError:
1717 1715 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1718 1716 % 'size')
1719 1717 except KeyError:
1720 1718 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1721 1719
1722 1720 digests = {}
1723 1721 for typ in inpart.params.get('digests', '').split():
1724 1722 param = 'digest:%s' % typ
1725 1723 try:
1726 1724 value = inpart.params[param]
1727 1725 except KeyError:
1728 1726 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1729 1727 param)
1730 1728 digests[typ] = value
1731 1729
1732 1730 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1733 1731
1734 1732 tr = op.gettransaction()
1735 1733 from . import exchange
1736 1734 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1737 1735 if not isinstance(cg, changegroup.cg1unpacker):
1738 1736 raise error.Abort(_('%s: not a bundle version 1.0') %
1739 1737 util.hidepassword(raw_url))
1740 1738 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1741 1739 if op.reply is not None:
1742 1740 # This is definitely not the final form of this
1743 1741 # return. But one need to start somewhere.
1744 1742 part = op.reply.newpart('reply:changegroup')
1745 1743 part.addparam(
1746 1744 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1747 1745 part.addparam('return', '%i' % ret, mandatory=False)
1748 1746 try:
1749 1747 real_part.validate()
1750 1748 except error.Abort as e:
1751 1749 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1752 1750 (util.hidepassword(raw_url), str(e)))
1753 1751 assert not inpart.read()
1754 1752
1755 1753 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1756 1754 def handlereplychangegroup(op, inpart):
1757 1755 ret = int(inpart.params['return'])
1758 1756 replyto = int(inpart.params['in-reply-to'])
1759 1757 op.records.add('changegroup', {'return': ret}, replyto)
1760 1758
1761 1759 @parthandler('check:heads')
1762 1760 def handlecheckheads(op, inpart):
1763 1761 """check that head of the repo did not change
1764 1762
1765 1763 This is used to detect a push race when using unbundle.
1766 1764 This replaces the "heads" argument of unbundle."""
1767 1765 h = inpart.read(20)
1768 1766 heads = []
1769 1767 while len(h) == 20:
1770 1768 heads.append(h)
1771 1769 h = inpart.read(20)
1772 1770 assert not h
1773 1771 # Trigger a transaction so that we are guaranteed to have the lock now.
1774 1772 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1775 1773 op.gettransaction()
1776 1774 if sorted(heads) != sorted(op.repo.heads()):
1777 1775 raise error.PushRaced('repository changed while pushing - '
1778 1776 'please try again')
1779 1777
1780 1778 @parthandler('check:updated-heads')
1781 1779 def handlecheckupdatedheads(op, inpart):
1782 1780 """check for race on the heads touched by a push
1783 1781
1784 1782 This is similar to 'check:heads' but focus on the heads actually updated
1785 1783 during the push. If other activities happen on unrelated heads, it is
1786 1784 ignored.
1787 1785
1788 1786 This allow server with high traffic to avoid push contention as long as
1789 1787 unrelated parts of the graph are involved."""
1790 1788 h = inpart.read(20)
1791 1789 heads = []
1792 1790 while len(h) == 20:
1793 1791 heads.append(h)
1794 1792 h = inpart.read(20)
1795 1793 assert not h
1796 1794 # trigger a transaction so that we are guaranteed to have the lock now.
1797 1795 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1798 1796 op.gettransaction()
1799 1797
1800 1798 currentheads = set()
1801 1799 for ls in op.repo.branchmap().itervalues():
1802 1800 currentheads.update(ls)
1803 1801
1804 1802 for h in heads:
1805 1803 if h not in currentheads:
1806 1804 raise error.PushRaced('repository changed while pushing - '
1807 1805 'please try again')
1808 1806
1809 1807 @parthandler('check:phases')
1810 1808 def handlecheckphases(op, inpart):
1811 1809 """check that phase boundaries of the repository did not change
1812 1810
1813 1811 This is used to detect a push race.
1814 1812 """
1815 1813 phasetonodes = phases.binarydecode(inpart)
1816 1814 unfi = op.repo.unfiltered()
1817 1815 cl = unfi.changelog
1818 1816 phasecache = unfi._phasecache
1819 1817 msg = ('repository changed while pushing - please try again '
1820 1818 '(%s is %s expected %s)')
1821 1819 for expectedphase, nodes in enumerate(phasetonodes):
1822 1820 for n in nodes:
1823 1821 actualphase = phasecache.phase(unfi, cl.rev(n))
1824 1822 if actualphase != expectedphase:
1825 1823 finalmsg = msg % (nodemod.short(n),
1826 1824 phases.phasenames[actualphase],
1827 1825 phases.phasenames[expectedphase])
1828 1826 raise error.PushRaced(finalmsg)
1829 1827
1830 1828 @parthandler('output')
1831 1829 def handleoutput(op, inpart):
1832 1830 """forward output captured on the server to the client"""
1833 1831 for line in inpart.read().splitlines():
1834 1832 op.ui.status(_('remote: %s\n') % line)
1835 1833
1836 1834 @parthandler('replycaps')
1837 1835 def handlereplycaps(op, inpart):
1838 1836 """Notify that a reply bundle should be created
1839 1837
1840 1838 The payload contains the capabilities information for the reply"""
1841 1839 caps = decodecaps(inpart.read())
1842 1840 if op.reply is None:
1843 1841 op.reply = bundle20(op.ui, caps)
1844 1842
1845 1843 class AbortFromPart(error.Abort):
1846 1844 """Sub-class of Abort that denotes an error from a bundle2 part."""
1847 1845
1848 1846 @parthandler('error:abort', ('message', 'hint'))
1849 1847 def handleerrorabort(op, inpart):
1850 1848 """Used to transmit abort error over the wire"""
1851 1849 raise AbortFromPart(inpart.params['message'],
1852 1850 hint=inpart.params.get('hint'))
1853 1851
1854 1852 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1855 1853 'in-reply-to'))
1856 1854 def handleerrorpushkey(op, inpart):
1857 1855 """Used to transmit failure of a mandatory pushkey over the wire"""
1858 1856 kwargs = {}
1859 1857 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1860 1858 value = inpart.params.get(name)
1861 1859 if value is not None:
1862 1860 kwargs[name] = value
1863 1861 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1864 1862
1865 1863 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1866 1864 def handleerrorunsupportedcontent(op, inpart):
1867 1865 """Used to transmit unknown content error over the wire"""
1868 1866 kwargs = {}
1869 1867 parttype = inpart.params.get('parttype')
1870 1868 if parttype is not None:
1871 1869 kwargs['parttype'] = parttype
1872 1870 params = inpart.params.get('params')
1873 1871 if params is not None:
1874 1872 kwargs['params'] = params.split('\0')
1875 1873
1876 1874 raise error.BundleUnknownFeatureError(**kwargs)
1877 1875
1878 1876 @parthandler('error:pushraced', ('message',))
1879 1877 def handleerrorpushraced(op, inpart):
1880 1878 """Used to transmit push race error over the wire"""
1881 1879 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1882 1880
1883 1881 @parthandler('listkeys', ('namespace',))
1884 1882 def handlelistkeys(op, inpart):
1885 1883 """retrieve pushkey namespace content stored in a bundle2"""
1886 1884 namespace = inpart.params['namespace']
1887 1885 r = pushkey.decodekeys(inpart.read())
1888 1886 op.records.add('listkeys', (namespace, r))
1889 1887
1890 1888 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1891 1889 def handlepushkey(op, inpart):
1892 1890 """process a pushkey request"""
1893 1891 dec = pushkey.decode
1894 1892 namespace = dec(inpart.params['namespace'])
1895 1893 key = dec(inpart.params['key'])
1896 1894 old = dec(inpart.params['old'])
1897 1895 new = dec(inpart.params['new'])
1898 1896 # Grab the transaction to ensure that we have the lock before performing the
1899 1897 # pushkey.
1900 1898 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1901 1899 op.gettransaction()
1902 1900 ret = op.repo.pushkey(namespace, key, old, new)
1903 1901 record = {'namespace': namespace,
1904 1902 'key': key,
1905 1903 'old': old,
1906 1904 'new': new}
1907 1905 op.records.add('pushkey', record)
1908 1906 if op.reply is not None:
1909 1907 rpart = op.reply.newpart('reply:pushkey')
1910 1908 rpart.addparam(
1911 1909 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1912 1910 rpart.addparam('return', '%i' % ret, mandatory=False)
1913 1911 if inpart.mandatory and not ret:
1914 1912 kwargs = {}
1915 1913 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1916 1914 if key in inpart.params:
1917 1915 kwargs[key] = inpart.params[key]
1918 1916 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1919 1917
1920 1918 @parthandler('phase-heads')
1921 1919 def handlephases(op, inpart):
1922 1920 """apply phases from bundle part to repo"""
1923 1921 headsbyphase = phases.binarydecode(inpart)
1924 1922 phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
1925 1923
1926 1924 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1927 1925 def handlepushkeyreply(op, inpart):
1928 1926 """retrieve the result of a pushkey request"""
1929 1927 ret = int(inpart.params['return'])
1930 1928 partid = int(inpart.params['in-reply-to'])
1931 1929 op.records.add('pushkey', {'return': ret}, partid)
1932 1930
1933 1931 @parthandler('obsmarkers')
1934 1932 def handleobsmarker(op, inpart):
1935 1933 """add a stream of obsmarkers to the repo"""
1936 1934 tr = op.gettransaction()
1937 1935 markerdata = inpart.read()
1938 1936 if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
1939 1937 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1940 1938 % len(markerdata))
1941 1939 # The mergemarkers call will crash if marker creation is not enabled.
1942 1940 # we want to avoid this if the part is advisory.
1943 1941 if not inpart.mandatory and op.repo.obsstore.readonly:
1944 1942 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1945 1943 return
1946 1944 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1947 1945 op.repo.invalidatevolatilesets()
1948 1946 if new:
1949 1947 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1950 1948 op.records.add('obsmarkers', {'new': new})
1951 1949 if op.reply is not None:
1952 1950 rpart = op.reply.newpart('reply:obsmarkers')
1953 1951 rpart.addparam(
1954 1952 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
1955 1953 rpart.addparam('new', '%i' % new, mandatory=False)
1956 1954
1957 1955
1958 1956 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1959 1957 def handleobsmarkerreply(op, inpart):
1960 1958 """retrieve the result of a pushkey request"""
1961 1959 ret = int(inpart.params['new'])
1962 1960 partid = int(inpart.params['in-reply-to'])
1963 1961 op.records.add('obsmarkers', {'new': ret}, partid)
1964 1962
1965 1963 @parthandler('hgtagsfnodes')
1966 1964 def handlehgtagsfnodes(op, inpart):
1967 1965 """Applies .hgtags fnodes cache entries to the local repo.
1968 1966
1969 1967 Payload is pairs of 20 byte changeset nodes and filenodes.
1970 1968 """
1971 1969 # Grab the transaction so we ensure that we have the lock at this point.
1972 1970 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1973 1971 op.gettransaction()
1974 1972 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1975 1973
1976 1974 count = 0
1977 1975 while True:
1978 1976 node = inpart.read(20)
1979 1977 fnode = inpart.read(20)
1980 1978 if len(node) < 20 or len(fnode) < 20:
1981 1979 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1982 1980 break
1983 1981 cache.setfnode(node, fnode)
1984 1982 count += 1
1985 1983
1986 1984 cache.write()
1987 1985 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
1988 1986
1989 1987 @parthandler('pushvars')
1990 1988 def bundle2getvars(op, part):
1991 1989 '''unbundle a bundle2 containing shellvars on the server'''
1992 1990 # An option to disable unbundling on server-side for security reasons
1993 1991 if op.ui.configbool('push', 'pushvars.server'):
1994 1992 hookargs = {}
1995 1993 for key, value in part.advisoryparams:
1996 1994 key = key.upper()
1997 1995 # We want pushed variables to have USERVAR_ prepended so we know
1998 1996 # they came from the --pushvar flag.
1999 1997 key = "USERVAR_" + key
2000 1998 hookargs[key] = value
2001 1999 op.addhookargs(hookargs)
@@ -1,590 +1,598
1 1 # bundlerepo.py - repository class for viewing uncompressed bundles
2 2 #
3 3 # Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """Repository class for viewing uncompressed bundles.
9 9
10 10 This provides a read-only repository interface to bundles as if they
11 11 were part of the actual repository.
12 12 """
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import os
17 17 import shutil
18 18 import tempfile
19 19
20 20 from .i18n import _
21 21 from .node import nullid
22 22
23 23 from . import (
24 24 bundle2,
25 25 changegroup,
26 26 changelog,
27 27 cmdutil,
28 28 discovery,
29 29 error,
30 30 exchange,
31 31 filelog,
32 32 localrepo,
33 33 manifest,
34 34 mdiff,
35 35 node as nodemod,
36 36 pathutil,
37 37 phases,
38 38 pycompat,
39 39 revlog,
40 40 util,
41 41 vfs as vfsmod,
42 42 )
43 43
44 44 class bundlerevlog(revlog.revlog):
45 45 def __init__(self, opener, indexfile, cgunpacker, linkmapper):
46 46 # How it works:
47 47 # To retrieve a revision, we need to know the offset of the revision in
48 48 # the bundle (an unbundle object). We store this offset in the index
49 49 # (start). The base of the delta is stored in the base field.
50 50 #
51 51 # To differentiate a rev in the bundle from a rev in the revlog, we
52 52 # check revision against repotiprev.
53 53 opener = vfsmod.readonlyvfs(opener)
54 54 revlog.revlog.__init__(self, opener, indexfile)
55 55 self.bundle = cgunpacker
56 56 n = len(self)
57 57 self.repotiprev = n - 1
58 58 self.bundlerevs = set() # used by 'bundle()' revset expression
59 59 for deltadata in cgunpacker.deltaiter():
60 60 node, p1, p2, cs, deltabase, delta, flags = deltadata
61 61
62 62 size = len(delta)
63 63 start = cgunpacker.tell() - size
64 64
65 65 link = linkmapper(cs)
66 66 if node in self.nodemap:
67 67 # this can happen if two branches make the same change
68 68 self.bundlerevs.add(self.nodemap[node])
69 69 continue
70 70
71 71 for p in (p1, p2):
72 72 if p not in self.nodemap:
73 73 raise error.LookupError(p, self.indexfile,
74 74 _("unknown parent"))
75 75
76 76 if deltabase not in self.nodemap:
77 77 raise LookupError(deltabase, self.indexfile,
78 78 _('unknown delta base'))
79 79
80 80 baserev = self.rev(deltabase)
81 81 # start, size, full unc. size, base (unused), link, p1, p2, node
82 82 e = (revlog.offset_type(start, flags), size, -1, baserev, link,
83 83 self.rev(p1), self.rev(p2), node)
84 84 self.index.insert(-1, e)
85 85 self.nodemap[node] = n
86 86 self.bundlerevs.add(n)
87 87 n += 1
88 88
89 89 def _chunk(self, rev, df=None):
90 90 # Warning: in case of bundle, the diff is against what we stored as
91 91 # delta base, not against rev - 1
92 92 # XXX: could use some caching
93 93 if rev <= self.repotiprev:
94 94 return revlog.revlog._chunk(self, rev)
95 95 self.bundle.seek(self.start(rev))
96 96 return self.bundle.read(self.length(rev))
97 97
98 98 def revdiff(self, rev1, rev2):
99 99 """return or calculate a delta between two revisions"""
100 100 if rev1 > self.repotiprev and rev2 > self.repotiprev:
101 101 # hot path for bundle
102 102 revb = self.index[rev2][3]
103 103 if revb == rev1:
104 104 return self._chunk(rev2)
105 105 elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
106 106 return revlog.revlog.revdiff(self, rev1, rev2)
107 107
108 108 return mdiff.textdiff(self.revision(rev1, raw=True),
109 109 self.revision(rev2, raw=True))
110 110
111 111 def revision(self, nodeorrev, _df=None, raw=False):
112 112 """return an uncompressed revision of a given node or revision
113 113 number.
114 114 """
115 115 if isinstance(nodeorrev, int):
116 116 rev = nodeorrev
117 117 node = self.node(rev)
118 118 else:
119 119 node = nodeorrev
120 120 rev = self.rev(node)
121 121
122 122 if node == nullid:
123 123 return ""
124 124
125 125 rawtext = None
126 126 chain = []
127 127 iterrev = rev
128 128 # reconstruct the revision if it is from a changegroup
129 129 while iterrev > self.repotiprev:
130 130 if self._cache and self._cache[1] == iterrev:
131 131 rawtext = self._cache[2]
132 132 break
133 133 chain.append(iterrev)
134 134 iterrev = self.index[iterrev][3]
135 135 if rawtext is None:
136 136 rawtext = self.baserevision(iterrev)
137 137
138 138 while chain:
139 139 delta = self._chunk(chain.pop())
140 140 rawtext = mdiff.patches(rawtext, [delta])
141 141
142 142 text, validatehash = self._processflags(rawtext, self.flags(rev),
143 143 'read', raw=raw)
144 144 if validatehash:
145 145 self.checkhash(text, node, rev=rev)
146 146 self._cache = (node, rev, rawtext)
147 147 return text
148 148
149 149 def baserevision(self, nodeorrev):
150 150 # Revlog subclasses may override 'revision' method to modify format of
151 151 # content retrieved from revlog. To use bundlerevlog with such class one
152 152 # needs to override 'baserevision' and make more specific call here.
153 153 return revlog.revlog.revision(self, nodeorrev, raw=True)
154 154
155 155 def addrevision(self, *args, **kwargs):
156 156 raise NotImplementedError
157 157
158 158 def addgroup(self, *args, **kwargs):
159 159 raise NotImplementedError
160 160
161 161 def strip(self, *args, **kwargs):
162 162 raise NotImplementedError
163 163
164 164 def checksize(self):
165 165 raise NotImplementedError
166 166
167 167 class bundlechangelog(bundlerevlog, changelog.changelog):
168 168 def __init__(self, opener, cgunpacker):
169 169 changelog.changelog.__init__(self, opener)
170 170 linkmapper = lambda x: x
171 171 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
172 172 linkmapper)
173 173
174 174 def baserevision(self, nodeorrev):
175 175 # Although changelog doesn't override 'revision' method, some extensions
176 176 # may replace this class with another that does. Same story with
177 177 # manifest and filelog classes.
178 178
179 179 # This bypasses filtering on changelog.node() and rev() because we need
180 180 # revision text of the bundle base even if it is hidden.
181 181 oldfilter = self.filteredrevs
182 182 try:
183 183 self.filteredrevs = ()
184 184 return changelog.changelog.revision(self, nodeorrev, raw=True)
185 185 finally:
186 186 self.filteredrevs = oldfilter
187 187
188 188 class bundlemanifest(bundlerevlog, manifest.manifestrevlog):
189 189 def __init__(self, opener, cgunpacker, linkmapper, dirlogstarts=None,
190 190 dir=''):
191 191 manifest.manifestrevlog.__init__(self, opener, dir=dir)
192 192 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
193 193 linkmapper)
194 194 if dirlogstarts is None:
195 195 dirlogstarts = {}
196 196 if self.bundle.version == "03":
197 197 dirlogstarts = _getfilestarts(self.bundle)
198 198 self._dirlogstarts = dirlogstarts
199 199 self._linkmapper = linkmapper
200 200
201 201 def baserevision(self, nodeorrev):
202 202 node = nodeorrev
203 203 if isinstance(node, int):
204 204 node = self.node(node)
205 205
206 206 if node in self.fulltextcache:
207 207 result = '%s' % self.fulltextcache[node]
208 208 else:
209 209 result = manifest.manifestrevlog.revision(self, nodeorrev, raw=True)
210 210 return result
211 211
212 212 def dirlog(self, d):
213 213 if d in self._dirlogstarts:
214 214 self.bundle.seek(self._dirlogstarts[d])
215 215 return bundlemanifest(
216 216 self.opener, self.bundle, self._linkmapper,
217 217 self._dirlogstarts, dir=d)
218 218 return super(bundlemanifest, self).dirlog(d)
219 219
220 220 class bundlefilelog(bundlerevlog, filelog.filelog):
221 221 def __init__(self, opener, path, cgunpacker, linkmapper):
222 222 filelog.filelog.__init__(self, opener, path)
223 223 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
224 224 linkmapper)
225 225
226 226 def baserevision(self, nodeorrev):
227 227 return filelog.filelog.revision(self, nodeorrev, raw=True)
228 228
229 229 class bundlepeer(localrepo.localpeer):
230 230 def canpush(self):
231 231 return False
232 232
233 233 class bundlephasecache(phases.phasecache):
234 234 def __init__(self, *args, **kwargs):
235 235 super(bundlephasecache, self).__init__(*args, **kwargs)
236 236 if util.safehasattr(self, 'opener'):
237 237 self.opener = vfsmod.readonlyvfs(self.opener)
238 238
239 239 def write(self):
240 240 raise NotImplementedError
241 241
242 242 def _write(self, fp):
243 243 raise NotImplementedError
244 244
245 245 def _updateroots(self, phase, newroots, tr):
246 246 self.phaseroots[phase] = newroots
247 247 self.invalidate()
248 248 self.dirty = True
249 249
250 250 def _getfilestarts(cgunpacker):
251 251 filespos = {}
252 252 for chunkdata in iter(cgunpacker.filelogheader, {}):
253 253 fname = chunkdata['filename']
254 254 filespos[fname] = cgunpacker.tell()
255 255 for chunk in iter(lambda: cgunpacker.deltachunk(None), {}):
256 256 pass
257 257 return filespos
258 258
259 259 class bundlerepository(localrepo.localrepository):
260 260 """A repository instance that is a union of a local repo and a bundle.
261 261
262 262 Instances represent a read-only repository composed of a local repository
263 263 with the contents of a bundle file applied. The repository instance is
264 264 conceptually similar to the state of a repository after an
265 265 ``hg unbundle`` operation. However, the contents of the bundle are never
266 266 applied to the actual base repository.
267 267 """
268 268 def __init__(self, ui, repopath, bundlepath):
269 269 self._tempparent = None
270 270 try:
271 271 localrepo.localrepository.__init__(self, ui, repopath)
272 272 except error.RepoError:
273 273 self._tempparent = tempfile.mkdtemp()
274 274 localrepo.instance(ui, self._tempparent, 1)
275 275 localrepo.localrepository.__init__(self, ui, self._tempparent)
276 276 self.ui.setconfig('phases', 'publish', False, 'bundlerepo')
277 277
278 278 if repopath:
279 279 self._url = 'bundle:' + util.expandpath(repopath) + '+' + bundlepath
280 280 else:
281 281 self._url = 'bundle:' + bundlepath
282 282
283 283 self.tempfile = None
284 284 f = util.posixfile(bundlepath, "rb")
285 285 bundle = exchange.readbundle(ui, f, bundlepath)
286 286
287 287 if isinstance(bundle, bundle2.unbundle20):
288 288 self._bundlefile = bundle
289 289 self._cgunpacker = None
290 290
291 hadchangegroup = False
291 cgpart = None
292 292 for part in bundle.iterparts():
293 293 if part.type == 'changegroup':
294 if hadchangegroup:
294 if cgpart:
295 295 raise NotImplementedError("can't process "
296 296 "multiple changegroups")
297 hadchangegroup = True
297 cgpart = part
298 298
299 299 self._handlebundle2part(bundle, part)
300 300
301 if not hadchangegroup:
301 if not cgpart:
302 302 raise error.Abort(_("No changegroups found"))
303
304 # This is required to placate a later consumer, which expects
305 # the payload offset to be at the beginning of the changegroup.
306 # We need to do this after the iterparts() generator advances
307 # because iterparts() will seek to end of payload after the
308 # generator returns control to iterparts().
309 cgpart.seek(0, os.SEEK_SET)
310
303 311 elif isinstance(bundle, changegroup.cg1unpacker):
304 312 if bundle.compressed():
305 313 f = self._writetempbundle(bundle.read, '.hg10un',
306 314 header='HG10UN')
307 315 bundle = exchange.readbundle(ui, f, bundlepath, self.vfs)
308 316
309 317 self._bundlefile = bundle
310 318 self._cgunpacker = bundle
311 319 else:
312 320 raise error.Abort(_('bundle type %s cannot be read') %
313 321 type(bundle))
314 322
315 323 # dict with the mapping 'filename' -> position in the changegroup.
316 324 self._cgfilespos = {}
317 325
318 326 self.firstnewrev = self.changelog.repotiprev + 1
319 327 phases.retractboundary(self, None, phases.draft,
320 328 [ctx.node() for ctx in self[self.firstnewrev:]])
321 329
322 330 def _handlebundle2part(self, bundle, part):
323 331 if part.type != 'changegroup':
324 332 return
325 333
326 334 cgstream = part
327 335 version = part.params.get('version', '01')
328 336 legalcgvers = changegroup.supportedincomingversions(self)
329 337 if version not in legalcgvers:
330 338 msg = _('Unsupported changegroup version: %s')
331 339 raise error.Abort(msg % version)
332 340 if bundle.compressed():
333 341 cgstream = self._writetempbundle(part.read, '.cg%sun' % version)
334 342
335 343 self._cgunpacker = changegroup.getunbundler(version, cgstream, 'UN')
336 344
337 345 def _writetempbundle(self, readfn, suffix, header=''):
338 346 """Write a temporary file to disk
339 347 """
340 348 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
341 349 suffix=suffix)
342 350 self.tempfile = temp
343 351
344 352 with os.fdopen(fdtemp, pycompat.sysstr('wb')) as fptemp:
345 353 fptemp.write(header)
346 354 while True:
347 355 chunk = readfn(2**18)
348 356 if not chunk:
349 357 break
350 358 fptemp.write(chunk)
351 359
352 360 return self.vfs.open(self.tempfile, mode="rb")
353 361
354 362 @localrepo.unfilteredpropertycache
355 363 def _phasecache(self):
356 364 return bundlephasecache(self, self._phasedefaults)
357 365
358 366 @localrepo.unfilteredpropertycache
359 367 def changelog(self):
360 368 # consume the header if it exists
361 369 self._cgunpacker.changelogheader()
362 370 c = bundlechangelog(self.svfs, self._cgunpacker)
363 371 self.manstart = self._cgunpacker.tell()
364 372 return c
365 373
366 374 def _constructmanifest(self):
367 375 self._cgunpacker.seek(self.manstart)
368 376 # consume the header if it exists
369 377 self._cgunpacker.manifestheader()
370 378 linkmapper = self.unfiltered().changelog.rev
371 379 m = bundlemanifest(self.svfs, self._cgunpacker, linkmapper)
372 380 self.filestart = self._cgunpacker.tell()
373 381 return m
374 382
375 383 def _consumemanifest(self):
376 384 """Consumes the manifest portion of the bundle, setting filestart so the
377 385 file portion can be read."""
378 386 self._cgunpacker.seek(self.manstart)
379 387 self._cgunpacker.manifestheader()
380 388 for delta in self._cgunpacker.deltaiter():
381 389 pass
382 390 self.filestart = self._cgunpacker.tell()
383 391
384 392 @localrepo.unfilteredpropertycache
385 393 def manstart(self):
386 394 self.changelog
387 395 return self.manstart
388 396
389 397 @localrepo.unfilteredpropertycache
390 398 def filestart(self):
391 399 self.manifestlog
392 400
393 401 # If filestart was not set by self.manifestlog, that means the
394 402 # manifestlog implementation did not consume the manifests from the
395 403 # changegroup (ex: it might be consuming trees from a separate bundle2
396 404 # part instead). So we need to manually consume it.
397 405 if 'filestart' not in self.__dict__:
398 406 self._consumemanifest()
399 407
400 408 return self.filestart
401 409
402 410 def url(self):
403 411 return self._url
404 412
405 413 def file(self, f):
406 414 if not self._cgfilespos:
407 415 self._cgunpacker.seek(self.filestart)
408 416 self._cgfilespos = _getfilestarts(self._cgunpacker)
409 417
410 418 if f in self._cgfilespos:
411 419 self._cgunpacker.seek(self._cgfilespos[f])
412 420 linkmapper = self.unfiltered().changelog.rev
413 421 return bundlefilelog(self.svfs, f, self._cgunpacker, linkmapper)
414 422 else:
415 423 return filelog.filelog(self.svfs, f)
416 424
417 425 def close(self):
418 426 """Close assigned bundle file immediately."""
419 427 self._bundlefile.close()
420 428 if self.tempfile is not None:
421 429 self.vfs.unlink(self.tempfile)
422 430 if self._tempparent:
423 431 shutil.rmtree(self._tempparent, True)
424 432
425 433 def cancopy(self):
426 434 return False
427 435
428 436 def peer(self):
429 437 return bundlepeer(self)
430 438
431 439 def getcwd(self):
432 440 return pycompat.getcwd() # always outside the repo
433 441
434 442 # Check if parents exist in localrepo before setting
435 443 def setparents(self, p1, p2=nullid):
436 444 p1rev = self.changelog.rev(p1)
437 445 p2rev = self.changelog.rev(p2)
438 446 msg = _("setting parent to node %s that only exists in the bundle\n")
439 447 if self.changelog.repotiprev < p1rev:
440 448 self.ui.warn(msg % nodemod.hex(p1))
441 449 if self.changelog.repotiprev < p2rev:
442 450 self.ui.warn(msg % nodemod.hex(p2))
443 451 return super(bundlerepository, self).setparents(p1, p2)
444 452
445 453 def instance(ui, path, create):
446 454 if create:
447 455 raise error.Abort(_('cannot create new bundle repository'))
448 456 # internal config: bundle.mainreporoot
449 457 parentpath = ui.config("bundle", "mainreporoot")
450 458 if not parentpath:
451 459 # try to find the correct path to the working directory repo
452 460 parentpath = cmdutil.findrepo(pycompat.getcwd())
453 461 if parentpath is None:
454 462 parentpath = ''
455 463 if parentpath:
456 464 # Try to make the full path relative so we get a nice, short URL.
457 465 # In particular, we don't want temp dir names in test outputs.
458 466 cwd = pycompat.getcwd()
459 467 if parentpath == cwd:
460 468 parentpath = ''
461 469 else:
462 470 cwd = pathutil.normasprefix(cwd)
463 471 if parentpath.startswith(cwd):
464 472 parentpath = parentpath[len(cwd):]
465 473 u = util.url(path)
466 474 path = u.localpath()
467 475 if u.scheme == 'bundle':
468 476 s = path.split("+", 1)
469 477 if len(s) == 1:
470 478 repopath, bundlename = parentpath, s[0]
471 479 else:
472 480 repopath, bundlename = s
473 481 else:
474 482 repopath, bundlename = parentpath, path
475 483 return bundlerepository(ui, repopath, bundlename)
476 484
477 485 class bundletransactionmanager(object):
478 486 def transaction(self):
479 487 return None
480 488
481 489 def close(self):
482 490 raise NotImplementedError
483 491
484 492 def release(self):
485 493 raise NotImplementedError
486 494
487 495 def getremotechanges(ui, repo, other, onlyheads=None, bundlename=None,
488 496 force=False):
489 497 '''obtains a bundle of changes incoming from other
490 498
491 499 "onlyheads" restricts the returned changes to those reachable from the
492 500 specified heads.
493 501 "bundlename", if given, stores the bundle to this file path permanently;
494 502 otherwise it's stored to a temp file and gets deleted again when you call
495 503 the returned "cleanupfn".
496 504 "force" indicates whether to proceed on unrelated repos.
497 505
498 506 Returns a tuple (local, csets, cleanupfn):
499 507
500 508 "local" is a local repo from which to obtain the actual incoming
501 509 changesets; it is a bundlerepo for the obtained bundle when the
502 510 original "other" is remote.
503 511 "csets" lists the incoming changeset node ids.
504 512 "cleanupfn" must be called without arguments when you're done processing
505 513 the changes; it closes both the original "other" and the one returned
506 514 here.
507 515 '''
508 516 tmp = discovery.findcommonincoming(repo, other, heads=onlyheads,
509 517 force=force)
510 518 common, incoming, rheads = tmp
511 519 if not incoming:
512 520 try:
513 521 if bundlename:
514 522 os.unlink(bundlename)
515 523 except OSError:
516 524 pass
517 525 return repo, [], other.close
518 526
519 527 commonset = set(common)
520 528 rheads = [x for x in rheads if x not in commonset]
521 529
522 530 bundle = None
523 531 bundlerepo = None
524 532 localrepo = other.local()
525 533 if bundlename or not localrepo:
526 534 # create a bundle (uncompressed if other repo is not local)
527 535
528 536 # developer config: devel.legacy.exchange
529 537 legexc = ui.configlist('devel', 'legacy.exchange')
530 538 forcebundle1 = 'bundle2' not in legexc and 'bundle1' in legexc
531 539 canbundle2 = (not forcebundle1
532 540 and other.capable('getbundle')
533 541 and other.capable('bundle2'))
534 542 if canbundle2:
535 543 kwargs = {}
536 544 kwargs['common'] = common
537 545 kwargs['heads'] = rheads
538 546 kwargs['bundlecaps'] = exchange.caps20to10(repo)
539 547 kwargs['cg'] = True
540 548 b2 = other.getbundle('incoming', **kwargs)
541 549 fname = bundle = changegroup.writechunks(ui, b2._forwardchunks(),
542 550 bundlename)
543 551 else:
544 552 if other.capable('getbundle'):
545 553 cg = other.getbundle('incoming', common=common, heads=rheads)
546 554 elif onlyheads is None and not other.capable('changegroupsubset'):
547 555 # compat with older servers when pulling all remote heads
548 556 cg = other.changegroup(incoming, "incoming")
549 557 rheads = None
550 558 else:
551 559 cg = other.changegroupsubset(incoming, rheads, 'incoming')
552 560 if localrepo:
553 561 bundletype = "HG10BZ"
554 562 else:
555 563 bundletype = "HG10UN"
556 564 fname = bundle = bundle2.writebundle(ui, cg, bundlename,
557 565 bundletype)
558 566 # keep written bundle?
559 567 if bundlename:
560 568 bundle = None
561 569 if not localrepo:
562 570 # use the created uncompressed bundlerepo
563 571 localrepo = bundlerepo = bundlerepository(repo.baseui, repo.root,
564 572 fname)
565 573 # this repo contains local and other now, so filter out local again
566 574 common = repo.heads()
567 575 if localrepo:
568 576 # Part of common may be remotely filtered
569 577 # So use an unfiltered version
570 578 # The discovery process probably need cleanup to avoid that
571 579 localrepo = localrepo.unfiltered()
572 580
573 581 csets = localrepo.changelog.findmissing(common, rheads)
574 582
575 583 if bundlerepo:
576 584 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev:]]
577 585 remotephases = other.listkeys('phases')
578 586
579 587 pullop = exchange.pulloperation(bundlerepo, other, heads=reponodes)
580 588 pullop.trmanager = bundletransactionmanager()
581 589 exchange._pullapplyphases(pullop, remotephases)
582 590
583 591 def cleanup():
584 592 if bundlerepo:
585 593 bundlerepo.close()
586 594 if bundle:
587 595 os.unlink(bundle)
588 596 other.close()
589 597
590 598 return (localrepo, csets, cleanup)
General Comments 0
You need to be logged in to leave comments. Login now