##// END OF EJS Templates
changegroup.cg2packer: lookup 'group' via inheritance chain...
Siddharth Agarwal -
r23243:c5843268 default
parent child Browse files
Show More
@@ -1,831 +1,831
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import weakref
9 9 from i18n import _
10 10 from node import nullrev, nullid, hex, short
11 11 import mdiff, util, dagutil
12 12 import struct, os, bz2, zlib, tempfile
13 13 import discovery, error, phases, branchmap
14 14
15 15 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
16 16 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
17 17
18 18 def readexactly(stream, n):
19 19 '''read n bytes from stream.read and abort if less was available'''
20 20 s = stream.read(n)
21 21 if len(s) < n:
22 22 raise util.Abort(_("stream ended unexpectedly"
23 23 " (got %d bytes, expected %d)")
24 24 % (len(s), n))
25 25 return s
26 26
27 27 def getchunk(stream):
28 28 """return the next chunk from stream as a string"""
29 29 d = readexactly(stream, 4)
30 30 l = struct.unpack(">l", d)[0]
31 31 if l <= 4:
32 32 if l:
33 33 raise util.Abort(_("invalid chunk length %d") % l)
34 34 return ""
35 35 return readexactly(stream, l - 4)
36 36
37 37 def chunkheader(length):
38 38 """return a changegroup chunk header (string)"""
39 39 return struct.pack(">l", length + 4)
40 40
41 41 def closechunk():
42 42 """return a changegroup chunk header (string) for a zero-length chunk"""
43 43 return struct.pack(">l", 0)
44 44
45 45 class nocompress(object):
46 46 def compress(self, x):
47 47 return x
48 48 def flush(self):
49 49 return ""
50 50
51 51 bundletypes = {
52 52 "": ("", nocompress), # only when using unbundle on ssh and old http servers
53 53 # since the unification ssh accepts a header but there
54 54 # is no capability signaling it.
55 55 "HG10UN": ("HG10UN", nocompress),
56 56 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
57 57 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
58 58 }
59 59
60 60 # hgweb uses this list to communicate its preferred type
61 61 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
62 62
63 63 def writebundle(cg, filename, bundletype, vfs=None):
64 64 """Write a bundle file and return its filename.
65 65
66 66 Existing files will not be overwritten.
67 67 If no filename is specified, a temporary file is created.
68 68 bz2 compression can be turned off.
69 69 The bundle file will be deleted in case of errors.
70 70 """
71 71
72 72 fh = None
73 73 cleanup = None
74 74 try:
75 75 if filename:
76 76 if vfs:
77 77 fh = vfs.open(filename, "wb")
78 78 else:
79 79 fh = open(filename, "wb")
80 80 else:
81 81 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
82 82 fh = os.fdopen(fd, "wb")
83 83 cleanup = filename
84 84
85 85 header, compressor = bundletypes[bundletype]
86 86 fh.write(header)
87 87 z = compressor()
88 88
89 89 # parse the changegroup data, otherwise we will block
90 90 # in case of sshrepo because we don't know the end of the stream
91 91
92 92 # an empty chunkgroup is the end of the changegroup
93 93 # a changegroup has at least 2 chunkgroups (changelog and manifest).
94 94 # after that, an empty chunkgroup is the end of the changegroup
95 95 for chunk in cg.getchunks():
96 96 fh.write(z.compress(chunk))
97 97 fh.write(z.flush())
98 98 cleanup = None
99 99 return filename
100 100 finally:
101 101 if fh is not None:
102 102 fh.close()
103 103 if cleanup is not None:
104 104 if filename and vfs:
105 105 vfs.unlink(cleanup)
106 106 else:
107 107 os.unlink(cleanup)
108 108
109 109 def decompressor(fh, alg):
110 110 if alg == 'UN':
111 111 return fh
112 112 elif alg == 'GZ':
113 113 def generator(f):
114 114 zd = zlib.decompressobj()
115 115 for chunk in util.filechunkiter(f):
116 116 yield zd.decompress(chunk)
117 117 elif alg == 'BZ':
118 118 def generator(f):
119 119 zd = bz2.BZ2Decompressor()
120 120 zd.decompress("BZ")
121 121 for chunk in util.filechunkiter(f, 4096):
122 122 yield zd.decompress(chunk)
123 123 else:
124 124 raise util.Abort("unknown bundle compression '%s'" % alg)
125 125 return util.chunkbuffer(generator(fh))
126 126
127 127 class cg1unpacker(object):
128 128 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
129 129 deltaheadersize = struct.calcsize(deltaheader)
130 130 def __init__(self, fh, alg):
131 131 self._stream = decompressor(fh, alg)
132 132 self._type = alg
133 133 self.callback = None
134 134 def compressed(self):
135 135 return self._type != 'UN'
136 136 def read(self, l):
137 137 return self._stream.read(l)
138 138 def seek(self, pos):
139 139 return self._stream.seek(pos)
140 140 def tell(self):
141 141 return self._stream.tell()
142 142 def close(self):
143 143 return self._stream.close()
144 144
145 145 def chunklength(self):
146 146 d = readexactly(self._stream, 4)
147 147 l = struct.unpack(">l", d)[0]
148 148 if l <= 4:
149 149 if l:
150 150 raise util.Abort(_("invalid chunk length %d") % l)
151 151 return 0
152 152 if self.callback:
153 153 self.callback()
154 154 return l - 4
155 155
156 156 def changelogheader(self):
157 157 """v10 does not have a changelog header chunk"""
158 158 return {}
159 159
160 160 def manifestheader(self):
161 161 """v10 does not have a manifest header chunk"""
162 162 return {}
163 163
164 164 def filelogheader(self):
165 165 """return the header of the filelogs chunk, v10 only has the filename"""
166 166 l = self.chunklength()
167 167 if not l:
168 168 return {}
169 169 fname = readexactly(self._stream, l)
170 170 return {'filename': fname}
171 171
172 172 def _deltaheader(self, headertuple, prevnode):
173 173 node, p1, p2, cs = headertuple
174 174 if prevnode is None:
175 175 deltabase = p1
176 176 else:
177 177 deltabase = prevnode
178 178 return node, p1, p2, deltabase, cs
179 179
180 180 def deltachunk(self, prevnode):
181 181 l = self.chunklength()
182 182 if not l:
183 183 return {}
184 184 headerdata = readexactly(self._stream, self.deltaheadersize)
185 185 header = struct.unpack(self.deltaheader, headerdata)
186 186 delta = readexactly(self._stream, l - self.deltaheadersize)
187 187 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
188 188 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
189 189 'deltabase': deltabase, 'delta': delta}
190 190
191 191 def getchunks(self):
192 192 """returns all the chunks contains in the bundle
193 193
194 194 Used when you need to forward the binary stream to a file or another
195 195 network API. To do so, it parse the changegroup data, otherwise it will
196 196 block in case of sshrepo because it don't know the end of the stream.
197 197 """
198 198 # an empty chunkgroup is the end of the changegroup
199 199 # a changegroup has at least 2 chunkgroups (changelog and manifest).
200 200 # after that, an empty chunkgroup is the end of the changegroup
201 201 empty = False
202 202 count = 0
203 203 while not empty or count <= 2:
204 204 empty = True
205 205 count += 1
206 206 while True:
207 207 chunk = getchunk(self)
208 208 if not chunk:
209 209 break
210 210 empty = False
211 211 yield chunkheader(len(chunk))
212 212 pos = 0
213 213 while pos < len(chunk):
214 214 next = pos + 2**20
215 215 yield chunk[pos:next]
216 216 pos = next
217 217 yield closechunk()
218 218
219 219 class cg2unpacker(cg1unpacker):
220 220 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
221 221 deltaheadersize = struct.calcsize(deltaheader)
222 222
223 223 def _deltaheader(self, headertuple, prevnode):
224 224 node, p1, p2, deltabase, cs = headertuple
225 225 return node, p1, p2, deltabase, cs
226 226
227 227 class headerlessfixup(object):
228 228 def __init__(self, fh, h):
229 229 self._h = h
230 230 self._fh = fh
231 231 def read(self, n):
232 232 if self._h:
233 233 d, self._h = self._h[:n], self._h[n:]
234 234 if len(d) < n:
235 235 d += readexactly(self._fh, n - len(d))
236 236 return d
237 237 return readexactly(self._fh, n)
238 238
239 239 class cg1packer(object):
240 240 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
241 241 def __init__(self, repo, bundlecaps=None):
242 242 """Given a source repo, construct a bundler.
243 243
244 244 bundlecaps is optional and can be used to specify the set of
245 245 capabilities which can be used to build the bundle.
246 246 """
247 247 # Set of capabilities we can use to build the bundle.
248 248 if bundlecaps is None:
249 249 bundlecaps = set()
250 250 self._bundlecaps = bundlecaps
251 251 self._changelog = repo.changelog
252 252 self._manifest = repo.manifest
253 253 reorder = repo.ui.config('bundle', 'reorder', 'auto')
254 254 if reorder == 'auto':
255 255 reorder = None
256 256 else:
257 257 reorder = util.parsebool(reorder)
258 258 self._repo = repo
259 259 self._reorder = reorder
260 260 self._progress = repo.ui.progress
261 261 def close(self):
262 262 return closechunk()
263 263
264 264 def fileheader(self, fname):
265 265 return chunkheader(len(fname)) + fname
266 266
267 267 def group(self, nodelist, revlog, lookup, units=None, reorder=None):
268 268 """Calculate a delta group, yielding a sequence of changegroup chunks
269 269 (strings).
270 270
271 271 Given a list of changeset revs, return a set of deltas and
272 272 metadata corresponding to nodes. The first delta is
273 273 first parent(nodelist[0]) -> nodelist[0], the receiver is
274 274 guaranteed to have this parent as it has all history before
275 275 these changesets. In the case firstparent is nullrev the
276 276 changegroup starts with a full revision.
277 277
278 278 If units is not None, progress detail will be generated, units specifies
279 279 the type of revlog that is touched (changelog, manifest, etc.).
280 280 """
281 281 # if we don't have any revisions touched by these changesets, bail
282 282 if len(nodelist) == 0:
283 283 yield self.close()
284 284 return
285 285
286 286 # for generaldelta revlogs, we linearize the revs; this will both be
287 287 # much quicker and generate a much smaller bundle
288 288 if (revlog._generaldelta and reorder is not False) or reorder:
289 289 dag = dagutil.revlogdag(revlog)
290 290 revs = set(revlog.rev(n) for n in nodelist)
291 291 revs = dag.linearize(revs)
292 292 else:
293 293 revs = sorted([revlog.rev(n) for n in nodelist])
294 294
295 295 # add the parent of the first rev
296 296 p = revlog.parentrevs(revs[0])[0]
297 297 revs.insert(0, p)
298 298
299 299 # build deltas
300 300 total = len(revs) - 1
301 301 msgbundling = _('bundling')
302 302 for r in xrange(len(revs) - 1):
303 303 if units is not None:
304 304 self._progress(msgbundling, r + 1, unit=units, total=total)
305 305 prev, curr = revs[r], revs[r + 1]
306 306 linknode = lookup(revlog.node(curr))
307 307 for c in self.revchunk(revlog, curr, prev, linknode):
308 308 yield c
309 309
310 310 yield self.close()
311 311
312 312 # filter any nodes that claim to be part of the known set
313 313 def prune(self, revlog, missing, commonrevs, source):
314 314 rr, rl = revlog.rev, revlog.linkrev
315 315 return [n for n in missing if rl(rr(n)) not in commonrevs]
316 316
317 317 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
318 318 '''yield a sequence of changegroup chunks (strings)'''
319 319 repo = self._repo
320 320 cl = self._changelog
321 321 mf = self._manifest
322 322 reorder = self._reorder
323 323 progress = self._progress
324 324
325 325 # for progress output
326 326 msgbundling = _('bundling')
327 327
328 328 mfs = {} # needed manifests
329 329 fnodes = {} # needed file nodes
330 330 changedfiles = set()
331 331
332 332 # Callback for the changelog, used to collect changed files and manifest
333 333 # nodes.
334 334 # Returns the linkrev node (identity in the changelog case).
335 335 def lookupcl(x):
336 336 c = cl.read(x)
337 337 changedfiles.update(c[3])
338 338 # record the first changeset introducing this manifest version
339 339 mfs.setdefault(c[0], x)
340 340 return x
341 341
342 342 # Callback for the manifest, used to collect linkrevs for filelog
343 343 # revisions.
344 344 # Returns the linkrev node (collected in lookupcl).
345 345 def lookupmf(x):
346 346 clnode = mfs[x]
347 347 if not fastpathlinkrev:
348 348 mdata = mf.readfast(x)
349 349 for f, n in mdata.iteritems():
350 350 if f in changedfiles:
351 351 # record the first changeset introducing this filelog
352 352 # version
353 353 fnodes[f].setdefault(n, clnode)
354 354 return clnode
355 355
356 356 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets'),
357 357 reorder=reorder):
358 358 yield chunk
359 359 progress(msgbundling, None)
360 360
361 361 for f in changedfiles:
362 362 fnodes[f] = {}
363 363 mfnodes = self.prune(mf, mfs, commonrevs, source)
364 364 for chunk in self.group(mfnodes, mf, lookupmf, units=_('manifests'),
365 365 reorder=reorder):
366 366 yield chunk
367 367 progress(msgbundling, None)
368 368
369 369 mfs.clear()
370 370 needed = set(cl.rev(x) for x in clnodes)
371 371
372 372 def linknodes(filerevlog, fname):
373 373 if fastpathlinkrev:
374 374 llr = filerevlog.linkrev
375 375 def genfilenodes():
376 376 for r in filerevlog:
377 377 linkrev = llr(r)
378 378 if linkrev in needed:
379 379 yield filerevlog.node(r), cl.node(linkrev)
380 380 fnodes[fname] = dict(genfilenodes())
381 381 return fnodes.get(fname, {})
382 382
383 383 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
384 384 source):
385 385 yield chunk
386 386
387 387 yield self.close()
388 388 progress(msgbundling, None)
389 389
390 390 if clnodes:
391 391 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
392 392
393 393 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
394 394 repo = self._repo
395 395 progress = self._progress
396 396 reorder = self._reorder
397 397 msgbundling = _('bundling')
398 398
399 399 total = len(changedfiles)
400 400 # for progress output
401 401 msgfiles = _('files')
402 402 for i, fname in enumerate(sorted(changedfiles)):
403 403 filerevlog = repo.file(fname)
404 404 if not filerevlog:
405 405 raise util.Abort(_("empty or missing revlog for %s") % fname)
406 406
407 407 linkrevnodes = linknodes(filerevlog, fname)
408 408 # Lookup for filenodes, we collected the linkrev nodes above in the
409 409 # fastpath case and with lookupmf in the slowpath case.
410 410 def lookupfilelog(x):
411 411 return linkrevnodes[x]
412 412
413 413 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs, source)
414 414 if filenodes:
415 415 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
416 416 total=total)
417 417 yield self.fileheader(fname)
418 418 for chunk in self.group(filenodes, filerevlog, lookupfilelog,
419 419 reorder=reorder):
420 420 yield chunk
421 421
422 422 def deltaparent(self, revlog, rev, p1, p2, prev):
423 423 return prev
424 424
425 425 def revchunk(self, revlog, rev, prev, linknode):
426 426 node = revlog.node(rev)
427 427 p1, p2 = revlog.parentrevs(rev)
428 428 base = self.deltaparent(revlog, rev, p1, p2, prev)
429 429
430 430 prefix = ''
431 431 if base == nullrev:
432 432 delta = revlog.revision(node)
433 433 prefix = mdiff.trivialdiffheader(len(delta))
434 434 else:
435 435 delta = revlog.revdiff(base, rev)
436 436 p1n, p2n = revlog.parents(node)
437 437 basenode = revlog.node(base)
438 438 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
439 439 meta += prefix
440 440 l = len(meta) + len(delta)
441 441 yield chunkheader(l)
442 442 yield meta
443 443 yield delta
444 444 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
445 445 # do nothing with basenode, it is implicitly the previous one in HG10
446 446 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
447 447
448 448 class cg2packer(cg1packer):
449 449
450 450 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
451 451
452 452 def group(self, nodelist, revlog, lookup, units=None, reorder=None):
453 453 if (revlog._generaldelta and reorder is not True):
454 454 reorder = False
455 return cg1packer.group(self, nodelist, revlog, lookup,
455 return super(cg2packer, self).group(nodelist, revlog, lookup,
456 456 units=units, reorder=reorder)
457 457
458 458 def deltaparent(self, revlog, rev, p1, p2, prev):
459 459 dp = revlog.deltaparent(rev)
460 460 # avoid storing full revisions; pick prev in those cases
461 461 # also pick prev when we can't be sure remote has dp
462 462 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
463 463 return prev
464 464 return dp
465 465
466 466 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
467 467 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
468 468
469 469 packermap = {'01': (cg1packer, cg1unpacker),
470 470 '02': (cg2packer, cg2unpacker)}
471 471
472 472 def _changegroupinfo(repo, nodes, source):
473 473 if repo.ui.verbose or source == 'bundle':
474 474 repo.ui.status(_("%d changesets found\n") % len(nodes))
475 475 if repo.ui.debugflag:
476 476 repo.ui.debug("list of changesets:\n")
477 477 for node in nodes:
478 478 repo.ui.debug("%s\n" % hex(node))
479 479
480 480 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
481 481 repo = repo.unfiltered()
482 482 commonrevs = outgoing.common
483 483 csets = outgoing.missing
484 484 heads = outgoing.missingheads
485 485 # We go through the fast path if we get told to, or if all (unfiltered
486 486 # heads have been requested (since we then know there all linkrevs will
487 487 # be pulled by the client).
488 488 heads.sort()
489 489 fastpathlinkrev = fastpath or (
490 490 repo.filtername is None and heads == sorted(repo.heads()))
491 491
492 492 repo.hook('preoutgoing', throw=True, source=source)
493 493 _changegroupinfo(repo, csets, source)
494 494 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
495 495
496 496 def getsubset(repo, outgoing, bundler, source, fastpath=False):
497 497 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
498 498 return cg1unpacker(util.chunkbuffer(gengroup), 'UN')
499 499
500 500 def changegroupsubset(repo, roots, heads, source):
501 501 """Compute a changegroup consisting of all the nodes that are
502 502 descendants of any of the roots and ancestors of any of the heads.
503 503 Return a chunkbuffer object whose read() method will return
504 504 successive changegroup chunks.
505 505
506 506 It is fairly complex as determining which filenodes and which
507 507 manifest nodes need to be included for the changeset to be complete
508 508 is non-trivial.
509 509
510 510 Another wrinkle is doing the reverse, figuring out which changeset in
511 511 the changegroup a particular filenode or manifestnode belongs to.
512 512 """
513 513 cl = repo.changelog
514 514 if not roots:
515 515 roots = [nullid]
516 516 # TODO: remove call to nodesbetween.
517 517 csets, roots, heads = cl.nodesbetween(roots, heads)
518 518 discbases = []
519 519 for n in roots:
520 520 discbases.extend([p for p in cl.parents(n) if p != nullid])
521 521 outgoing = discovery.outgoing(cl, discbases, heads)
522 522 bundler = cg1packer(repo)
523 523 return getsubset(repo, outgoing, bundler, source)
524 524
525 525 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
526 526 version='01'):
527 527 """Like getbundle, but taking a discovery.outgoing as an argument.
528 528
529 529 This is only implemented for local repos and reuses potentially
530 530 precomputed sets in outgoing. Returns a raw changegroup generator."""
531 531 if not outgoing.missing:
532 532 return None
533 533 bundler = packermap[version][0](repo, bundlecaps)
534 534 return getsubsetraw(repo, outgoing, bundler, source)
535 535
536 536 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None):
537 537 """Like getbundle, but taking a discovery.outgoing as an argument.
538 538
539 539 This is only implemented for local repos and reuses potentially
540 540 precomputed sets in outgoing."""
541 541 if not outgoing.missing:
542 542 return None
543 543 bundler = cg1packer(repo, bundlecaps)
544 544 return getsubset(repo, outgoing, bundler, source)
545 545
546 546 def _computeoutgoing(repo, heads, common):
547 547 """Computes which revs are outgoing given a set of common
548 548 and a set of heads.
549 549
550 550 This is a separate function so extensions can have access to
551 551 the logic.
552 552
553 553 Returns a discovery.outgoing object.
554 554 """
555 555 cl = repo.changelog
556 556 if common:
557 557 hasnode = cl.hasnode
558 558 common = [n for n in common if hasnode(n)]
559 559 else:
560 560 common = [nullid]
561 561 if not heads:
562 562 heads = cl.heads()
563 563 return discovery.outgoing(cl, common, heads)
564 564
565 565 def getchangegroupraw(repo, source, heads=None, common=None, bundlecaps=None,
566 566 version='01'):
567 567 """Like changegroupsubset, but returns the set difference between the
568 568 ancestors of heads and the ancestors common.
569 569
570 570 If heads is None, use the local heads. If common is None, use [nullid].
571 571
572 572 If version is None, use a version '1' changegroup.
573 573
574 574 The nodes in common might not all be known locally due to the way the
575 575 current discovery protocol works. Returns a raw changegroup generator.
576 576 """
577 577 outgoing = _computeoutgoing(repo, heads, common)
578 578 return getlocalchangegroupraw(repo, source, outgoing, bundlecaps=bundlecaps,
579 579 version=version)
580 580
581 581 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None):
582 582 """Like changegroupsubset, but returns the set difference between the
583 583 ancestors of heads and the ancestors common.
584 584
585 585 If heads is None, use the local heads. If common is None, use [nullid].
586 586
587 587 The nodes in common might not all be known locally due to the way the
588 588 current discovery protocol works.
589 589 """
590 590 outgoing = _computeoutgoing(repo, heads, common)
591 591 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps)
592 592
593 593 def changegroup(repo, basenodes, source):
594 594 # to avoid a race we use changegroupsubset() (issue1320)
595 595 return changegroupsubset(repo, basenodes, repo.heads(), source)
596 596
597 597 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
598 598 revisions = 0
599 599 files = 0
600 600 while True:
601 601 chunkdata = source.filelogheader()
602 602 if not chunkdata:
603 603 break
604 604 f = chunkdata["filename"]
605 605 repo.ui.debug("adding %s revisions\n" % f)
606 606 pr()
607 607 fl = repo.file(f)
608 608 o = len(fl)
609 609 if not fl.addgroup(source, revmap, trp):
610 610 raise util.Abort(_("received file revlog group is empty"))
611 611 revisions += len(fl) - o
612 612 files += 1
613 613 if f in needfiles:
614 614 needs = needfiles[f]
615 615 for new in xrange(o, len(fl)):
616 616 n = fl.node(new)
617 617 if n in needs:
618 618 needs.remove(n)
619 619 else:
620 620 raise util.Abort(
621 621 _("received spurious file revlog entry"))
622 622 if not needs:
623 623 del needfiles[f]
624 624 repo.ui.progress(_('files'), None)
625 625
626 626 for f, needs in needfiles.iteritems():
627 627 fl = repo.file(f)
628 628 for n in needs:
629 629 try:
630 630 fl.rev(n)
631 631 except error.LookupError:
632 632 raise util.Abort(
633 633 _('missing file data for %s:%s - run hg verify') %
634 634 (f, hex(n)))
635 635
636 636 return revisions, files
637 637
638 638 def addchangegroup(repo, source, srctype, url, emptyok=False,
639 639 targetphase=phases.draft):
640 640 """Add the changegroup returned by source.read() to this repo.
641 641 srctype is a string like 'push', 'pull', or 'unbundle'. url is
642 642 the URL of the repo where this changegroup is coming from.
643 643
644 644 Return an integer summarizing the change to this repo:
645 645 - nothing changed or no source: 0
646 646 - more heads than before: 1+added heads (2..n)
647 647 - fewer heads than before: -1-removed heads (-2..-n)
648 648 - number of heads stays the same: 1
649 649 """
650 650 repo = repo.unfiltered()
651 651 def csmap(x):
652 652 repo.ui.debug("add changeset %s\n" % short(x))
653 653 return len(cl)
654 654
655 655 def revmap(x):
656 656 return cl.rev(x)
657 657
658 658 if not source:
659 659 return 0
660 660
661 661 changesets = files = revisions = 0
662 662 efiles = set()
663 663
664 664 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
665 665 # The transaction could have been created before and already carries source
666 666 # information. In this case we use the top level data. We overwrite the
667 667 # argument because we need to use the top level value (if they exist) in
668 668 # this function.
669 669 srctype = tr.hookargs.setdefault('source', srctype)
670 670 url = tr.hookargs.setdefault('url', url)
671 671
672 672 # write changelog data to temp files so concurrent readers will not see
673 673 # inconsistent view
674 674 cl = repo.changelog
675 675 cl.delayupdate(tr)
676 676 oldheads = cl.heads()
677 677 try:
678 678 repo.hook('prechangegroup', throw=True, **tr.hookargs)
679 679
680 680 trp = weakref.proxy(tr)
681 681 # pull off the changeset group
682 682 repo.ui.status(_("adding changesets\n"))
683 683 clstart = len(cl)
684 684 class prog(object):
685 685 step = _('changesets')
686 686 count = 1
687 687 ui = repo.ui
688 688 total = None
689 689 def __call__(repo):
690 690 repo.ui.progress(repo.step, repo.count, unit=_('chunks'),
691 691 total=repo.total)
692 692 repo.count += 1
693 693 pr = prog()
694 694 source.callback = pr
695 695
696 696 source.changelogheader()
697 697 srccontent = cl.addgroup(source, csmap, trp)
698 698 if not (srccontent or emptyok):
699 699 raise util.Abort(_("received changelog group is empty"))
700 700 clend = len(cl)
701 701 changesets = clend - clstart
702 702 for c in xrange(clstart, clend):
703 703 efiles.update(repo[c].files())
704 704 efiles = len(efiles)
705 705 repo.ui.progress(_('changesets'), None)
706 706
707 707 # pull off the manifest group
708 708 repo.ui.status(_("adding manifests\n"))
709 709 pr.step = _('manifests')
710 710 pr.count = 1
711 711 pr.total = changesets # manifests <= changesets
712 712 # no need to check for empty manifest group here:
713 713 # if the result of the merge of 1 and 2 is the same in 3 and 4,
714 714 # no new manifest will be created and the manifest group will
715 715 # be empty during the pull
716 716 source.manifestheader()
717 717 repo.manifest.addgroup(source, revmap, trp)
718 718 repo.ui.progress(_('manifests'), None)
719 719
720 720 needfiles = {}
721 721 if repo.ui.configbool('server', 'validate', default=False):
722 722 # validate incoming csets have their manifests
723 723 for cset in xrange(clstart, clend):
724 724 mfest = repo.changelog.read(repo.changelog.node(cset))[0]
725 725 mfest = repo.manifest.readdelta(mfest)
726 726 # store file nodes we must see
727 727 for f, n in mfest.iteritems():
728 728 needfiles.setdefault(f, set()).add(n)
729 729
730 730 # process the files
731 731 repo.ui.status(_("adding file changes\n"))
732 732 pr.step = _('files')
733 733 pr.count = 1
734 734 pr.total = efiles
735 735 source.callback = None
736 736
737 737 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
738 738 needfiles)
739 739 revisions += newrevs
740 740 files += newfiles
741 741
742 742 dh = 0
743 743 if oldheads:
744 744 heads = cl.heads()
745 745 dh = len(heads) - len(oldheads)
746 746 for h in heads:
747 747 if h not in oldheads and repo[h].closesbranch():
748 748 dh -= 1
749 749 htext = ""
750 750 if dh:
751 751 htext = _(" (%+d heads)") % dh
752 752
753 753 repo.ui.status(_("added %d changesets"
754 754 " with %d changes to %d files%s\n")
755 755 % (changesets, revisions, files, htext))
756 756 repo.invalidatevolatilesets()
757 757
758 758 if changesets > 0:
759 759 p = lambda: tr.writepending() and repo.root or ""
760 760 if 'node' not in tr.hookargs:
761 761 tr.hookargs['node'] = hex(cl.node(clstart))
762 762 hookargs = dict(tr.hookargs)
763 763 else:
764 764 hookargs = dict(tr.hookargs)
765 765 hookargs['node'] = hex(cl.node(clstart))
766 766 repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
767 767
768 768 added = [cl.node(r) for r in xrange(clstart, clend)]
769 769 publishing = repo.ui.configbool('phases', 'publish', True)
770 770 if srctype in ('push', 'serve'):
771 771 # Old servers can not push the boundary themselves.
772 772 # New servers won't push the boundary if changeset already
773 773 # exists locally as secret
774 774 #
775 775 # We should not use added here but the list of all change in
776 776 # the bundle
777 777 if publishing:
778 778 phases.advanceboundary(repo, tr, phases.public, srccontent)
779 779 else:
780 780 # Those changesets have been pushed from the outside, their
781 781 # phases are going to be pushed alongside. Therefor
782 782 # `targetphase` is ignored.
783 783 phases.advanceboundary(repo, tr, phases.draft, srccontent)
784 784 phases.retractboundary(repo, tr, phases.draft, added)
785 785 elif srctype != 'strip':
786 786 # publishing only alter behavior during push
787 787 #
788 788 # strip should not touch boundary at all
789 789 phases.retractboundary(repo, tr, targetphase, added)
790 790
791 791 if changesets > 0:
792 792 if srctype != 'strip':
793 793 # During strip, branchcache is invalid but coming call to
794 794 # `destroyed` will repair it.
795 795 # In other case we can safely update cache on disk.
796 796 branchmap.updatecache(repo.filtered('served'))
797 797
798 798 def runhooks():
799 799 # These hooks run when the lock releases, not when the
800 800 # transaction closes. So it's possible for the changelog
801 801 # to have changed since we last saw it.
802 802 if clstart >= len(repo):
803 803 return
804 804
805 805 # forcefully update the on-disk branch cache
806 806 repo.ui.debug("updating the branch cache\n")
807 807 repo.hook("changegroup", **hookargs)
808 808
809 809 for n in added:
810 810 args = hookargs.copy()
811 811 args['node'] = hex(n)
812 812 repo.hook("incoming", **args)
813 813
814 814 newheads = [h for h in repo.heads() if h not in oldheads]
815 815 repo.ui.log("incoming",
816 816 "%s incoming changes - new heads: %s\n",
817 817 len(added),
818 818 ', '.join([hex(c[:6]) for c in newheads]))
819 819
820 820 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
821 821 lambda: repo._afterlock(runhooks))
822 822
823 823 tr.close()
824 824
825 825 finally:
826 826 tr.release()
827 827 # never return 0 here:
828 828 if dh < 0:
829 829 return dh - 1
830 830 else:
831 831 return dh + 1
General Comments 0
You need to be logged in to leave comments. Login now