##// END OF EJS Templates
changegroup: introduce cg2packer/unpacker...
Sune Foldager -
r23181:832b7ef2 default
parent child Browse files
Show More
@@ -1,798 +1,832 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import weakref
9 9 from i18n import _
10 10 from node import nullrev, nullid, hex, short
11 11 import mdiff, util, dagutil
12 12 import struct, os, bz2, zlib, tempfile
13 13 import discovery, error, phases, branchmap
14 14
15 15 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
16 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
16 17
17 18 def readexactly(stream, n):
18 19 '''read n bytes from stream.read and abort if less was available'''
19 20 s = stream.read(n)
20 21 if len(s) < n:
21 22 raise util.Abort(_("stream ended unexpectedly"
22 23 " (got %d bytes, expected %d)")
23 24 % (len(s), n))
24 25 return s
25 26
26 27 def getchunk(stream):
27 28 """return the next chunk from stream as a string"""
28 29 d = readexactly(stream, 4)
29 30 l = struct.unpack(">l", d)[0]
30 31 if l <= 4:
31 32 if l:
32 33 raise util.Abort(_("invalid chunk length %d") % l)
33 34 return ""
34 35 return readexactly(stream, l - 4)
35 36
36 37 def chunkheader(length):
37 38 """return a changegroup chunk header (string)"""
38 39 return struct.pack(">l", length + 4)
39 40
40 41 def closechunk():
41 42 """return a changegroup chunk header (string) for a zero-length chunk"""
42 43 return struct.pack(">l", 0)
43 44
44 45 class nocompress(object):
45 46 def compress(self, x):
46 47 return x
47 48 def flush(self):
48 49 return ""
49 50
50 51 bundletypes = {
51 52 "": ("", nocompress), # only when using unbundle on ssh and old http servers
52 53 # since the unification ssh accepts a header but there
53 54 # is no capability signaling it.
54 55 "HG10UN": ("HG10UN", nocompress),
55 56 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
56 57 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
57 58 }
58 59
59 60 # hgweb uses this list to communicate its preferred type
60 61 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
61 62
62 63 def writebundle(cg, filename, bundletype, vfs=None):
63 64 """Write a bundle file and return its filename.
64 65
65 66 Existing files will not be overwritten.
66 67 If no filename is specified, a temporary file is created.
67 68 bz2 compression can be turned off.
68 69 The bundle file will be deleted in case of errors.
69 70 """
70 71
71 72 fh = None
72 73 cleanup = None
73 74 try:
74 75 if filename:
75 76 if vfs:
76 77 fh = vfs.open(filename, "wb")
77 78 else:
78 79 fh = open(filename, "wb")
79 80 else:
80 81 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
81 82 fh = os.fdopen(fd, "wb")
82 83 cleanup = filename
83 84
84 85 header, compressor = bundletypes[bundletype]
85 86 fh.write(header)
86 87 z = compressor()
87 88
88 89 # parse the changegroup data, otherwise we will block
89 90 # in case of sshrepo because we don't know the end of the stream
90 91
91 92 # an empty chunkgroup is the end of the changegroup
92 93 # a changegroup has at least 2 chunkgroups (changelog and manifest).
93 94 # after that, an empty chunkgroup is the end of the changegroup
94 95 for chunk in cg.getchunks():
95 96 fh.write(z.compress(chunk))
96 97 fh.write(z.flush())
97 98 cleanup = None
98 99 return filename
99 100 finally:
100 101 if fh is not None:
101 102 fh.close()
102 103 if cleanup is not None:
103 104 if filename and vfs:
104 105 vfs.unlink(cleanup)
105 106 else:
106 107 os.unlink(cleanup)
107 108
108 109 def decompressor(fh, alg):
109 110 if alg == 'UN':
110 111 return fh
111 112 elif alg == 'GZ':
112 113 def generator(f):
113 114 zd = zlib.decompressobj()
114 115 for chunk in util.filechunkiter(f):
115 116 yield zd.decompress(chunk)
116 117 elif alg == 'BZ':
117 118 def generator(f):
118 119 zd = bz2.BZ2Decompressor()
119 120 zd.decompress("BZ")
120 121 for chunk in util.filechunkiter(f, 4096):
121 122 yield zd.decompress(chunk)
122 123 else:
123 124 raise util.Abort("unknown bundle compression '%s'" % alg)
124 125 return util.chunkbuffer(generator(fh))
125 126
126 127 class cg1unpacker(object):
127 128 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
128 129 deltaheadersize = struct.calcsize(deltaheader)
129 130 def __init__(self, fh, alg):
130 131 self._stream = decompressor(fh, alg)
131 132 self._type = alg
132 133 self.callback = None
133 134 def compressed(self):
134 135 return self._type != 'UN'
135 136 def read(self, l):
136 137 return self._stream.read(l)
137 138 def seek(self, pos):
138 139 return self._stream.seek(pos)
139 140 def tell(self):
140 141 return self._stream.tell()
141 142 def close(self):
142 143 return self._stream.close()
143 144
144 145 def chunklength(self):
145 146 d = readexactly(self._stream, 4)
146 147 l = struct.unpack(">l", d)[0]
147 148 if l <= 4:
148 149 if l:
149 150 raise util.Abort(_("invalid chunk length %d") % l)
150 151 return 0
151 152 if self.callback:
152 153 self.callback()
153 154 return l - 4
154 155
155 156 def changelogheader(self):
156 157 """v10 does not have a changelog header chunk"""
157 158 return {}
158 159
159 160 def manifestheader(self):
160 161 """v10 does not have a manifest header chunk"""
161 162 return {}
162 163
163 164 def filelogheader(self):
164 165 """return the header of the filelogs chunk, v10 only has the filename"""
165 166 l = self.chunklength()
166 167 if not l:
167 168 return {}
168 169 fname = readexactly(self._stream, l)
169 170 return {'filename': fname}
170 171
171 172 def _deltaheader(self, headertuple, prevnode):
172 173 node, p1, p2, cs = headertuple
173 174 if prevnode is None:
174 175 deltabase = p1
175 176 else:
176 177 deltabase = prevnode
177 178 return node, p1, p2, deltabase, cs
178 179
179 180 def deltachunk(self, prevnode):
180 181 l = self.chunklength()
181 182 if not l:
182 183 return {}
183 184 headerdata = readexactly(self._stream, self.deltaheadersize)
184 185 header = struct.unpack(self.deltaheader, headerdata)
185 186 delta = readexactly(self._stream, l - self.deltaheadersize)
186 187 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
187 188 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
188 189 'deltabase': deltabase, 'delta': delta}
189 190
190 191 def getchunks(self):
191 192 """returns all the chunks contains in the bundle
192 193
193 194 Used when you need to forward the binary stream to a file or another
194 195 network API. To do so, it parse the changegroup data, otherwise it will
195 196 block in case of sshrepo because it don't know the end of the stream.
196 197 """
197 198 # an empty chunkgroup is the end of the changegroup
198 199 # a changegroup has at least 2 chunkgroups (changelog and manifest).
199 200 # after that, an empty chunkgroup is the end of the changegroup
200 201 empty = False
201 202 count = 0
202 203 while not empty or count <= 2:
203 204 empty = True
204 205 count += 1
205 206 while True:
206 207 chunk = getchunk(self)
207 208 if not chunk:
208 209 break
209 210 empty = False
210 211 yield chunkheader(len(chunk))
211 212 pos = 0
212 213 while pos < len(chunk):
213 214 next = pos + 2**20
214 215 yield chunk[pos:next]
215 216 pos = next
216 217 yield closechunk()
217 218
219 class cg2unpacker(cg1unpacker):
220 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
221 deltaheadersize = struct.calcsize(deltaheader)
222
223 def _deltaheader(self, headertuple, prevnode):
224 node, p1, p2, deltabase, cs = headertuple
225 return node, p1, p2, deltabase, cs
226
218 227 class headerlessfixup(object):
219 228 def __init__(self, fh, h):
220 229 self._h = h
221 230 self._fh = fh
222 231 def read(self, n):
223 232 if self._h:
224 233 d, self._h = self._h[:n], self._h[n:]
225 234 if len(d) < n:
226 235 d += readexactly(self._fh, n - len(d))
227 236 return d
228 237 return readexactly(self._fh, n)
229 238
230 239 class cg1packer(object):
231 240 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
232 241 def __init__(self, repo, bundlecaps=None):
233 242 """Given a source repo, construct a bundler.
234 243
235 244 bundlecaps is optional and can be used to specify the set of
236 245 capabilities which can be used to build the bundle.
237 246 """
238 247 # Set of capabilities we can use to build the bundle.
239 248 if bundlecaps is None:
240 249 bundlecaps = set()
241 250 self._bundlecaps = bundlecaps
242 251 self._changelog = repo.changelog
243 252 self._manifest = repo.manifest
244 253 reorder = repo.ui.config('bundle', 'reorder', 'auto')
245 254 if reorder == 'auto':
246 255 reorder = None
247 256 else:
248 257 reorder = util.parsebool(reorder)
249 258 self._repo = repo
250 259 self._reorder = reorder
251 260 self._progress = repo.ui.progress
252 261 def close(self):
253 262 return closechunk()
254 263
255 264 def fileheader(self, fname):
256 265 return chunkheader(len(fname)) + fname
257 266
258 267 def group(self, nodelist, revlog, lookup, units=None, reorder=None):
259 268 """Calculate a delta group, yielding a sequence of changegroup chunks
260 269 (strings).
261 270
262 271 Given a list of changeset revs, return a set of deltas and
263 272 metadata corresponding to nodes. The first delta is
264 273 first parent(nodelist[0]) -> nodelist[0], the receiver is
265 274 guaranteed to have this parent as it has all history before
266 275 these changesets. In the case firstparent is nullrev the
267 276 changegroup starts with a full revision.
268 277
269 278 If units is not None, progress detail will be generated, units specifies
270 279 the type of revlog that is touched (changelog, manifest, etc.).
271 280 """
272 281 # if we don't have any revisions touched by these changesets, bail
273 282 if len(nodelist) == 0:
274 283 yield self.close()
275 284 return
276 285
277 286 # for generaldelta revlogs, we linearize the revs; this will both be
278 287 # much quicker and generate a much smaller bundle
279 288 if (revlog._generaldelta and reorder is not False) or reorder:
280 289 dag = dagutil.revlogdag(revlog)
281 290 revs = set(revlog.rev(n) for n in nodelist)
282 291 revs = dag.linearize(revs)
283 292 else:
284 293 revs = sorted([revlog.rev(n) for n in nodelist])
285 294
286 295 # add the parent of the first rev
287 296 p = revlog.parentrevs(revs[0])[0]
288 297 revs.insert(0, p)
289 298
290 299 # build deltas
291 300 total = len(revs) - 1
292 301 msgbundling = _('bundling')
293 302 for r in xrange(len(revs) - 1):
294 303 if units is not None:
295 304 self._progress(msgbundling, r + 1, unit=units, total=total)
296 305 prev, curr = revs[r], revs[r + 1]
297 306 linknode = lookup(revlog.node(curr))
298 307 for c in self.revchunk(revlog, curr, prev, linknode):
299 308 yield c
300 309
301 310 yield self.close()
302 311
303 312 # filter any nodes that claim to be part of the known set
304 313 def prune(self, revlog, missing, commonrevs, source):
305 314 rr, rl = revlog.rev, revlog.linkrev
306 315 return [n for n in missing if rl(rr(n)) not in commonrevs]
307 316
308 317 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
309 318 '''yield a sequence of changegroup chunks (strings)'''
310 319 repo = self._repo
311 320 cl = self._changelog
312 321 mf = self._manifest
313 322 reorder = self._reorder
314 323 progress = self._progress
315 324
316 325 # for progress output
317 326 msgbundling = _('bundling')
318 327
319 328 mfs = {} # needed manifests
320 329 fnodes = {} # needed file nodes
321 330 changedfiles = set()
322 331
323 332 # Callback for the changelog, used to collect changed files and manifest
324 333 # nodes.
325 334 # Returns the linkrev node (identity in the changelog case).
326 335 def lookupcl(x):
327 336 c = cl.read(x)
328 337 changedfiles.update(c[3])
329 338 # record the first changeset introducing this manifest version
330 339 mfs.setdefault(c[0], x)
331 340 return x
332 341
333 342 # Callback for the manifest, used to collect linkrevs for filelog
334 343 # revisions.
335 344 # Returns the linkrev node (collected in lookupcl).
336 345 def lookupmf(x):
337 346 clnode = mfs[x]
338 347 if not fastpathlinkrev:
339 348 mdata = mf.readfast(x)
340 349 for f, n in mdata.iteritems():
341 350 if f in changedfiles:
342 351 # record the first changeset introducing this filelog
343 352 # version
344 353 fnodes[f].setdefault(n, clnode)
345 354 return clnode
346 355
347 356 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets'),
348 357 reorder=reorder):
349 358 yield chunk
350 359 progress(msgbundling, None)
351 360
352 361 for f in changedfiles:
353 362 fnodes[f] = {}
354 363 mfnodes = self.prune(mf, mfs, commonrevs, source)
355 364 for chunk in self.group(mfnodes, mf, lookupmf, units=_('manifests'),
356 365 reorder=reorder):
357 366 yield chunk
358 367 progress(msgbundling, None)
359 368
360 369 mfs.clear()
361 370 needed = set(cl.rev(x) for x in clnodes)
362 371
363 372 def linknodes(filerevlog, fname):
364 373 if fastpathlinkrev:
365 374 llr = filerevlog.linkrev
366 375 def genfilenodes():
367 376 for r in filerevlog:
368 377 linkrev = llr(r)
369 378 if linkrev in needed:
370 379 yield filerevlog.node(r), cl.node(linkrev)
371 380 fnodes[fname] = dict(genfilenodes())
372 381 return fnodes.get(fname, {})
373 382
374 383 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
375 384 source):
376 385 yield chunk
377 386
378 387 yield self.close()
379 388 progress(msgbundling, None)
380 389
381 390 if clnodes:
382 391 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
383 392
384 393 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
385 394 repo = self._repo
386 395 progress = self._progress
387 396 reorder = self._reorder
388 397 msgbundling = _('bundling')
389 398
390 399 total = len(changedfiles)
391 400 # for progress output
392 401 msgfiles = _('files')
393 402 for i, fname in enumerate(sorted(changedfiles)):
394 403 filerevlog = repo.file(fname)
395 404 if not filerevlog:
396 405 raise util.Abort(_("empty or missing revlog for %s") % fname)
397 406
398 407 linkrevnodes = linknodes(filerevlog, fname)
399 408 # Lookup for filenodes, we collected the linkrev nodes above in the
400 409 # fastpath case and with lookupmf in the slowpath case.
401 410 def lookupfilelog(x):
402 411 return linkrevnodes[x]
403 412
404 413 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs, source)
405 414 if filenodes:
406 415 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
407 416 total=total)
408 417 yield self.fileheader(fname)
409 418 for chunk in self.group(filenodes, filerevlog, lookupfilelog,
410 419 reorder=reorder):
411 420 yield chunk
412 421
422 def deltaparent(self, revlog, rev, p1, p2, prev):
423 return prev
424
413 425 def revchunk(self, revlog, rev, prev, linknode):
414 426 node = revlog.node(rev)
415 427 p1, p2 = revlog.parentrevs(rev)
416 base = prev
428 base = self.deltaparent(revlog, rev, p1, p2, prev)
417 429
418 430 prefix = ''
419 431 if base == nullrev:
420 432 delta = revlog.revision(node)
421 433 prefix = mdiff.trivialdiffheader(len(delta))
422 434 else:
423 435 delta = revlog.revdiff(base, rev)
424 436 p1n, p2n = revlog.parents(node)
425 437 basenode = revlog.node(base)
426 438 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
427 439 meta += prefix
428 440 l = len(meta) + len(delta)
429 441 yield chunkheader(l)
430 442 yield meta
431 443 yield delta
432 444 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
433 445 # do nothing with basenode, it is implicitly the previous one in HG10
434 446 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
435 447
436 packermap = {'01': (cg1packer, cg1unpacker)}
448 class cg2packer(cg1packer):
449
450 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
451
452 def group(self, nodelist, revlog, lookup, units=None, reorder=None):
453 if (revlog._generaldelta and reorder is not True):
454 reorder = False
455 return cg1packer.group(self, nodelist, revlog, lookup,
456 units=units, reorder=reorder)
457
458 def deltaparent(self, revlog, rev, p1, p2, prev):
459 dp = revlog.deltaparent(rev)
460 # avoid storing full revisions; pick prev in those cases
461 # also pick prev when we can't be sure remote has dp
462 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
463 return prev
464 return dp
465
466 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
467 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
468
469 packermap = {'01': (cg1packer, cg1unpacker),
470 '02': (cg2packer, cg2unpacker)}
437 471
438 472 def _changegroupinfo(repo, nodes, source):
439 473 if repo.ui.verbose or source == 'bundle':
440 474 repo.ui.status(_("%d changesets found\n") % len(nodes))
441 475 if repo.ui.debugflag:
442 476 repo.ui.debug("list of changesets:\n")
443 477 for node in nodes:
444 478 repo.ui.debug("%s\n" % hex(node))
445 479
446 480 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
447 481 repo = repo.unfiltered()
448 482 commonrevs = outgoing.common
449 483 csets = outgoing.missing
450 484 heads = outgoing.missingheads
451 485 # We go through the fast path if we get told to, or if all (unfiltered
452 486 # heads have been requested (since we then know there all linkrevs will
453 487 # be pulled by the client).
454 488 heads.sort()
455 489 fastpathlinkrev = fastpath or (
456 490 repo.filtername is None and heads == sorted(repo.heads()))
457 491
458 492 repo.hook('preoutgoing', throw=True, source=source)
459 493 _changegroupinfo(repo, csets, source)
460 494 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
461 495
462 496 def getsubset(repo, outgoing, bundler, source, fastpath=False):
463 497 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
464 498 return cg1unpacker(util.chunkbuffer(gengroup), 'UN')
465 499
466 500 def changegroupsubset(repo, roots, heads, source):
467 501 """Compute a changegroup consisting of all the nodes that are
468 502 descendants of any of the roots and ancestors of any of the heads.
469 503 Return a chunkbuffer object whose read() method will return
470 504 successive changegroup chunks.
471 505
472 506 It is fairly complex as determining which filenodes and which
473 507 manifest nodes need to be included for the changeset to be complete
474 508 is non-trivial.
475 509
476 510 Another wrinkle is doing the reverse, figuring out which changeset in
477 511 the changegroup a particular filenode or manifestnode belongs to.
478 512 """
479 513 cl = repo.changelog
480 514 if not roots:
481 515 roots = [nullid]
482 516 # TODO: remove call to nodesbetween.
483 517 csets, roots, heads = cl.nodesbetween(roots, heads)
484 518 discbases = []
485 519 for n in roots:
486 520 discbases.extend([p for p in cl.parents(n) if p != nullid])
487 521 outgoing = discovery.outgoing(cl, discbases, heads)
488 522 bundler = cg1packer(repo)
489 523 return getsubset(repo, outgoing, bundler, source)
490 524
491 525 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
492 526 version='01'):
493 527 """Like getbundle, but taking a discovery.outgoing as an argument.
494 528
495 529 This is only implemented for local repos and reuses potentially
496 530 precomputed sets in outgoing. Returns a raw changegroup generator."""
497 531 if not outgoing.missing:
498 532 return None
499 533 bundler = packermap[version][0](repo, bundlecaps)
500 534 return getsubsetraw(repo, outgoing, bundler, source)
501 535
502 536 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None):
503 537 """Like getbundle, but taking a discovery.outgoing as an argument.
504 538
505 539 This is only implemented for local repos and reuses potentially
506 540 precomputed sets in outgoing."""
507 541 if not outgoing.missing:
508 542 return None
509 543 bundler = cg1packer(repo, bundlecaps)
510 544 return getsubset(repo, outgoing, bundler, source)
511 545
512 546 def _computeoutgoing(repo, heads, common):
513 547 """Computes which revs are outgoing given a set of common
514 548 and a set of heads.
515 549
516 550 This is a separate function so extensions can have access to
517 551 the logic.
518 552
519 553 Returns a discovery.outgoing object.
520 554 """
521 555 cl = repo.changelog
522 556 if common:
523 557 hasnode = cl.hasnode
524 558 common = [n for n in common if hasnode(n)]
525 559 else:
526 560 common = [nullid]
527 561 if not heads:
528 562 heads = cl.heads()
529 563 return discovery.outgoing(cl, common, heads)
530 564
531 565 def getchangegroupraw(repo, source, heads=None, common=None, bundlecaps=None,
532 566 version='01'):
533 567 """Like changegroupsubset, but returns the set difference between the
534 568 ancestors of heads and the ancestors common.
535 569
536 570 If heads is None, use the local heads. If common is None, use [nullid].
537 571
538 572 If version is None, use a version '1' changegroup.
539 573
540 574 The nodes in common might not all be known locally due to the way the
541 575 current discovery protocol works. Returns a raw changegroup generator.
542 576 """
543 577 outgoing = _computeoutgoing(repo, heads, common)
544 578 return getlocalchangegroupraw(repo, source, outgoing, bundlecaps=bundlecaps,
545 579 version=version)
546 580
547 581 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None):
548 582 """Like changegroupsubset, but returns the set difference between the
549 583 ancestors of heads and the ancestors common.
550 584
551 585 If heads is None, use the local heads. If common is None, use [nullid].
552 586
553 587 The nodes in common might not all be known locally due to the way the
554 588 current discovery protocol works.
555 589 """
556 590 outgoing = _computeoutgoing(repo, heads, common)
557 591 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps)
558 592
559 593 def changegroup(repo, basenodes, source):
560 594 # to avoid a race we use changegroupsubset() (issue1320)
561 595 return changegroupsubset(repo, basenodes, repo.heads(), source)
562 596
563 597 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
564 598 revisions = 0
565 599 files = 0
566 600 while True:
567 601 chunkdata = source.filelogheader()
568 602 if not chunkdata:
569 603 break
570 604 f = chunkdata["filename"]
571 605 repo.ui.debug("adding %s revisions\n" % f)
572 606 pr()
573 607 fl = repo.file(f)
574 608 o = len(fl)
575 609 if not fl.addgroup(source, revmap, trp):
576 610 raise util.Abort(_("received file revlog group is empty"))
577 611 revisions += len(fl) - o
578 612 files += 1
579 613 if f in needfiles:
580 614 needs = needfiles[f]
581 615 for new in xrange(o, len(fl)):
582 616 n = fl.node(new)
583 617 if n in needs:
584 618 needs.remove(n)
585 619 else:
586 620 raise util.Abort(
587 621 _("received spurious file revlog entry"))
588 622 if not needs:
589 623 del needfiles[f]
590 624 repo.ui.progress(_('files'), None)
591 625
592 626 for f, needs in needfiles.iteritems():
593 627 fl = repo.file(f)
594 628 for n in needs:
595 629 try:
596 630 fl.rev(n)
597 631 except error.LookupError:
598 632 raise util.Abort(
599 633 _('missing file data for %s:%s - run hg verify') %
600 634 (f, hex(n)))
601 635
602 636 return revisions, files
603 637
604 638 def addchangegroup(repo, source, srctype, url, emptyok=False,
605 639 targetphase=phases.draft):
606 640 """Add the changegroup returned by source.read() to this repo.
607 641 srctype is a string like 'push', 'pull', or 'unbundle'. url is
608 642 the URL of the repo where this changegroup is coming from.
609 643
610 644 Return an integer summarizing the change to this repo:
611 645 - nothing changed or no source: 0
612 646 - more heads than before: 1+added heads (2..n)
613 647 - fewer heads than before: -1-removed heads (-2..-n)
614 648 - number of heads stays the same: 1
615 649 """
616 650 repo = repo.unfiltered()
617 651 def csmap(x):
618 652 repo.ui.debug("add changeset %s\n" % short(x))
619 653 return len(cl)
620 654
621 655 def revmap(x):
622 656 return cl.rev(x)
623 657
624 658 if not source:
625 659 return 0
626 660
627 661 changesets = files = revisions = 0
628 662 efiles = set()
629 663
630 664 # write changelog data to temp files so concurrent readers will not see
631 665 # inconsistent view
632 666 cl = repo.changelog
633 667 cl.delayupdate()
634 668 oldheads = cl.heads()
635 669
636 670 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
637 671 # The transaction could have been created before and already carries source
638 672 # information. In this case we use the top level data. We overwrite the
639 673 # argument because we need to use the top level value (if they exist) in
640 674 # this function.
641 675 srctype = tr.hookargs.setdefault('source', srctype)
642 676 url = tr.hookargs.setdefault('url', url)
643 677 try:
644 678 repo.hook('prechangegroup', throw=True, **tr.hookargs)
645 679
646 680 trp = weakref.proxy(tr)
647 681 # pull off the changeset group
648 682 repo.ui.status(_("adding changesets\n"))
649 683 clstart = len(cl)
650 684 class prog(object):
651 685 step = _('changesets')
652 686 count = 1
653 687 ui = repo.ui
654 688 total = None
655 689 def __call__(repo):
656 690 repo.ui.progress(repo.step, repo.count, unit=_('chunks'),
657 691 total=repo.total)
658 692 repo.count += 1
659 693 pr = prog()
660 694 source.callback = pr
661 695
662 696 source.changelogheader()
663 697 srccontent = cl.addgroup(source, csmap, trp)
664 698 if not (srccontent or emptyok):
665 699 raise util.Abort(_("received changelog group is empty"))
666 700 clend = len(cl)
667 701 changesets = clend - clstart
668 702 for c in xrange(clstart, clend):
669 703 efiles.update(repo[c].files())
670 704 efiles = len(efiles)
671 705 repo.ui.progress(_('changesets'), None)
672 706
673 707 # pull off the manifest group
674 708 repo.ui.status(_("adding manifests\n"))
675 709 pr.step = _('manifests')
676 710 pr.count = 1
677 711 pr.total = changesets # manifests <= changesets
678 712 # no need to check for empty manifest group here:
679 713 # if the result of the merge of 1 and 2 is the same in 3 and 4,
680 714 # no new manifest will be created and the manifest group will
681 715 # be empty during the pull
682 716 source.manifestheader()
683 717 repo.manifest.addgroup(source, revmap, trp)
684 718 repo.ui.progress(_('manifests'), None)
685 719
686 720 needfiles = {}
687 721 if repo.ui.configbool('server', 'validate', default=False):
688 722 # validate incoming csets have their manifests
689 723 for cset in xrange(clstart, clend):
690 724 mfest = repo.changelog.read(repo.changelog.node(cset))[0]
691 725 mfest = repo.manifest.readdelta(mfest)
692 726 # store file nodes we must see
693 727 for f, n in mfest.iteritems():
694 728 needfiles.setdefault(f, set()).add(n)
695 729
696 730 # process the files
697 731 repo.ui.status(_("adding file changes\n"))
698 732 pr.step = _('files')
699 733 pr.count = 1
700 734 pr.total = efiles
701 735 source.callback = None
702 736
703 737 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
704 738 needfiles)
705 739 revisions += newrevs
706 740 files += newfiles
707 741
708 742 dh = 0
709 743 if oldheads:
710 744 heads = cl.heads()
711 745 dh = len(heads) - len(oldheads)
712 746 for h in heads:
713 747 if h not in oldheads and repo[h].closesbranch():
714 748 dh -= 1
715 749 htext = ""
716 750 if dh:
717 751 htext = _(" (%+d heads)") % dh
718 752
719 753 repo.ui.status(_("added %d changesets"
720 754 " with %d changes to %d files%s\n")
721 755 % (changesets, revisions, files, htext))
722 756 repo.invalidatevolatilesets()
723 757
724 758 if changesets > 0:
725 759 p = lambda: cl.writepending() and repo.root or ""
726 760 if 'node' not in tr.hookargs:
727 761 tr.hookargs['node'] = hex(cl.node(clstart))
728 762 hookargs = dict(tr.hookargs)
729 763 else:
730 764 hookargs = dict(tr.hookargs)
731 765 hookargs['node'] = hex(cl.node(clstart))
732 766 repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
733 767
734 768 added = [cl.node(r) for r in xrange(clstart, clend)]
735 769 publishing = repo.ui.configbool('phases', 'publish', True)
736 770 if srctype in ('push', 'serve'):
737 771 # Old servers can not push the boundary themselves.
738 772 # New servers won't push the boundary if changeset already
739 773 # exists locally as secret
740 774 #
741 775 # We should not use added here but the list of all change in
742 776 # the bundle
743 777 if publishing:
744 778 phases.advanceboundary(repo, tr, phases.public, srccontent)
745 779 else:
746 780 # Those changesets have been pushed from the outside, their
747 781 # phases are going to be pushed alongside. Therefor
748 782 # `targetphase` is ignored.
749 783 phases.advanceboundary(repo, tr, phases.draft, srccontent)
750 784 phases.retractboundary(repo, tr, phases.draft, added)
751 785 elif srctype != 'strip':
752 786 # publishing only alter behavior during push
753 787 #
754 788 # strip should not touch boundary at all
755 789 phases.retractboundary(repo, tr, targetphase, added)
756 790
757 791 # make changelog see real files again
758 792 cl.finalize(trp)
759 793
760 794 tr.close()
761 795
762 796 if changesets > 0:
763 797 if srctype != 'strip':
764 798 # During strip, branchcache is invalid but coming call to
765 799 # `destroyed` will repair it.
766 800 # In other case we can safely update cache on disk.
767 801 branchmap.updatecache(repo.filtered('served'))
768 802
769 803 def runhooks():
770 804 # These hooks run when the lock releases, not when the
771 805 # transaction closes. So it's possible for the changelog
772 806 # to have changed since we last saw it.
773 807 if clstart >= len(repo):
774 808 return
775 809
776 810 # forcefully update the on-disk branch cache
777 811 repo.ui.debug("updating the branch cache\n")
778 812 repo.hook("changegroup", **hookargs)
779 813
780 814 for n in added:
781 815 args = hookargs.copy()
782 816 args['node'] = hex(n)
783 817 repo.hook("incoming", **args)
784 818
785 819 newheads = [h for h in repo.heads() if h not in oldheads]
786 820 repo.ui.log("incoming",
787 821 "%s incoming changes - new heads: %s\n",
788 822 len(added),
789 823 ', '.join([hex(c[:6]) for c in newheads]))
790 824 repo._afterlock(runhooks)
791 825
792 826 finally:
793 827 tr.release()
794 828 # never return 0 here:
795 829 if dh < 0:
796 830 return dh - 1
797 831 else:
798 832 return dh + 1
General Comments 0
You need to be logged in to leave comments. Login now