##// END OF EJS Templates
generaldelta: mark experimental reordering option
Matt Mackall -
r25831:578fc979 default
parent child Browse files
Show More
@@ -1,891 +1,892 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import weakref
9 9 from i18n import _
10 10 from node import nullrev, nullid, hex, short
11 11 import mdiff, util, dagutil
12 12 import struct, os, bz2, zlib, tempfile
13 13 import discovery, error, phases, branchmap
14 14
15 15 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
16 16 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
17 17
18 18 def readexactly(stream, n):
19 19 '''read n bytes from stream.read and abort if less was available'''
20 20 s = stream.read(n)
21 21 if len(s) < n:
22 22 raise util.Abort(_("stream ended unexpectedly"
23 23 " (got %d bytes, expected %d)")
24 24 % (len(s), n))
25 25 return s
26 26
27 27 def getchunk(stream):
28 28 """return the next chunk from stream as a string"""
29 29 d = readexactly(stream, 4)
30 30 l = struct.unpack(">l", d)[0]
31 31 if l <= 4:
32 32 if l:
33 33 raise util.Abort(_("invalid chunk length %d") % l)
34 34 return ""
35 35 return readexactly(stream, l - 4)
36 36
37 37 def chunkheader(length):
38 38 """return a changegroup chunk header (string)"""
39 39 return struct.pack(">l", length + 4)
40 40
41 41 def closechunk():
42 42 """return a changegroup chunk header (string) for a zero-length chunk"""
43 43 return struct.pack(">l", 0)
44 44
45 45 def combineresults(results):
46 46 """logic to combine 0 or more addchangegroup results into one"""
47 47 changedheads = 0
48 48 result = 1
49 49 for ret in results:
50 50 # If any changegroup result is 0, return 0
51 51 if ret == 0:
52 52 result = 0
53 53 break
54 54 if ret < -1:
55 55 changedheads += ret + 1
56 56 elif ret > 1:
57 57 changedheads += ret - 1
58 58 if changedheads > 0:
59 59 result = 1 + changedheads
60 60 elif changedheads < 0:
61 61 result = -1 + changedheads
62 62 return result
63 63
64 64 class nocompress(object):
65 65 def compress(self, x):
66 66 return x
67 67 def flush(self):
68 68 return ""
69 69
70 70 bundletypes = {
71 71 "": ("", nocompress), # only when using unbundle on ssh and old http servers
72 72 # since the unification ssh accepts a header but there
73 73 # is no capability signaling it.
74 74 "HG20": (), # special-cased below
75 75 "HG10UN": ("HG10UN", nocompress),
76 76 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
77 77 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
78 78 }
79 79
80 80 # hgweb uses this list to communicate its preferred type
81 81 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
82 82
83 83 def writebundle(ui, cg, filename, bundletype, vfs=None):
84 84 """Write a bundle file and return its filename.
85 85
86 86 Existing files will not be overwritten.
87 87 If no filename is specified, a temporary file is created.
88 88 bz2 compression can be turned off.
89 89 The bundle file will be deleted in case of errors.
90 90 """
91 91
92 92 fh = None
93 93 cleanup = None
94 94 try:
95 95 if filename:
96 96 if vfs:
97 97 fh = vfs.open(filename, "wb")
98 98 else:
99 99 fh = open(filename, "wb")
100 100 else:
101 101 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
102 102 fh = os.fdopen(fd, "wb")
103 103 cleanup = filename
104 104
105 105 if bundletype == "HG20":
106 106 import bundle2
107 107 bundle = bundle2.bundle20(ui)
108 108 part = bundle.newpart('changegroup', data=cg.getchunks())
109 109 part.addparam('version', cg.version)
110 110 z = nocompress()
111 111 chunkiter = bundle.getchunks()
112 112 else:
113 113 if cg.version != '01':
114 114 raise util.Abort(_('old bundle types only supports v1 '
115 115 'changegroups'))
116 116 header, compressor = bundletypes[bundletype]
117 117 fh.write(header)
118 118 z = compressor()
119 119 chunkiter = cg.getchunks()
120 120
121 121 # parse the changegroup data, otherwise we will block
122 122 # in case of sshrepo because we don't know the end of the stream
123 123
124 124 # an empty chunkgroup is the end of the changegroup
125 125 # a changegroup has at least 2 chunkgroups (changelog and manifest).
126 126 # after that, an empty chunkgroup is the end of the changegroup
127 127 for chunk in chunkiter:
128 128 fh.write(z.compress(chunk))
129 129 fh.write(z.flush())
130 130 cleanup = None
131 131 return filename
132 132 finally:
133 133 if fh is not None:
134 134 fh.close()
135 135 if cleanup is not None:
136 136 if filename and vfs:
137 137 vfs.unlink(cleanup)
138 138 else:
139 139 os.unlink(cleanup)
140 140
141 141 def decompressor(fh, alg):
142 142 if alg == 'UN':
143 143 return fh
144 144 elif alg == 'GZ':
145 145 def generator(f):
146 146 zd = zlib.decompressobj()
147 147 for chunk in util.filechunkiter(f):
148 148 yield zd.decompress(chunk)
149 149 elif alg == 'BZ':
150 150 def generator(f):
151 151 zd = bz2.BZ2Decompressor()
152 152 zd.decompress("BZ")
153 153 for chunk in util.filechunkiter(f, 4096):
154 154 yield zd.decompress(chunk)
155 155 else:
156 156 raise util.Abort("unknown bundle compression '%s'" % alg)
157 157 return util.chunkbuffer(generator(fh))
158 158
159 159 class cg1unpacker(object):
160 160 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
161 161 deltaheadersize = struct.calcsize(deltaheader)
162 162 version = '01'
163 163 def __init__(self, fh, alg):
164 164 self._stream = decompressor(fh, alg)
165 165 self._type = alg
166 166 self.callback = None
167 167 def compressed(self):
168 168 return self._type != 'UN'
169 169 def read(self, l):
170 170 return self._stream.read(l)
171 171 def seek(self, pos):
172 172 return self._stream.seek(pos)
173 173 def tell(self):
174 174 return self._stream.tell()
175 175 def close(self):
176 176 return self._stream.close()
177 177
178 178 def chunklength(self):
179 179 d = readexactly(self._stream, 4)
180 180 l = struct.unpack(">l", d)[0]
181 181 if l <= 4:
182 182 if l:
183 183 raise util.Abort(_("invalid chunk length %d") % l)
184 184 return 0
185 185 if self.callback:
186 186 self.callback()
187 187 return l - 4
188 188
189 189 def changelogheader(self):
190 190 """v10 does not have a changelog header chunk"""
191 191 return {}
192 192
193 193 def manifestheader(self):
194 194 """v10 does not have a manifest header chunk"""
195 195 return {}
196 196
197 197 def filelogheader(self):
198 198 """return the header of the filelogs chunk, v10 only has the filename"""
199 199 l = self.chunklength()
200 200 if not l:
201 201 return {}
202 202 fname = readexactly(self._stream, l)
203 203 return {'filename': fname}
204 204
205 205 def _deltaheader(self, headertuple, prevnode):
206 206 node, p1, p2, cs = headertuple
207 207 if prevnode is None:
208 208 deltabase = p1
209 209 else:
210 210 deltabase = prevnode
211 211 return node, p1, p2, deltabase, cs
212 212
213 213 def deltachunk(self, prevnode):
214 214 l = self.chunklength()
215 215 if not l:
216 216 return {}
217 217 headerdata = readexactly(self._stream, self.deltaheadersize)
218 218 header = struct.unpack(self.deltaheader, headerdata)
219 219 delta = readexactly(self._stream, l - self.deltaheadersize)
220 220 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
221 221 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
222 222 'deltabase': deltabase, 'delta': delta}
223 223
224 224 def getchunks(self):
225 225 """returns all the chunks contains in the bundle
226 226
227 227 Used when you need to forward the binary stream to a file or another
228 228 network API. To do so, it parse the changegroup data, otherwise it will
229 229 block in case of sshrepo because it don't know the end of the stream.
230 230 """
231 231 # an empty chunkgroup is the end of the changegroup
232 232 # a changegroup has at least 2 chunkgroups (changelog and manifest).
233 233 # after that, an empty chunkgroup is the end of the changegroup
234 234 empty = False
235 235 count = 0
236 236 while not empty or count <= 2:
237 237 empty = True
238 238 count += 1
239 239 while True:
240 240 chunk = getchunk(self)
241 241 if not chunk:
242 242 break
243 243 empty = False
244 244 yield chunkheader(len(chunk))
245 245 pos = 0
246 246 while pos < len(chunk):
247 247 next = pos + 2**20
248 248 yield chunk[pos:next]
249 249 pos = next
250 250 yield closechunk()
251 251
252 252 class cg2unpacker(cg1unpacker):
253 253 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
254 254 deltaheadersize = struct.calcsize(deltaheader)
255 255 version = '02'
256 256
257 257 def _deltaheader(self, headertuple, prevnode):
258 258 node, p1, p2, deltabase, cs = headertuple
259 259 return node, p1, p2, deltabase, cs
260 260
261 261 class headerlessfixup(object):
262 262 def __init__(self, fh, h):
263 263 self._h = h
264 264 self._fh = fh
265 265 def read(self, n):
266 266 if self._h:
267 267 d, self._h = self._h[:n], self._h[n:]
268 268 if len(d) < n:
269 269 d += readexactly(self._fh, n - len(d))
270 270 return d
271 271 return readexactly(self._fh, n)
272 272
273 273 class cg1packer(object):
274 274 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
275 275 version = '01'
276 276 def __init__(self, repo, bundlecaps=None):
277 277 """Given a source repo, construct a bundler.
278 278
279 279 bundlecaps is optional and can be used to specify the set of
280 280 capabilities which can be used to build the bundle.
281 281 """
282 282 # Set of capabilities we can use to build the bundle.
283 283 if bundlecaps is None:
284 284 bundlecaps = set()
285 285 self._bundlecaps = bundlecaps
286 # experimental config: bundle.reorder
286 287 reorder = repo.ui.config('bundle', 'reorder', 'auto')
287 288 if reorder == 'auto':
288 289 reorder = None
289 290 else:
290 291 reorder = util.parsebool(reorder)
291 292 self._repo = repo
292 293 self._reorder = reorder
293 294 self._progress = repo.ui.progress
294 295 if self._repo.ui.verbose and not self._repo.ui.debugflag:
295 296 self._verbosenote = self._repo.ui.note
296 297 else:
297 298 self._verbosenote = lambda s: None
298 299
299 300 def close(self):
300 301 return closechunk()
301 302
302 303 def fileheader(self, fname):
303 304 return chunkheader(len(fname)) + fname
304 305
305 306 def group(self, nodelist, revlog, lookup, units=None):
306 307 """Calculate a delta group, yielding a sequence of changegroup chunks
307 308 (strings).
308 309
309 310 Given a list of changeset revs, return a set of deltas and
310 311 metadata corresponding to nodes. The first delta is
311 312 first parent(nodelist[0]) -> nodelist[0], the receiver is
312 313 guaranteed to have this parent as it has all history before
313 314 these changesets. In the case firstparent is nullrev the
314 315 changegroup starts with a full revision.
315 316
316 317 If units is not None, progress detail will be generated, units specifies
317 318 the type of revlog that is touched (changelog, manifest, etc.).
318 319 """
319 320 # if we don't have any revisions touched by these changesets, bail
320 321 if len(nodelist) == 0:
321 322 yield self.close()
322 323 return
323 324
324 325 # for generaldelta revlogs, we linearize the revs; this will both be
325 326 # much quicker and generate a much smaller bundle
326 327 if (revlog._generaldelta and self._reorder is None) or self._reorder:
327 328 dag = dagutil.revlogdag(revlog)
328 329 revs = set(revlog.rev(n) for n in nodelist)
329 330 revs = dag.linearize(revs)
330 331 else:
331 332 revs = sorted([revlog.rev(n) for n in nodelist])
332 333
333 334 # add the parent of the first rev
334 335 p = revlog.parentrevs(revs[0])[0]
335 336 revs.insert(0, p)
336 337
337 338 # build deltas
338 339 total = len(revs) - 1
339 340 msgbundling = _('bundling')
340 341 for r in xrange(len(revs) - 1):
341 342 if units is not None:
342 343 self._progress(msgbundling, r + 1, unit=units, total=total)
343 344 prev, curr = revs[r], revs[r + 1]
344 345 linknode = lookup(revlog.node(curr))
345 346 for c in self.revchunk(revlog, curr, prev, linknode):
346 347 yield c
347 348
348 349 if units is not None:
349 350 self._progress(msgbundling, None)
350 351 yield self.close()
351 352
352 353 # filter any nodes that claim to be part of the known set
353 354 def prune(self, revlog, missing, commonrevs):
354 355 rr, rl = revlog.rev, revlog.linkrev
355 356 return [n for n in missing if rl(rr(n)) not in commonrevs]
356 357
357 358 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
358 359 '''yield a sequence of changegroup chunks (strings)'''
359 360 repo = self._repo
360 361 cl = repo.changelog
361 362 ml = repo.manifest
362 363
363 364 clrevorder = {}
364 365 mfs = {} # needed manifests
365 366 fnodes = {} # needed file nodes
366 367 changedfiles = set()
367 368
368 369 # Callback for the changelog, used to collect changed files and manifest
369 370 # nodes.
370 371 # Returns the linkrev node (identity in the changelog case).
371 372 def lookupcl(x):
372 373 c = cl.read(x)
373 374 clrevorder[x] = len(clrevorder)
374 375 changedfiles.update(c[3])
375 376 # record the first changeset introducing this manifest version
376 377 mfs.setdefault(c[0], x)
377 378 return x
378 379
379 380 self._verbosenote(_('uncompressed size of bundle content:\n'))
380 381 size = 0
381 382 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
382 383 size += len(chunk)
383 384 yield chunk
384 385 self._verbosenote(_('%8.i (changelog)\n') % size)
385 386
386 387 # We need to make sure that the linkrev in the changegroup refers to
387 388 # the first changeset that introduced the manifest or file revision.
388 389 # The fastpath is usually safer than the slowpath, because the filelogs
389 390 # are walked in revlog order.
390 391 #
391 392 # When taking the slowpath with reorder=None and the manifest revlog
392 393 # uses generaldelta, the manifest may be walked in the "wrong" order.
393 394 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
394 395 # cc0ff93d0c0c).
395 396 #
396 397 # When taking the fastpath, we are only vulnerable to reordering
397 398 # of the changelog itself. The changelog never uses generaldelta, so
398 399 # it is only reordered when reorder=True. To handle this case, we
399 400 # simply take the slowpath, which already has the 'clrevorder' logic.
400 401 # This was also fixed in cc0ff93d0c0c.
401 402 fastpathlinkrev = fastpathlinkrev and not self._reorder
402 403 # Callback for the manifest, used to collect linkrevs for filelog
403 404 # revisions.
404 405 # Returns the linkrev node (collected in lookupcl).
405 406 def lookupmf(x):
406 407 clnode = mfs[x]
407 408 if not fastpathlinkrev:
408 409 mdata = ml.readfast(x)
409 410 for f, n in mdata.iteritems():
410 411 if f in changedfiles:
411 412 # record the first changeset introducing this filelog
412 413 # version
413 414 fclnodes = fnodes.setdefault(f, {})
414 415 fclnode = fclnodes.setdefault(n, clnode)
415 416 if clrevorder[clnode] < clrevorder[fclnode]:
416 417 fclnodes[n] = clnode
417 418 return clnode
418 419
419 420 mfnodes = self.prune(ml, mfs, commonrevs)
420 421 size = 0
421 422 for chunk in self.group(mfnodes, ml, lookupmf, units=_('manifests')):
422 423 size += len(chunk)
423 424 yield chunk
424 425 self._verbosenote(_('%8.i (manifests)\n') % size)
425 426
426 427 mfs.clear()
427 428 clrevs = set(cl.rev(x) for x in clnodes)
428 429
429 430 def linknodes(filerevlog, fname):
430 431 if fastpathlinkrev:
431 432 llr = filerevlog.linkrev
432 433 def genfilenodes():
433 434 for r in filerevlog:
434 435 linkrev = llr(r)
435 436 if linkrev in clrevs:
436 437 yield filerevlog.node(r), cl.node(linkrev)
437 438 return dict(genfilenodes())
438 439 return fnodes.get(fname, {})
439 440
440 441 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
441 442 source):
442 443 yield chunk
443 444
444 445 yield self.close()
445 446
446 447 if clnodes:
447 448 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
448 449
449 450 # The 'source' parameter is useful for extensions
450 451 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
451 452 repo = self._repo
452 453 progress = self._progress
453 454 msgbundling = _('bundling')
454 455
455 456 total = len(changedfiles)
456 457 # for progress output
457 458 msgfiles = _('files')
458 459 for i, fname in enumerate(sorted(changedfiles)):
459 460 filerevlog = repo.file(fname)
460 461 if not filerevlog:
461 462 raise util.Abort(_("empty or missing revlog for %s") % fname)
462 463
463 464 linkrevnodes = linknodes(filerevlog, fname)
464 465 # Lookup for filenodes, we collected the linkrev nodes above in the
465 466 # fastpath case and with lookupmf in the slowpath case.
466 467 def lookupfilelog(x):
467 468 return linkrevnodes[x]
468 469
469 470 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
470 471 if filenodes:
471 472 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
472 473 total=total)
473 474 h = self.fileheader(fname)
474 475 size = len(h)
475 476 yield h
476 477 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
477 478 size += len(chunk)
478 479 yield chunk
479 480 self._verbosenote(_('%8.i %s\n') % (size, fname))
480 481 progress(msgbundling, None)
481 482
482 483 def deltaparent(self, revlog, rev, p1, p2, prev):
483 484 return prev
484 485
485 486 def revchunk(self, revlog, rev, prev, linknode):
486 487 node = revlog.node(rev)
487 488 p1, p2 = revlog.parentrevs(rev)
488 489 base = self.deltaparent(revlog, rev, p1, p2, prev)
489 490
490 491 prefix = ''
491 492 if revlog.iscensored(base) or revlog.iscensored(rev):
492 493 try:
493 494 delta = revlog.revision(node)
494 495 except error.CensoredNodeError as e:
495 496 delta = e.tombstone
496 497 if base == nullrev:
497 498 prefix = mdiff.trivialdiffheader(len(delta))
498 499 else:
499 500 baselen = revlog.rawsize(base)
500 501 prefix = mdiff.replacediffheader(baselen, len(delta))
501 502 elif base == nullrev:
502 503 delta = revlog.revision(node)
503 504 prefix = mdiff.trivialdiffheader(len(delta))
504 505 else:
505 506 delta = revlog.revdiff(base, rev)
506 507 p1n, p2n = revlog.parents(node)
507 508 basenode = revlog.node(base)
508 509 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
509 510 meta += prefix
510 511 l = len(meta) + len(delta)
511 512 yield chunkheader(l)
512 513 yield meta
513 514 yield delta
514 515 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
515 516 # do nothing with basenode, it is implicitly the previous one in HG10
516 517 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
517 518
518 519 class cg2packer(cg1packer):
519 520 version = '02'
520 521 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
521 522
522 523 def __init__(self, repo, bundlecaps=None):
523 524 super(cg2packer, self).__init__(repo, bundlecaps)
524 525 if self._reorder is None:
525 526 # Since generaldelta is directly supported by cg2, reordering
526 527 # generally doesn't help, so we disable it by default (treating
527 528 # bundle.reorder=auto just like bundle.reorder=False).
528 529 self._reorder = False
529 530
530 531 def deltaparent(self, revlog, rev, p1, p2, prev):
531 532 dp = revlog.deltaparent(rev)
532 533 # avoid storing full revisions; pick prev in those cases
533 534 # also pick prev when we can't be sure remote has dp
534 535 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
535 536 return prev
536 537 return dp
537 538
538 539 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
539 540 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
540 541
541 542 packermap = {'01': (cg1packer, cg1unpacker),
542 543 '02': (cg2packer, cg2unpacker)}
543 544
544 545 def _changegroupinfo(repo, nodes, source):
545 546 if repo.ui.verbose or source == 'bundle':
546 547 repo.ui.status(_("%d changesets found\n") % len(nodes))
547 548 if repo.ui.debugflag:
548 549 repo.ui.debug("list of changesets:\n")
549 550 for node in nodes:
550 551 repo.ui.debug("%s\n" % hex(node))
551 552
552 553 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
553 554 repo = repo.unfiltered()
554 555 commonrevs = outgoing.common
555 556 csets = outgoing.missing
556 557 heads = outgoing.missingheads
557 558 # We go through the fast path if we get told to, or if all (unfiltered
558 559 # heads have been requested (since we then know there all linkrevs will
559 560 # be pulled by the client).
560 561 heads.sort()
561 562 fastpathlinkrev = fastpath or (
562 563 repo.filtername is None and heads == sorted(repo.heads()))
563 564
564 565 repo.hook('preoutgoing', throw=True, source=source)
565 566 _changegroupinfo(repo, csets, source)
566 567 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
567 568
568 569 def getsubset(repo, outgoing, bundler, source, fastpath=False, version='01'):
569 570 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
570 571 return packermap[version][1](util.chunkbuffer(gengroup), 'UN')
571 572
572 573 def changegroupsubset(repo, roots, heads, source, version='01'):
573 574 """Compute a changegroup consisting of all the nodes that are
574 575 descendants of any of the roots and ancestors of any of the heads.
575 576 Return a chunkbuffer object whose read() method will return
576 577 successive changegroup chunks.
577 578
578 579 It is fairly complex as determining which filenodes and which
579 580 manifest nodes need to be included for the changeset to be complete
580 581 is non-trivial.
581 582
582 583 Another wrinkle is doing the reverse, figuring out which changeset in
583 584 the changegroup a particular filenode or manifestnode belongs to.
584 585 """
585 586 cl = repo.changelog
586 587 if not roots:
587 588 roots = [nullid]
588 589 discbases = []
589 590 for n in roots:
590 591 discbases.extend([p for p in cl.parents(n) if p != nullid])
591 592 # TODO: remove call to nodesbetween.
592 593 csets, roots, heads = cl.nodesbetween(roots, heads)
593 594 included = set(csets)
594 595 discbases = [n for n in discbases if n not in included]
595 596 outgoing = discovery.outgoing(cl, discbases, heads)
596 597 bundler = packermap[version][0](repo)
597 598 return getsubset(repo, outgoing, bundler, source, version=version)
598 599
599 600 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
600 601 version='01'):
601 602 """Like getbundle, but taking a discovery.outgoing as an argument.
602 603
603 604 This is only implemented for local repos and reuses potentially
604 605 precomputed sets in outgoing. Returns a raw changegroup generator."""
605 606 if not outgoing.missing:
606 607 return None
607 608 bundler = packermap[version][0](repo, bundlecaps)
608 609 return getsubsetraw(repo, outgoing, bundler, source)
609 610
610 611 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None):
611 612 """Like getbundle, but taking a discovery.outgoing as an argument.
612 613
613 614 This is only implemented for local repos and reuses potentially
614 615 precomputed sets in outgoing."""
615 616 if not outgoing.missing:
616 617 return None
617 618 bundler = cg1packer(repo, bundlecaps)
618 619 return getsubset(repo, outgoing, bundler, source)
619 620
620 621 def computeoutgoing(repo, heads, common):
621 622 """Computes which revs are outgoing given a set of common
622 623 and a set of heads.
623 624
624 625 This is a separate function so extensions can have access to
625 626 the logic.
626 627
627 628 Returns a discovery.outgoing object.
628 629 """
629 630 cl = repo.changelog
630 631 if common:
631 632 hasnode = cl.hasnode
632 633 common = [n for n in common if hasnode(n)]
633 634 else:
634 635 common = [nullid]
635 636 if not heads:
636 637 heads = cl.heads()
637 638 return discovery.outgoing(cl, common, heads)
638 639
639 640 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None):
640 641 """Like changegroupsubset, but returns the set difference between the
641 642 ancestors of heads and the ancestors common.
642 643
643 644 If heads is None, use the local heads. If common is None, use [nullid].
644 645
645 646 The nodes in common might not all be known locally due to the way the
646 647 current discovery protocol works.
647 648 """
648 649 outgoing = computeoutgoing(repo, heads, common)
649 650 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps)
650 651
651 652 def changegroup(repo, basenodes, source):
652 653 # to avoid a race we use changegroupsubset() (issue1320)
653 654 return changegroupsubset(repo, basenodes, repo.heads(), source)
654 655
655 656 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
656 657 revisions = 0
657 658 files = 0
658 659 while True:
659 660 chunkdata = source.filelogheader()
660 661 if not chunkdata:
661 662 break
662 663 f = chunkdata["filename"]
663 664 repo.ui.debug("adding %s revisions\n" % f)
664 665 pr()
665 666 fl = repo.file(f)
666 667 o = len(fl)
667 668 try:
668 669 if not fl.addgroup(source, revmap, trp):
669 670 raise util.Abort(_("received file revlog group is empty"))
670 671 except error.CensoredBaseError as e:
671 672 raise util.Abort(_("received delta base is censored: %s") % e)
672 673 revisions += len(fl) - o
673 674 files += 1
674 675 if f in needfiles:
675 676 needs = needfiles[f]
676 677 for new in xrange(o, len(fl)):
677 678 n = fl.node(new)
678 679 if n in needs:
679 680 needs.remove(n)
680 681 else:
681 682 raise util.Abort(
682 683 _("received spurious file revlog entry"))
683 684 if not needs:
684 685 del needfiles[f]
685 686 repo.ui.progress(_('files'), None)
686 687
687 688 for f, needs in needfiles.iteritems():
688 689 fl = repo.file(f)
689 690 for n in needs:
690 691 try:
691 692 fl.rev(n)
692 693 except error.LookupError:
693 694 raise util.Abort(
694 695 _('missing file data for %s:%s - run hg verify') %
695 696 (f, hex(n)))
696 697
697 698 return revisions, files
698 699
699 700 def addchangegroup(repo, source, srctype, url, emptyok=False,
700 701 targetphase=phases.draft, expectedtotal=None):
701 702 """Add the changegroup returned by source.read() to this repo.
702 703 srctype is a string like 'push', 'pull', or 'unbundle'. url is
703 704 the URL of the repo where this changegroup is coming from.
704 705
705 706 Return an integer summarizing the change to this repo:
706 707 - nothing changed or no source: 0
707 708 - more heads than before: 1+added heads (2..n)
708 709 - fewer heads than before: -1-removed heads (-2..-n)
709 710 - number of heads stays the same: 1
710 711 """
711 712 repo = repo.unfiltered()
712 713 def csmap(x):
713 714 repo.ui.debug("add changeset %s\n" % short(x))
714 715 return len(cl)
715 716
716 717 def revmap(x):
717 718 return cl.rev(x)
718 719
719 720 if not source:
720 721 return 0
721 722
722 723 changesets = files = revisions = 0
723 724
724 725 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
725 726 # The transaction could have been created before and already carries source
726 727 # information. In this case we use the top level data. We overwrite the
727 728 # argument because we need to use the top level value (if they exist) in
728 729 # this function.
729 730 srctype = tr.hookargs.setdefault('source', srctype)
730 731 url = tr.hookargs.setdefault('url', url)
731 732
732 733 # write changelog data to temp files so concurrent readers will not see
733 734 # inconsistent view
734 735 cl = repo.changelog
735 736 cl.delayupdate(tr)
736 737 oldheads = cl.heads()
737 738 try:
738 739 repo.hook('prechangegroup', throw=True, **tr.hookargs)
739 740
740 741 trp = weakref.proxy(tr)
741 742 # pull off the changeset group
742 743 repo.ui.status(_("adding changesets\n"))
743 744 clstart = len(cl)
744 745 class prog(object):
745 746 def __init__(self, step, total):
746 747 self._step = step
747 748 self._total = total
748 749 self._count = 1
749 750 def __call__(self):
750 751 repo.ui.progress(self._step, self._count, unit=_('chunks'),
751 752 total=self._total)
752 753 self._count += 1
753 754 source.callback = prog(_('changesets'), expectedtotal)
754 755
755 756 efiles = set()
756 757 def onchangelog(cl, node):
757 758 efiles.update(cl.read(node)[3])
758 759
759 760 source.changelogheader()
760 761 srccontent = cl.addgroup(source, csmap, trp,
761 762 addrevisioncb=onchangelog)
762 763 efiles = len(efiles)
763 764
764 765 if not (srccontent or emptyok):
765 766 raise util.Abort(_("received changelog group is empty"))
766 767 clend = len(cl)
767 768 changesets = clend - clstart
768 769 repo.ui.progress(_('changesets'), None)
769 770
770 771 # pull off the manifest group
771 772 repo.ui.status(_("adding manifests\n"))
772 773 # manifests <= changesets
773 774 source.callback = prog(_('manifests'), changesets)
774 775 # no need to check for empty manifest group here:
775 776 # if the result of the merge of 1 and 2 is the same in 3 and 4,
776 777 # no new manifest will be created and the manifest group will
777 778 # be empty during the pull
778 779 source.manifestheader()
779 780 repo.manifest.addgroup(source, revmap, trp)
780 781 repo.ui.progress(_('manifests'), None)
781 782
782 783 needfiles = {}
783 784 if repo.ui.configbool('server', 'validate', default=False):
784 785 # validate incoming csets have their manifests
785 786 for cset in xrange(clstart, clend):
786 787 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
787 788 mfest = repo.manifest.readdelta(mfnode)
788 789 # store file nodes we must see
789 790 for f, n in mfest.iteritems():
790 791 needfiles.setdefault(f, set()).add(n)
791 792
792 793 # process the files
793 794 repo.ui.status(_("adding file changes\n"))
794 795 source.callback = None
795 796 pr = prog(_('files'), efiles)
796 797 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
797 798 needfiles)
798 799 revisions += newrevs
799 800 files += newfiles
800 801
801 802 dh = 0
802 803 if oldheads:
803 804 heads = cl.heads()
804 805 dh = len(heads) - len(oldheads)
805 806 for h in heads:
806 807 if h not in oldheads and repo[h].closesbranch():
807 808 dh -= 1
808 809 htext = ""
809 810 if dh:
810 811 htext = _(" (%+d heads)") % dh
811 812
812 813 repo.ui.status(_("added %d changesets"
813 814 " with %d changes to %d files%s\n")
814 815 % (changesets, revisions, files, htext))
815 816 repo.invalidatevolatilesets()
816 817
817 818 if changesets > 0:
818 819 p = lambda: tr.writepending() and repo.root or ""
819 820 if 'node' not in tr.hookargs:
820 821 tr.hookargs['node'] = hex(cl.node(clstart))
821 822 hookargs = dict(tr.hookargs)
822 823 else:
823 824 hookargs = dict(tr.hookargs)
824 825 hookargs['node'] = hex(cl.node(clstart))
825 826 repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
826 827
827 828 added = [cl.node(r) for r in xrange(clstart, clend)]
828 829 publishing = repo.publishing()
829 830 if srctype in ('push', 'serve'):
830 831 # Old servers can not push the boundary themselves.
831 832 # New servers won't push the boundary if changeset already
832 833 # exists locally as secret
833 834 #
834 835 # We should not use added here but the list of all change in
835 836 # the bundle
836 837 if publishing:
837 838 phases.advanceboundary(repo, tr, phases.public, srccontent)
838 839 else:
839 840 # Those changesets have been pushed from the outside, their
840 841 # phases are going to be pushed alongside. Therefor
841 842 # `targetphase` is ignored.
842 843 phases.advanceboundary(repo, tr, phases.draft, srccontent)
843 844 phases.retractboundary(repo, tr, phases.draft, added)
844 845 elif srctype != 'strip':
845 846 # publishing only alter behavior during push
846 847 #
847 848 # strip should not touch boundary at all
848 849 phases.retractboundary(repo, tr, targetphase, added)
849 850
850 851 if changesets > 0:
851 852 if srctype != 'strip':
852 853 # During strip, branchcache is invalid but coming call to
853 854 # `destroyed` will repair it.
854 855 # In other case we can safely update cache on disk.
855 856 branchmap.updatecache(repo.filtered('served'))
856 857
857 858 def runhooks():
858 859 # These hooks run when the lock releases, not when the
859 860 # transaction closes. So it's possible for the changelog
860 861 # to have changed since we last saw it.
861 862 if clstart >= len(repo):
862 863 return
863 864
864 865 # forcefully update the on-disk branch cache
865 866 repo.ui.debug("updating the branch cache\n")
866 867 repo.hook("changegroup", **hookargs)
867 868
868 869 for n in added:
869 870 args = hookargs.copy()
870 871 args['node'] = hex(n)
871 872 repo.hook("incoming", **args)
872 873
873 874 newheads = [h for h in repo.heads() if h not in oldheads]
874 875 repo.ui.log("incoming",
875 876 "%s incoming changes - new heads: %s\n",
876 877 len(added),
877 878 ', '.join([hex(c[:6]) for c in newheads]))
878 879
879 880 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
880 881 lambda tr: repo._afterlock(runhooks))
881 882
882 883 tr.close()
883 884
884 885 finally:
885 886 tr.release()
886 887 repo.ui.flush()
887 888 # never return 0 here:
888 889 if dh < 0:
889 890 return dh - 1
890 891 else:
891 892 return dh + 1
General Comments 0
You need to be logged in to leave comments. Login now