##// END OF EJS Templates
changegroup: stop returning and recording added nodes in 'cg.apply'...
Boris Feld -
r33461:bb72031f default
parent child Browse files
Show More
@@ -1,1854 +1,1853 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 phases,
162 162 pushkey,
163 163 pycompat,
164 164 scmutil,
165 165 tags,
166 166 url,
167 167 util,
168 168 )
169 169
170 170 urlerr = util.urlerr
171 171 urlreq = util.urlreq
172 172
173 173 _pack = struct.pack
174 174 _unpack = struct.unpack
175 175
176 176 _fstreamparamsize = '>i'
177 177 _fpartheadersize = '>i'
178 178 _fparttypesize = '>B'
179 179 _fpartid = '>I'
180 180 _fpayloadsize = '>i'
181 181 _fpartparamcount = '>BB'
182 182
183 183 _fphasesentry = '>i20s'
184 184
185 185 preferedchunksize = 4096
186 186
187 187 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
188 188
189 189 def outdebug(ui, message):
190 190 """debug regarding output stream (bundling)"""
191 191 if ui.configbool('devel', 'bundle2.debug'):
192 192 ui.debug('bundle2-output: %s\n' % message)
193 193
194 194 def indebug(ui, message):
195 195 """debug on input stream (unbundling)"""
196 196 if ui.configbool('devel', 'bundle2.debug'):
197 197 ui.debug('bundle2-input: %s\n' % message)
198 198
199 199 def validateparttype(parttype):
200 200 """raise ValueError if a parttype contains invalid character"""
201 201 if _parttypeforbidden.search(parttype):
202 202 raise ValueError(parttype)
203 203
204 204 def _makefpartparamsizes(nbparams):
205 205 """return a struct format to read part parameter sizes
206 206
207 207 The number parameters is variable so we need to build that format
208 208 dynamically.
209 209 """
210 210 return '>'+('BB'*nbparams)
211 211
212 212 parthandlermapping = {}
213 213
214 214 def parthandler(parttype, params=()):
215 215 """decorator that register a function as a bundle2 part handler
216 216
217 217 eg::
218 218
219 219 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
220 220 def myparttypehandler(...):
221 221 '''process a part of type "my part".'''
222 222 ...
223 223 """
224 224 validateparttype(parttype)
225 225 def _decorator(func):
226 226 lparttype = parttype.lower() # enforce lower case matching.
227 227 assert lparttype not in parthandlermapping
228 228 parthandlermapping[lparttype] = func
229 229 func.params = frozenset(params)
230 230 return func
231 231 return _decorator
232 232
233 233 class unbundlerecords(object):
234 234 """keep record of what happens during and unbundle
235 235
236 236 New records are added using `records.add('cat', obj)`. Where 'cat' is a
237 237 category of record and obj is an arbitrary object.
238 238
239 239 `records['cat']` will return all entries of this category 'cat'.
240 240
241 241 Iterating on the object itself will yield `('category', obj)` tuples
242 242 for all entries.
243 243
244 244 All iterations happens in chronological order.
245 245 """
246 246
247 247 def __init__(self):
248 248 self._categories = {}
249 249 self._sequences = []
250 250 self._replies = {}
251 251
252 252 def add(self, category, entry, inreplyto=None):
253 253 """add a new record of a given category.
254 254
255 255 The entry can then be retrieved in the list returned by
256 256 self['category']."""
257 257 self._categories.setdefault(category, []).append(entry)
258 258 self._sequences.append((category, entry))
259 259 if inreplyto is not None:
260 260 self.getreplies(inreplyto).add(category, entry)
261 261
262 262 def getreplies(self, partid):
263 263 """get the records that are replies to a specific part"""
264 264 return self._replies.setdefault(partid, unbundlerecords())
265 265
266 266 def __getitem__(self, cat):
267 267 return tuple(self._categories.get(cat, ()))
268 268
269 269 def __iter__(self):
270 270 return iter(self._sequences)
271 271
272 272 def __len__(self):
273 273 return len(self._sequences)
274 274
275 275 def __nonzero__(self):
276 276 return bool(self._sequences)
277 277
278 278 __bool__ = __nonzero__
279 279
280 280 class bundleoperation(object):
281 281 """an object that represents a single bundling process
282 282
283 283 Its purpose is to carry unbundle-related objects and states.
284 284
285 285 A new object should be created at the beginning of each bundle processing.
286 286 The object is to be returned by the processing function.
287 287
288 288 The object has very little content now it will ultimately contain:
289 289 * an access to the repo the bundle is applied to,
290 290 * a ui object,
291 291 * a way to retrieve a transaction to add changes to the repo,
292 292 * a way to record the result of processing each part,
293 293 * a way to construct a bundle response when applicable.
294 294 """
295 295
296 296 def __init__(self, repo, transactiongetter, captureoutput=True):
297 297 self.repo = repo
298 298 self.ui = repo.ui
299 299 self.records = unbundlerecords()
300 300 self.gettransaction = transactiongetter
301 301 self.reply = None
302 302 self.captureoutput = captureoutput
303 303
304 304 class TransactionUnavailable(RuntimeError):
305 305 pass
306 306
307 307 def _notransaction():
308 308 """default method to get a transaction while processing a bundle
309 309
310 310 Raise an exception to highlight the fact that no transaction was expected
311 311 to be created"""
312 312 raise TransactionUnavailable()
313 313
314 314 def applybundle(repo, unbundler, tr, source=None, url=None, **kwargs):
315 315 # transform me into unbundler.apply() as soon as the freeze is lifted
316 316 if isinstance(unbundler, unbundle20):
317 317 tr.hookargs['bundle2'] = '1'
318 318 if source is not None and 'source' not in tr.hookargs:
319 319 tr.hookargs['source'] = source
320 320 if url is not None and 'url' not in tr.hookargs:
321 321 tr.hookargs['url'] = url
322 322 return processbundle(repo, unbundler, lambda: tr)
323 323 else:
324 324 # the transactiongetter won't be used, but we might as well set it
325 325 op = bundleoperation(repo, lambda: tr)
326 326 _processchangegroup(op, unbundler, tr, source, url, **kwargs)
327 327 return op
328 328
329 329 def processbundle(repo, unbundler, transactiongetter=None, op=None):
330 330 """This function process a bundle, apply effect to/from a repo
331 331
332 332 It iterates over each part then searches for and uses the proper handling
333 333 code to process the part. Parts are processed in order.
334 334
335 335 Unknown Mandatory part will abort the process.
336 336
337 337 It is temporarily possible to provide a prebuilt bundleoperation to the
338 338 function. This is used to ensure output is properly propagated in case of
339 339 an error during the unbundling. This output capturing part will likely be
340 340 reworked and this ability will probably go away in the process.
341 341 """
342 342 if op is None:
343 343 if transactiongetter is None:
344 344 transactiongetter = _notransaction
345 345 op = bundleoperation(repo, transactiongetter)
346 346 # todo:
347 347 # - replace this is a init function soon.
348 348 # - exception catching
349 349 unbundler.params
350 350 if repo.ui.debugflag:
351 351 msg = ['bundle2-input-bundle:']
352 352 if unbundler.params:
353 353 msg.append(' %i params' % len(unbundler.params))
354 354 if op.gettransaction is None or op.gettransaction is _notransaction:
355 355 msg.append(' no-transaction')
356 356 else:
357 357 msg.append(' with-transaction')
358 358 msg.append('\n')
359 359 repo.ui.debug(''.join(msg))
360 360 iterparts = enumerate(unbundler.iterparts())
361 361 part = None
362 362 nbpart = 0
363 363 try:
364 364 for nbpart, part in iterparts:
365 365 _processpart(op, part)
366 366 except Exception as exc:
367 367 # Any exceptions seeking to the end of the bundle at this point are
368 368 # almost certainly related to the underlying stream being bad.
369 369 # And, chances are that the exception we're handling is related to
370 370 # getting in that bad state. So, we swallow the seeking error and
371 371 # re-raise the original error.
372 372 seekerror = False
373 373 try:
374 374 for nbpart, part in iterparts:
375 375 # consume the bundle content
376 376 part.seek(0, 2)
377 377 except Exception:
378 378 seekerror = True
379 379
380 380 # Small hack to let caller code distinguish exceptions from bundle2
381 381 # processing from processing the old format. This is mostly
382 382 # needed to handle different return codes to unbundle according to the
383 383 # type of bundle. We should probably clean up or drop this return code
384 384 # craziness in a future version.
385 385 exc.duringunbundle2 = True
386 386 salvaged = []
387 387 replycaps = None
388 388 if op.reply is not None:
389 389 salvaged = op.reply.salvageoutput()
390 390 replycaps = op.reply.capabilities
391 391 exc._replycaps = replycaps
392 392 exc._bundle2salvagedoutput = salvaged
393 393
394 394 # Re-raising from a variable loses the original stack. So only use
395 395 # that form if we need to.
396 396 if seekerror:
397 397 raise exc
398 398 else:
399 399 raise
400 400 finally:
401 401 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
402 402
403 403 return op
404 404
405 405 def _processchangegroup(op, cg, tr, source, url, **kwargs):
406 ret, addednodes = cg.apply(op.repo, tr, source, url, **kwargs)
406 ret = cg.apply(op.repo, tr, source, url, **kwargs)
407 407 op.records.add('changegroup', {
408 408 'return': ret,
409 'addednodes': addednodes,
410 409 })
411 410 return ret
412 411
413 412 def _processpart(op, part):
414 413 """process a single part from a bundle
415 414
416 415 The part is guaranteed to have been fully consumed when the function exits
417 416 (even if an exception is raised)."""
418 417 status = 'unknown' # used by debug output
419 418 hardabort = False
420 419 try:
421 420 try:
422 421 handler = parthandlermapping.get(part.type)
423 422 if handler is None:
424 423 status = 'unsupported-type'
425 424 raise error.BundleUnknownFeatureError(parttype=part.type)
426 425 indebug(op.ui, 'found a handler for part %r' % part.type)
427 426 unknownparams = part.mandatorykeys - handler.params
428 427 if unknownparams:
429 428 unknownparams = list(unknownparams)
430 429 unknownparams.sort()
431 430 status = 'unsupported-params (%s)' % unknownparams
432 431 raise error.BundleUnknownFeatureError(parttype=part.type,
433 432 params=unknownparams)
434 433 status = 'supported'
435 434 except error.BundleUnknownFeatureError as exc:
436 435 if part.mandatory: # mandatory parts
437 436 raise
438 437 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
439 438 return # skip to part processing
440 439 finally:
441 440 if op.ui.debugflag:
442 441 msg = ['bundle2-input-part: "%s"' % part.type]
443 442 if not part.mandatory:
444 443 msg.append(' (advisory)')
445 444 nbmp = len(part.mandatorykeys)
446 445 nbap = len(part.params) - nbmp
447 446 if nbmp or nbap:
448 447 msg.append(' (params:')
449 448 if nbmp:
450 449 msg.append(' %i mandatory' % nbmp)
451 450 if nbap:
452 451 msg.append(' %i advisory' % nbmp)
453 452 msg.append(')')
454 453 msg.append(' %s\n' % status)
455 454 op.ui.debug(''.join(msg))
456 455
457 456 # handler is called outside the above try block so that we don't
458 457 # risk catching KeyErrors from anything other than the
459 458 # parthandlermapping lookup (any KeyError raised by handler()
460 459 # itself represents a defect of a different variety).
461 460 output = None
462 461 if op.captureoutput and op.reply is not None:
463 462 op.ui.pushbuffer(error=True, subproc=True)
464 463 output = ''
465 464 try:
466 465 handler(op, part)
467 466 finally:
468 467 if output is not None:
469 468 output = op.ui.popbuffer()
470 469 if output:
471 470 outpart = op.reply.newpart('output', data=output,
472 471 mandatory=False)
473 472 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
474 473 # If exiting or interrupted, do not attempt to seek the stream in the
475 474 # finally block below. This makes abort faster.
476 475 except (SystemExit, KeyboardInterrupt):
477 476 hardabort = True
478 477 raise
479 478 finally:
480 479 # consume the part content to not corrupt the stream.
481 480 if not hardabort:
482 481 part.seek(0, 2)
483 482
484 483
485 484 def decodecaps(blob):
486 485 """decode a bundle2 caps bytes blob into a dictionary
487 486
488 487 The blob is a list of capabilities (one per line)
489 488 Capabilities may have values using a line of the form::
490 489
491 490 capability=value1,value2,value3
492 491
493 492 The values are always a list."""
494 493 caps = {}
495 494 for line in blob.splitlines():
496 495 if not line:
497 496 continue
498 497 if '=' not in line:
499 498 key, vals = line, ()
500 499 else:
501 500 key, vals = line.split('=', 1)
502 501 vals = vals.split(',')
503 502 key = urlreq.unquote(key)
504 503 vals = [urlreq.unquote(v) for v in vals]
505 504 caps[key] = vals
506 505 return caps
507 506
508 507 def encodecaps(caps):
509 508 """encode a bundle2 caps dictionary into a bytes blob"""
510 509 chunks = []
511 510 for ca in sorted(caps):
512 511 vals = caps[ca]
513 512 ca = urlreq.quote(ca)
514 513 vals = [urlreq.quote(v) for v in vals]
515 514 if vals:
516 515 ca = "%s=%s" % (ca, ','.join(vals))
517 516 chunks.append(ca)
518 517 return '\n'.join(chunks)
519 518
520 519 bundletypes = {
521 520 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
522 521 # since the unification ssh accepts a header but there
523 522 # is no capability signaling it.
524 523 "HG20": (), # special-cased below
525 524 "HG10UN": ("HG10UN", 'UN'),
526 525 "HG10BZ": ("HG10", 'BZ'),
527 526 "HG10GZ": ("HG10GZ", 'GZ'),
528 527 }
529 528
530 529 # hgweb uses this list to communicate its preferred type
531 530 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
532 531
533 532 class bundle20(object):
534 533 """represent an outgoing bundle2 container
535 534
536 535 Use the `addparam` method to add stream level parameter. and `newpart` to
537 536 populate it. Then call `getchunks` to retrieve all the binary chunks of
538 537 data that compose the bundle2 container."""
539 538
540 539 _magicstring = 'HG20'
541 540
542 541 def __init__(self, ui, capabilities=()):
543 542 self.ui = ui
544 543 self._params = []
545 544 self._parts = []
546 545 self.capabilities = dict(capabilities)
547 546 self._compengine = util.compengines.forbundletype('UN')
548 547 self._compopts = None
549 548
550 549 def setcompression(self, alg, compopts=None):
551 550 """setup core part compression to <alg>"""
552 551 if alg in (None, 'UN'):
553 552 return
554 553 assert not any(n.lower() == 'compression' for n, v in self._params)
555 554 self.addparam('Compression', alg)
556 555 self._compengine = util.compengines.forbundletype(alg)
557 556 self._compopts = compopts
558 557
559 558 @property
560 559 def nbparts(self):
561 560 """total number of parts added to the bundler"""
562 561 return len(self._parts)
563 562
564 563 # methods used to defines the bundle2 content
565 564 def addparam(self, name, value=None):
566 565 """add a stream level parameter"""
567 566 if not name:
568 567 raise ValueError('empty parameter name')
569 568 if name[0] not in string.letters:
570 569 raise ValueError('non letter first character: %r' % name)
571 570 self._params.append((name, value))
572 571
573 572 def addpart(self, part):
574 573 """add a new part to the bundle2 container
575 574
576 575 Parts contains the actual applicative payload."""
577 576 assert part.id is None
578 577 part.id = len(self._parts) # very cheap counter
579 578 self._parts.append(part)
580 579
581 580 def newpart(self, typeid, *args, **kwargs):
582 581 """create a new part and add it to the containers
583 582
584 583 As the part is directly added to the containers. For now, this means
585 584 that any failure to properly initialize the part after calling
586 585 ``newpart`` should result in a failure of the whole bundling process.
587 586
588 587 You can still fall back to manually create and add if you need better
589 588 control."""
590 589 part = bundlepart(typeid, *args, **kwargs)
591 590 self.addpart(part)
592 591 return part
593 592
594 593 # methods used to generate the bundle2 stream
595 594 def getchunks(self):
596 595 if self.ui.debugflag:
597 596 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
598 597 if self._params:
599 598 msg.append(' (%i params)' % len(self._params))
600 599 msg.append(' %i parts total\n' % len(self._parts))
601 600 self.ui.debug(''.join(msg))
602 601 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
603 602 yield self._magicstring
604 603 param = self._paramchunk()
605 604 outdebug(self.ui, 'bundle parameter: %s' % param)
606 605 yield _pack(_fstreamparamsize, len(param))
607 606 if param:
608 607 yield param
609 608 for chunk in self._compengine.compressstream(self._getcorechunk(),
610 609 self._compopts):
611 610 yield chunk
612 611
613 612 def _paramchunk(self):
614 613 """return a encoded version of all stream parameters"""
615 614 blocks = []
616 615 for par, value in self._params:
617 616 par = urlreq.quote(par)
618 617 if value is not None:
619 618 value = urlreq.quote(value)
620 619 par = '%s=%s' % (par, value)
621 620 blocks.append(par)
622 621 return ' '.join(blocks)
623 622
624 623 def _getcorechunk(self):
625 624 """yield chunk for the core part of the bundle
626 625
627 626 (all but headers and parameters)"""
628 627 outdebug(self.ui, 'start of parts')
629 628 for part in self._parts:
630 629 outdebug(self.ui, 'bundle part: "%s"' % part.type)
631 630 for chunk in part.getchunks(ui=self.ui):
632 631 yield chunk
633 632 outdebug(self.ui, 'end of bundle')
634 633 yield _pack(_fpartheadersize, 0)
635 634
636 635
637 636 def salvageoutput(self):
638 637 """return a list with a copy of all output parts in the bundle
639 638
640 639 This is meant to be used during error handling to make sure we preserve
641 640 server output"""
642 641 salvaged = []
643 642 for part in self._parts:
644 643 if part.type.startswith('output'):
645 644 salvaged.append(part.copy())
646 645 return salvaged
647 646
648 647
649 648 class unpackermixin(object):
650 649 """A mixin to extract bytes and struct data from a stream"""
651 650
652 651 def __init__(self, fp):
653 652 self._fp = fp
654 653
655 654 def _unpack(self, format):
656 655 """unpack this struct format from the stream
657 656
658 657 This method is meant for internal usage by the bundle2 protocol only.
659 658 They directly manipulate the low level stream including bundle2 level
660 659 instruction.
661 660
662 661 Do not use it to implement higher-level logic or methods."""
663 662 data = self._readexact(struct.calcsize(format))
664 663 return _unpack(format, data)
665 664
666 665 def _readexact(self, size):
667 666 """read exactly <size> bytes from the stream
668 667
669 668 This method is meant for internal usage by the bundle2 protocol only.
670 669 They directly manipulate the low level stream including bundle2 level
671 670 instruction.
672 671
673 672 Do not use it to implement higher-level logic or methods."""
674 673 return changegroup.readexactly(self._fp, size)
675 674
676 675 def getunbundler(ui, fp, magicstring=None):
677 676 """return a valid unbundler object for a given magicstring"""
678 677 if magicstring is None:
679 678 magicstring = changegroup.readexactly(fp, 4)
680 679 magic, version = magicstring[0:2], magicstring[2:4]
681 680 if magic != 'HG':
682 681 ui.debug(
683 682 "error: invalid magic: %r (version %r), should be 'HG'\n"
684 683 % (magic, version))
685 684 raise error.Abort(_('not a Mercurial bundle'))
686 685 unbundlerclass = formatmap.get(version)
687 686 if unbundlerclass is None:
688 687 raise error.Abort(_('unknown bundle version %s') % version)
689 688 unbundler = unbundlerclass(ui, fp)
690 689 indebug(ui, 'start processing of %s stream' % magicstring)
691 690 return unbundler
692 691
693 692 class unbundle20(unpackermixin):
694 693 """interpret a bundle2 stream
695 694
696 695 This class is fed with a binary stream and yields parts through its
697 696 `iterparts` methods."""
698 697
699 698 _magicstring = 'HG20'
700 699
701 700 def __init__(self, ui, fp):
702 701 """If header is specified, we do not read it out of the stream."""
703 702 self.ui = ui
704 703 self._compengine = util.compengines.forbundletype('UN')
705 704 self._compressed = None
706 705 super(unbundle20, self).__init__(fp)
707 706
708 707 @util.propertycache
709 708 def params(self):
710 709 """dictionary of stream level parameters"""
711 710 indebug(self.ui, 'reading bundle2 stream parameters')
712 711 params = {}
713 712 paramssize = self._unpack(_fstreamparamsize)[0]
714 713 if paramssize < 0:
715 714 raise error.BundleValueError('negative bundle param size: %i'
716 715 % paramssize)
717 716 if paramssize:
718 717 params = self._readexact(paramssize)
719 718 params = self._processallparams(params)
720 719 return params
721 720
722 721 def _processallparams(self, paramsblock):
723 722 """"""
724 723 params = util.sortdict()
725 724 for p in paramsblock.split(' '):
726 725 p = p.split('=', 1)
727 726 p = [urlreq.unquote(i) for i in p]
728 727 if len(p) < 2:
729 728 p.append(None)
730 729 self._processparam(*p)
731 730 params[p[0]] = p[1]
732 731 return params
733 732
734 733
735 734 def _processparam(self, name, value):
736 735 """process a parameter, applying its effect if needed
737 736
738 737 Parameter starting with a lower case letter are advisory and will be
739 738 ignored when unknown. Those starting with an upper case letter are
740 739 mandatory and will this function will raise a KeyError when unknown.
741 740
742 741 Note: no option are currently supported. Any input will be either
743 742 ignored or failing.
744 743 """
745 744 if not name:
746 745 raise ValueError('empty parameter name')
747 746 if name[0] not in string.letters:
748 747 raise ValueError('non letter first character: %r' % name)
749 748 try:
750 749 handler = b2streamparamsmap[name.lower()]
751 750 except KeyError:
752 751 if name[0].islower():
753 752 indebug(self.ui, "ignoring unknown parameter %r" % name)
754 753 else:
755 754 raise error.BundleUnknownFeatureError(params=(name,))
756 755 else:
757 756 handler(self, name, value)
758 757
759 758 def _forwardchunks(self):
760 759 """utility to transfer a bundle2 as binary
761 760
762 761 This is made necessary by the fact the 'getbundle' command over 'ssh'
763 762 have no way to know then the reply end, relying on the bundle to be
764 763 interpreted to know its end. This is terrible and we are sorry, but we
765 764 needed to move forward to get general delta enabled.
766 765 """
767 766 yield self._magicstring
768 767 assert 'params' not in vars(self)
769 768 paramssize = self._unpack(_fstreamparamsize)[0]
770 769 if paramssize < 0:
771 770 raise error.BundleValueError('negative bundle param size: %i'
772 771 % paramssize)
773 772 yield _pack(_fstreamparamsize, paramssize)
774 773 if paramssize:
775 774 params = self._readexact(paramssize)
776 775 self._processallparams(params)
777 776 yield params
778 777 assert self._compengine.bundletype == 'UN'
779 778 # From there, payload might need to be decompressed
780 779 self._fp = self._compengine.decompressorreader(self._fp)
781 780 emptycount = 0
782 781 while emptycount < 2:
783 782 # so we can brainlessly loop
784 783 assert _fpartheadersize == _fpayloadsize
785 784 size = self._unpack(_fpartheadersize)[0]
786 785 yield _pack(_fpartheadersize, size)
787 786 if size:
788 787 emptycount = 0
789 788 else:
790 789 emptycount += 1
791 790 continue
792 791 if size == flaginterrupt:
793 792 continue
794 793 elif size < 0:
795 794 raise error.BundleValueError('negative chunk size: %i')
796 795 yield self._readexact(size)
797 796
798 797
799 798 def iterparts(self):
800 799 """yield all parts contained in the stream"""
801 800 # make sure param have been loaded
802 801 self.params
803 802 # From there, payload need to be decompressed
804 803 self._fp = self._compengine.decompressorreader(self._fp)
805 804 indebug(self.ui, 'start extraction of bundle2 parts')
806 805 headerblock = self._readpartheader()
807 806 while headerblock is not None:
808 807 part = unbundlepart(self.ui, headerblock, self._fp)
809 808 yield part
810 809 part.seek(0, 2)
811 810 headerblock = self._readpartheader()
812 811 indebug(self.ui, 'end of bundle2 stream')
813 812
814 813 def _readpartheader(self):
815 814 """reads a part header size and return the bytes blob
816 815
817 816 returns None if empty"""
818 817 headersize = self._unpack(_fpartheadersize)[0]
819 818 if headersize < 0:
820 819 raise error.BundleValueError('negative part header size: %i'
821 820 % headersize)
822 821 indebug(self.ui, 'part header size: %i' % headersize)
823 822 if headersize:
824 823 return self._readexact(headersize)
825 824 return None
826 825
827 826 def compressed(self):
828 827 self.params # load params
829 828 return self._compressed
830 829
831 830 def close(self):
832 831 """close underlying file"""
833 832 if util.safehasattr(self._fp, 'close'):
834 833 return self._fp.close()
835 834
836 835 formatmap = {'20': unbundle20}
837 836
838 837 b2streamparamsmap = {}
839 838
840 839 def b2streamparamhandler(name):
841 840 """register a handler for a stream level parameter"""
842 841 def decorator(func):
843 842 assert name not in formatmap
844 843 b2streamparamsmap[name] = func
845 844 return func
846 845 return decorator
847 846
848 847 @b2streamparamhandler('compression')
849 848 def processcompression(unbundler, param, value):
850 849 """read compression parameter and install payload decompression"""
851 850 if value not in util.compengines.supportedbundletypes:
852 851 raise error.BundleUnknownFeatureError(params=(param,),
853 852 values=(value,))
854 853 unbundler._compengine = util.compengines.forbundletype(value)
855 854 if value is not None:
856 855 unbundler._compressed = True
857 856
858 857 class bundlepart(object):
859 858 """A bundle2 part contains application level payload
860 859
861 860 The part `type` is used to route the part to the application level
862 861 handler.
863 862
864 863 The part payload is contained in ``part.data``. It could be raw bytes or a
865 864 generator of byte chunks.
866 865
867 866 You can add parameters to the part using the ``addparam`` method.
868 867 Parameters can be either mandatory (default) or advisory. Remote side
869 868 should be able to safely ignore the advisory ones.
870 869
871 870 Both data and parameters cannot be modified after the generation has begun.
872 871 """
873 872
874 873 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
875 874 data='', mandatory=True):
876 875 validateparttype(parttype)
877 876 self.id = None
878 877 self.type = parttype
879 878 self._data = data
880 879 self._mandatoryparams = list(mandatoryparams)
881 880 self._advisoryparams = list(advisoryparams)
882 881 # checking for duplicated entries
883 882 self._seenparams = set()
884 883 for pname, __ in self._mandatoryparams + self._advisoryparams:
885 884 if pname in self._seenparams:
886 885 raise error.ProgrammingError('duplicated params: %s' % pname)
887 886 self._seenparams.add(pname)
888 887 # status of the part's generation:
889 888 # - None: not started,
890 889 # - False: currently generated,
891 890 # - True: generation done.
892 891 self._generated = None
893 892 self.mandatory = mandatory
894 893
895 894 def __repr__(self):
896 895 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
897 896 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
898 897 % (cls, id(self), self.id, self.type, self.mandatory))
899 898
900 899 def copy(self):
901 900 """return a copy of the part
902 901
903 902 The new part have the very same content but no partid assigned yet.
904 903 Parts with generated data cannot be copied."""
905 904 assert not util.safehasattr(self.data, 'next')
906 905 return self.__class__(self.type, self._mandatoryparams,
907 906 self._advisoryparams, self._data, self.mandatory)
908 907
909 908 # methods used to defines the part content
910 909 @property
911 910 def data(self):
912 911 return self._data
913 912
914 913 @data.setter
915 914 def data(self, data):
916 915 if self._generated is not None:
917 916 raise error.ReadOnlyPartError('part is being generated')
918 917 self._data = data
919 918
920 919 @property
921 920 def mandatoryparams(self):
922 921 # make it an immutable tuple to force people through ``addparam``
923 922 return tuple(self._mandatoryparams)
924 923
925 924 @property
926 925 def advisoryparams(self):
927 926 # make it an immutable tuple to force people through ``addparam``
928 927 return tuple(self._advisoryparams)
929 928
930 929 def addparam(self, name, value='', mandatory=True):
931 930 """add a parameter to the part
932 931
933 932 If 'mandatory' is set to True, the remote handler must claim support
934 933 for this parameter or the unbundling will be aborted.
935 934
936 935 The 'name' and 'value' cannot exceed 255 bytes each.
937 936 """
938 937 if self._generated is not None:
939 938 raise error.ReadOnlyPartError('part is being generated')
940 939 if name in self._seenparams:
941 940 raise ValueError('duplicated params: %s' % name)
942 941 self._seenparams.add(name)
943 942 params = self._advisoryparams
944 943 if mandatory:
945 944 params = self._mandatoryparams
946 945 params.append((name, value))
947 946
948 947 # methods used to generates the bundle2 stream
949 948 def getchunks(self, ui):
950 949 if self._generated is not None:
951 950 raise error.ProgrammingError('part can only be consumed once')
952 951 self._generated = False
953 952
954 953 if ui.debugflag:
955 954 msg = ['bundle2-output-part: "%s"' % self.type]
956 955 if not self.mandatory:
957 956 msg.append(' (advisory)')
958 957 nbmp = len(self.mandatoryparams)
959 958 nbap = len(self.advisoryparams)
960 959 if nbmp or nbap:
961 960 msg.append(' (params:')
962 961 if nbmp:
963 962 msg.append(' %i mandatory' % nbmp)
964 963 if nbap:
965 964 msg.append(' %i advisory' % nbmp)
966 965 msg.append(')')
967 966 if not self.data:
968 967 msg.append(' empty payload')
969 968 elif util.safehasattr(self.data, 'next'):
970 969 msg.append(' streamed payload')
971 970 else:
972 971 msg.append(' %i bytes payload' % len(self.data))
973 972 msg.append('\n')
974 973 ui.debug(''.join(msg))
975 974
976 975 #### header
977 976 if self.mandatory:
978 977 parttype = self.type.upper()
979 978 else:
980 979 parttype = self.type.lower()
981 980 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
982 981 ## parttype
983 982 header = [_pack(_fparttypesize, len(parttype)),
984 983 parttype, _pack(_fpartid, self.id),
985 984 ]
986 985 ## parameters
987 986 # count
988 987 manpar = self.mandatoryparams
989 988 advpar = self.advisoryparams
990 989 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
991 990 # size
992 991 parsizes = []
993 992 for key, value in manpar:
994 993 parsizes.append(len(key))
995 994 parsizes.append(len(value))
996 995 for key, value in advpar:
997 996 parsizes.append(len(key))
998 997 parsizes.append(len(value))
999 998 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
1000 999 header.append(paramsizes)
1001 1000 # key, value
1002 1001 for key, value in manpar:
1003 1002 header.append(key)
1004 1003 header.append(value)
1005 1004 for key, value in advpar:
1006 1005 header.append(key)
1007 1006 header.append(value)
1008 1007 ## finalize header
1009 1008 headerchunk = ''.join(header)
1010 1009 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
1011 1010 yield _pack(_fpartheadersize, len(headerchunk))
1012 1011 yield headerchunk
1013 1012 ## payload
1014 1013 try:
1015 1014 for chunk in self._payloadchunks():
1016 1015 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1017 1016 yield _pack(_fpayloadsize, len(chunk))
1018 1017 yield chunk
1019 1018 except GeneratorExit:
1020 1019 # GeneratorExit means that nobody is listening for our
1021 1020 # results anyway, so just bail quickly rather than trying
1022 1021 # to produce an error part.
1023 1022 ui.debug('bundle2-generatorexit\n')
1024 1023 raise
1025 1024 except BaseException as exc:
1026 1025 # backup exception data for later
1027 1026 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1028 1027 % exc)
1029 1028 tb = sys.exc_info()[2]
1030 1029 msg = 'unexpected error: %s' % exc
1031 1030 interpart = bundlepart('error:abort', [('message', msg)],
1032 1031 mandatory=False)
1033 1032 interpart.id = 0
1034 1033 yield _pack(_fpayloadsize, -1)
1035 1034 for chunk in interpart.getchunks(ui=ui):
1036 1035 yield chunk
1037 1036 outdebug(ui, 'closing payload chunk')
1038 1037 # abort current part payload
1039 1038 yield _pack(_fpayloadsize, 0)
1040 1039 pycompat.raisewithtb(exc, tb)
1041 1040 # end of payload
1042 1041 outdebug(ui, 'closing payload chunk')
1043 1042 yield _pack(_fpayloadsize, 0)
1044 1043 self._generated = True
1045 1044
1046 1045 def _payloadchunks(self):
1047 1046 """yield chunks of a the part payload
1048 1047
1049 1048 Exists to handle the different methods to provide data to a part."""
1050 1049 # we only support fixed size data now.
1051 1050 # This will be improved in the future.
1052 1051 if util.safehasattr(self.data, 'next'):
1053 1052 buff = util.chunkbuffer(self.data)
1054 1053 chunk = buff.read(preferedchunksize)
1055 1054 while chunk:
1056 1055 yield chunk
1057 1056 chunk = buff.read(preferedchunksize)
1058 1057 elif len(self.data):
1059 1058 yield self.data
1060 1059
1061 1060
1062 1061 flaginterrupt = -1
1063 1062
1064 1063 class interrupthandler(unpackermixin):
1065 1064 """read one part and process it with restricted capability
1066 1065
1067 1066 This allows to transmit exception raised on the producer size during part
1068 1067 iteration while the consumer is reading a part.
1069 1068
1070 1069 Part processed in this manner only have access to a ui object,"""
1071 1070
1072 1071 def __init__(self, ui, fp):
1073 1072 super(interrupthandler, self).__init__(fp)
1074 1073 self.ui = ui
1075 1074
1076 1075 def _readpartheader(self):
1077 1076 """reads a part header size and return the bytes blob
1078 1077
1079 1078 returns None if empty"""
1080 1079 headersize = self._unpack(_fpartheadersize)[0]
1081 1080 if headersize < 0:
1082 1081 raise error.BundleValueError('negative part header size: %i'
1083 1082 % headersize)
1084 1083 indebug(self.ui, 'part header size: %i\n' % headersize)
1085 1084 if headersize:
1086 1085 return self._readexact(headersize)
1087 1086 return None
1088 1087
1089 1088 def __call__(self):
1090 1089
1091 1090 self.ui.debug('bundle2-input-stream-interrupt:'
1092 1091 ' opening out of band context\n')
1093 1092 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1094 1093 headerblock = self._readpartheader()
1095 1094 if headerblock is None:
1096 1095 indebug(self.ui, 'no part found during interruption.')
1097 1096 return
1098 1097 part = unbundlepart(self.ui, headerblock, self._fp)
1099 1098 op = interruptoperation(self.ui)
1100 1099 _processpart(op, part)
1101 1100 self.ui.debug('bundle2-input-stream-interrupt:'
1102 1101 ' closing out of band context\n')
1103 1102
1104 1103 class interruptoperation(object):
1105 1104 """A limited operation to be use by part handler during interruption
1106 1105
1107 1106 It only have access to an ui object.
1108 1107 """
1109 1108
1110 1109 def __init__(self, ui):
1111 1110 self.ui = ui
1112 1111 self.reply = None
1113 1112 self.captureoutput = False
1114 1113
1115 1114 @property
1116 1115 def repo(self):
1117 1116 raise error.ProgrammingError('no repo access from stream interruption')
1118 1117
1119 1118 def gettransaction(self):
1120 1119 raise TransactionUnavailable('no repo access from stream interruption')
1121 1120
1122 1121 class unbundlepart(unpackermixin):
1123 1122 """a bundle part read from a bundle"""
1124 1123
1125 1124 def __init__(self, ui, header, fp):
1126 1125 super(unbundlepart, self).__init__(fp)
1127 1126 self._seekable = (util.safehasattr(fp, 'seek') and
1128 1127 util.safehasattr(fp, 'tell'))
1129 1128 self.ui = ui
1130 1129 # unbundle state attr
1131 1130 self._headerdata = header
1132 1131 self._headeroffset = 0
1133 1132 self._initialized = False
1134 1133 self.consumed = False
1135 1134 # part data
1136 1135 self.id = None
1137 1136 self.type = None
1138 1137 self.mandatoryparams = None
1139 1138 self.advisoryparams = None
1140 1139 self.params = None
1141 1140 self.mandatorykeys = ()
1142 1141 self._payloadstream = None
1143 1142 self._readheader()
1144 1143 self._mandatory = None
1145 1144 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1146 1145 self._pos = 0
1147 1146
1148 1147 def _fromheader(self, size):
1149 1148 """return the next <size> byte from the header"""
1150 1149 offset = self._headeroffset
1151 1150 data = self._headerdata[offset:(offset + size)]
1152 1151 self._headeroffset = offset + size
1153 1152 return data
1154 1153
1155 1154 def _unpackheader(self, format):
1156 1155 """read given format from header
1157 1156
1158 1157 This automatically compute the size of the format to read."""
1159 1158 data = self._fromheader(struct.calcsize(format))
1160 1159 return _unpack(format, data)
1161 1160
1162 1161 def _initparams(self, mandatoryparams, advisoryparams):
1163 1162 """internal function to setup all logic related parameters"""
1164 1163 # make it read only to prevent people touching it by mistake.
1165 1164 self.mandatoryparams = tuple(mandatoryparams)
1166 1165 self.advisoryparams = tuple(advisoryparams)
1167 1166 # user friendly UI
1168 1167 self.params = util.sortdict(self.mandatoryparams)
1169 1168 self.params.update(self.advisoryparams)
1170 1169 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1171 1170
1172 1171 def _payloadchunks(self, chunknum=0):
1173 1172 '''seek to specified chunk and start yielding data'''
1174 1173 if len(self._chunkindex) == 0:
1175 1174 assert chunknum == 0, 'Must start with chunk 0'
1176 1175 self._chunkindex.append((0, self._tellfp()))
1177 1176 else:
1178 1177 assert chunknum < len(self._chunkindex), \
1179 1178 'Unknown chunk %d' % chunknum
1180 1179 self._seekfp(self._chunkindex[chunknum][1])
1181 1180
1182 1181 pos = self._chunkindex[chunknum][0]
1183 1182 payloadsize = self._unpack(_fpayloadsize)[0]
1184 1183 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1185 1184 while payloadsize:
1186 1185 if payloadsize == flaginterrupt:
1187 1186 # interruption detection, the handler will now read a
1188 1187 # single part and process it.
1189 1188 interrupthandler(self.ui, self._fp)()
1190 1189 elif payloadsize < 0:
1191 1190 msg = 'negative payload chunk size: %i' % payloadsize
1192 1191 raise error.BundleValueError(msg)
1193 1192 else:
1194 1193 result = self._readexact(payloadsize)
1195 1194 chunknum += 1
1196 1195 pos += payloadsize
1197 1196 if chunknum == len(self._chunkindex):
1198 1197 self._chunkindex.append((pos, self._tellfp()))
1199 1198 yield result
1200 1199 payloadsize = self._unpack(_fpayloadsize)[0]
1201 1200 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1202 1201
1203 1202 def _findchunk(self, pos):
1204 1203 '''for a given payload position, return a chunk number and offset'''
1205 1204 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1206 1205 if ppos == pos:
1207 1206 return chunk, 0
1208 1207 elif ppos > pos:
1209 1208 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1210 1209 raise ValueError('Unknown chunk')
1211 1210
1212 1211 def _readheader(self):
1213 1212 """read the header and setup the object"""
1214 1213 typesize = self._unpackheader(_fparttypesize)[0]
1215 1214 self.type = self._fromheader(typesize)
1216 1215 indebug(self.ui, 'part type: "%s"' % self.type)
1217 1216 self.id = self._unpackheader(_fpartid)[0]
1218 1217 indebug(self.ui, 'part id: "%s"' % self.id)
1219 1218 # extract mandatory bit from type
1220 1219 self.mandatory = (self.type != self.type.lower())
1221 1220 self.type = self.type.lower()
1222 1221 ## reading parameters
1223 1222 # param count
1224 1223 mancount, advcount = self._unpackheader(_fpartparamcount)
1225 1224 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1226 1225 # param size
1227 1226 fparamsizes = _makefpartparamsizes(mancount + advcount)
1228 1227 paramsizes = self._unpackheader(fparamsizes)
1229 1228 # make it a list of couple again
1230 1229 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1231 1230 # split mandatory from advisory
1232 1231 mansizes = paramsizes[:mancount]
1233 1232 advsizes = paramsizes[mancount:]
1234 1233 # retrieve param value
1235 1234 manparams = []
1236 1235 for key, value in mansizes:
1237 1236 manparams.append((self._fromheader(key), self._fromheader(value)))
1238 1237 advparams = []
1239 1238 for key, value in advsizes:
1240 1239 advparams.append((self._fromheader(key), self._fromheader(value)))
1241 1240 self._initparams(manparams, advparams)
1242 1241 ## part payload
1243 1242 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1244 1243 # we read the data, tell it
1245 1244 self._initialized = True
1246 1245
1247 1246 def read(self, size=None):
1248 1247 """read payload data"""
1249 1248 if not self._initialized:
1250 1249 self._readheader()
1251 1250 if size is None:
1252 1251 data = self._payloadstream.read()
1253 1252 else:
1254 1253 data = self._payloadstream.read(size)
1255 1254 self._pos += len(data)
1256 1255 if size is None or len(data) < size:
1257 1256 if not self.consumed and self._pos:
1258 1257 self.ui.debug('bundle2-input-part: total payload size %i\n'
1259 1258 % self._pos)
1260 1259 self.consumed = True
1261 1260 return data
1262 1261
1263 1262 def tell(self):
1264 1263 return self._pos
1265 1264
1266 1265 def seek(self, offset, whence=0):
1267 1266 if whence == 0:
1268 1267 newpos = offset
1269 1268 elif whence == 1:
1270 1269 newpos = self._pos + offset
1271 1270 elif whence == 2:
1272 1271 if not self.consumed:
1273 1272 self.read()
1274 1273 newpos = self._chunkindex[-1][0] - offset
1275 1274 else:
1276 1275 raise ValueError('Unknown whence value: %r' % (whence,))
1277 1276
1278 1277 if newpos > self._chunkindex[-1][0] and not self.consumed:
1279 1278 self.read()
1280 1279 if not 0 <= newpos <= self._chunkindex[-1][0]:
1281 1280 raise ValueError('Offset out of range')
1282 1281
1283 1282 if self._pos != newpos:
1284 1283 chunk, internaloffset = self._findchunk(newpos)
1285 1284 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1286 1285 adjust = self.read(internaloffset)
1287 1286 if len(adjust) != internaloffset:
1288 1287 raise error.Abort(_('Seek failed\n'))
1289 1288 self._pos = newpos
1290 1289
1291 1290 def _seekfp(self, offset, whence=0):
1292 1291 """move the underlying file pointer
1293 1292
1294 1293 This method is meant for internal usage by the bundle2 protocol only.
1295 1294 They directly manipulate the low level stream including bundle2 level
1296 1295 instruction.
1297 1296
1298 1297 Do not use it to implement higher-level logic or methods."""
1299 1298 if self._seekable:
1300 1299 return self._fp.seek(offset, whence)
1301 1300 else:
1302 1301 raise NotImplementedError(_('File pointer is not seekable'))
1303 1302
1304 1303 def _tellfp(self):
1305 1304 """return the file offset, or None if file is not seekable
1306 1305
1307 1306 This method is meant for internal usage by the bundle2 protocol only.
1308 1307 They directly manipulate the low level stream including bundle2 level
1309 1308 instruction.
1310 1309
1311 1310 Do not use it to implement higher-level logic or methods."""
1312 1311 if self._seekable:
1313 1312 try:
1314 1313 return self._fp.tell()
1315 1314 except IOError as e:
1316 1315 if e.errno == errno.ESPIPE:
1317 1316 self._seekable = False
1318 1317 else:
1319 1318 raise
1320 1319 return None
1321 1320
1322 1321 # These are only the static capabilities.
1323 1322 # Check the 'getrepocaps' function for the rest.
1324 1323 capabilities = {'HG20': (),
1325 1324 'error': ('abort', 'unsupportedcontent', 'pushraced',
1326 1325 'pushkey'),
1327 1326 'listkeys': (),
1328 1327 'pushkey': (),
1329 1328 'digests': tuple(sorted(util.DIGESTS.keys())),
1330 1329 'remote-changegroup': ('http', 'https'),
1331 1330 'hgtagsfnodes': (),
1332 1331 }
1333 1332
1334 1333 def getrepocaps(repo, allowpushback=False):
1335 1334 """return the bundle2 capabilities for a given repo
1336 1335
1337 1336 Exists to allow extensions (like evolution) to mutate the capabilities.
1338 1337 """
1339 1338 caps = capabilities.copy()
1340 1339 caps['changegroup'] = tuple(sorted(
1341 1340 changegroup.supportedincomingversions(repo)))
1342 1341 if obsolete.isenabled(repo, obsolete.exchangeopt):
1343 1342 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1344 1343 caps['obsmarkers'] = supportedformat
1345 1344 if allowpushback:
1346 1345 caps['pushback'] = ()
1347 1346 cpmode = repo.ui.config('server', 'concurrent-push-mode')
1348 1347 if cpmode == 'check-related':
1349 1348 caps['checkheads'] = ('related',)
1350 1349 return caps
1351 1350
1352 1351 def bundle2caps(remote):
1353 1352 """return the bundle capabilities of a peer as dict"""
1354 1353 raw = remote.capable('bundle2')
1355 1354 if not raw and raw != '':
1356 1355 return {}
1357 1356 capsblob = urlreq.unquote(remote.capable('bundle2'))
1358 1357 return decodecaps(capsblob)
1359 1358
1360 1359 def obsmarkersversion(caps):
1361 1360 """extract the list of supported obsmarkers versions from a bundle2caps dict
1362 1361 """
1363 1362 obscaps = caps.get('obsmarkers', ())
1364 1363 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1365 1364
1366 1365 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
1367 1366 vfs=None, compression=None, compopts=None):
1368 1367 if bundletype.startswith('HG10'):
1369 1368 cg = changegroup.getchangegroup(repo, source, outgoing, version='01')
1370 1369 return writebundle(ui, cg, filename, bundletype, vfs=vfs,
1371 1370 compression=compression, compopts=compopts)
1372 1371 elif not bundletype.startswith('HG20'):
1373 1372 raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
1374 1373
1375 1374 caps = {}
1376 1375 if 'obsolescence' in opts:
1377 1376 caps['obsmarkers'] = ('V1',)
1378 1377 bundle = bundle20(ui, caps)
1379 1378 bundle.setcompression(compression, compopts)
1380 1379 _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
1381 1380 chunkiter = bundle.getchunks()
1382 1381
1383 1382 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1384 1383
1385 1384 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
1386 1385 # We should eventually reconcile this logic with the one behind
1387 1386 # 'exchange.getbundle2partsgenerator'.
1388 1387 #
1389 1388 # The type of input from 'getbundle' and 'writenewbundle' are a bit
1390 1389 # different right now. So we keep them separated for now for the sake of
1391 1390 # simplicity.
1392 1391
1393 1392 # we always want a changegroup in such bundle
1394 1393 cgversion = opts.get('cg.version')
1395 1394 if cgversion is None:
1396 1395 cgversion = changegroup.safeversion(repo)
1397 1396 cg = changegroup.getchangegroup(repo, source, outgoing,
1398 1397 version=cgversion)
1399 1398 part = bundler.newpart('changegroup', data=cg.getchunks())
1400 1399 part.addparam('version', cg.version)
1401 1400 if 'clcount' in cg.extras:
1402 1401 part.addparam('nbchanges', str(cg.extras['clcount']),
1403 1402 mandatory=False)
1404 1403 if opts.get('phases') and repo.revs('%ln and secret()',
1405 1404 outgoing.missingheads):
1406 1405 part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
1407 1406
1408 1407 addparttagsfnodescache(repo, bundler, outgoing)
1409 1408
1410 1409 if opts.get('obsolescence', False):
1411 1410 obsmarkers = repo.obsstore.relevantmarkers(outgoing.missing)
1412 1411 buildobsmarkerspart(bundler, obsmarkers)
1413 1412
1414 1413 if opts.get('phases', False):
1415 1414 headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
1416 1415 phasedata = []
1417 1416 for phase in phases.allphases:
1418 1417 for head in headsbyphase[phase]:
1419 1418 phasedata.append(_pack(_fphasesentry, phase, head))
1420 1419 bundler.newpart('phase-heads', data=''.join(phasedata))
1421 1420
1422 1421 def addparttagsfnodescache(repo, bundler, outgoing):
1423 1422 # we include the tags fnode cache for the bundle changeset
1424 1423 # (as an optional parts)
1425 1424 cache = tags.hgtagsfnodescache(repo.unfiltered())
1426 1425 chunks = []
1427 1426
1428 1427 # .hgtags fnodes are only relevant for head changesets. While we could
1429 1428 # transfer values for all known nodes, there will likely be little to
1430 1429 # no benefit.
1431 1430 #
1432 1431 # We don't bother using a generator to produce output data because
1433 1432 # a) we only have 40 bytes per head and even esoteric numbers of heads
1434 1433 # consume little memory (1M heads is 40MB) b) we don't want to send the
1435 1434 # part if we don't have entries and knowing if we have entries requires
1436 1435 # cache lookups.
1437 1436 for node in outgoing.missingheads:
1438 1437 # Don't compute missing, as this may slow down serving.
1439 1438 fnode = cache.getfnode(node, computemissing=False)
1440 1439 if fnode is not None:
1441 1440 chunks.extend([node, fnode])
1442 1441
1443 1442 if chunks:
1444 1443 bundler.newpart('hgtagsfnodes', data=''.join(chunks))
1445 1444
1446 1445 def buildobsmarkerspart(bundler, markers):
1447 1446 """add an obsmarker part to the bundler with <markers>
1448 1447
1449 1448 No part is created if markers is empty.
1450 1449 Raises ValueError if the bundler doesn't support any known obsmarker format.
1451 1450 """
1452 1451 if not markers:
1453 1452 return None
1454 1453
1455 1454 remoteversions = obsmarkersversion(bundler.capabilities)
1456 1455 version = obsolete.commonversion(remoteversions)
1457 1456 if version is None:
1458 1457 raise ValueError('bundler does not support common obsmarker format')
1459 1458 stream = obsolete.encodemarkers(markers, True, version=version)
1460 1459 return bundler.newpart('obsmarkers', data=stream)
1461 1460
1462 1461 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1463 1462 compopts=None):
1464 1463 """Write a bundle file and return its filename.
1465 1464
1466 1465 Existing files will not be overwritten.
1467 1466 If no filename is specified, a temporary file is created.
1468 1467 bz2 compression can be turned off.
1469 1468 The bundle file will be deleted in case of errors.
1470 1469 """
1471 1470
1472 1471 if bundletype == "HG20":
1473 1472 bundle = bundle20(ui)
1474 1473 bundle.setcompression(compression, compopts)
1475 1474 part = bundle.newpart('changegroup', data=cg.getchunks())
1476 1475 part.addparam('version', cg.version)
1477 1476 if 'clcount' in cg.extras:
1478 1477 part.addparam('nbchanges', str(cg.extras['clcount']),
1479 1478 mandatory=False)
1480 1479 chunkiter = bundle.getchunks()
1481 1480 else:
1482 1481 # compression argument is only for the bundle2 case
1483 1482 assert compression is None
1484 1483 if cg.version != '01':
1485 1484 raise error.Abort(_('old bundle types only supports v1 '
1486 1485 'changegroups'))
1487 1486 header, comp = bundletypes[bundletype]
1488 1487 if comp not in util.compengines.supportedbundletypes:
1489 1488 raise error.Abort(_('unknown stream compression type: %s')
1490 1489 % comp)
1491 1490 compengine = util.compengines.forbundletype(comp)
1492 1491 def chunkiter():
1493 1492 yield header
1494 1493 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1495 1494 yield chunk
1496 1495 chunkiter = chunkiter()
1497 1496
1498 1497 # parse the changegroup data, otherwise we will block
1499 1498 # in case of sshrepo because we don't know the end of the stream
1500 1499 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1501 1500
1502 1501 def combinechangegroupresults(op):
1503 1502 """logic to combine 0 or more addchangegroup results into one"""
1504 1503 results = [r.get('return', 0)
1505 1504 for r in op.records['changegroup']]
1506 1505 changedheads = 0
1507 1506 result = 1
1508 1507 for ret in results:
1509 1508 # If any changegroup result is 0, return 0
1510 1509 if ret == 0:
1511 1510 result = 0
1512 1511 break
1513 1512 if ret < -1:
1514 1513 changedheads += ret + 1
1515 1514 elif ret > 1:
1516 1515 changedheads += ret - 1
1517 1516 if changedheads > 0:
1518 1517 result = 1 + changedheads
1519 1518 elif changedheads < 0:
1520 1519 result = -1 + changedheads
1521 1520 return result
1522 1521
1523 1522 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
1524 1523 'targetphase'))
1525 1524 def handlechangegroup(op, inpart):
1526 1525 """apply a changegroup part on the repo
1527 1526
1528 1527 This is a very early implementation that will massive rework before being
1529 1528 inflicted to any end-user.
1530 1529 """
1531 1530 tr = op.gettransaction()
1532 1531 unpackerversion = inpart.params.get('version', '01')
1533 1532 # We should raise an appropriate exception here
1534 1533 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1535 1534 # the source and url passed here are overwritten by the one contained in
1536 1535 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1537 1536 nbchangesets = None
1538 1537 if 'nbchanges' in inpart.params:
1539 1538 nbchangesets = int(inpart.params.get('nbchanges'))
1540 1539 if ('treemanifest' in inpart.params and
1541 1540 'treemanifest' not in op.repo.requirements):
1542 1541 if len(op.repo.changelog) != 0:
1543 1542 raise error.Abort(_(
1544 1543 "bundle contains tree manifests, but local repo is "
1545 1544 "non-empty and does not use tree manifests"))
1546 1545 op.repo.requirements.add('treemanifest')
1547 1546 op.repo._applyopenerreqs()
1548 1547 op.repo._writerequirements()
1549 1548 extrakwargs = {}
1550 1549 targetphase = inpart.params.get('targetphase')
1551 1550 if targetphase is not None:
1552 1551 extrakwargs['targetphase'] = int(targetphase)
1553 1552 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
1554 1553 expectedtotal=nbchangesets, **extrakwargs)
1555 1554 if op.reply is not None:
1556 1555 # This is definitely not the final form of this
1557 1556 # return. But one need to start somewhere.
1558 1557 part = op.reply.newpart('reply:changegroup', mandatory=False)
1559 1558 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1560 1559 part.addparam('return', '%i' % ret, mandatory=False)
1561 1560 assert not inpart.read()
1562 1561
1563 1562 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1564 1563 ['digest:%s' % k for k in util.DIGESTS.keys()])
1565 1564 @parthandler('remote-changegroup', _remotechangegroupparams)
1566 1565 def handleremotechangegroup(op, inpart):
1567 1566 """apply a bundle10 on the repo, given an url and validation information
1568 1567
1569 1568 All the information about the remote bundle to import are given as
1570 1569 parameters. The parameters include:
1571 1570 - url: the url to the bundle10.
1572 1571 - size: the bundle10 file size. It is used to validate what was
1573 1572 retrieved by the client matches the server knowledge about the bundle.
1574 1573 - digests: a space separated list of the digest types provided as
1575 1574 parameters.
1576 1575 - digest:<digest-type>: the hexadecimal representation of the digest with
1577 1576 that name. Like the size, it is used to validate what was retrieved by
1578 1577 the client matches what the server knows about the bundle.
1579 1578
1580 1579 When multiple digest types are given, all of them are checked.
1581 1580 """
1582 1581 try:
1583 1582 raw_url = inpart.params['url']
1584 1583 except KeyError:
1585 1584 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1586 1585 parsed_url = util.url(raw_url)
1587 1586 if parsed_url.scheme not in capabilities['remote-changegroup']:
1588 1587 raise error.Abort(_('remote-changegroup does not support %s urls') %
1589 1588 parsed_url.scheme)
1590 1589
1591 1590 try:
1592 1591 size = int(inpart.params['size'])
1593 1592 except ValueError:
1594 1593 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1595 1594 % 'size')
1596 1595 except KeyError:
1597 1596 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1598 1597
1599 1598 digests = {}
1600 1599 for typ in inpart.params.get('digests', '').split():
1601 1600 param = 'digest:%s' % typ
1602 1601 try:
1603 1602 value = inpart.params[param]
1604 1603 except KeyError:
1605 1604 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1606 1605 param)
1607 1606 digests[typ] = value
1608 1607
1609 1608 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1610 1609
1611 1610 tr = op.gettransaction()
1612 1611 from . import exchange
1613 1612 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1614 1613 if not isinstance(cg, changegroup.cg1unpacker):
1615 1614 raise error.Abort(_('%s: not a bundle version 1.0') %
1616 1615 util.hidepassword(raw_url))
1617 1616 ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
1618 1617 if op.reply is not None:
1619 1618 # This is definitely not the final form of this
1620 1619 # return. But one need to start somewhere.
1621 1620 part = op.reply.newpart('reply:changegroup')
1622 1621 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1623 1622 part.addparam('return', '%i' % ret, mandatory=False)
1624 1623 try:
1625 1624 real_part.validate()
1626 1625 except error.Abort as e:
1627 1626 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1628 1627 (util.hidepassword(raw_url), str(e)))
1629 1628 assert not inpart.read()
1630 1629
1631 1630 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1632 1631 def handlereplychangegroup(op, inpart):
1633 1632 ret = int(inpart.params['return'])
1634 1633 replyto = int(inpart.params['in-reply-to'])
1635 1634 op.records.add('changegroup', {'return': ret}, replyto)
1636 1635
1637 1636 @parthandler('check:heads')
1638 1637 def handlecheckheads(op, inpart):
1639 1638 """check that head of the repo did not change
1640 1639
1641 1640 This is used to detect a push race when using unbundle.
1642 1641 This replaces the "heads" argument of unbundle."""
1643 1642 h = inpart.read(20)
1644 1643 heads = []
1645 1644 while len(h) == 20:
1646 1645 heads.append(h)
1647 1646 h = inpart.read(20)
1648 1647 assert not h
1649 1648 # Trigger a transaction so that we are guaranteed to have the lock now.
1650 1649 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1651 1650 op.gettransaction()
1652 1651 if sorted(heads) != sorted(op.repo.heads()):
1653 1652 raise error.PushRaced('repository changed while pushing - '
1654 1653 'please try again')
1655 1654
1656 1655 @parthandler('check:updated-heads')
1657 1656 def handlecheckupdatedheads(op, inpart):
1658 1657 """check for race on the heads touched by a push
1659 1658
1660 1659 This is similar to 'check:heads' but focus on the heads actually updated
1661 1660 during the push. If other activities happen on unrelated heads, it is
1662 1661 ignored.
1663 1662
1664 1663 This allow server with high traffic to avoid push contention as long as
1665 1664 unrelated parts of the graph are involved."""
1666 1665 h = inpart.read(20)
1667 1666 heads = []
1668 1667 while len(h) == 20:
1669 1668 heads.append(h)
1670 1669 h = inpart.read(20)
1671 1670 assert not h
1672 1671 # trigger a transaction so that we are guaranteed to have the lock now.
1673 1672 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1674 1673 op.gettransaction()
1675 1674
1676 1675 currentheads = set()
1677 1676 for ls in op.repo.branchmap().itervalues():
1678 1677 currentheads.update(ls)
1679 1678
1680 1679 for h in heads:
1681 1680 if h not in currentheads:
1682 1681 raise error.PushRaced('repository changed while pushing - '
1683 1682 'please try again')
1684 1683
1685 1684 @parthandler('output')
1686 1685 def handleoutput(op, inpart):
1687 1686 """forward output captured on the server to the client"""
1688 1687 for line in inpart.read().splitlines():
1689 1688 op.ui.status(_('remote: %s\n') % line)
1690 1689
1691 1690 @parthandler('replycaps')
1692 1691 def handlereplycaps(op, inpart):
1693 1692 """Notify that a reply bundle should be created
1694 1693
1695 1694 The payload contains the capabilities information for the reply"""
1696 1695 caps = decodecaps(inpart.read())
1697 1696 if op.reply is None:
1698 1697 op.reply = bundle20(op.ui, caps)
1699 1698
1700 1699 class AbortFromPart(error.Abort):
1701 1700 """Sub-class of Abort that denotes an error from a bundle2 part."""
1702 1701
1703 1702 @parthandler('error:abort', ('message', 'hint'))
1704 1703 def handleerrorabort(op, inpart):
1705 1704 """Used to transmit abort error over the wire"""
1706 1705 raise AbortFromPart(inpart.params['message'],
1707 1706 hint=inpart.params.get('hint'))
1708 1707
1709 1708 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1710 1709 'in-reply-to'))
1711 1710 def handleerrorpushkey(op, inpart):
1712 1711 """Used to transmit failure of a mandatory pushkey over the wire"""
1713 1712 kwargs = {}
1714 1713 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1715 1714 value = inpart.params.get(name)
1716 1715 if value is not None:
1717 1716 kwargs[name] = value
1718 1717 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1719 1718
1720 1719 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1721 1720 def handleerrorunsupportedcontent(op, inpart):
1722 1721 """Used to transmit unknown content error over the wire"""
1723 1722 kwargs = {}
1724 1723 parttype = inpart.params.get('parttype')
1725 1724 if parttype is not None:
1726 1725 kwargs['parttype'] = parttype
1727 1726 params = inpart.params.get('params')
1728 1727 if params is not None:
1729 1728 kwargs['params'] = params.split('\0')
1730 1729
1731 1730 raise error.BundleUnknownFeatureError(**kwargs)
1732 1731
1733 1732 @parthandler('error:pushraced', ('message',))
1734 1733 def handleerrorpushraced(op, inpart):
1735 1734 """Used to transmit push race error over the wire"""
1736 1735 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1737 1736
1738 1737 @parthandler('listkeys', ('namespace',))
1739 1738 def handlelistkeys(op, inpart):
1740 1739 """retrieve pushkey namespace content stored in a bundle2"""
1741 1740 namespace = inpart.params['namespace']
1742 1741 r = pushkey.decodekeys(inpart.read())
1743 1742 op.records.add('listkeys', (namespace, r))
1744 1743
1745 1744 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1746 1745 def handlepushkey(op, inpart):
1747 1746 """process a pushkey request"""
1748 1747 dec = pushkey.decode
1749 1748 namespace = dec(inpart.params['namespace'])
1750 1749 key = dec(inpart.params['key'])
1751 1750 old = dec(inpart.params['old'])
1752 1751 new = dec(inpart.params['new'])
1753 1752 # Grab the transaction to ensure that we have the lock before performing the
1754 1753 # pushkey.
1755 1754 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1756 1755 op.gettransaction()
1757 1756 ret = op.repo.pushkey(namespace, key, old, new)
1758 1757 record = {'namespace': namespace,
1759 1758 'key': key,
1760 1759 'old': old,
1761 1760 'new': new}
1762 1761 op.records.add('pushkey', record)
1763 1762 if op.reply is not None:
1764 1763 rpart = op.reply.newpart('reply:pushkey')
1765 1764 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1766 1765 rpart.addparam('return', '%i' % ret, mandatory=False)
1767 1766 if inpart.mandatory and not ret:
1768 1767 kwargs = {}
1769 1768 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1770 1769 if key in inpart.params:
1771 1770 kwargs[key] = inpart.params[key]
1772 1771 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1773 1772
1774 1773 def _readphaseheads(inpart):
1775 1774 headsbyphase = [[] for i in phases.allphases]
1776 1775 entrysize = struct.calcsize(_fphasesentry)
1777 1776 while True:
1778 1777 entry = inpart.read(entrysize)
1779 1778 if len(entry) < entrysize:
1780 1779 if entry:
1781 1780 raise error.Abort(_('bad phase-heads bundle part'))
1782 1781 break
1783 1782 phase, node = struct.unpack(_fphasesentry, entry)
1784 1783 headsbyphase[phase].append(node)
1785 1784 return headsbyphase
1786 1785
1787 1786 @parthandler('phase-heads')
1788 1787 def handlephases(op, inpart):
1789 1788 """apply phases from bundle part to repo"""
1790 1789 headsbyphase = _readphaseheads(inpart)
1791 1790 phases.updatephases(op.repo.unfiltered(), op.gettransaction(), headsbyphase)
1792 1791
1793 1792 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1794 1793 def handlepushkeyreply(op, inpart):
1795 1794 """retrieve the result of a pushkey request"""
1796 1795 ret = int(inpart.params['return'])
1797 1796 partid = int(inpart.params['in-reply-to'])
1798 1797 op.records.add('pushkey', {'return': ret}, partid)
1799 1798
1800 1799 @parthandler('obsmarkers')
1801 1800 def handleobsmarker(op, inpart):
1802 1801 """add a stream of obsmarkers to the repo"""
1803 1802 tr = op.gettransaction()
1804 1803 markerdata = inpart.read()
1805 1804 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1806 1805 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1807 1806 % len(markerdata))
1808 1807 # The mergemarkers call will crash if marker creation is not enabled.
1809 1808 # we want to avoid this if the part is advisory.
1810 1809 if not inpart.mandatory and op.repo.obsstore.readonly:
1811 1810 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1812 1811 return
1813 1812 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1814 1813 op.repo.invalidatevolatilesets()
1815 1814 if new:
1816 1815 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1817 1816 op.records.add('obsmarkers', {'new': new})
1818 1817 scmutil.registersummarycallback(op.repo, tr)
1819 1818 if op.reply is not None:
1820 1819 rpart = op.reply.newpart('reply:obsmarkers')
1821 1820 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1822 1821 rpart.addparam('new', '%i' % new, mandatory=False)
1823 1822
1824 1823
1825 1824 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1826 1825 def handleobsmarkerreply(op, inpart):
1827 1826 """retrieve the result of a pushkey request"""
1828 1827 ret = int(inpart.params['new'])
1829 1828 partid = int(inpart.params['in-reply-to'])
1830 1829 op.records.add('obsmarkers', {'new': ret}, partid)
1831 1830
1832 1831 @parthandler('hgtagsfnodes')
1833 1832 def handlehgtagsfnodes(op, inpart):
1834 1833 """Applies .hgtags fnodes cache entries to the local repo.
1835 1834
1836 1835 Payload is pairs of 20 byte changeset nodes and filenodes.
1837 1836 """
1838 1837 # Grab the transaction so we ensure that we have the lock at this point.
1839 1838 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1840 1839 op.gettransaction()
1841 1840 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1842 1841
1843 1842 count = 0
1844 1843 while True:
1845 1844 node = inpart.read(20)
1846 1845 fnode = inpart.read(20)
1847 1846 if len(node) < 20 or len(fnode) < 20:
1848 1847 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1849 1848 break
1850 1849 cache.setfnode(node, fnode)
1851 1850 count += 1
1852 1851
1853 1852 cache.write()
1854 1853 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
@@ -1,1009 +1,1009 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 dagutil,
24 24 discovery,
25 25 error,
26 26 mdiff,
27 27 phases,
28 28 pycompat,
29 29 util,
30 30 )
31 31
32 32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
33 33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
34 34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def writechunks(ui, chunks, filename, vfs=None):
64 64 """Write chunks to a file and return its filename.
65 65
66 66 The stream is assumed to be a bundle file.
67 67 Existing files will not be overwritten.
68 68 If no filename is specified, a temporary file is created.
69 69 """
70 70 fh = None
71 71 cleanup = None
72 72 try:
73 73 if filename:
74 74 if vfs:
75 75 fh = vfs.open(filename, "wb")
76 76 else:
77 77 # Increase default buffer size because default is usually
78 78 # small (4k is common on Linux).
79 79 fh = open(filename, "wb", 131072)
80 80 else:
81 81 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
82 82 fh = os.fdopen(fd, pycompat.sysstr("wb"))
83 83 cleanup = filename
84 84 for c in chunks:
85 85 fh.write(c)
86 86 cleanup = None
87 87 return filename
88 88 finally:
89 89 if fh is not None:
90 90 fh.close()
91 91 if cleanup is not None:
92 92 if filename and vfs:
93 93 vfs.unlink(cleanup)
94 94 else:
95 95 os.unlink(cleanup)
96 96
97 97 class cg1unpacker(object):
98 98 """Unpacker for cg1 changegroup streams.
99 99
100 100 A changegroup unpacker handles the framing of the revision data in
101 101 the wire format. Most consumers will want to use the apply()
102 102 method to add the changes from the changegroup to a repository.
103 103
104 104 If you're forwarding a changegroup unmodified to another consumer,
105 105 use getchunks(), which returns an iterator of changegroup
106 106 chunks. This is mostly useful for cases where you need to know the
107 107 data stream has ended by observing the end of the changegroup.
108 108
109 109 deltachunk() is useful only if you're applying delta data. Most
110 110 consumers should prefer apply() instead.
111 111
112 112 A few other public methods exist. Those are used only for
113 113 bundlerepo and some debug commands - their use is discouraged.
114 114 """
115 115 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
116 116 deltaheadersize = struct.calcsize(deltaheader)
117 117 version = '01'
118 118 _grouplistcount = 1 # One list of files after the manifests
119 119
120 120 def __init__(self, fh, alg, extras=None):
121 121 if alg is None:
122 122 alg = 'UN'
123 123 if alg not in util.compengines.supportedbundletypes:
124 124 raise error.Abort(_('unknown stream compression type: %s')
125 125 % alg)
126 126 if alg == 'BZ':
127 127 alg = '_truncatedBZ'
128 128
129 129 compengine = util.compengines.forbundletype(alg)
130 130 self._stream = compengine.decompressorreader(fh)
131 131 self._type = alg
132 132 self.extras = extras or {}
133 133 self.callback = None
134 134
135 135 # These methods (compressed, read, seek, tell) all appear to only
136 136 # be used by bundlerepo, but it's a little hard to tell.
137 137 def compressed(self):
138 138 return self._type is not None and self._type != 'UN'
139 139 def read(self, l):
140 140 return self._stream.read(l)
141 141 def seek(self, pos):
142 142 return self._stream.seek(pos)
143 143 def tell(self):
144 144 return self._stream.tell()
145 145 def close(self):
146 146 return self._stream.close()
147 147
148 148 def _chunklength(self):
149 149 d = readexactly(self._stream, 4)
150 150 l = struct.unpack(">l", d)[0]
151 151 if l <= 4:
152 152 if l:
153 153 raise error.Abort(_("invalid chunk length %d") % l)
154 154 return 0
155 155 if self.callback:
156 156 self.callback()
157 157 return l - 4
158 158
159 159 def changelogheader(self):
160 160 """v10 does not have a changelog header chunk"""
161 161 return {}
162 162
163 163 def manifestheader(self):
164 164 """v10 does not have a manifest header chunk"""
165 165 return {}
166 166
167 167 def filelogheader(self):
168 168 """return the header of the filelogs chunk, v10 only has the filename"""
169 169 l = self._chunklength()
170 170 if not l:
171 171 return {}
172 172 fname = readexactly(self._stream, l)
173 173 return {'filename': fname}
174 174
175 175 def _deltaheader(self, headertuple, prevnode):
176 176 node, p1, p2, cs = headertuple
177 177 if prevnode is None:
178 178 deltabase = p1
179 179 else:
180 180 deltabase = prevnode
181 181 flags = 0
182 182 return node, p1, p2, deltabase, cs, flags
183 183
184 184 def deltachunk(self, prevnode):
185 185 l = self._chunklength()
186 186 if not l:
187 187 return {}
188 188 headerdata = readexactly(self._stream, self.deltaheadersize)
189 189 header = struct.unpack(self.deltaheader, headerdata)
190 190 delta = readexactly(self._stream, l - self.deltaheadersize)
191 191 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
192 192 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
193 193 'deltabase': deltabase, 'delta': delta, 'flags': flags}
194 194
195 195 def getchunks(self):
196 196 """returns all the chunks contains in the bundle
197 197
198 198 Used when you need to forward the binary stream to a file or another
199 199 network API. To do so, it parse the changegroup data, otherwise it will
200 200 block in case of sshrepo because it don't know the end of the stream.
201 201 """
202 202 # an empty chunkgroup is the end of the changegroup
203 203 # a changegroup has at least 2 chunkgroups (changelog and manifest).
204 204 # after that, changegroup versions 1 and 2 have a series of groups
205 205 # with one group per file. changegroup 3 has a series of directory
206 206 # manifests before the files.
207 207 count = 0
208 208 emptycount = 0
209 209 while emptycount < self._grouplistcount:
210 210 empty = True
211 211 count += 1
212 212 while True:
213 213 chunk = getchunk(self)
214 214 if not chunk:
215 215 if empty and count > 2:
216 216 emptycount += 1
217 217 break
218 218 empty = False
219 219 yield chunkheader(len(chunk))
220 220 pos = 0
221 221 while pos < len(chunk):
222 222 next = pos + 2**20
223 223 yield chunk[pos:next]
224 224 pos = next
225 225 yield closechunk()
226 226
227 227 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
228 228 # We know that we'll never have more manifests than we had
229 229 # changesets.
230 230 self.callback = prog(_('manifests'), numchanges)
231 231 # no need to check for empty manifest group here:
232 232 # if the result of the merge of 1 and 2 is the same in 3 and 4,
233 233 # no new manifest will be created and the manifest group will
234 234 # be empty during the pull
235 235 self.manifestheader()
236 236 repo.manifestlog._revlog.addgroup(self, revmap, trp)
237 237 repo.ui.progress(_('manifests'), None)
238 238 self.callback = None
239 239
240 240 def apply(self, repo, tr, srctype, url, targetphase=phases.draft,
241 241 expectedtotal=None):
242 242 """Add the changegroup returned by source.read() to this repo.
243 243 srctype is a string like 'push', 'pull', or 'unbundle'. url is
244 244 the URL of the repo where this changegroup is coming from.
245 245
246 246 Return an integer summarizing the change to this repo:
247 247 - nothing changed or no source: 0
248 248 - more heads than before: 1+added heads (2..n)
249 249 - fewer heads than before: -1-removed heads (-2..-n)
250 250 - number of heads stays the same: 1
251 251 """
252 252 repo = repo.unfiltered()
253 253 def csmap(x):
254 254 repo.ui.debug("add changeset %s\n" % short(x))
255 255 return len(cl)
256 256
257 257 def revmap(x):
258 258 return cl.rev(x)
259 259
260 260 changesets = files = revisions = 0
261 261
262 262 try:
263 263 # The transaction may already carry source information. In this
264 264 # case we use the top level data. We overwrite the argument
265 265 # because we need to use the top level value (if they exist)
266 266 # in this function.
267 267 srctype = tr.hookargs.setdefault('source', srctype)
268 268 url = tr.hookargs.setdefault('url', url)
269 269 repo.hook('prechangegroup', throw=True, **tr.hookargs)
270 270
271 271 # write changelog data to temp files so concurrent readers
272 272 # will not see an inconsistent view
273 273 cl = repo.changelog
274 274 cl.delayupdate(tr)
275 275 oldheads = set(cl.heads())
276 276
277 277 trp = weakref.proxy(tr)
278 278 # pull off the changeset group
279 279 repo.ui.status(_("adding changesets\n"))
280 280 clstart = len(cl)
281 281 class prog(object):
282 282 def __init__(self, step, total):
283 283 self._step = step
284 284 self._total = total
285 285 self._count = 1
286 286 def __call__(self):
287 287 repo.ui.progress(self._step, self._count, unit=_('chunks'),
288 288 total=self._total)
289 289 self._count += 1
290 290 self.callback = prog(_('changesets'), expectedtotal)
291 291
292 292 efiles = set()
293 293 def onchangelog(cl, node):
294 294 efiles.update(cl.readfiles(node))
295 295
296 296 self.changelogheader()
297 297 cgnodes = cl.addgroup(self, csmap, trp, addrevisioncb=onchangelog)
298 298 efiles = len(efiles)
299 299
300 300 if not cgnodes:
301 301 repo.ui.develwarn('applied empty changegroup',
302 302 config='empty-changegroup')
303 303 clend = len(cl)
304 304 changesets = clend - clstart
305 305 repo.ui.progress(_('changesets'), None)
306 306 self.callback = None
307 307
308 308 # pull off the manifest group
309 309 repo.ui.status(_("adding manifests\n"))
310 310 self._unpackmanifests(repo, revmap, trp, prog, changesets)
311 311
312 312 needfiles = {}
313 313 if repo.ui.configbool('server', 'validate'):
314 314 cl = repo.changelog
315 315 ml = repo.manifestlog
316 316 # validate incoming csets have their manifests
317 317 for cset in xrange(clstart, clend):
318 318 mfnode = cl.changelogrevision(cset).manifest
319 319 mfest = ml[mfnode].readdelta()
320 320 # store file cgnodes we must see
321 321 for f, n in mfest.iteritems():
322 322 needfiles.setdefault(f, set()).add(n)
323 323
324 324 # process the files
325 325 repo.ui.status(_("adding file changes\n"))
326 326 newrevs, newfiles = _addchangegroupfiles(
327 327 repo, self, revmap, trp, efiles, needfiles)
328 328 revisions += newrevs
329 329 files += newfiles
330 330
331 331 deltaheads = 0
332 332 if oldheads:
333 333 heads = cl.heads()
334 334 deltaheads = len(heads) - len(oldheads)
335 335 for h in heads:
336 336 if h not in oldheads and repo[h].closesbranch():
337 337 deltaheads -= 1
338 338 htext = ""
339 339 if deltaheads:
340 340 htext = _(" (%+d heads)") % deltaheads
341 341
342 342 repo.ui.status(_("added %d changesets"
343 343 " with %d changes to %d files%s\n")
344 344 % (changesets, revisions, files, htext))
345 345 repo.invalidatevolatilesets()
346 346
347 347 if changesets > 0:
348 348 if 'node' not in tr.hookargs:
349 349 tr.hookargs['node'] = hex(cl.node(clstart))
350 350 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
351 351 hookargs = dict(tr.hookargs)
352 352 else:
353 353 hookargs = dict(tr.hookargs)
354 354 hookargs['node'] = hex(cl.node(clstart))
355 355 hookargs['node_last'] = hex(cl.node(clend - 1))
356 356 repo.hook('pretxnchangegroup', throw=True, **hookargs)
357 357
358 358 added = [cl.node(r) for r in xrange(clstart, clend)]
359 359 phaseall = None
360 360 if srctype in ('push', 'serve'):
361 361 # Old servers can not push the boundary themselves.
362 362 # New servers won't push the boundary if changeset already
363 363 # exists locally as secret
364 364 #
365 365 # We should not use added here but the list of all change in
366 366 # the bundle
367 367 if repo.publishing():
368 368 targetphase = phaseall = phases.public
369 369 else:
370 370 # closer target phase computation
371 371
372 372 # Those changesets have been pushed from the
373 373 # outside, their phases are going to be pushed
374 374 # alongside. Therefor `targetphase` is
375 375 # ignored.
376 376 targetphase = phaseall = phases.draft
377 377 if added:
378 378 phases.registernew(repo, tr, targetphase, added)
379 379 if phaseall is not None:
380 380 phases.advanceboundary(repo, tr, phaseall, cgnodes)
381 381
382 382 if changesets > 0:
383 383
384 384 def runhooks():
385 385 # These hooks run when the lock releases, not when the
386 386 # transaction closes. So it's possible for the changelog
387 387 # to have changed since we last saw it.
388 388 if clstart >= len(repo):
389 389 return
390 390
391 391 repo.hook("changegroup", **hookargs)
392 392
393 393 for n in added:
394 394 args = hookargs.copy()
395 395 args['node'] = hex(n)
396 396 del args['node_last']
397 397 repo.hook("incoming", **args)
398 398
399 399 newheads = [h for h in repo.heads()
400 400 if h not in oldheads]
401 401 repo.ui.log("incoming",
402 402 "%s incoming changes - new heads: %s\n",
403 403 len(added),
404 404 ', '.join([hex(c[:6]) for c in newheads]))
405 405
406 406 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
407 407 lambda tr: repo._afterlock(runhooks))
408 408 finally:
409 409 repo.ui.flush()
410 410 # never return 0 here:
411 411 if deltaheads < 0:
412 412 ret = deltaheads - 1
413 413 else:
414 414 ret = deltaheads + 1
415 return ret, added
415 return ret
416 416
417 417 class cg2unpacker(cg1unpacker):
418 418 """Unpacker for cg2 streams.
419 419
420 420 cg2 streams add support for generaldelta, so the delta header
421 421 format is slightly different. All other features about the data
422 422 remain the same.
423 423 """
424 424 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
425 425 deltaheadersize = struct.calcsize(deltaheader)
426 426 version = '02'
427 427
428 428 def _deltaheader(self, headertuple, prevnode):
429 429 node, p1, p2, deltabase, cs = headertuple
430 430 flags = 0
431 431 return node, p1, p2, deltabase, cs, flags
432 432
433 433 class cg3unpacker(cg2unpacker):
434 434 """Unpacker for cg3 streams.
435 435
436 436 cg3 streams add support for exchanging treemanifests and revlog
437 437 flags. It adds the revlog flags to the delta header and an empty chunk
438 438 separating manifests and files.
439 439 """
440 440 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
441 441 deltaheadersize = struct.calcsize(deltaheader)
442 442 version = '03'
443 443 _grouplistcount = 2 # One list of manifests and one list of files
444 444
445 445 def _deltaheader(self, headertuple, prevnode):
446 446 node, p1, p2, deltabase, cs, flags = headertuple
447 447 return node, p1, p2, deltabase, cs, flags
448 448
449 449 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
450 450 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
451 451 numchanges)
452 452 for chunkdata in iter(self.filelogheader, {}):
453 453 # If we get here, there are directory manifests in the changegroup
454 454 d = chunkdata["filename"]
455 455 repo.ui.debug("adding %s revisions\n" % d)
456 456 dirlog = repo.manifestlog._revlog.dirlog(d)
457 457 if not dirlog.addgroup(self, revmap, trp):
458 458 raise error.Abort(_("received dir revlog group is empty"))
459 459
460 460 class headerlessfixup(object):
461 461 def __init__(self, fh, h):
462 462 self._h = h
463 463 self._fh = fh
464 464 def read(self, n):
465 465 if self._h:
466 466 d, self._h = self._h[:n], self._h[n:]
467 467 if len(d) < n:
468 468 d += readexactly(self._fh, n - len(d))
469 469 return d
470 470 return readexactly(self._fh, n)
471 471
472 472 class cg1packer(object):
473 473 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
474 474 version = '01'
475 475 def __init__(self, repo, bundlecaps=None):
476 476 """Given a source repo, construct a bundler.
477 477
478 478 bundlecaps is optional and can be used to specify the set of
479 479 capabilities which can be used to build the bundle. While bundlecaps is
480 480 unused in core Mercurial, extensions rely on this feature to communicate
481 481 capabilities to customize the changegroup packer.
482 482 """
483 483 # Set of capabilities we can use to build the bundle.
484 484 if bundlecaps is None:
485 485 bundlecaps = set()
486 486 self._bundlecaps = bundlecaps
487 487 # experimental config: bundle.reorder
488 488 reorder = repo.ui.config('bundle', 'reorder')
489 489 if reorder == 'auto':
490 490 reorder = None
491 491 else:
492 492 reorder = util.parsebool(reorder)
493 493 self._repo = repo
494 494 self._reorder = reorder
495 495 self._progress = repo.ui.progress
496 496 if self._repo.ui.verbose and not self._repo.ui.debugflag:
497 497 self._verbosenote = self._repo.ui.note
498 498 else:
499 499 self._verbosenote = lambda s: None
500 500
501 501 def close(self):
502 502 return closechunk()
503 503
504 504 def fileheader(self, fname):
505 505 return chunkheader(len(fname)) + fname
506 506
507 507 # Extracted both for clarity and for overriding in extensions.
508 508 def _sortgroup(self, revlog, nodelist, lookup):
509 509 """Sort nodes for change group and turn them into revnums."""
510 510 # for generaldelta revlogs, we linearize the revs; this will both be
511 511 # much quicker and generate a much smaller bundle
512 512 if (revlog._generaldelta and self._reorder is None) or self._reorder:
513 513 dag = dagutil.revlogdag(revlog)
514 514 return dag.linearize(set(revlog.rev(n) for n in nodelist))
515 515 else:
516 516 return sorted([revlog.rev(n) for n in nodelist])
517 517
518 518 def group(self, nodelist, revlog, lookup, units=None):
519 519 """Calculate a delta group, yielding a sequence of changegroup chunks
520 520 (strings).
521 521
522 522 Given a list of changeset revs, return a set of deltas and
523 523 metadata corresponding to nodes. The first delta is
524 524 first parent(nodelist[0]) -> nodelist[0], the receiver is
525 525 guaranteed to have this parent as it has all history before
526 526 these changesets. In the case firstparent is nullrev the
527 527 changegroup starts with a full revision.
528 528
529 529 If units is not None, progress detail will be generated, units specifies
530 530 the type of revlog that is touched (changelog, manifest, etc.).
531 531 """
532 532 # if we don't have any revisions touched by these changesets, bail
533 533 if len(nodelist) == 0:
534 534 yield self.close()
535 535 return
536 536
537 537 revs = self._sortgroup(revlog, nodelist, lookup)
538 538
539 539 # add the parent of the first rev
540 540 p = revlog.parentrevs(revs[0])[0]
541 541 revs.insert(0, p)
542 542
543 543 # build deltas
544 544 total = len(revs) - 1
545 545 msgbundling = _('bundling')
546 546 for r in xrange(len(revs) - 1):
547 547 if units is not None:
548 548 self._progress(msgbundling, r + 1, unit=units, total=total)
549 549 prev, curr = revs[r], revs[r + 1]
550 550 linknode = lookup(revlog.node(curr))
551 551 for c in self.revchunk(revlog, curr, prev, linknode):
552 552 yield c
553 553
554 554 if units is not None:
555 555 self._progress(msgbundling, None)
556 556 yield self.close()
557 557
558 558 # filter any nodes that claim to be part of the known set
559 559 def prune(self, revlog, missing, commonrevs):
560 560 rr, rl = revlog.rev, revlog.linkrev
561 561 return [n for n in missing if rl(rr(n)) not in commonrevs]
562 562
563 563 def _packmanifests(self, dir, mfnodes, lookuplinknode):
564 564 """Pack flat manifests into a changegroup stream."""
565 565 assert not dir
566 566 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
567 567 lookuplinknode, units=_('manifests')):
568 568 yield chunk
569 569
570 570 def _manifestsdone(self):
571 571 return ''
572 572
573 573 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
574 574 '''yield a sequence of changegroup chunks (strings)'''
575 575 repo = self._repo
576 576 cl = repo.changelog
577 577
578 578 clrevorder = {}
579 579 mfs = {} # needed manifests
580 580 fnodes = {} # needed file nodes
581 581 changedfiles = set()
582 582
583 583 # Callback for the changelog, used to collect changed files and manifest
584 584 # nodes.
585 585 # Returns the linkrev node (identity in the changelog case).
586 586 def lookupcl(x):
587 587 c = cl.read(x)
588 588 clrevorder[x] = len(clrevorder)
589 589 n = c[0]
590 590 # record the first changeset introducing this manifest version
591 591 mfs.setdefault(n, x)
592 592 # Record a complete list of potentially-changed files in
593 593 # this manifest.
594 594 changedfiles.update(c[3])
595 595 return x
596 596
597 597 self._verbosenote(_('uncompressed size of bundle content:\n'))
598 598 size = 0
599 599 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
600 600 size += len(chunk)
601 601 yield chunk
602 602 self._verbosenote(_('%8.i (changelog)\n') % size)
603 603
604 604 # We need to make sure that the linkrev in the changegroup refers to
605 605 # the first changeset that introduced the manifest or file revision.
606 606 # The fastpath is usually safer than the slowpath, because the filelogs
607 607 # are walked in revlog order.
608 608 #
609 609 # When taking the slowpath with reorder=None and the manifest revlog
610 610 # uses generaldelta, the manifest may be walked in the "wrong" order.
611 611 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
612 612 # cc0ff93d0c0c).
613 613 #
614 614 # When taking the fastpath, we are only vulnerable to reordering
615 615 # of the changelog itself. The changelog never uses generaldelta, so
616 616 # it is only reordered when reorder=True. To handle this case, we
617 617 # simply take the slowpath, which already has the 'clrevorder' logic.
618 618 # This was also fixed in cc0ff93d0c0c.
619 619 fastpathlinkrev = fastpathlinkrev and not self._reorder
620 620 # Treemanifests don't work correctly with fastpathlinkrev
621 621 # either, because we don't discover which directory nodes to
622 622 # send along with files. This could probably be fixed.
623 623 fastpathlinkrev = fastpathlinkrev and (
624 624 'treemanifest' not in repo.requirements)
625 625
626 626 for chunk in self.generatemanifests(commonrevs, clrevorder,
627 627 fastpathlinkrev, mfs, fnodes):
628 628 yield chunk
629 629 mfs.clear()
630 630 clrevs = set(cl.rev(x) for x in clnodes)
631 631
632 632 if not fastpathlinkrev:
633 633 def linknodes(unused, fname):
634 634 return fnodes.get(fname, {})
635 635 else:
636 636 cln = cl.node
637 637 def linknodes(filerevlog, fname):
638 638 llr = filerevlog.linkrev
639 639 fln = filerevlog.node
640 640 revs = ((r, llr(r)) for r in filerevlog)
641 641 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
642 642
643 643 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
644 644 source):
645 645 yield chunk
646 646
647 647 yield self.close()
648 648
649 649 if clnodes:
650 650 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
651 651
652 652 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
653 653 fnodes):
654 654 repo = self._repo
655 655 mfl = repo.manifestlog
656 656 dirlog = mfl._revlog.dirlog
657 657 tmfnodes = {'': mfs}
658 658
659 659 # Callback for the manifest, used to collect linkrevs for filelog
660 660 # revisions.
661 661 # Returns the linkrev node (collected in lookupcl).
662 662 def makelookupmflinknode(dir):
663 663 if fastpathlinkrev:
664 664 assert not dir
665 665 return mfs.__getitem__
666 666
667 667 def lookupmflinknode(x):
668 668 """Callback for looking up the linknode for manifests.
669 669
670 670 Returns the linkrev node for the specified manifest.
671 671
672 672 SIDE EFFECT:
673 673
674 674 1) fclnodes gets populated with the list of relevant
675 675 file nodes if we're not using fastpathlinkrev
676 676 2) When treemanifests are in use, collects treemanifest nodes
677 677 to send
678 678
679 679 Note that this means manifests must be completely sent to
680 680 the client before you can trust the list of files and
681 681 treemanifests to send.
682 682 """
683 683 clnode = tmfnodes[dir][x]
684 684 mdata = mfl.get(dir, x).readfast(shallow=True)
685 685 for p, n, fl in mdata.iterentries():
686 686 if fl == 't': # subdirectory manifest
687 687 subdir = dir + p + '/'
688 688 tmfclnodes = tmfnodes.setdefault(subdir, {})
689 689 tmfclnode = tmfclnodes.setdefault(n, clnode)
690 690 if clrevorder[clnode] < clrevorder[tmfclnode]:
691 691 tmfclnodes[n] = clnode
692 692 else:
693 693 f = dir + p
694 694 fclnodes = fnodes.setdefault(f, {})
695 695 fclnode = fclnodes.setdefault(n, clnode)
696 696 if clrevorder[clnode] < clrevorder[fclnode]:
697 697 fclnodes[n] = clnode
698 698 return clnode
699 699 return lookupmflinknode
700 700
701 701 size = 0
702 702 while tmfnodes:
703 703 dir = min(tmfnodes)
704 704 nodes = tmfnodes[dir]
705 705 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
706 706 if not dir or prunednodes:
707 707 for x in self._packmanifests(dir, prunednodes,
708 708 makelookupmflinknode(dir)):
709 709 size += len(x)
710 710 yield x
711 711 del tmfnodes[dir]
712 712 self._verbosenote(_('%8.i (manifests)\n') % size)
713 713 yield self._manifestsdone()
714 714
715 715 # The 'source' parameter is useful for extensions
716 716 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
717 717 repo = self._repo
718 718 progress = self._progress
719 719 msgbundling = _('bundling')
720 720
721 721 total = len(changedfiles)
722 722 # for progress output
723 723 msgfiles = _('files')
724 724 for i, fname in enumerate(sorted(changedfiles)):
725 725 filerevlog = repo.file(fname)
726 726 if not filerevlog:
727 727 raise error.Abort(_("empty or missing revlog for %s") % fname)
728 728
729 729 linkrevnodes = linknodes(filerevlog, fname)
730 730 # Lookup for filenodes, we collected the linkrev nodes above in the
731 731 # fastpath case and with lookupmf in the slowpath case.
732 732 def lookupfilelog(x):
733 733 return linkrevnodes[x]
734 734
735 735 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
736 736 if filenodes:
737 737 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
738 738 total=total)
739 739 h = self.fileheader(fname)
740 740 size = len(h)
741 741 yield h
742 742 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
743 743 size += len(chunk)
744 744 yield chunk
745 745 self._verbosenote(_('%8.i %s\n') % (size, fname))
746 746 progress(msgbundling, None)
747 747
748 748 def deltaparent(self, revlog, rev, p1, p2, prev):
749 749 return prev
750 750
751 751 def revchunk(self, revlog, rev, prev, linknode):
752 752 node = revlog.node(rev)
753 753 p1, p2 = revlog.parentrevs(rev)
754 754 base = self.deltaparent(revlog, rev, p1, p2, prev)
755 755
756 756 prefix = ''
757 757 if revlog.iscensored(base) or revlog.iscensored(rev):
758 758 try:
759 759 delta = revlog.revision(node, raw=True)
760 760 except error.CensoredNodeError as e:
761 761 delta = e.tombstone
762 762 if base == nullrev:
763 763 prefix = mdiff.trivialdiffheader(len(delta))
764 764 else:
765 765 baselen = revlog.rawsize(base)
766 766 prefix = mdiff.replacediffheader(baselen, len(delta))
767 767 elif base == nullrev:
768 768 delta = revlog.revision(node, raw=True)
769 769 prefix = mdiff.trivialdiffheader(len(delta))
770 770 else:
771 771 delta = revlog.revdiff(base, rev)
772 772 p1n, p2n = revlog.parents(node)
773 773 basenode = revlog.node(base)
774 774 flags = revlog.flags(rev)
775 775 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
776 776 meta += prefix
777 777 l = len(meta) + len(delta)
778 778 yield chunkheader(l)
779 779 yield meta
780 780 yield delta
781 781 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
782 782 # do nothing with basenode, it is implicitly the previous one in HG10
783 783 # do nothing with flags, it is implicitly 0 for cg1 and cg2
784 784 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
785 785
786 786 class cg2packer(cg1packer):
787 787 version = '02'
788 788 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
789 789
790 790 def __init__(self, repo, bundlecaps=None):
791 791 super(cg2packer, self).__init__(repo, bundlecaps)
792 792 if self._reorder is None:
793 793 # Since generaldelta is directly supported by cg2, reordering
794 794 # generally doesn't help, so we disable it by default (treating
795 795 # bundle.reorder=auto just like bundle.reorder=False).
796 796 self._reorder = False
797 797
798 798 def deltaparent(self, revlog, rev, p1, p2, prev):
799 799 dp = revlog.deltaparent(rev)
800 800 if dp == nullrev and revlog.storedeltachains:
801 801 # Avoid sending full revisions when delta parent is null. Pick prev
802 802 # in that case. It's tempting to pick p1 in this case, as p1 will
803 803 # be smaller in the common case. However, computing a delta against
804 804 # p1 may require resolving the raw text of p1, which could be
805 805 # expensive. The revlog caches should have prev cached, meaning
806 806 # less CPU for changegroup generation. There is likely room to add
807 807 # a flag and/or config option to control this behavior.
808 808 return prev
809 809 elif dp == nullrev:
810 810 # revlog is configured to use full snapshot for a reason,
811 811 # stick to full snapshot.
812 812 return nullrev
813 813 elif dp not in (p1, p2, prev):
814 814 # Pick prev when we can't be sure remote has the base revision.
815 815 return prev
816 816 else:
817 817 return dp
818 818
819 819 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
820 820 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
821 821 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
822 822
823 823 class cg3packer(cg2packer):
824 824 version = '03'
825 825 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
826 826
827 827 def _packmanifests(self, dir, mfnodes, lookuplinknode):
828 828 if dir:
829 829 yield self.fileheader(dir)
830 830
831 831 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
832 832 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
833 833 units=_('manifests')):
834 834 yield chunk
835 835
836 836 def _manifestsdone(self):
837 837 return self.close()
838 838
839 839 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
840 840 return struct.pack(
841 841 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
842 842
843 843 _packermap = {'01': (cg1packer, cg1unpacker),
844 844 # cg2 adds support for exchanging generaldelta
845 845 '02': (cg2packer, cg2unpacker),
846 846 # cg3 adds support for exchanging revlog flags and treemanifests
847 847 '03': (cg3packer, cg3unpacker),
848 848 }
849 849
850 850 def allsupportedversions(repo):
851 851 versions = set(_packermap.keys())
852 852 if not (repo.ui.configbool('experimental', 'changegroup3') or
853 853 repo.ui.configbool('experimental', 'treemanifest') or
854 854 'treemanifest' in repo.requirements):
855 855 versions.discard('03')
856 856 return versions
857 857
858 858 # Changegroup versions that can be applied to the repo
859 859 def supportedincomingversions(repo):
860 860 return allsupportedversions(repo)
861 861
862 862 # Changegroup versions that can be created from the repo
863 863 def supportedoutgoingversions(repo):
864 864 versions = allsupportedversions(repo)
865 865 if 'treemanifest' in repo.requirements:
866 866 # Versions 01 and 02 support only flat manifests and it's just too
867 867 # expensive to convert between the flat manifest and tree manifest on
868 868 # the fly. Since tree manifests are hashed differently, all of history
869 869 # would have to be converted. Instead, we simply don't even pretend to
870 870 # support versions 01 and 02.
871 871 versions.discard('01')
872 872 versions.discard('02')
873 873 return versions
874 874
875 875 def safeversion(repo):
876 876 # Finds the smallest version that it's safe to assume clients of the repo
877 877 # will support. For example, all hg versions that support generaldelta also
878 878 # support changegroup 02.
879 879 versions = supportedoutgoingversions(repo)
880 880 if 'generaldelta' in repo.requirements:
881 881 versions.discard('01')
882 882 assert versions
883 883 return min(versions)
884 884
885 885 def getbundler(version, repo, bundlecaps=None):
886 886 assert version in supportedoutgoingversions(repo)
887 887 return _packermap[version][0](repo, bundlecaps)
888 888
889 889 def getunbundler(version, fh, alg, extras=None):
890 890 return _packermap[version][1](fh, alg, extras=extras)
891 891
892 892 def _changegroupinfo(repo, nodes, source):
893 893 if repo.ui.verbose or source == 'bundle':
894 894 repo.ui.status(_("%d changesets found\n") % len(nodes))
895 895 if repo.ui.debugflag:
896 896 repo.ui.debug("list of changesets:\n")
897 897 for node in nodes:
898 898 repo.ui.debug("%s\n" % hex(node))
899 899
900 900 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
901 901 repo = repo.unfiltered()
902 902 commonrevs = outgoing.common
903 903 csets = outgoing.missing
904 904 heads = outgoing.missingheads
905 905 # We go through the fast path if we get told to, or if all (unfiltered
906 906 # heads have been requested (since we then know there all linkrevs will
907 907 # be pulled by the client).
908 908 heads.sort()
909 909 fastpathlinkrev = fastpath or (
910 910 repo.filtername is None and heads == sorted(repo.heads()))
911 911
912 912 repo.hook('preoutgoing', throw=True, source=source)
913 913 _changegroupinfo(repo, csets, source)
914 914 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
915 915
916 916 def getsubset(repo, outgoing, bundler, source, fastpath=False):
917 917 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
918 918 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
919 919 {'clcount': len(outgoing.missing)})
920 920
921 921 def changegroupsubset(repo, roots, heads, source, version='01'):
922 922 """Compute a changegroup consisting of all the nodes that are
923 923 descendants of any of the roots and ancestors of any of the heads.
924 924 Return a chunkbuffer object whose read() method will return
925 925 successive changegroup chunks.
926 926
927 927 It is fairly complex as determining which filenodes and which
928 928 manifest nodes need to be included for the changeset to be complete
929 929 is non-trivial.
930 930
931 931 Another wrinkle is doing the reverse, figuring out which changeset in
932 932 the changegroup a particular filenode or manifestnode belongs to.
933 933 """
934 934 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
935 935 bundler = getbundler(version, repo)
936 936 return getsubset(repo, outgoing, bundler, source)
937 937
938 938 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
939 939 version='01'):
940 940 """Like getbundle, but taking a discovery.outgoing as an argument.
941 941
942 942 This is only implemented for local repos and reuses potentially
943 943 precomputed sets in outgoing. Returns a raw changegroup generator."""
944 944 if not outgoing.missing:
945 945 return None
946 946 bundler = getbundler(version, repo, bundlecaps)
947 947 return getsubsetraw(repo, outgoing, bundler, source)
948 948
949 949 def getchangegroup(repo, source, outgoing, bundlecaps=None,
950 950 version='01'):
951 951 """Like getbundle, but taking a discovery.outgoing as an argument.
952 952
953 953 This is only implemented for local repos and reuses potentially
954 954 precomputed sets in outgoing."""
955 955 if not outgoing.missing:
956 956 return None
957 957 bundler = getbundler(version, repo, bundlecaps)
958 958 return getsubset(repo, outgoing, bundler, source)
959 959
960 960 def getlocalchangegroup(repo, *args, **kwargs):
961 961 repo.ui.deprecwarn('getlocalchangegroup is deprecated, use getchangegroup',
962 962 '4.3')
963 963 return getchangegroup(repo, *args, **kwargs)
964 964
965 965 def changegroup(repo, basenodes, source):
966 966 # to avoid a race we use changegroupsubset() (issue1320)
967 967 return changegroupsubset(repo, basenodes, repo.heads(), source)
968 968
969 969 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
970 970 revisions = 0
971 971 files = 0
972 972 for chunkdata in iter(source.filelogheader, {}):
973 973 files += 1
974 974 f = chunkdata["filename"]
975 975 repo.ui.debug("adding %s revisions\n" % f)
976 976 repo.ui.progress(_('files'), files, unit=_('files'),
977 977 total=expectedfiles)
978 978 fl = repo.file(f)
979 979 o = len(fl)
980 980 try:
981 981 if not fl.addgroup(source, revmap, trp):
982 982 raise error.Abort(_("received file revlog group is empty"))
983 983 except error.CensoredBaseError as e:
984 984 raise error.Abort(_("received delta base is censored: %s") % e)
985 985 revisions += len(fl) - o
986 986 if f in needfiles:
987 987 needs = needfiles[f]
988 988 for new in xrange(o, len(fl)):
989 989 n = fl.node(new)
990 990 if n in needs:
991 991 needs.remove(n)
992 992 else:
993 993 raise error.Abort(
994 994 _("received spurious file revlog entry"))
995 995 if not needs:
996 996 del needfiles[f]
997 997 repo.ui.progress(_('files'), None)
998 998
999 999 for f, needs in needfiles.iteritems():
1000 1000 fl = repo.file(f)
1001 1001 for n in needs:
1002 1002 try:
1003 1003 fl.rev(n)
1004 1004 except error.LookupError:
1005 1005 raise error.Abort(
1006 1006 _('missing file data for %s:%s - run hg verify') %
1007 1007 (f, hex(n)))
1008 1008
1009 1009 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now