##// END OF EJS Templates
changegroup: allow use of different cg#packer in getchangegroupraw...
Sune Foldager -
r23178:5e895ed5 default
parent child Browse files
Show More
@@ -1,793 +1,798 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import weakref
9 9 from i18n import _
10 10 from node import nullrev, nullid, hex, short
11 11 import mdiff, util, dagutil
12 12 import struct, os, bz2, zlib, tempfile
13 13 import discovery, error, phases, branchmap
14 14
15 15 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
16 16
17 17 def readexactly(stream, n):
18 18 '''read n bytes from stream.read and abort if less was available'''
19 19 s = stream.read(n)
20 20 if len(s) < n:
21 21 raise util.Abort(_("stream ended unexpectedly"
22 22 " (got %d bytes, expected %d)")
23 23 % (len(s), n))
24 24 return s
25 25
26 26 def getchunk(stream):
27 27 """return the next chunk from stream as a string"""
28 28 d = readexactly(stream, 4)
29 29 l = struct.unpack(">l", d)[0]
30 30 if l <= 4:
31 31 if l:
32 32 raise util.Abort(_("invalid chunk length %d") % l)
33 33 return ""
34 34 return readexactly(stream, l - 4)
35 35
36 36 def chunkheader(length):
37 37 """return a changegroup chunk header (string)"""
38 38 return struct.pack(">l", length + 4)
39 39
40 40 def closechunk():
41 41 """return a changegroup chunk header (string) for a zero-length chunk"""
42 42 return struct.pack(">l", 0)
43 43
44 44 class nocompress(object):
45 45 def compress(self, x):
46 46 return x
47 47 def flush(self):
48 48 return ""
49 49
50 50 bundletypes = {
51 51 "": ("", nocompress), # only when using unbundle on ssh and old http servers
52 52 # since the unification ssh accepts a header but there
53 53 # is no capability signaling it.
54 54 "HG10UN": ("HG10UN", nocompress),
55 55 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
56 56 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
57 57 }
58 58
59 59 # hgweb uses this list to communicate its preferred type
60 60 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
61 61
62 62 def writebundle(cg, filename, bundletype, vfs=None):
63 63 """Write a bundle file and return its filename.
64 64
65 65 Existing files will not be overwritten.
66 66 If no filename is specified, a temporary file is created.
67 67 bz2 compression can be turned off.
68 68 The bundle file will be deleted in case of errors.
69 69 """
70 70
71 71 fh = None
72 72 cleanup = None
73 73 try:
74 74 if filename:
75 75 if vfs:
76 76 fh = vfs.open(filename, "wb")
77 77 else:
78 78 fh = open(filename, "wb")
79 79 else:
80 80 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
81 81 fh = os.fdopen(fd, "wb")
82 82 cleanup = filename
83 83
84 84 header, compressor = bundletypes[bundletype]
85 85 fh.write(header)
86 86 z = compressor()
87 87
88 88 # parse the changegroup data, otherwise we will block
89 89 # in case of sshrepo because we don't know the end of the stream
90 90
91 91 # an empty chunkgroup is the end of the changegroup
92 92 # a changegroup has at least 2 chunkgroups (changelog and manifest).
93 93 # after that, an empty chunkgroup is the end of the changegroup
94 94 for chunk in cg.getchunks():
95 95 fh.write(z.compress(chunk))
96 96 fh.write(z.flush())
97 97 cleanup = None
98 98 return filename
99 99 finally:
100 100 if fh is not None:
101 101 fh.close()
102 102 if cleanup is not None:
103 103 if filename and vfs:
104 104 vfs.unlink(cleanup)
105 105 else:
106 106 os.unlink(cleanup)
107 107
108 108 def decompressor(fh, alg):
109 109 if alg == 'UN':
110 110 return fh
111 111 elif alg == 'GZ':
112 112 def generator(f):
113 113 zd = zlib.decompressobj()
114 114 for chunk in util.filechunkiter(f):
115 115 yield zd.decompress(chunk)
116 116 elif alg == 'BZ':
117 117 def generator(f):
118 118 zd = bz2.BZ2Decompressor()
119 119 zd.decompress("BZ")
120 120 for chunk in util.filechunkiter(f, 4096):
121 121 yield zd.decompress(chunk)
122 122 else:
123 123 raise util.Abort("unknown bundle compression '%s'" % alg)
124 124 return util.chunkbuffer(generator(fh))
125 125
126 126 class cg1unpacker(object):
127 127 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
128 128 deltaheadersize = struct.calcsize(deltaheader)
129 129 def __init__(self, fh, alg):
130 130 self._stream = decompressor(fh, alg)
131 131 self._type = alg
132 132 self.callback = None
133 133 def compressed(self):
134 134 return self._type != 'UN'
135 135 def read(self, l):
136 136 return self._stream.read(l)
137 137 def seek(self, pos):
138 138 return self._stream.seek(pos)
139 139 def tell(self):
140 140 return self._stream.tell()
141 141 def close(self):
142 142 return self._stream.close()
143 143
144 144 def chunklength(self):
145 145 d = readexactly(self._stream, 4)
146 146 l = struct.unpack(">l", d)[0]
147 147 if l <= 4:
148 148 if l:
149 149 raise util.Abort(_("invalid chunk length %d") % l)
150 150 return 0
151 151 if self.callback:
152 152 self.callback()
153 153 return l - 4
154 154
155 155 def changelogheader(self):
156 156 """v10 does not have a changelog header chunk"""
157 157 return {}
158 158
159 159 def manifestheader(self):
160 160 """v10 does not have a manifest header chunk"""
161 161 return {}
162 162
163 163 def filelogheader(self):
164 164 """return the header of the filelogs chunk, v10 only has the filename"""
165 165 l = self.chunklength()
166 166 if not l:
167 167 return {}
168 168 fname = readexactly(self._stream, l)
169 169 return {'filename': fname}
170 170
171 171 def _deltaheader(self, headertuple, prevnode):
172 172 node, p1, p2, cs = headertuple
173 173 if prevnode is None:
174 174 deltabase = p1
175 175 else:
176 176 deltabase = prevnode
177 177 return node, p1, p2, deltabase, cs
178 178
179 179 def deltachunk(self, prevnode):
180 180 l = self.chunklength()
181 181 if not l:
182 182 return {}
183 183 headerdata = readexactly(self._stream, self.deltaheadersize)
184 184 header = struct.unpack(self.deltaheader, headerdata)
185 185 delta = readexactly(self._stream, l - self.deltaheadersize)
186 186 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
187 187 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
188 188 'deltabase': deltabase, 'delta': delta}
189 189
190 190 def getchunks(self):
191 191 """returns all the chunks contains in the bundle
192 192
193 193 Used when you need to forward the binary stream to a file or another
194 194 network API. To do so, it parse the changegroup data, otherwise it will
195 195 block in case of sshrepo because it don't know the end of the stream.
196 196 """
197 197 # an empty chunkgroup is the end of the changegroup
198 198 # a changegroup has at least 2 chunkgroups (changelog and manifest).
199 199 # after that, an empty chunkgroup is the end of the changegroup
200 200 empty = False
201 201 count = 0
202 202 while not empty or count <= 2:
203 203 empty = True
204 204 count += 1
205 205 while True:
206 206 chunk = getchunk(self)
207 207 if not chunk:
208 208 break
209 209 empty = False
210 210 yield chunkheader(len(chunk))
211 211 pos = 0
212 212 while pos < len(chunk):
213 213 next = pos + 2**20
214 214 yield chunk[pos:next]
215 215 pos = next
216 216 yield closechunk()
217 217
218 218 class headerlessfixup(object):
219 219 def __init__(self, fh, h):
220 220 self._h = h
221 221 self._fh = fh
222 222 def read(self, n):
223 223 if self._h:
224 224 d, self._h = self._h[:n], self._h[n:]
225 225 if len(d) < n:
226 226 d += readexactly(self._fh, n - len(d))
227 227 return d
228 228 return readexactly(self._fh, n)
229 229
230 230 class cg1packer(object):
231 231 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
232 232 def __init__(self, repo, bundlecaps=None):
233 233 """Given a source repo, construct a bundler.
234 234
235 235 bundlecaps is optional and can be used to specify the set of
236 236 capabilities which can be used to build the bundle.
237 237 """
238 238 # Set of capabilities we can use to build the bundle.
239 239 if bundlecaps is None:
240 240 bundlecaps = set()
241 241 self._bundlecaps = bundlecaps
242 242 self._changelog = repo.changelog
243 243 self._manifest = repo.manifest
244 244 reorder = repo.ui.config('bundle', 'reorder', 'auto')
245 245 if reorder == 'auto':
246 246 reorder = None
247 247 else:
248 248 reorder = util.parsebool(reorder)
249 249 self._repo = repo
250 250 self._reorder = reorder
251 251 self._progress = repo.ui.progress
252 252 def close(self):
253 253 return closechunk()
254 254
255 255 def fileheader(self, fname):
256 256 return chunkheader(len(fname)) + fname
257 257
258 258 def group(self, nodelist, revlog, lookup, units=None, reorder=None):
259 259 """Calculate a delta group, yielding a sequence of changegroup chunks
260 260 (strings).
261 261
262 262 Given a list of changeset revs, return a set of deltas and
263 263 metadata corresponding to nodes. The first delta is
264 264 first parent(nodelist[0]) -> nodelist[0], the receiver is
265 265 guaranteed to have this parent as it has all history before
266 266 these changesets. In the case firstparent is nullrev the
267 267 changegroup starts with a full revision.
268 268
269 269 If units is not None, progress detail will be generated, units specifies
270 270 the type of revlog that is touched (changelog, manifest, etc.).
271 271 """
272 272 # if we don't have any revisions touched by these changesets, bail
273 273 if len(nodelist) == 0:
274 274 yield self.close()
275 275 return
276 276
277 277 # for generaldelta revlogs, we linearize the revs; this will both be
278 278 # much quicker and generate a much smaller bundle
279 279 if (revlog._generaldelta and reorder is not False) or reorder:
280 280 dag = dagutil.revlogdag(revlog)
281 281 revs = set(revlog.rev(n) for n in nodelist)
282 282 revs = dag.linearize(revs)
283 283 else:
284 284 revs = sorted([revlog.rev(n) for n in nodelist])
285 285
286 286 # add the parent of the first rev
287 287 p = revlog.parentrevs(revs[0])[0]
288 288 revs.insert(0, p)
289 289
290 290 # build deltas
291 291 total = len(revs) - 1
292 292 msgbundling = _('bundling')
293 293 for r in xrange(len(revs) - 1):
294 294 if units is not None:
295 295 self._progress(msgbundling, r + 1, unit=units, total=total)
296 296 prev, curr = revs[r], revs[r + 1]
297 297 linknode = lookup(revlog.node(curr))
298 298 for c in self.revchunk(revlog, curr, prev, linknode):
299 299 yield c
300 300
301 301 yield self.close()
302 302
303 303 # filter any nodes that claim to be part of the known set
304 304 def prune(self, revlog, missing, commonrevs, source):
305 305 rr, rl = revlog.rev, revlog.linkrev
306 306 return [n for n in missing if rl(rr(n)) not in commonrevs]
307 307
308 308 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
309 309 '''yield a sequence of changegroup chunks (strings)'''
310 310 repo = self._repo
311 311 cl = self._changelog
312 312 mf = self._manifest
313 313 reorder = self._reorder
314 314 progress = self._progress
315 315
316 316 # for progress output
317 317 msgbundling = _('bundling')
318 318
319 319 mfs = {} # needed manifests
320 320 fnodes = {} # needed file nodes
321 321 changedfiles = set()
322 322
323 323 # Callback for the changelog, used to collect changed files and manifest
324 324 # nodes.
325 325 # Returns the linkrev node (identity in the changelog case).
326 326 def lookupcl(x):
327 327 c = cl.read(x)
328 328 changedfiles.update(c[3])
329 329 # record the first changeset introducing this manifest version
330 330 mfs.setdefault(c[0], x)
331 331 return x
332 332
333 333 # Callback for the manifest, used to collect linkrevs for filelog
334 334 # revisions.
335 335 # Returns the linkrev node (collected in lookupcl).
336 336 def lookupmf(x):
337 337 clnode = mfs[x]
338 338 if not fastpathlinkrev:
339 339 mdata = mf.readfast(x)
340 340 for f, n in mdata.iteritems():
341 341 if f in changedfiles:
342 342 # record the first changeset introducing this filelog
343 343 # version
344 344 fnodes[f].setdefault(n, clnode)
345 345 return clnode
346 346
347 347 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets'),
348 348 reorder=reorder):
349 349 yield chunk
350 350 progress(msgbundling, None)
351 351
352 352 for f in changedfiles:
353 353 fnodes[f] = {}
354 354 mfnodes = self.prune(mf, mfs, commonrevs, source)
355 355 for chunk in self.group(mfnodes, mf, lookupmf, units=_('manifests'),
356 356 reorder=reorder):
357 357 yield chunk
358 358 progress(msgbundling, None)
359 359
360 360 mfs.clear()
361 361 needed = set(cl.rev(x) for x in clnodes)
362 362
363 363 def linknodes(filerevlog, fname):
364 364 if fastpathlinkrev:
365 365 llr = filerevlog.linkrev
366 366 def genfilenodes():
367 367 for r in filerevlog:
368 368 linkrev = llr(r)
369 369 if linkrev in needed:
370 370 yield filerevlog.node(r), cl.node(linkrev)
371 371 fnodes[fname] = dict(genfilenodes())
372 372 return fnodes.get(fname, {})
373 373
374 374 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
375 375 source):
376 376 yield chunk
377 377
378 378 yield self.close()
379 379 progress(msgbundling, None)
380 380
381 381 if clnodes:
382 382 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
383 383
384 384 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
385 385 repo = self._repo
386 386 progress = self._progress
387 387 reorder = self._reorder
388 388 msgbundling = _('bundling')
389 389
390 390 total = len(changedfiles)
391 391 # for progress output
392 392 msgfiles = _('files')
393 393 for i, fname in enumerate(sorted(changedfiles)):
394 394 filerevlog = repo.file(fname)
395 395 if not filerevlog:
396 396 raise util.Abort(_("empty or missing revlog for %s") % fname)
397 397
398 398 linkrevnodes = linknodes(filerevlog, fname)
399 399 # Lookup for filenodes, we collected the linkrev nodes above in the
400 400 # fastpath case and with lookupmf in the slowpath case.
401 401 def lookupfilelog(x):
402 402 return linkrevnodes[x]
403 403
404 404 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs, source)
405 405 if filenodes:
406 406 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
407 407 total=total)
408 408 yield self.fileheader(fname)
409 409 for chunk in self.group(filenodes, filerevlog, lookupfilelog,
410 410 reorder=reorder):
411 411 yield chunk
412 412
413 413 def revchunk(self, revlog, rev, prev, linknode):
414 414 node = revlog.node(rev)
415 415 p1, p2 = revlog.parentrevs(rev)
416 416 base = prev
417 417
418 418 prefix = ''
419 419 if base == nullrev:
420 420 delta = revlog.revision(node)
421 421 prefix = mdiff.trivialdiffheader(len(delta))
422 422 else:
423 423 delta = revlog.revdiff(base, rev)
424 424 p1n, p2n = revlog.parents(node)
425 425 basenode = revlog.node(base)
426 426 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
427 427 meta += prefix
428 428 l = len(meta) + len(delta)
429 429 yield chunkheader(l)
430 430 yield meta
431 431 yield delta
432 432 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
433 433 # do nothing with basenode, it is implicitly the previous one in HG10
434 434 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
435 435
436 436 packermap = {'01': (cg1packer, cg1unpacker)}
437 437
438 438 def _changegroupinfo(repo, nodes, source):
439 439 if repo.ui.verbose or source == 'bundle':
440 440 repo.ui.status(_("%d changesets found\n") % len(nodes))
441 441 if repo.ui.debugflag:
442 442 repo.ui.debug("list of changesets:\n")
443 443 for node in nodes:
444 444 repo.ui.debug("%s\n" % hex(node))
445 445
446 446 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
447 447 repo = repo.unfiltered()
448 448 commonrevs = outgoing.common
449 449 csets = outgoing.missing
450 450 heads = outgoing.missingheads
451 451 # We go through the fast path if we get told to, or if all (unfiltered
452 452 # heads have been requested (since we then know there all linkrevs will
453 453 # be pulled by the client).
454 454 heads.sort()
455 455 fastpathlinkrev = fastpath or (
456 456 repo.filtername is None and heads == sorted(repo.heads()))
457 457
458 458 repo.hook('preoutgoing', throw=True, source=source)
459 459 _changegroupinfo(repo, csets, source)
460 460 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
461 461
462 462 def getsubset(repo, outgoing, bundler, source, fastpath=False):
463 463 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
464 464 return cg1unpacker(util.chunkbuffer(gengroup), 'UN')
465 465
466 466 def changegroupsubset(repo, roots, heads, source):
467 467 """Compute a changegroup consisting of all the nodes that are
468 468 descendants of any of the roots and ancestors of any of the heads.
469 469 Return a chunkbuffer object whose read() method will return
470 470 successive changegroup chunks.
471 471
472 472 It is fairly complex as determining which filenodes and which
473 473 manifest nodes need to be included for the changeset to be complete
474 474 is non-trivial.
475 475
476 476 Another wrinkle is doing the reverse, figuring out which changeset in
477 477 the changegroup a particular filenode or manifestnode belongs to.
478 478 """
479 479 cl = repo.changelog
480 480 if not roots:
481 481 roots = [nullid]
482 482 # TODO: remove call to nodesbetween.
483 483 csets, roots, heads = cl.nodesbetween(roots, heads)
484 484 discbases = []
485 485 for n in roots:
486 486 discbases.extend([p for p in cl.parents(n) if p != nullid])
487 487 outgoing = discovery.outgoing(cl, discbases, heads)
488 488 bundler = cg1packer(repo)
489 489 return getsubset(repo, outgoing, bundler, source)
490 490
491 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None):
491 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
492 version='01'):
492 493 """Like getbundle, but taking a discovery.outgoing as an argument.
493 494
494 495 This is only implemented for local repos and reuses potentially
495 496 precomputed sets in outgoing. Returns a raw changegroup generator."""
496 497 if not outgoing.missing:
497 498 return None
498 bundler = cg1packer(repo, bundlecaps)
499 bundler = packermap[version][0](repo, bundlecaps)
499 500 return getsubsetraw(repo, outgoing, bundler, source)
500 501
501 502 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None):
502 503 """Like getbundle, but taking a discovery.outgoing as an argument.
503 504
504 505 This is only implemented for local repos and reuses potentially
505 506 precomputed sets in outgoing."""
506 507 if not outgoing.missing:
507 508 return None
508 509 bundler = cg1packer(repo, bundlecaps)
509 510 return getsubset(repo, outgoing, bundler, source)
510 511
511 512 def _computeoutgoing(repo, heads, common):
512 513 """Computes which revs are outgoing given a set of common
513 514 and a set of heads.
514 515
515 516 This is a separate function so extensions can have access to
516 517 the logic.
517 518
518 519 Returns a discovery.outgoing object.
519 520 """
520 521 cl = repo.changelog
521 522 if common:
522 523 hasnode = cl.hasnode
523 524 common = [n for n in common if hasnode(n)]
524 525 else:
525 526 common = [nullid]
526 527 if not heads:
527 528 heads = cl.heads()
528 529 return discovery.outgoing(cl, common, heads)
529 530
530 def getchangegroupraw(repo, source, heads=None, common=None, bundlecaps=None):
531 def getchangegroupraw(repo, source, heads=None, common=None, bundlecaps=None,
532 version='01'):
531 533 """Like changegroupsubset, but returns the set difference between the
532 534 ancestors of heads and the ancestors common.
533 535
534 536 If heads is None, use the local heads. If common is None, use [nullid].
535 537
538 If version is None, use a version '1' changegroup.
539
536 540 The nodes in common might not all be known locally due to the way the
537 541 current discovery protocol works. Returns a raw changegroup generator.
538 542 """
539 543 outgoing = _computeoutgoing(repo, heads, common)
540 return getlocalchangegroupraw(repo, source, outgoing, bundlecaps=bundlecaps)
544 return getlocalchangegroupraw(repo, source, outgoing, bundlecaps=bundlecaps,
545 version=version)
541 546
542 547 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None):
543 548 """Like changegroupsubset, but returns the set difference between the
544 549 ancestors of heads and the ancestors common.
545 550
546 551 If heads is None, use the local heads. If common is None, use [nullid].
547 552
548 553 The nodes in common might not all be known locally due to the way the
549 554 current discovery protocol works.
550 555 """
551 556 outgoing = _computeoutgoing(repo, heads, common)
552 557 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps)
553 558
554 559 def changegroup(repo, basenodes, source):
555 560 # to avoid a race we use changegroupsubset() (issue1320)
556 561 return changegroupsubset(repo, basenodes, repo.heads(), source)
557 562
558 563 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
559 564 revisions = 0
560 565 files = 0
561 566 while True:
562 567 chunkdata = source.filelogheader()
563 568 if not chunkdata:
564 569 break
565 570 f = chunkdata["filename"]
566 571 repo.ui.debug("adding %s revisions\n" % f)
567 572 pr()
568 573 fl = repo.file(f)
569 574 o = len(fl)
570 575 if not fl.addgroup(source, revmap, trp):
571 576 raise util.Abort(_("received file revlog group is empty"))
572 577 revisions += len(fl) - o
573 578 files += 1
574 579 if f in needfiles:
575 580 needs = needfiles[f]
576 581 for new in xrange(o, len(fl)):
577 582 n = fl.node(new)
578 583 if n in needs:
579 584 needs.remove(n)
580 585 else:
581 586 raise util.Abort(
582 587 _("received spurious file revlog entry"))
583 588 if not needs:
584 589 del needfiles[f]
585 590 repo.ui.progress(_('files'), None)
586 591
587 592 for f, needs in needfiles.iteritems():
588 593 fl = repo.file(f)
589 594 for n in needs:
590 595 try:
591 596 fl.rev(n)
592 597 except error.LookupError:
593 598 raise util.Abort(
594 599 _('missing file data for %s:%s - run hg verify') %
595 600 (f, hex(n)))
596 601
597 602 return revisions, files
598 603
599 604 def addchangegroup(repo, source, srctype, url, emptyok=False,
600 605 targetphase=phases.draft):
601 606 """Add the changegroup returned by source.read() to this repo.
602 607 srctype is a string like 'push', 'pull', or 'unbundle'. url is
603 608 the URL of the repo where this changegroup is coming from.
604 609
605 610 Return an integer summarizing the change to this repo:
606 611 - nothing changed or no source: 0
607 612 - more heads than before: 1+added heads (2..n)
608 613 - fewer heads than before: -1-removed heads (-2..-n)
609 614 - number of heads stays the same: 1
610 615 """
611 616 repo = repo.unfiltered()
612 617 def csmap(x):
613 618 repo.ui.debug("add changeset %s\n" % short(x))
614 619 return len(cl)
615 620
616 621 def revmap(x):
617 622 return cl.rev(x)
618 623
619 624 if not source:
620 625 return 0
621 626
622 627 changesets = files = revisions = 0
623 628 efiles = set()
624 629
625 630 # write changelog data to temp files so concurrent readers will not see
626 631 # inconsistent view
627 632 cl = repo.changelog
628 633 cl.delayupdate()
629 634 oldheads = cl.heads()
630 635
631 636 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
632 637 # The transaction could have been created before and already carries source
633 638 # information. In this case we use the top level data. We overwrite the
634 639 # argument because we need to use the top level value (if they exist) in
635 640 # this function.
636 641 srctype = tr.hookargs.setdefault('source', srctype)
637 642 url = tr.hookargs.setdefault('url', url)
638 643 try:
639 644 repo.hook('prechangegroup', throw=True, **tr.hookargs)
640 645
641 646 trp = weakref.proxy(tr)
642 647 # pull off the changeset group
643 648 repo.ui.status(_("adding changesets\n"))
644 649 clstart = len(cl)
645 650 class prog(object):
646 651 step = _('changesets')
647 652 count = 1
648 653 ui = repo.ui
649 654 total = None
650 655 def __call__(repo):
651 656 repo.ui.progress(repo.step, repo.count, unit=_('chunks'),
652 657 total=repo.total)
653 658 repo.count += 1
654 659 pr = prog()
655 660 source.callback = pr
656 661
657 662 source.changelogheader()
658 663 srccontent = cl.addgroup(source, csmap, trp)
659 664 if not (srccontent or emptyok):
660 665 raise util.Abort(_("received changelog group is empty"))
661 666 clend = len(cl)
662 667 changesets = clend - clstart
663 668 for c in xrange(clstart, clend):
664 669 efiles.update(repo[c].files())
665 670 efiles = len(efiles)
666 671 repo.ui.progress(_('changesets'), None)
667 672
668 673 # pull off the manifest group
669 674 repo.ui.status(_("adding manifests\n"))
670 675 pr.step = _('manifests')
671 676 pr.count = 1
672 677 pr.total = changesets # manifests <= changesets
673 678 # no need to check for empty manifest group here:
674 679 # if the result of the merge of 1 and 2 is the same in 3 and 4,
675 680 # no new manifest will be created and the manifest group will
676 681 # be empty during the pull
677 682 source.manifestheader()
678 683 repo.manifest.addgroup(source, revmap, trp)
679 684 repo.ui.progress(_('manifests'), None)
680 685
681 686 needfiles = {}
682 687 if repo.ui.configbool('server', 'validate', default=False):
683 688 # validate incoming csets have their manifests
684 689 for cset in xrange(clstart, clend):
685 690 mfest = repo.changelog.read(repo.changelog.node(cset))[0]
686 691 mfest = repo.manifest.readdelta(mfest)
687 692 # store file nodes we must see
688 693 for f, n in mfest.iteritems():
689 694 needfiles.setdefault(f, set()).add(n)
690 695
691 696 # process the files
692 697 repo.ui.status(_("adding file changes\n"))
693 698 pr.step = _('files')
694 699 pr.count = 1
695 700 pr.total = efiles
696 701 source.callback = None
697 702
698 703 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
699 704 needfiles)
700 705 revisions += newrevs
701 706 files += newfiles
702 707
703 708 dh = 0
704 709 if oldheads:
705 710 heads = cl.heads()
706 711 dh = len(heads) - len(oldheads)
707 712 for h in heads:
708 713 if h not in oldheads and repo[h].closesbranch():
709 714 dh -= 1
710 715 htext = ""
711 716 if dh:
712 717 htext = _(" (%+d heads)") % dh
713 718
714 719 repo.ui.status(_("added %d changesets"
715 720 " with %d changes to %d files%s\n")
716 721 % (changesets, revisions, files, htext))
717 722 repo.invalidatevolatilesets()
718 723
719 724 if changesets > 0:
720 725 p = lambda: cl.writepending() and repo.root or ""
721 726 if 'node' not in tr.hookargs:
722 727 tr.hookargs['node'] = hex(cl.node(clstart))
723 728 hookargs = dict(tr.hookargs)
724 729 else:
725 730 hookargs = dict(tr.hookargs)
726 731 hookargs['node'] = hex(cl.node(clstart))
727 732 repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
728 733
729 734 added = [cl.node(r) for r in xrange(clstart, clend)]
730 735 publishing = repo.ui.configbool('phases', 'publish', True)
731 736 if srctype in ('push', 'serve'):
732 737 # Old servers can not push the boundary themselves.
733 738 # New servers won't push the boundary if changeset already
734 739 # exists locally as secret
735 740 #
736 741 # We should not use added here but the list of all change in
737 742 # the bundle
738 743 if publishing:
739 744 phases.advanceboundary(repo, tr, phases.public, srccontent)
740 745 else:
741 746 # Those changesets have been pushed from the outside, their
742 747 # phases are going to be pushed alongside. Therefor
743 748 # `targetphase` is ignored.
744 749 phases.advanceboundary(repo, tr, phases.draft, srccontent)
745 750 phases.retractboundary(repo, tr, phases.draft, added)
746 751 elif srctype != 'strip':
747 752 # publishing only alter behavior during push
748 753 #
749 754 # strip should not touch boundary at all
750 755 phases.retractboundary(repo, tr, targetphase, added)
751 756
752 757 # make changelog see real files again
753 758 cl.finalize(trp)
754 759
755 760 tr.close()
756 761
757 762 if changesets > 0:
758 763 if srctype != 'strip':
759 764 # During strip, branchcache is invalid but coming call to
760 765 # `destroyed` will repair it.
761 766 # In other case we can safely update cache on disk.
762 767 branchmap.updatecache(repo.filtered('served'))
763 768
764 769 def runhooks():
765 770 # These hooks run when the lock releases, not when the
766 771 # transaction closes. So it's possible for the changelog
767 772 # to have changed since we last saw it.
768 773 if clstart >= len(repo):
769 774 return
770 775
771 776 # forcefully update the on-disk branch cache
772 777 repo.ui.debug("updating the branch cache\n")
773 778 repo.hook("changegroup", **hookargs)
774 779
775 780 for n in added:
776 781 args = hookargs.copy()
777 782 args['node'] = hex(n)
778 783 repo.hook("incoming", **args)
779 784
780 785 newheads = [h for h in repo.heads() if h not in oldheads]
781 786 repo.ui.log("incoming",
782 787 "%s incoming changes - new heads: %s\n",
783 788 len(added),
784 789 ', '.join([hex(c[:6]) for c in newheads]))
785 790 repo._afterlock(runhooks)
786 791
787 792 finally:
788 793 tr.release()
789 794 # never return 0 here:
790 795 if dh < 0:
791 796 return dh - 1
792 797 else:
793 798 return dh + 1
General Comments 0
You need to be logged in to leave comments. Login now