##// END OF EJS Templates
changegroup: don't define lookupmf() until it is needed...
Gregory Szorc -
r23224:f4ab47cc stable
parent child Browse files
Show More
@@ -1,766 +1,767
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import weakref
9 9 from i18n import _
10 10 from node import nullrev, nullid, hex, short
11 11 import mdiff, util, dagutil
12 12 import struct, os, bz2, zlib, tempfile
13 13 import discovery, error, phases, branchmap
14 14
15 15 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
16 16
17 17 def readexactly(stream, n):
18 18 '''read n bytes from stream.read and abort if less was available'''
19 19 s = stream.read(n)
20 20 if len(s) < n:
21 21 raise util.Abort(_("stream ended unexpectedly"
22 22 " (got %d bytes, expected %d)")
23 23 % (len(s), n))
24 24 return s
25 25
26 26 def getchunk(stream):
27 27 """return the next chunk from stream as a string"""
28 28 d = readexactly(stream, 4)
29 29 l = struct.unpack(">l", d)[0]
30 30 if l <= 4:
31 31 if l:
32 32 raise util.Abort(_("invalid chunk length %d") % l)
33 33 return ""
34 34 return readexactly(stream, l - 4)
35 35
36 36 def chunkheader(length):
37 37 """return a changegroup chunk header (string)"""
38 38 return struct.pack(">l", length + 4)
39 39
40 40 def closechunk():
41 41 """return a changegroup chunk header (string) for a zero-length chunk"""
42 42 return struct.pack(">l", 0)
43 43
44 44 class nocompress(object):
45 45 def compress(self, x):
46 46 return x
47 47 def flush(self):
48 48 return ""
49 49
50 50 bundletypes = {
51 51 "": ("", nocompress), # only when using unbundle on ssh and old http servers
52 52 # since the unification ssh accepts a header but there
53 53 # is no capability signaling it.
54 54 "HG10UN": ("HG10UN", nocompress),
55 55 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
56 56 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
57 57 }
58 58
59 59 # hgweb uses this list to communicate its preferred type
60 60 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
61 61
62 62 def writebundle(cg, filename, bundletype, vfs=None):
63 63 """Write a bundle file and return its filename.
64 64
65 65 Existing files will not be overwritten.
66 66 If no filename is specified, a temporary file is created.
67 67 bz2 compression can be turned off.
68 68 The bundle file will be deleted in case of errors.
69 69 """
70 70
71 71 fh = None
72 72 cleanup = None
73 73 try:
74 74 if filename:
75 75 if vfs:
76 76 fh = vfs.open(filename, "wb")
77 77 else:
78 78 fh = open(filename, "wb")
79 79 else:
80 80 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
81 81 fh = os.fdopen(fd, "wb")
82 82 cleanup = filename
83 83
84 84 header, compressor = bundletypes[bundletype]
85 85 fh.write(header)
86 86 z = compressor()
87 87
88 88 # parse the changegroup data, otherwise we will block
89 89 # in case of sshrepo because we don't know the end of the stream
90 90
91 91 # an empty chunkgroup is the end of the changegroup
92 92 # a changegroup has at least 2 chunkgroups (changelog and manifest).
93 93 # after that, an empty chunkgroup is the end of the changegroup
94 94 for chunk in cg.getchunks():
95 95 fh.write(z.compress(chunk))
96 96 fh.write(z.flush())
97 97 cleanup = None
98 98 return filename
99 99 finally:
100 100 if fh is not None:
101 101 fh.close()
102 102 if cleanup is not None:
103 103 if filename and vfs:
104 104 vfs.unlink(cleanup)
105 105 else:
106 106 os.unlink(cleanup)
107 107
108 108 def decompressor(fh, alg):
109 109 if alg == 'UN':
110 110 return fh
111 111 elif alg == 'GZ':
112 112 def generator(f):
113 113 zd = zlib.decompressobj()
114 114 for chunk in util.filechunkiter(f):
115 115 yield zd.decompress(chunk)
116 116 elif alg == 'BZ':
117 117 def generator(f):
118 118 zd = bz2.BZ2Decompressor()
119 119 zd.decompress("BZ")
120 120 for chunk in util.filechunkiter(f, 4096):
121 121 yield zd.decompress(chunk)
122 122 else:
123 123 raise util.Abort("unknown bundle compression '%s'" % alg)
124 124 return util.chunkbuffer(generator(fh))
125 125
126 126 class cg1unpacker(object):
127 127 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
128 128 deltaheadersize = struct.calcsize(deltaheader)
129 129 def __init__(self, fh, alg):
130 130 self._stream = decompressor(fh, alg)
131 131 self._type = alg
132 132 self.callback = None
133 133 def compressed(self):
134 134 return self._type != 'UN'
135 135 def read(self, l):
136 136 return self._stream.read(l)
137 137 def seek(self, pos):
138 138 return self._stream.seek(pos)
139 139 def tell(self):
140 140 return self._stream.tell()
141 141 def close(self):
142 142 return self._stream.close()
143 143
144 144 def chunklength(self):
145 145 d = readexactly(self._stream, 4)
146 146 l = struct.unpack(">l", d)[0]
147 147 if l <= 4:
148 148 if l:
149 149 raise util.Abort(_("invalid chunk length %d") % l)
150 150 return 0
151 151 if self.callback:
152 152 self.callback()
153 153 return l - 4
154 154
155 155 def changelogheader(self):
156 156 """v10 does not have a changelog header chunk"""
157 157 return {}
158 158
159 159 def manifestheader(self):
160 160 """v10 does not have a manifest header chunk"""
161 161 return {}
162 162
163 163 def filelogheader(self):
164 164 """return the header of the filelogs chunk, v10 only has the filename"""
165 165 l = self.chunklength()
166 166 if not l:
167 167 return {}
168 168 fname = readexactly(self._stream, l)
169 169 return {'filename': fname}
170 170
171 171 def _deltaheader(self, headertuple, prevnode):
172 172 node, p1, p2, cs = headertuple
173 173 if prevnode is None:
174 174 deltabase = p1
175 175 else:
176 176 deltabase = prevnode
177 177 return node, p1, p2, deltabase, cs
178 178
179 179 def deltachunk(self, prevnode):
180 180 l = self.chunklength()
181 181 if not l:
182 182 return {}
183 183 headerdata = readexactly(self._stream, self.deltaheadersize)
184 184 header = struct.unpack(self.deltaheader, headerdata)
185 185 delta = readexactly(self._stream, l - self.deltaheadersize)
186 186 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
187 187 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
188 188 'deltabase': deltabase, 'delta': delta}
189 189
190 190 def getchunks(self):
191 191 """returns all the chunks contains in the bundle
192 192
193 193 Used when you need to forward the binary stream to a file or another
194 194 network API. To do so, it parse the changegroup data, otherwise it will
195 195 block in case of sshrepo because it don't know the end of the stream.
196 196 """
197 197 # an empty chunkgroup is the end of the changegroup
198 198 # a changegroup has at least 2 chunkgroups (changelog and manifest).
199 199 # after that, an empty chunkgroup is the end of the changegroup
200 200 empty = False
201 201 count = 0
202 202 while not empty or count <= 2:
203 203 empty = True
204 204 count += 1
205 205 while True:
206 206 chunk = getchunk(self)
207 207 if not chunk:
208 208 break
209 209 empty = False
210 210 yield chunkheader(len(chunk))
211 211 pos = 0
212 212 while pos < len(chunk):
213 213 next = pos + 2**20
214 214 yield chunk[pos:next]
215 215 pos = next
216 216 yield closechunk()
217 217
218 218 class headerlessfixup(object):
219 219 def __init__(self, fh, h):
220 220 self._h = h
221 221 self._fh = fh
222 222 def read(self, n):
223 223 if self._h:
224 224 d, self._h = self._h[:n], self._h[n:]
225 225 if len(d) < n:
226 226 d += readexactly(self._fh, n - len(d))
227 227 return d
228 228 return readexactly(self._fh, n)
229 229
230 230 class cg1packer(object):
231 231 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
232 232 def __init__(self, repo, bundlecaps=None):
233 233 """Given a source repo, construct a bundler.
234 234
235 235 bundlecaps is optional and can be used to specify the set of
236 236 capabilities which can be used to build the bundle.
237 237 """
238 238 # Set of capabilities we can use to build the bundle.
239 239 if bundlecaps is None:
240 240 bundlecaps = set()
241 241 self._bundlecaps = bundlecaps
242 242 self._changelog = repo.changelog
243 243 self._manifest = repo.manifest
244 244 reorder = repo.ui.config('bundle', 'reorder', 'auto')
245 245 if reorder == 'auto':
246 246 reorder = None
247 247 else:
248 248 reorder = util.parsebool(reorder)
249 249 self._repo = repo
250 250 self._reorder = reorder
251 251 self._progress = repo.ui.progress
252 252 def close(self):
253 253 return closechunk()
254 254
255 255 def fileheader(self, fname):
256 256 return chunkheader(len(fname)) + fname
257 257
258 258 def group(self, nodelist, revlog, lookup, units=None, reorder=None):
259 259 """Calculate a delta group, yielding a sequence of changegroup chunks
260 260 (strings).
261 261
262 262 Given a list of changeset revs, return a set of deltas and
263 263 metadata corresponding to nodes. The first delta is
264 264 first parent(nodelist[0]) -> nodelist[0], the receiver is
265 265 guaranteed to have this parent as it has all history before
266 266 these changesets. In the case firstparent is nullrev the
267 267 changegroup starts with a full revision.
268 268
269 269 If units is not None, progress detail will be generated, units specifies
270 270 the type of revlog that is touched (changelog, manifest, etc.).
271 271 """
272 272 # if we don't have any revisions touched by these changesets, bail
273 273 if len(nodelist) == 0:
274 274 yield self.close()
275 275 return
276 276
277 277 # for generaldelta revlogs, we linearize the revs; this will both be
278 278 # much quicker and generate a much smaller bundle
279 279 if (revlog._generaldelta and reorder is not False) or reorder:
280 280 dag = dagutil.revlogdag(revlog)
281 281 revs = set(revlog.rev(n) for n in nodelist)
282 282 revs = dag.linearize(revs)
283 283 else:
284 284 revs = sorted([revlog.rev(n) for n in nodelist])
285 285
286 286 # add the parent of the first rev
287 287 p = revlog.parentrevs(revs[0])[0]
288 288 revs.insert(0, p)
289 289
290 290 # build deltas
291 291 total = len(revs) - 1
292 292 msgbundling = _('bundling')
293 293 for r in xrange(len(revs) - 1):
294 294 if units is not None:
295 295 self._progress(msgbundling, r + 1, unit=units, total=total)
296 296 prev, curr = revs[r], revs[r + 1]
297 297 linknode = lookup(revlog.node(curr))
298 298 for c in self.revchunk(revlog, curr, prev, linknode):
299 299 yield c
300 300
301 301 yield self.close()
302 302
303 303 # filter any nodes that claim to be part of the known set
304 304 def prune(self, revlog, missing, commonrevs, source):
305 305 rr, rl = revlog.rev, revlog.linkrev
306 306 return [n for n in missing if rl(rr(n)) not in commonrevs]
307 307
308 308 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
309 309 '''yield a sequence of changegroup chunks (strings)'''
310 310 repo = self._repo
311 311 cl = self._changelog
312 312 mf = self._manifest
313 313 reorder = self._reorder
314 314 progress = self._progress
315 315
316 316 # for progress output
317 317 msgbundling = _('bundling')
318 318
319 319 mfs = {} # needed manifests
320 320 fnodes = {} # needed file nodes
321 321 changedfiles = set()
322 322
323 323 # Callback for the changelog, used to collect changed files and manifest
324 324 # nodes.
325 325 # Returns the linkrev node (identity in the changelog case).
326 326 def lookupcl(x):
327 327 c = cl.read(x)
328 328 changedfiles.update(c[3])
329 329 # record the first changeset introducing this manifest version
330 330 mfs.setdefault(c[0], x)
331 331 return x
332 332
333 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets'),
334 reorder=reorder):
335 yield chunk
336 progress(msgbundling, None)
337
338 for f in changedfiles:
339 fnodes[f] = {}
340
333 341 # Callback for the manifest, used to collect linkrevs for filelog
334 342 # revisions.
335 343 # Returns the linkrev node (collected in lookupcl).
336 344 def lookupmf(x):
337 345 clnode = mfs[x]
338 346 if not fastpathlinkrev:
339 347 mdata = mf.readfast(x)
340 348 for f, n in mdata.iteritems():
341 349 if f in changedfiles:
342 350 # record the first changeset introducing this filelog
343 351 # version
344 352 fnodes[f].setdefault(n, clnode)
345 353 return clnode
346 354
347 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets'),
348 reorder=reorder):
349 yield chunk
350 progress(msgbundling, None)
351
352 for f in changedfiles:
353 fnodes[f] = {}
354 355 mfnodes = self.prune(mf, mfs, commonrevs, source)
355 356 for chunk in self.group(mfnodes, mf, lookupmf, units=_('manifests'),
356 357 reorder=reorder):
357 358 yield chunk
358 359 progress(msgbundling, None)
359 360
360 361 mfs.clear()
361 362 needed = set(cl.rev(x) for x in clnodes)
362 363
363 364 def linknodes(filerevlog, fname):
364 365 if fastpathlinkrev:
365 366 llr = filerevlog.linkrev
366 367 def genfilenodes():
367 368 for r in filerevlog:
368 369 linkrev = llr(r)
369 370 if linkrev in needed:
370 371 yield filerevlog.node(r), cl.node(linkrev)
371 372 fnodes[fname] = dict(genfilenodes())
372 373 return fnodes.get(fname, {})
373 374
374 375 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
375 376 source):
376 377 yield chunk
377 378
378 379 yield self.close()
379 380 progress(msgbundling, None)
380 381
381 382 if clnodes:
382 383 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
383 384
384 385 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
385 386 repo = self._repo
386 387 progress = self._progress
387 388 reorder = self._reorder
388 389 msgbundling = _('bundling')
389 390
390 391 total = len(changedfiles)
391 392 # for progress output
392 393 msgfiles = _('files')
393 394 for i, fname in enumerate(sorted(changedfiles)):
394 395 filerevlog = repo.file(fname)
395 396 if not filerevlog:
396 397 raise util.Abort(_("empty or missing revlog for %s") % fname)
397 398
398 399 linkrevnodes = linknodes(filerevlog, fname)
399 400 # Lookup for filenodes, we collected the linkrev nodes above in the
400 401 # fastpath case and with lookupmf in the slowpath case.
401 402 def lookupfilelog(x):
402 403 return linkrevnodes[x]
403 404
404 405 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs, source)
405 406 if filenodes:
406 407 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
407 408 total=total)
408 409 yield self.fileheader(fname)
409 410 for chunk in self.group(filenodes, filerevlog, lookupfilelog,
410 411 reorder=reorder):
411 412 yield chunk
412 413
413 414 def revchunk(self, revlog, rev, prev, linknode):
414 415 node = revlog.node(rev)
415 416 p1, p2 = revlog.parentrevs(rev)
416 417 base = prev
417 418
418 419 prefix = ''
419 420 if base == nullrev:
420 421 delta = revlog.revision(node)
421 422 prefix = mdiff.trivialdiffheader(len(delta))
422 423 else:
423 424 delta = revlog.revdiff(base, rev)
424 425 p1n, p2n = revlog.parents(node)
425 426 basenode = revlog.node(base)
426 427 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
427 428 meta += prefix
428 429 l = len(meta) + len(delta)
429 430 yield chunkheader(l)
430 431 yield meta
431 432 yield delta
432 433 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
433 434 # do nothing with basenode, it is implicitly the previous one in HG10
434 435 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
435 436
436 437 def _changegroupinfo(repo, nodes, source):
437 438 if repo.ui.verbose or source == 'bundle':
438 439 repo.ui.status(_("%d changesets found\n") % len(nodes))
439 440 if repo.ui.debugflag:
440 441 repo.ui.debug("list of changesets:\n")
441 442 for node in nodes:
442 443 repo.ui.debug("%s\n" % hex(node))
443 444
444 445 def getsubset(repo, outgoing, bundler, source, fastpath=False):
445 446 repo = repo.unfiltered()
446 447 commonrevs = outgoing.common
447 448 csets = outgoing.missing
448 449 heads = outgoing.missingheads
449 450 # We go through the fast path if we get told to, or if all (unfiltered
450 451 # heads have been requested (since we then know there all linkrevs will
451 452 # be pulled by the client).
452 453 heads.sort()
453 454 fastpathlinkrev = fastpath or (
454 455 repo.filtername is None and heads == sorted(repo.heads()))
455 456
456 457 repo.hook('preoutgoing', throw=True, source=source)
457 458 _changegroupinfo(repo, csets, source)
458 459 gengroup = bundler.generate(commonrevs, csets, fastpathlinkrev, source)
459 460 return cg1unpacker(util.chunkbuffer(gengroup), 'UN')
460 461
461 462 def changegroupsubset(repo, roots, heads, source):
462 463 """Compute a changegroup consisting of all the nodes that are
463 464 descendants of any of the roots and ancestors of any of the heads.
464 465 Return a chunkbuffer object whose read() method will return
465 466 successive changegroup chunks.
466 467
467 468 It is fairly complex as determining which filenodes and which
468 469 manifest nodes need to be included for the changeset to be complete
469 470 is non-trivial.
470 471
471 472 Another wrinkle is doing the reverse, figuring out which changeset in
472 473 the changegroup a particular filenode or manifestnode belongs to.
473 474 """
474 475 cl = repo.changelog
475 476 if not roots:
476 477 roots = [nullid]
477 478 # TODO: remove call to nodesbetween.
478 479 csets, roots, heads = cl.nodesbetween(roots, heads)
479 480 discbases = []
480 481 for n in roots:
481 482 discbases.extend([p for p in cl.parents(n) if p != nullid])
482 483 outgoing = discovery.outgoing(cl, discbases, heads)
483 484 bundler = cg1packer(repo)
484 485 return getsubset(repo, outgoing, bundler, source)
485 486
486 487 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None):
487 488 """Like getbundle, but taking a discovery.outgoing as an argument.
488 489
489 490 This is only implemented for local repos and reuses potentially
490 491 precomputed sets in outgoing."""
491 492 if not outgoing.missing:
492 493 return None
493 494 bundler = cg1packer(repo, bundlecaps)
494 495 return getsubset(repo, outgoing, bundler, source)
495 496
496 497 def _computeoutgoing(repo, heads, common):
497 498 """Computes which revs are outgoing given a set of common
498 499 and a set of heads.
499 500
500 501 This is a separate function so extensions can have access to
501 502 the logic.
502 503
503 504 Returns a discovery.outgoing object.
504 505 """
505 506 cl = repo.changelog
506 507 if common:
507 508 hasnode = cl.hasnode
508 509 common = [n for n in common if hasnode(n)]
509 510 else:
510 511 common = [nullid]
511 512 if not heads:
512 513 heads = cl.heads()
513 514 return discovery.outgoing(cl, common, heads)
514 515
515 516 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None):
516 517 """Like changegroupsubset, but returns the set difference between the
517 518 ancestors of heads and the ancestors common.
518 519
519 520 If heads is None, use the local heads. If common is None, use [nullid].
520 521
521 522 The nodes in common might not all be known locally due to the way the
522 523 current discovery protocol works.
523 524 """
524 525 outgoing = _computeoutgoing(repo, heads, common)
525 526 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps)
526 527
527 528 def changegroup(repo, basenodes, source):
528 529 # to avoid a race we use changegroupsubset() (issue1320)
529 530 return changegroupsubset(repo, basenodes, repo.heads(), source)
530 531
531 532 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
532 533 revisions = 0
533 534 files = 0
534 535 while True:
535 536 chunkdata = source.filelogheader()
536 537 if not chunkdata:
537 538 break
538 539 f = chunkdata["filename"]
539 540 repo.ui.debug("adding %s revisions\n" % f)
540 541 pr()
541 542 fl = repo.file(f)
542 543 o = len(fl)
543 544 if not fl.addgroup(source, revmap, trp):
544 545 raise util.Abort(_("received file revlog group is empty"))
545 546 revisions += len(fl) - o
546 547 files += 1
547 548 if f in needfiles:
548 549 needs = needfiles[f]
549 550 for new in xrange(o, len(fl)):
550 551 n = fl.node(new)
551 552 if n in needs:
552 553 needs.remove(n)
553 554 else:
554 555 raise util.Abort(
555 556 _("received spurious file revlog entry"))
556 557 if not needs:
557 558 del needfiles[f]
558 559 repo.ui.progress(_('files'), None)
559 560
560 561 for f, needs in needfiles.iteritems():
561 562 fl = repo.file(f)
562 563 for n in needs:
563 564 try:
564 565 fl.rev(n)
565 566 except error.LookupError:
566 567 raise util.Abort(
567 568 _('missing file data for %s:%s - run hg verify') %
568 569 (f, hex(n)))
569 570
570 571 return revisions, files
571 572
572 573 def addchangegroup(repo, source, srctype, url, emptyok=False,
573 574 targetphase=phases.draft):
574 575 """Add the changegroup returned by source.read() to this repo.
575 576 srctype is a string like 'push', 'pull', or 'unbundle'. url is
576 577 the URL of the repo where this changegroup is coming from.
577 578
578 579 Return an integer summarizing the change to this repo:
579 580 - nothing changed or no source: 0
580 581 - more heads than before: 1+added heads (2..n)
581 582 - fewer heads than before: -1-removed heads (-2..-n)
582 583 - number of heads stays the same: 1
583 584 """
584 585 repo = repo.unfiltered()
585 586 def csmap(x):
586 587 repo.ui.debug("add changeset %s\n" % short(x))
587 588 return len(cl)
588 589
589 590 def revmap(x):
590 591 return cl.rev(x)
591 592
592 593 if not source:
593 594 return 0
594 595
595 596 changesets = files = revisions = 0
596 597 efiles = set()
597 598
598 599 # write changelog data to temp files so concurrent readers will not see
599 600 # inconsistent view
600 601 cl = repo.changelog
601 602 cl.delayupdate()
602 603 oldheads = cl.heads()
603 604
604 605 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
605 606 # The transaction could have been created before and already carries source
606 607 # information. In this case we use the top level data. We overwrite the
607 608 # argument because we need to use the top level value (if they exist) in
608 609 # this function.
609 610 srctype = tr.hookargs.setdefault('source', srctype)
610 611 url = tr.hookargs.setdefault('url', url)
611 612 try:
612 613 repo.hook('prechangegroup', throw=True, **tr.hookargs)
613 614
614 615 trp = weakref.proxy(tr)
615 616 # pull off the changeset group
616 617 repo.ui.status(_("adding changesets\n"))
617 618 clstart = len(cl)
618 619 class prog(object):
619 620 step = _('changesets')
620 621 count = 1
621 622 ui = repo.ui
622 623 total = None
623 624 def __call__(repo):
624 625 repo.ui.progress(repo.step, repo.count, unit=_('chunks'),
625 626 total=repo.total)
626 627 repo.count += 1
627 628 pr = prog()
628 629 source.callback = pr
629 630
630 631 source.changelogheader()
631 632 srccontent = cl.addgroup(source, csmap, trp)
632 633 if not (srccontent or emptyok):
633 634 raise util.Abort(_("received changelog group is empty"))
634 635 clend = len(cl)
635 636 changesets = clend - clstart
636 637 for c in xrange(clstart, clend):
637 638 efiles.update(repo[c].files())
638 639 efiles = len(efiles)
639 640 repo.ui.progress(_('changesets'), None)
640 641
641 642 # pull off the manifest group
642 643 repo.ui.status(_("adding manifests\n"))
643 644 pr.step = _('manifests')
644 645 pr.count = 1
645 646 pr.total = changesets # manifests <= changesets
646 647 # no need to check for empty manifest group here:
647 648 # if the result of the merge of 1 and 2 is the same in 3 and 4,
648 649 # no new manifest will be created and the manifest group will
649 650 # be empty during the pull
650 651 source.manifestheader()
651 652 repo.manifest.addgroup(source, revmap, trp)
652 653 repo.ui.progress(_('manifests'), None)
653 654
654 655 needfiles = {}
655 656 if repo.ui.configbool('server', 'validate', default=False):
656 657 # validate incoming csets have their manifests
657 658 for cset in xrange(clstart, clend):
658 659 mfest = repo.changelog.read(repo.changelog.node(cset))[0]
659 660 mfest = repo.manifest.readdelta(mfest)
660 661 # store file nodes we must see
661 662 for f, n in mfest.iteritems():
662 663 needfiles.setdefault(f, set()).add(n)
663 664
664 665 # process the files
665 666 repo.ui.status(_("adding file changes\n"))
666 667 pr.step = _('files')
667 668 pr.count = 1
668 669 pr.total = efiles
669 670 source.callback = None
670 671
671 672 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
672 673 needfiles)
673 674 revisions += newrevs
674 675 files += newfiles
675 676
676 677 dh = 0
677 678 if oldheads:
678 679 heads = cl.heads()
679 680 dh = len(heads) - len(oldheads)
680 681 for h in heads:
681 682 if h not in oldheads and repo[h].closesbranch():
682 683 dh -= 1
683 684 htext = ""
684 685 if dh:
685 686 htext = _(" (%+d heads)") % dh
686 687
687 688 repo.ui.status(_("added %d changesets"
688 689 " with %d changes to %d files%s\n")
689 690 % (changesets, revisions, files, htext))
690 691 repo.invalidatevolatilesets()
691 692
692 693 if changesets > 0:
693 694 p = lambda: cl.writepending() and repo.root or ""
694 695 if 'node' not in tr.hookargs:
695 696 tr.hookargs['node'] = hex(cl.node(clstart))
696 697 hookargs = dict(tr.hookargs)
697 698 else:
698 699 hookargs = dict(tr.hookargs)
699 700 hookargs['node'] = hex(cl.node(clstart))
700 701 repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
701 702
702 703 added = [cl.node(r) for r in xrange(clstart, clend)]
703 704 publishing = repo.ui.configbool('phases', 'publish', True)
704 705 if srctype in ('push', 'serve'):
705 706 # Old servers can not push the boundary themselves.
706 707 # New servers won't push the boundary if changeset already
707 708 # exists locally as secret
708 709 #
709 710 # We should not use added here but the list of all change in
710 711 # the bundle
711 712 if publishing:
712 713 phases.advanceboundary(repo, tr, phases.public, srccontent)
713 714 else:
714 715 # Those changesets have been pushed from the outside, their
715 716 # phases are going to be pushed alongside. Therefor
716 717 # `targetphase` is ignored.
717 718 phases.advanceboundary(repo, tr, phases.draft, srccontent)
718 719 phases.retractboundary(repo, tr, phases.draft, added)
719 720 elif srctype != 'strip':
720 721 # publishing only alter behavior during push
721 722 #
722 723 # strip should not touch boundary at all
723 724 phases.retractboundary(repo, tr, targetphase, added)
724 725
725 726 # make changelog see real files again
726 727 cl.finalize(trp)
727 728
728 729 tr.close()
729 730
730 731 if changesets > 0:
731 732 if srctype != 'strip':
732 733 # During strip, branchcache is invalid but coming call to
733 734 # `destroyed` will repair it.
734 735 # In other case we can safely update cache on disk.
735 736 branchmap.updatecache(repo.filtered('served'))
736 737
737 738 def runhooks():
738 739 # These hooks run when the lock releases, not when the
739 740 # transaction closes. So it's possible for the changelog
740 741 # to have changed since we last saw it.
741 742 if clstart >= len(repo):
742 743 return
743 744
744 745 # forcefully update the on-disk branch cache
745 746 repo.ui.debug("updating the branch cache\n")
746 747 repo.hook("changegroup", **hookargs)
747 748
748 749 for n in added:
749 750 args = hookargs.copy()
750 751 args['node'] = hex(n)
751 752 repo.hook("incoming", **args)
752 753
753 754 newheads = [h for h in repo.heads() if h not in oldheads]
754 755 repo.ui.log("incoming",
755 756 "%s incoming changes - new heads: %s\n",
756 757 len(added),
757 758 ', '.join([hex(c[:6]) for c in newheads]))
758 759 repo._afterlock(runhooks)
759 760
760 761 finally:
761 762 tr.release()
762 763 # never return 0 here:
763 764 if dh < 0:
764 765 return dh - 1
765 766 else:
766 767 return dh + 1
General Comments 0
You need to be logged in to leave comments. Login now