##// END OF EJS Templates
changegroup: move chunk extraction into a getchunks method of unbundle10...
Pierre-Yves David -
r20999:1e28ec97 default
parent child Browse files
Show More
@@ -1,746 +1,759 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import weakref
9 9 from i18n import _
10 10 from node import nullrev, nullid, hex, short
11 11 import mdiff, util, dagutil
12 12 import struct, os, bz2, zlib, tempfile
13 13 import discovery, error, phases, branchmap
14 14
15 15 _BUNDLE10_DELTA_HEADER = "20s20s20s20s"
16 16
17 17 def readexactly(stream, n):
18 18 '''read n bytes from stream.read and abort if less was available'''
19 19 s = stream.read(n)
20 20 if len(s) < n:
21 21 raise util.Abort(_("stream ended unexpectedly"
22 22 " (got %d bytes, expected %d)")
23 23 % (len(s), n))
24 24 return s
25 25
26 26 def getchunk(stream):
27 27 """return the next chunk from stream as a string"""
28 28 d = readexactly(stream, 4)
29 29 l = struct.unpack(">l", d)[0]
30 30 if l <= 4:
31 31 if l:
32 32 raise util.Abort(_("invalid chunk length %d") % l)
33 33 return ""
34 34 return readexactly(stream, l - 4)
35 35
36 36 def chunkheader(length):
37 37 """return a changegroup chunk header (string)"""
38 38 return struct.pack(">l", length + 4)
39 39
40 40 def closechunk():
41 41 """return a changegroup chunk header (string) for a zero-length chunk"""
42 42 return struct.pack(">l", 0)
43 43
44 44 class nocompress(object):
45 45 def compress(self, x):
46 46 return x
47 47 def flush(self):
48 48 return ""
49 49
50 50 bundletypes = {
51 51 "": ("", nocompress), # only when using unbundle on ssh and old http servers
52 52 # since the unification ssh accepts a header but there
53 53 # is no capability signaling it.
54 54 "HG10UN": ("HG10UN", nocompress),
55 55 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
56 56 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
57 57 }
58 58
59 59 # hgweb uses this list to communicate its preferred type
60 60 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
61 61
62 62 def writebundle(cg, filename, bundletype, vfs=None):
63 63 """Write a bundle file and return its filename.
64 64
65 65 Existing files will not be overwritten.
66 66 If no filename is specified, a temporary file is created.
67 67 bz2 compression can be turned off.
68 68 The bundle file will be deleted in case of errors.
69 69 """
70 70
71 71 fh = None
72 72 cleanup = None
73 73 try:
74 74 if filename:
75 75 if vfs:
76 76 fh = vfs.open(filename, "wb")
77 77 else:
78 78 fh = open(filename, "wb")
79 79 else:
80 80 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
81 81 fh = os.fdopen(fd, "wb")
82 82 cleanup = filename
83 83
84 84 header, compressor = bundletypes[bundletype]
85 85 fh.write(header)
86 86 z = compressor()
87 87
88 88 # parse the changegroup data, otherwise we will block
89 89 # in case of sshrepo because we don't know the end of the stream
90 90
91 91 # an empty chunkgroup is the end of the changegroup
92 92 # a changegroup has at least 2 chunkgroups (changelog and manifest).
93 93 # after that, an empty chunkgroup is the end of the changegroup
94 empty = False
95 count = 0
96 while not empty or count <= 2:
97 empty = True
98 count += 1
99 while True:
100 chunk = getchunk(cg)
101 if not chunk:
102 break
103 empty = False
104 fh.write(z.compress(chunkheader(len(chunk))))
105 pos = 0
106 while pos < len(chunk):
107 next = pos + 2**20
108 fh.write(z.compress(chunk[pos:next]))
109 pos = next
110 fh.write(z.compress(closechunk()))
94 for chunk in cg.getchunks():
95 fh.write(z.compress(chunk))
111 96 fh.write(z.flush())
112 97 cleanup = None
113 98 return filename
114 99 finally:
115 100 if fh is not None:
116 101 fh.close()
117 102 if cleanup is not None:
118 103 if filename and vfs:
119 104 vfs.unlink(cleanup)
120 105 else:
121 106 os.unlink(cleanup)
122 107
123 108 def decompressor(fh, alg):
124 109 if alg == 'UN':
125 110 return fh
126 111 elif alg == 'GZ':
127 112 def generator(f):
128 113 zd = zlib.decompressobj()
129 114 for chunk in util.filechunkiter(f):
130 115 yield zd.decompress(chunk)
131 116 elif alg == 'BZ':
132 117 def generator(f):
133 118 zd = bz2.BZ2Decompressor()
134 119 zd.decompress("BZ")
135 120 for chunk in util.filechunkiter(f, 4096):
136 121 yield zd.decompress(chunk)
137 122 else:
138 123 raise util.Abort("unknown bundle compression '%s'" % alg)
139 124 return util.chunkbuffer(generator(fh))
140 125
141 126 class unbundle10(object):
142 127 deltaheader = _BUNDLE10_DELTA_HEADER
143 128 deltaheadersize = struct.calcsize(deltaheader)
144 129 def __init__(self, fh, alg):
145 130 self._stream = decompressor(fh, alg)
146 131 self._type = alg
147 132 self.callback = None
148 133 def compressed(self):
149 134 return self._type != 'UN'
150 135 def read(self, l):
151 136 return self._stream.read(l)
152 137 def seek(self, pos):
153 138 return self._stream.seek(pos)
154 139 def tell(self):
155 140 return self._stream.tell()
156 141 def close(self):
157 142 return self._stream.close()
158 143
159 144 def chunklength(self):
160 145 d = readexactly(self._stream, 4)
161 146 l = struct.unpack(">l", d)[0]
162 147 if l <= 4:
163 148 if l:
164 149 raise util.Abort(_("invalid chunk length %d") % l)
165 150 return 0
166 151 if self.callback:
167 152 self.callback()
168 153 return l - 4
169 154
170 155 def changelogheader(self):
171 156 """v10 does not have a changelog header chunk"""
172 157 return {}
173 158
174 159 def manifestheader(self):
175 160 """v10 does not have a manifest header chunk"""
176 161 return {}
177 162
178 163 def filelogheader(self):
179 164 """return the header of the filelogs chunk, v10 only has the filename"""
180 165 l = self.chunklength()
181 166 if not l:
182 167 return {}
183 168 fname = readexactly(self._stream, l)
184 169 return {'filename': fname}
185 170
186 171 def _deltaheader(self, headertuple, prevnode):
187 172 node, p1, p2, cs = headertuple
188 173 if prevnode is None:
189 174 deltabase = p1
190 175 else:
191 176 deltabase = prevnode
192 177 return node, p1, p2, deltabase, cs
193 178
194 179 def deltachunk(self, prevnode):
195 180 l = self.chunklength()
196 181 if not l:
197 182 return {}
198 183 headerdata = readexactly(self._stream, self.deltaheadersize)
199 184 header = struct.unpack(self.deltaheader, headerdata)
200 185 delta = readexactly(self._stream, l - self.deltaheadersize)
201 186 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
202 187 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
203 188 'deltabase': deltabase, 'delta': delta}
204 189
190 def getchunks(self):
191 """returns all the chunks contains in the bundle
192
193 Used when you need to forward the binary stream to a file or another
194 network API. To do so, it parse the changegroup data, otherwise it will
195 block in case of sshrepo because it don't know the end of the stream.
196 """
197 # an empty chunkgroup is the end of the changegroup
198 # a changegroup has at least 2 chunkgroups (changelog and manifest).
199 # after that, an empty chunkgroup is the end of the changegroup
200 empty = False
201 count = 0
202 while not empty or count <= 2:
203 empty = True
204 count += 1
205 while True:
206 chunk = getchunk(self)
207 if not chunk:
208 break
209 empty = False
210 yield chunkheader(len(chunk))
211 pos = 0
212 while pos < len(chunk):
213 next = pos + 2**20
214 yield chunk[pos:next]
215 pos = next
216 yield closechunk()
217
205 218 class headerlessfixup(object):
206 219 def __init__(self, fh, h):
207 220 self._h = h
208 221 self._fh = fh
209 222 def read(self, n):
210 223 if self._h:
211 224 d, self._h = self._h[:n], self._h[n:]
212 225 if len(d) < n:
213 226 d += readexactly(self._fh, n - len(d))
214 227 return d
215 228 return readexactly(self._fh, n)
216 229
217 230 def readbundle(fh, fname, vfs=None):
218 231 header = readexactly(fh, 6)
219 232
220 233 if not fname:
221 234 fname = "stream"
222 235 if not header.startswith('HG') and header.startswith('\0'):
223 236 fh = headerlessfixup(fh, header)
224 237 header = "HG10UN"
225 238 elif vfs:
226 239 fname = vfs.join(fname)
227 240
228 241 magic, version, alg = header[0:2], header[2:4], header[4:6]
229 242
230 243 if magic != 'HG':
231 244 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
232 245 if version != '10':
233 246 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
234 247 return unbundle10(fh, alg)
235 248
236 249 class bundle10(object):
237 250 deltaheader = _BUNDLE10_DELTA_HEADER
238 251 def __init__(self, repo, bundlecaps=None):
239 252 """Given a source repo, construct a bundler.
240 253
241 254 bundlecaps is optional and can be used to specify the set of
242 255 capabilities which can be used to build the bundle.
243 256 """
244 257 # Set of capabilities we can use to build the bundle.
245 258 if bundlecaps is None:
246 259 bundlecaps = set()
247 260 self._bundlecaps = bundlecaps
248 261 self._changelog = repo.changelog
249 262 self._manifest = repo.manifest
250 263 reorder = repo.ui.config('bundle', 'reorder', 'auto')
251 264 if reorder == 'auto':
252 265 reorder = None
253 266 else:
254 267 reorder = util.parsebool(reorder)
255 268 self._repo = repo
256 269 self._reorder = reorder
257 270 self._progress = repo.ui.progress
258 271 def close(self):
259 272 return closechunk()
260 273
261 274 def fileheader(self, fname):
262 275 return chunkheader(len(fname)) + fname
263 276
264 277 def group(self, nodelist, revlog, lookup, units=None, reorder=None):
265 278 """Calculate a delta group, yielding a sequence of changegroup chunks
266 279 (strings).
267 280
268 281 Given a list of changeset revs, return a set of deltas and
269 282 metadata corresponding to nodes. The first delta is
270 283 first parent(nodelist[0]) -> nodelist[0], the receiver is
271 284 guaranteed to have this parent as it has all history before
272 285 these changesets. In the case firstparent is nullrev the
273 286 changegroup starts with a full revision.
274 287
275 288 If units is not None, progress detail will be generated, units specifies
276 289 the type of revlog that is touched (changelog, manifest, etc.).
277 290 """
278 291 # if we don't have any revisions touched by these changesets, bail
279 292 if len(nodelist) == 0:
280 293 yield self.close()
281 294 return
282 295
283 296 # for generaldelta revlogs, we linearize the revs; this will both be
284 297 # much quicker and generate a much smaller bundle
285 298 if (revlog._generaldelta and reorder is not False) or reorder:
286 299 dag = dagutil.revlogdag(revlog)
287 300 revs = set(revlog.rev(n) for n in nodelist)
288 301 revs = dag.linearize(revs)
289 302 else:
290 303 revs = sorted([revlog.rev(n) for n in nodelist])
291 304
292 305 # add the parent of the first rev
293 306 p = revlog.parentrevs(revs[0])[0]
294 307 revs.insert(0, p)
295 308
296 309 # build deltas
297 310 total = len(revs) - 1
298 311 msgbundling = _('bundling')
299 312 for r in xrange(len(revs) - 1):
300 313 if units is not None:
301 314 self._progress(msgbundling, r + 1, unit=units, total=total)
302 315 prev, curr = revs[r], revs[r + 1]
303 316 linknode = lookup(revlog.node(curr))
304 317 for c in self.revchunk(revlog, curr, prev, linknode):
305 318 yield c
306 319
307 320 yield self.close()
308 321
309 322 # filter any nodes that claim to be part of the known set
310 323 def prune(self, revlog, missing, commonrevs, source):
311 324 rr, rl = revlog.rev, revlog.linkrev
312 325 return [n for n in missing if rl(rr(n)) not in commonrevs]
313 326
314 327 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
315 328 '''yield a sequence of changegroup chunks (strings)'''
316 329 repo = self._repo
317 330 cl = self._changelog
318 331 mf = self._manifest
319 332 reorder = self._reorder
320 333 progress = self._progress
321 334
322 335 # for progress output
323 336 msgbundling = _('bundling')
324 337
325 338 mfs = {} # needed manifests
326 339 fnodes = {} # needed file nodes
327 340 changedfiles = set()
328 341
329 342 # Callback for the changelog, used to collect changed files and manifest
330 343 # nodes.
331 344 # Returns the linkrev node (identity in the changelog case).
332 345 def lookupcl(x):
333 346 c = cl.read(x)
334 347 changedfiles.update(c[3])
335 348 # record the first changeset introducing this manifest version
336 349 mfs.setdefault(c[0], x)
337 350 return x
338 351
339 352 # Callback for the manifest, used to collect linkrevs for filelog
340 353 # revisions.
341 354 # Returns the linkrev node (collected in lookupcl).
342 355 def lookupmf(x):
343 356 clnode = mfs[x]
344 357 if not fastpathlinkrev:
345 358 mdata = mf.readfast(x)
346 359 for f, n in mdata.iteritems():
347 360 if f in changedfiles:
348 361 # record the first changeset introducing this filelog
349 362 # version
350 363 fnodes[f].setdefault(n, clnode)
351 364 return clnode
352 365
353 366 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets'),
354 367 reorder=reorder):
355 368 yield chunk
356 369 progress(msgbundling, None)
357 370
358 371 for f in changedfiles:
359 372 fnodes[f] = {}
360 373 mfnodes = self.prune(mf, mfs, commonrevs, source)
361 374 for chunk in self.group(mfnodes, mf, lookupmf, units=_('manifests'),
362 375 reorder=reorder):
363 376 yield chunk
364 377 progress(msgbundling, None)
365 378
366 379 mfs.clear()
367 380 needed = set(cl.rev(x) for x in clnodes)
368 381
369 382 def linknodes(filerevlog, fname):
370 383 if fastpathlinkrev:
371 384 llr = filerevlog.linkrev
372 385 def genfilenodes():
373 386 for r in filerevlog:
374 387 linkrev = llr(r)
375 388 if linkrev in needed:
376 389 yield filerevlog.node(r), cl.node(linkrev)
377 390 fnodes[fname] = dict(genfilenodes())
378 391 return fnodes.get(fname, {})
379 392
380 393 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
381 394 source):
382 395 yield chunk
383 396
384 397 yield self.close()
385 398 progress(msgbundling, None)
386 399
387 400 if clnodes:
388 401 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
389 402
390 403 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
391 404 repo = self._repo
392 405 progress = self._progress
393 406 reorder = self._reorder
394 407 msgbundling = _('bundling')
395 408
396 409 total = len(changedfiles)
397 410 # for progress output
398 411 msgfiles = _('files')
399 412 for i, fname in enumerate(sorted(changedfiles)):
400 413 filerevlog = repo.file(fname)
401 414 if not filerevlog:
402 415 raise util.Abort(_("empty or missing revlog for %s") % fname)
403 416
404 417 linkrevnodes = linknodes(filerevlog, fname)
405 418 # Lookup for filenodes, we collected the linkrev nodes above in the
406 419 # fastpath case and with lookupmf in the slowpath case.
407 420 def lookupfilelog(x):
408 421 return linkrevnodes[x]
409 422
410 423 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs, source)
411 424 if filenodes:
412 425 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
413 426 total=total)
414 427 yield self.fileheader(fname)
415 428 for chunk in self.group(filenodes, filerevlog, lookupfilelog,
416 429 reorder=reorder):
417 430 yield chunk
418 431
419 432 def revchunk(self, revlog, rev, prev, linknode):
420 433 node = revlog.node(rev)
421 434 p1, p2 = revlog.parentrevs(rev)
422 435 base = prev
423 436
424 437 prefix = ''
425 438 if base == nullrev:
426 439 delta = revlog.revision(node)
427 440 prefix = mdiff.trivialdiffheader(len(delta))
428 441 else:
429 442 delta = revlog.revdiff(base, rev)
430 443 p1n, p2n = revlog.parents(node)
431 444 basenode = revlog.node(base)
432 445 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
433 446 meta += prefix
434 447 l = len(meta) + len(delta)
435 448 yield chunkheader(l)
436 449 yield meta
437 450 yield delta
438 451 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
439 452 # do nothing with basenode, it is implicitly the previous one in HG10
440 453 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
441 454
442 455 def _changegroupinfo(repo, nodes, source):
443 456 if repo.ui.verbose or source == 'bundle':
444 457 repo.ui.status(_("%d changesets found\n") % len(nodes))
445 458 if repo.ui.debugflag:
446 459 repo.ui.debug("list of changesets:\n")
447 460 for node in nodes:
448 461 repo.ui.debug("%s\n" % hex(node))
449 462
450 463 def getsubset(repo, outgoing, bundler, source, fastpath=False):
451 464 repo = repo.unfiltered()
452 465 commonrevs = outgoing.common
453 466 csets = outgoing.missing
454 467 heads = outgoing.missingheads
455 468 # We go through the fast path if we get told to, or if all (unfiltered
456 469 # heads have been requested (since we then know there all linkrevs will
457 470 # be pulled by the client).
458 471 heads.sort()
459 472 fastpathlinkrev = fastpath or (
460 473 repo.filtername is None and heads == sorted(repo.heads()))
461 474
462 475 repo.hook('preoutgoing', throw=True, source=source)
463 476 _changegroupinfo(repo, csets, source)
464 477 gengroup = bundler.generate(commonrevs, csets, fastpathlinkrev, source)
465 478 return unbundle10(util.chunkbuffer(gengroup), 'UN')
466 479
467 480 def changegroupsubset(repo, roots, heads, source):
468 481 """Compute a changegroup consisting of all the nodes that are
469 482 descendants of any of the roots and ancestors of any of the heads.
470 483 Return a chunkbuffer object whose read() method will return
471 484 successive changegroup chunks.
472 485
473 486 It is fairly complex as determining which filenodes and which
474 487 manifest nodes need to be included for the changeset to be complete
475 488 is non-trivial.
476 489
477 490 Another wrinkle is doing the reverse, figuring out which changeset in
478 491 the changegroup a particular filenode or manifestnode belongs to.
479 492 """
480 493 cl = repo.changelog
481 494 if not roots:
482 495 roots = [nullid]
483 496 # TODO: remove call to nodesbetween.
484 497 csets, roots, heads = cl.nodesbetween(roots, heads)
485 498 discbases = []
486 499 for n in roots:
487 500 discbases.extend([p for p in cl.parents(n) if p != nullid])
488 501 outgoing = discovery.outgoing(cl, discbases, heads)
489 502 bundler = bundle10(repo)
490 503 return getsubset(repo, outgoing, bundler, source)
491 504
492 505 def getlocalbundle(repo, source, outgoing, bundlecaps=None):
493 506 """Like getbundle, but taking a discovery.outgoing as an argument.
494 507
495 508 This is only implemented for local repos and reuses potentially
496 509 precomputed sets in outgoing."""
497 510 if not outgoing.missing:
498 511 return None
499 512 bundler = bundle10(repo, bundlecaps)
500 513 return getsubset(repo, outgoing, bundler, source)
501 514
502 515 def getbundle(repo, source, heads=None, common=None, bundlecaps=None):
503 516 """Like changegroupsubset, but returns the set difference between the
504 517 ancestors of heads and the ancestors common.
505 518
506 519 If heads is None, use the local heads. If common is None, use [nullid].
507 520
508 521 The nodes in common might not all be known locally due to the way the
509 522 current discovery protocol works.
510 523 """
511 524 cl = repo.changelog
512 525 if common:
513 526 hasnode = cl.hasnode
514 527 common = [n for n in common if hasnode(n)]
515 528 else:
516 529 common = [nullid]
517 530 if not heads:
518 531 heads = cl.heads()
519 532 outgoing = discovery.outgoing(cl, common, heads)
520 533 return getlocalbundle(repo, source, outgoing, bundlecaps=bundlecaps)
521 534
522 535 def changegroup(repo, basenodes, source):
523 536 # to avoid a race we use changegroupsubset() (issue1320)
524 537 return changegroupsubset(repo, basenodes, repo.heads(), source)
525 538
526 539 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
527 540 revisions = 0
528 541 files = 0
529 542 while True:
530 543 chunkdata = source.filelogheader()
531 544 if not chunkdata:
532 545 break
533 546 f = chunkdata["filename"]
534 547 repo.ui.debug("adding %s revisions\n" % f)
535 548 pr()
536 549 fl = repo.file(f)
537 550 o = len(fl)
538 551 if not fl.addgroup(source, revmap, trp):
539 552 raise util.Abort(_("received file revlog group is empty"))
540 553 revisions += len(fl) - o
541 554 files += 1
542 555 if f in needfiles:
543 556 needs = needfiles[f]
544 557 for new in xrange(o, len(fl)):
545 558 n = fl.node(new)
546 559 if n in needs:
547 560 needs.remove(n)
548 561 else:
549 562 raise util.Abort(
550 563 _("received spurious file revlog entry"))
551 564 if not needs:
552 565 del needfiles[f]
553 566 repo.ui.progress(_('files'), None)
554 567
555 568 for f, needs in needfiles.iteritems():
556 569 fl = repo.file(f)
557 570 for n in needs:
558 571 try:
559 572 fl.rev(n)
560 573 except error.LookupError:
561 574 raise util.Abort(
562 575 _('missing file data for %s:%s - run hg verify') %
563 576 (f, hex(n)))
564 577
565 578 return revisions, files
566 579
567 580 def addchangegroup(repo, source, srctype, url, emptyok=False):
568 581 """Add the changegroup returned by source.read() to this repo.
569 582 srctype is a string like 'push', 'pull', or 'unbundle'. url is
570 583 the URL of the repo where this changegroup is coming from.
571 584
572 585 Return an integer summarizing the change to this repo:
573 586 - nothing changed or no source: 0
574 587 - more heads than before: 1+added heads (2..n)
575 588 - fewer heads than before: -1-removed heads (-2..-n)
576 589 - number of heads stays the same: 1
577 590 """
578 591 repo = repo.unfiltered()
579 592 def csmap(x):
580 593 repo.ui.debug("add changeset %s\n" % short(x))
581 594 return len(cl)
582 595
583 596 def revmap(x):
584 597 return cl.rev(x)
585 598
586 599 if not source:
587 600 return 0
588 601
589 602 repo.hook('prechangegroup', throw=True, source=srctype, url=url)
590 603
591 604 changesets = files = revisions = 0
592 605 efiles = set()
593 606
594 607 # write changelog data to temp files so concurrent readers will not see
595 608 # inconsistent view
596 609 cl = repo.changelog
597 610 cl.delayupdate()
598 611 oldheads = cl.heads()
599 612
600 613 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
601 614 try:
602 615 trp = weakref.proxy(tr)
603 616 # pull off the changeset group
604 617 repo.ui.status(_("adding changesets\n"))
605 618 clstart = len(cl)
606 619 class prog(object):
607 620 step = _('changesets')
608 621 count = 1
609 622 ui = repo.ui
610 623 total = None
611 624 def __call__(repo):
612 625 repo.ui.progress(repo.step, repo.count, unit=_('chunks'),
613 626 total=repo.total)
614 627 repo.count += 1
615 628 pr = prog()
616 629 source.callback = pr
617 630
618 631 source.changelogheader()
619 632 srccontent = cl.addgroup(source, csmap, trp)
620 633 if not (srccontent or emptyok):
621 634 raise util.Abort(_("received changelog group is empty"))
622 635 clend = len(cl)
623 636 changesets = clend - clstart
624 637 for c in xrange(clstart, clend):
625 638 efiles.update(repo[c].files())
626 639 efiles = len(efiles)
627 640 repo.ui.progress(_('changesets'), None)
628 641
629 642 # pull off the manifest group
630 643 repo.ui.status(_("adding manifests\n"))
631 644 pr.step = _('manifests')
632 645 pr.count = 1
633 646 pr.total = changesets # manifests <= changesets
634 647 # no need to check for empty manifest group here:
635 648 # if the result of the merge of 1 and 2 is the same in 3 and 4,
636 649 # no new manifest will be created and the manifest group will
637 650 # be empty during the pull
638 651 source.manifestheader()
639 652 repo.manifest.addgroup(source, revmap, trp)
640 653 repo.ui.progress(_('manifests'), None)
641 654
642 655 needfiles = {}
643 656 if repo.ui.configbool('server', 'validate', default=False):
644 657 # validate incoming csets have their manifests
645 658 for cset in xrange(clstart, clend):
646 659 mfest = repo.changelog.read(repo.changelog.node(cset))[0]
647 660 mfest = repo.manifest.readdelta(mfest)
648 661 # store file nodes we must see
649 662 for f, n in mfest.iteritems():
650 663 needfiles.setdefault(f, set()).add(n)
651 664
652 665 # process the files
653 666 repo.ui.status(_("adding file changes\n"))
654 667 pr.step = _('files')
655 668 pr.count = 1
656 669 pr.total = efiles
657 670 source.callback = None
658 671
659 672 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
660 673 needfiles)
661 674 revisions += newrevs
662 675 files += newfiles
663 676
664 677 dh = 0
665 678 if oldheads:
666 679 heads = cl.heads()
667 680 dh = len(heads) - len(oldheads)
668 681 for h in heads:
669 682 if h not in oldheads and repo[h].closesbranch():
670 683 dh -= 1
671 684 htext = ""
672 685 if dh:
673 686 htext = _(" (%+d heads)") % dh
674 687
675 688 repo.ui.status(_("added %d changesets"
676 689 " with %d changes to %d files%s\n")
677 690 % (changesets, revisions, files, htext))
678 691 repo.invalidatevolatilesets()
679 692
680 693 if changesets > 0:
681 694 p = lambda: cl.writepending() and repo.root or ""
682 695 repo.hook('pretxnchangegroup', throw=True,
683 696 node=hex(cl.node(clstart)), source=srctype,
684 697 url=url, pending=p)
685 698
686 699 added = [cl.node(r) for r in xrange(clstart, clend)]
687 700 publishing = repo.ui.configbool('phases', 'publish', True)
688 701 if srctype in ('push', 'serve'):
689 702 # Old servers can not push the boundary themselves.
690 703 # New servers won't push the boundary if changeset already
691 704 # exists locally as secret
692 705 #
693 706 # We should not use added here but the list of all change in
694 707 # the bundle
695 708 if publishing:
696 709 phases.advanceboundary(repo, phases.public, srccontent)
697 710 else:
698 711 phases.advanceboundary(repo, phases.draft, srccontent)
699 712 phases.retractboundary(repo, phases.draft, added)
700 713 elif srctype != 'strip':
701 714 # publishing only alter behavior during push
702 715 #
703 716 # strip should not touch boundary at all
704 717 phases.retractboundary(repo, phases.draft, added)
705 718
706 719 # make changelog see real files again
707 720 cl.finalize(trp)
708 721
709 722 tr.close()
710 723
711 724 if changesets > 0:
712 725 if srctype != 'strip':
713 726 # During strip, branchcache is invalid but coming call to
714 727 # `destroyed` will repair it.
715 728 # In other case we can safely update cache on disk.
716 729 branchmap.updatecache(repo.filtered('served'))
717 730 def runhooks():
718 731 # These hooks run when the lock releases, not when the
719 732 # transaction closes. So it's possible for the changelog
720 733 # to have changed since we last saw it.
721 734 if clstart >= len(repo):
722 735 return
723 736
724 737 # forcefully update the on-disk branch cache
725 738 repo.ui.debug("updating the branch cache\n")
726 739 repo.hook("changegroup", node=hex(cl.node(clstart)),
727 740 source=srctype, url=url)
728 741
729 742 for n in added:
730 743 repo.hook("incoming", node=hex(n), source=srctype,
731 744 url=url)
732 745
733 746 newheads = [h for h in repo.heads() if h not in oldheads]
734 747 repo.ui.log("incoming",
735 748 "%s incoming changes - new heads: %s\n",
736 749 len(added),
737 750 ', '.join([hex(c[:6]) for c in newheads]))
738 751 repo._afterlock(runhooks)
739 752
740 753 finally:
741 754 tr.release()
742 755 # never return 0 here:
743 756 if dh < 0:
744 757 return dh - 1
745 758 else:
746 759 return dh + 1
General Comments 0
You need to be logged in to leave comments. Login now