##// END OF EJS Templates
changegroup: drop _changelog and _manifest properties...
Martin von Zweigbergk -
r24978:f52560c6 default
parent child Browse files
Show More
@@ -1,909 +1,907 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import weakref
9 9 from i18n import _
10 10 from node import nullrev, nullid, hex, short
11 11 import mdiff, util, dagutil
12 12 import struct, os, bz2, zlib, tempfile
13 13 import discovery, error, phases, branchmap
14 14
15 15 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
16 16 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
17 17
18 18 def readexactly(stream, n):
19 19 '''read n bytes from stream.read and abort if less was available'''
20 20 s = stream.read(n)
21 21 if len(s) < n:
22 22 raise util.Abort(_("stream ended unexpectedly"
23 23 " (got %d bytes, expected %d)")
24 24 % (len(s), n))
25 25 return s
26 26
27 27 def getchunk(stream):
28 28 """return the next chunk from stream as a string"""
29 29 d = readexactly(stream, 4)
30 30 l = struct.unpack(">l", d)[0]
31 31 if l <= 4:
32 32 if l:
33 33 raise util.Abort(_("invalid chunk length %d") % l)
34 34 return ""
35 35 return readexactly(stream, l - 4)
36 36
37 37 def chunkheader(length):
38 38 """return a changegroup chunk header (string)"""
39 39 return struct.pack(">l", length + 4)
40 40
41 41 def closechunk():
42 42 """return a changegroup chunk header (string) for a zero-length chunk"""
43 43 return struct.pack(">l", 0)
44 44
45 45 def combineresults(results):
46 46 """logic to combine 0 or more addchangegroup results into one"""
47 47 changedheads = 0
48 48 result = 1
49 49 for ret in results:
50 50 # If any changegroup result is 0, return 0
51 51 if ret == 0:
52 52 result = 0
53 53 break
54 54 if ret < -1:
55 55 changedheads += ret + 1
56 56 elif ret > 1:
57 57 changedheads += ret - 1
58 58 if changedheads > 0:
59 59 result = 1 + changedheads
60 60 elif changedheads < 0:
61 61 result = -1 + changedheads
62 62 return result
63 63
64 64 class nocompress(object):
65 65 def compress(self, x):
66 66 return x
67 67 def flush(self):
68 68 return ""
69 69
70 70 bundletypes = {
71 71 "": ("", nocompress), # only when using unbundle on ssh and old http servers
72 72 # since the unification ssh accepts a header but there
73 73 # is no capability signaling it.
74 74 "HG20": (), # special-cased below
75 75 "HG10UN": ("HG10UN", nocompress),
76 76 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
77 77 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
78 78 }
79 79
80 80 # hgweb uses this list to communicate its preferred type
81 81 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
82 82
83 83 def writebundle(ui, cg, filename, bundletype, vfs=None):
84 84 """Write a bundle file and return its filename.
85 85
86 86 Existing files will not be overwritten.
87 87 If no filename is specified, a temporary file is created.
88 88 bz2 compression can be turned off.
89 89 The bundle file will be deleted in case of errors.
90 90 """
91 91
92 92 fh = None
93 93 cleanup = None
94 94 try:
95 95 if filename:
96 96 if vfs:
97 97 fh = vfs.open(filename, "wb")
98 98 else:
99 99 fh = open(filename, "wb")
100 100 else:
101 101 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
102 102 fh = os.fdopen(fd, "wb")
103 103 cleanup = filename
104 104
105 105 if bundletype == "HG20":
106 106 import bundle2
107 107 bundle = bundle2.bundle20(ui)
108 108 part = bundle.newpart('changegroup', data=cg.getchunks())
109 109 part.addparam('version', cg.version)
110 110 z = nocompress()
111 111 chunkiter = bundle.getchunks()
112 112 else:
113 113 if cg.version != '01':
114 114 raise util.Abort(_('old bundle types only supports v1 '
115 115 'changegroups'))
116 116 header, compressor = bundletypes[bundletype]
117 117 fh.write(header)
118 118 z = compressor()
119 119 chunkiter = cg.getchunks()
120 120
121 121 # parse the changegroup data, otherwise we will block
122 122 # in case of sshrepo because we don't know the end of the stream
123 123
124 124 # an empty chunkgroup is the end of the changegroup
125 125 # a changegroup has at least 2 chunkgroups (changelog and manifest).
126 126 # after that, an empty chunkgroup is the end of the changegroup
127 127 for chunk in chunkiter:
128 128 fh.write(z.compress(chunk))
129 129 fh.write(z.flush())
130 130 cleanup = None
131 131 return filename
132 132 finally:
133 133 if fh is not None:
134 134 fh.close()
135 135 if cleanup is not None:
136 136 if filename and vfs:
137 137 vfs.unlink(cleanup)
138 138 else:
139 139 os.unlink(cleanup)
140 140
141 141 def decompressor(fh, alg):
142 142 if alg == 'UN':
143 143 return fh
144 144 elif alg == 'GZ':
145 145 def generator(f):
146 146 zd = zlib.decompressobj()
147 147 for chunk in util.filechunkiter(f):
148 148 yield zd.decompress(chunk)
149 149 elif alg == 'BZ':
150 150 def generator(f):
151 151 zd = bz2.BZ2Decompressor()
152 152 zd.decompress("BZ")
153 153 for chunk in util.filechunkiter(f, 4096):
154 154 yield zd.decompress(chunk)
155 155 else:
156 156 raise util.Abort("unknown bundle compression '%s'" % alg)
157 157 return util.chunkbuffer(generator(fh))
158 158
159 159 class cg1unpacker(object):
160 160 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
161 161 deltaheadersize = struct.calcsize(deltaheader)
162 162 version = '01'
163 163 def __init__(self, fh, alg):
164 164 self._stream = decompressor(fh, alg)
165 165 self._type = alg
166 166 self.callback = None
167 167 def compressed(self):
168 168 return self._type != 'UN'
169 169 def read(self, l):
170 170 return self._stream.read(l)
171 171 def seek(self, pos):
172 172 return self._stream.seek(pos)
173 173 def tell(self):
174 174 return self._stream.tell()
175 175 def close(self):
176 176 return self._stream.close()
177 177
178 178 def chunklength(self):
179 179 d = readexactly(self._stream, 4)
180 180 l = struct.unpack(">l", d)[0]
181 181 if l <= 4:
182 182 if l:
183 183 raise util.Abort(_("invalid chunk length %d") % l)
184 184 return 0
185 185 if self.callback:
186 186 self.callback()
187 187 return l - 4
188 188
189 189 def changelogheader(self):
190 190 """v10 does not have a changelog header chunk"""
191 191 return {}
192 192
193 193 def manifestheader(self):
194 194 """v10 does not have a manifest header chunk"""
195 195 return {}
196 196
197 197 def filelogheader(self):
198 198 """return the header of the filelogs chunk, v10 only has the filename"""
199 199 l = self.chunklength()
200 200 if not l:
201 201 return {}
202 202 fname = readexactly(self._stream, l)
203 203 return {'filename': fname}
204 204
205 205 def _deltaheader(self, headertuple, prevnode):
206 206 node, p1, p2, cs = headertuple
207 207 if prevnode is None:
208 208 deltabase = p1
209 209 else:
210 210 deltabase = prevnode
211 211 return node, p1, p2, deltabase, cs
212 212
213 213 def deltachunk(self, prevnode):
214 214 l = self.chunklength()
215 215 if not l:
216 216 return {}
217 217 headerdata = readexactly(self._stream, self.deltaheadersize)
218 218 header = struct.unpack(self.deltaheader, headerdata)
219 219 delta = readexactly(self._stream, l - self.deltaheadersize)
220 220 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
221 221 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
222 222 'deltabase': deltabase, 'delta': delta}
223 223
224 224 def getchunks(self):
225 225 """returns all the chunks contains in the bundle
226 226
227 227 Used when you need to forward the binary stream to a file or another
228 228 network API. To do so, it parse the changegroup data, otherwise it will
229 229 block in case of sshrepo because it don't know the end of the stream.
230 230 """
231 231 # an empty chunkgroup is the end of the changegroup
232 232 # a changegroup has at least 2 chunkgroups (changelog and manifest).
233 233 # after that, an empty chunkgroup is the end of the changegroup
234 234 empty = False
235 235 count = 0
236 236 while not empty or count <= 2:
237 237 empty = True
238 238 count += 1
239 239 while True:
240 240 chunk = getchunk(self)
241 241 if not chunk:
242 242 break
243 243 empty = False
244 244 yield chunkheader(len(chunk))
245 245 pos = 0
246 246 while pos < len(chunk):
247 247 next = pos + 2**20
248 248 yield chunk[pos:next]
249 249 pos = next
250 250 yield closechunk()
251 251
252 252 class cg2unpacker(cg1unpacker):
253 253 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
254 254 deltaheadersize = struct.calcsize(deltaheader)
255 255 version = '02'
256 256
257 257 def _deltaheader(self, headertuple, prevnode):
258 258 node, p1, p2, deltabase, cs = headertuple
259 259 return node, p1, p2, deltabase, cs
260 260
261 261 class headerlessfixup(object):
262 262 def __init__(self, fh, h):
263 263 self._h = h
264 264 self._fh = fh
265 265 def read(self, n):
266 266 if self._h:
267 267 d, self._h = self._h[:n], self._h[n:]
268 268 if len(d) < n:
269 269 d += readexactly(self._fh, n - len(d))
270 270 return d
271 271 return readexactly(self._fh, n)
272 272
273 273 class cg1packer(object):
274 274 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
275 275 version = '01'
276 276 def __init__(self, repo, bundlecaps=None):
277 277 """Given a source repo, construct a bundler.
278 278
279 279 bundlecaps is optional and can be used to specify the set of
280 280 capabilities which can be used to build the bundle.
281 281 """
282 282 # Set of capabilities we can use to build the bundle.
283 283 if bundlecaps is None:
284 284 bundlecaps = set()
285 285 self._bundlecaps = bundlecaps
286 self._changelog = repo.changelog
287 self._manifest = repo.manifest
288 286 reorder = repo.ui.config('bundle', 'reorder', 'auto')
289 287 if reorder == 'auto':
290 288 reorder = None
291 289 else:
292 290 reorder = util.parsebool(reorder)
293 291 self._repo = repo
294 292 self._reorder = reorder
295 293 self._progress = repo.ui.progress
296 294 if self._repo.ui.verbose and not self._repo.ui.debugflag:
297 295 self._verbosenote = self._repo.ui.note
298 296 else:
299 297 self._verbosenote = lambda s: None
300 298
301 299 def close(self):
302 300 return closechunk()
303 301
304 302 def fileheader(self, fname):
305 303 return chunkheader(len(fname)) + fname
306 304
307 305 def group(self, nodelist, revlog, lookup, units=None):
308 306 """Calculate a delta group, yielding a sequence of changegroup chunks
309 307 (strings).
310 308
311 309 Given a list of changeset revs, return a set of deltas and
312 310 metadata corresponding to nodes. The first delta is
313 311 first parent(nodelist[0]) -> nodelist[0], the receiver is
314 312 guaranteed to have this parent as it has all history before
315 313 these changesets. In the case firstparent is nullrev the
316 314 changegroup starts with a full revision.
317 315
318 316 If units is not None, progress detail will be generated, units specifies
319 317 the type of revlog that is touched (changelog, manifest, etc.).
320 318 """
321 319 # if we don't have any revisions touched by these changesets, bail
322 320 if len(nodelist) == 0:
323 321 yield self.close()
324 322 return
325 323
326 324 # for generaldelta revlogs, we linearize the revs; this will both be
327 325 # much quicker and generate a much smaller bundle
328 326 if (revlog._generaldelta and self._reorder is None) or self._reorder:
329 327 dag = dagutil.revlogdag(revlog)
330 328 revs = set(revlog.rev(n) for n in nodelist)
331 329 revs = dag.linearize(revs)
332 330 else:
333 331 revs = sorted([revlog.rev(n) for n in nodelist])
334 332
335 333 # add the parent of the first rev
336 334 p = revlog.parentrevs(revs[0])[0]
337 335 revs.insert(0, p)
338 336
339 337 # build deltas
340 338 total = len(revs) - 1
341 339 msgbundling = _('bundling')
342 340 for r in xrange(len(revs) - 1):
343 341 if units is not None:
344 342 self._progress(msgbundling, r + 1, unit=units, total=total)
345 343 prev, curr = revs[r], revs[r + 1]
346 344 linknode = lookup(revlog.node(curr))
347 345 for c in self.revchunk(revlog, curr, prev, linknode):
348 346 yield c
349 347
350 348 if units is not None:
351 349 self._progress(msgbundling, None)
352 350 yield self.close()
353 351
354 352 # filter any nodes that claim to be part of the known set
355 353 def prune(self, revlog, missing, commonrevs):
356 354 rr, rl = revlog.rev, revlog.linkrev
357 355 return [n for n in missing if rl(rr(n)) not in commonrevs]
358 356
359 357 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
360 358 '''yield a sequence of changegroup chunks (strings)'''
361 359 repo = self._repo
362 cl = self._changelog
363 ml = self._manifest
360 cl = repo.changelog
361 ml = repo.manifest
364 362
365 363 clrevorder = {}
366 364 mfs = {} # needed manifests
367 365 fnodes = {} # needed file nodes
368 366 changedfiles = set()
369 367
370 368 # Callback for the changelog, used to collect changed files and manifest
371 369 # nodes.
372 370 # Returns the linkrev node (identity in the changelog case).
373 371 def lookupcl(x):
374 372 c = cl.read(x)
375 373 clrevorder[x] = len(clrevorder)
376 374 changedfiles.update(c[3])
377 375 # record the first changeset introducing this manifest version
378 376 mfs.setdefault(c[0], x)
379 377 return x
380 378
381 379 self._verbosenote(_('uncompressed size of bundle content:\n'))
382 380 size = 0
383 381 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
384 382 size += len(chunk)
385 383 yield chunk
386 384 self._verbosenote(_('%8.i (changelog)\n') % size)
387 385
388 386 # We need to make sure that the linkrev in the changegroup refers to
389 387 # the first changeset that introduced the manifest or file revision.
390 388 # The fastpath is usually safer than the slowpath, because the filelogs
391 389 # are walked in revlog order.
392 390 #
393 391 # When taking the slowpath with reorder=None and the manifest revlog
394 392 # uses generaldelta, the manifest may be walked in the "wrong" order.
395 393 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
396 394 # cc0ff93d0c0c).
397 395 #
398 396 # When taking the fastpath, we are only vulnerable to reordering
399 397 # of the changelog itself. The changelog never uses generaldelta, so
400 398 # it is only reordered when reorder=True. To handle this case, we
401 399 # simply take the slowpath, which already has the 'clrevorder' logic.
402 400 # This was also fixed in cc0ff93d0c0c.
403 401 fastpathlinkrev = fastpathlinkrev and not self._reorder
404 402 # Callback for the manifest, used to collect linkrevs for filelog
405 403 # revisions.
406 404 # Returns the linkrev node (collected in lookupcl).
407 405 def lookupmf(x):
408 406 clnode = mfs[x]
409 407 if not fastpathlinkrev:
410 408 mdata = ml.readfast(x)
411 409 for f, n in mdata.iteritems():
412 410 if f in changedfiles:
413 411 # record the first changeset introducing this filelog
414 412 # version
415 413 fclnodes = fnodes.setdefault(f, {})
416 414 fclnode = fclnodes.setdefault(n, clnode)
417 415 if clrevorder[clnode] < clrevorder[fclnode]:
418 416 fclnodes[n] = clnode
419 417 return clnode
420 418
421 419 mfnodes = self.prune(ml, mfs, commonrevs)
422 420 size = 0
423 421 for chunk in self.group(mfnodes, ml, lookupmf, units=_('manifests')):
424 422 size += len(chunk)
425 423 yield chunk
426 424 self._verbosenote(_('%8.i (manifests)\n') % size)
427 425
428 426 mfs.clear()
429 427 clrevs = set(cl.rev(x) for x in clnodes)
430 428
431 429 def linknodes(filerevlog, fname):
432 430 if fastpathlinkrev:
433 431 llr = filerevlog.linkrev
434 432 def genfilenodes():
435 433 for r in filerevlog:
436 434 linkrev = llr(r)
437 435 if linkrev in clrevs:
438 436 yield filerevlog.node(r), cl.node(linkrev)
439 437 return dict(genfilenodes())
440 438 return fnodes.get(fname, {})
441 439
442 440 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
443 441 source):
444 442 yield chunk
445 443
446 444 yield self.close()
447 445
448 446 if clnodes:
449 447 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
450 448
451 449 # The 'source' parameter is useful for extensions
452 450 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
453 451 repo = self._repo
454 452 progress = self._progress
455 453 msgbundling = _('bundling')
456 454
457 455 total = len(changedfiles)
458 456 # for progress output
459 457 msgfiles = _('files')
460 458 for i, fname in enumerate(sorted(changedfiles)):
461 459 filerevlog = repo.file(fname)
462 460 if not filerevlog:
463 461 raise util.Abort(_("empty or missing revlog for %s") % fname)
464 462
465 463 linkrevnodes = linknodes(filerevlog, fname)
466 464 # Lookup for filenodes, we collected the linkrev nodes above in the
467 465 # fastpath case and with lookupmf in the slowpath case.
468 466 def lookupfilelog(x):
469 467 return linkrevnodes[x]
470 468
471 469 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
472 470 if filenodes:
473 471 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
474 472 total=total)
475 473 h = self.fileheader(fname)
476 474 size = len(h)
477 475 yield h
478 476 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
479 477 size += len(chunk)
480 478 yield chunk
481 479 self._verbosenote(_('%8.i %s\n') % (size, fname))
482 480 progress(msgbundling, None)
483 481
484 482 def deltaparent(self, revlog, rev, p1, p2, prev):
485 483 return prev
486 484
487 485 def revchunk(self, revlog, rev, prev, linknode):
488 486 node = revlog.node(rev)
489 487 p1, p2 = revlog.parentrevs(rev)
490 488 base = self.deltaparent(revlog, rev, p1, p2, prev)
491 489
492 490 prefix = ''
493 491 if revlog.iscensored(base) or revlog.iscensored(rev):
494 492 try:
495 493 delta = revlog.revision(node)
496 494 except error.CensoredNodeError, e:
497 495 delta = e.tombstone
498 496 if base == nullrev:
499 497 prefix = mdiff.trivialdiffheader(len(delta))
500 498 else:
501 499 baselen = revlog.rawsize(base)
502 500 prefix = mdiff.replacediffheader(baselen, len(delta))
503 501 elif base == nullrev:
504 502 delta = revlog.revision(node)
505 503 prefix = mdiff.trivialdiffheader(len(delta))
506 504 else:
507 505 delta = revlog.revdiff(base, rev)
508 506 p1n, p2n = revlog.parents(node)
509 507 basenode = revlog.node(base)
510 508 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
511 509 meta += prefix
512 510 l = len(meta) + len(delta)
513 511 yield chunkheader(l)
514 512 yield meta
515 513 yield delta
516 514 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
517 515 # do nothing with basenode, it is implicitly the previous one in HG10
518 516 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
519 517
520 518 class cg2packer(cg1packer):
521 519 version = '02'
522 520 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
523 521
524 522 def __init__(self, repo, bundlecaps=None):
525 523 super(cg2packer, self).__init__(repo, bundlecaps)
526 524 if self._reorder is None:
527 525 # Since generaldelta is directly supported by cg2, reordering
528 526 # generally doesn't help, so we disable it by default (treating
529 527 # bundle.reorder=auto just like bundle.reorder=False).
530 528 self._reorder = False
531 529
532 530 def deltaparent(self, revlog, rev, p1, p2, prev):
533 531 dp = revlog.deltaparent(rev)
534 532 # avoid storing full revisions; pick prev in those cases
535 533 # also pick prev when we can't be sure remote has dp
536 534 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
537 535 return prev
538 536 return dp
539 537
540 538 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
541 539 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
542 540
543 541 packermap = {'01': (cg1packer, cg1unpacker),
544 542 '02': (cg2packer, cg2unpacker)}
545 543
546 544 def _changegroupinfo(repo, nodes, source):
547 545 if repo.ui.verbose or source == 'bundle':
548 546 repo.ui.status(_("%d changesets found\n") % len(nodes))
549 547 if repo.ui.debugflag:
550 548 repo.ui.debug("list of changesets:\n")
551 549 for node in nodes:
552 550 repo.ui.debug("%s\n" % hex(node))
553 551
554 552 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
555 553 repo = repo.unfiltered()
556 554 commonrevs = outgoing.common
557 555 csets = outgoing.missing
558 556 heads = outgoing.missingheads
559 557 # We go through the fast path if we get told to, or if all (unfiltered
560 558 # heads have been requested (since we then know there all linkrevs will
561 559 # be pulled by the client).
562 560 heads.sort()
563 561 fastpathlinkrev = fastpath or (
564 562 repo.filtername is None and heads == sorted(repo.heads()))
565 563
566 564 repo.hook('preoutgoing', throw=True, source=source)
567 565 _changegroupinfo(repo, csets, source)
568 566 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
569 567
570 568 def getsubset(repo, outgoing, bundler, source, fastpath=False, version='01'):
571 569 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
572 570 return packermap[version][1](util.chunkbuffer(gengroup), 'UN')
573 571
574 572 def changegroupsubset(repo, roots, heads, source, version='01'):
575 573 """Compute a changegroup consisting of all the nodes that are
576 574 descendants of any of the roots and ancestors of any of the heads.
577 575 Return a chunkbuffer object whose read() method will return
578 576 successive changegroup chunks.
579 577
580 578 It is fairly complex as determining which filenodes and which
581 579 manifest nodes need to be included for the changeset to be complete
582 580 is non-trivial.
583 581
584 582 Another wrinkle is doing the reverse, figuring out which changeset in
585 583 the changegroup a particular filenode or manifestnode belongs to.
586 584 """
587 585 cl = repo.changelog
588 586 if not roots:
589 587 roots = [nullid]
590 588 # TODO: remove call to nodesbetween.
591 589 csets, roots, heads = cl.nodesbetween(roots, heads)
592 590 discbases = []
593 591 for n in roots:
594 592 discbases.extend([p for p in cl.parents(n) if p != nullid])
595 593 outgoing = discovery.outgoing(cl, discbases, heads)
596 594 bundler = packermap[version][0](repo)
597 595 return getsubset(repo, outgoing, bundler, source, version=version)
598 596
599 597 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
600 598 version='01'):
601 599 """Like getbundle, but taking a discovery.outgoing as an argument.
602 600
603 601 This is only implemented for local repos and reuses potentially
604 602 precomputed sets in outgoing. Returns a raw changegroup generator."""
605 603 if not outgoing.missing:
606 604 return None
607 605 bundler = packermap[version][0](repo, bundlecaps)
608 606 return getsubsetraw(repo, outgoing, bundler, source)
609 607
610 608 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None):
611 609 """Like getbundle, but taking a discovery.outgoing as an argument.
612 610
613 611 This is only implemented for local repos and reuses potentially
614 612 precomputed sets in outgoing."""
615 613 if not outgoing.missing:
616 614 return None
617 615 bundler = cg1packer(repo, bundlecaps)
618 616 return getsubset(repo, outgoing, bundler, source)
619 617
620 618 def _computeoutgoing(repo, heads, common):
621 619 """Computes which revs are outgoing given a set of common
622 620 and a set of heads.
623 621
624 622 This is a separate function so extensions can have access to
625 623 the logic.
626 624
627 625 Returns a discovery.outgoing object.
628 626 """
629 627 cl = repo.changelog
630 628 if common:
631 629 hasnode = cl.hasnode
632 630 common = [n for n in common if hasnode(n)]
633 631 else:
634 632 common = [nullid]
635 633 if not heads:
636 634 heads = cl.heads()
637 635 return discovery.outgoing(cl, common, heads)
638 636
639 637 def getchangegroupraw(repo, source, heads=None, common=None, bundlecaps=None,
640 638 version='01'):
641 639 """Like changegroupsubset, but returns the set difference between the
642 640 ancestors of heads and the ancestors common.
643 641
644 642 If heads is None, use the local heads. If common is None, use [nullid].
645 643
646 644 If version is None, use a version '1' changegroup.
647 645
648 646 The nodes in common might not all be known locally due to the way the
649 647 current discovery protocol works. Returns a raw changegroup generator.
650 648 """
651 649 outgoing = _computeoutgoing(repo, heads, common)
652 650 return getlocalchangegroupraw(repo, source, outgoing, bundlecaps=bundlecaps,
653 651 version=version)
654 652
655 653 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None):
656 654 """Like changegroupsubset, but returns the set difference between the
657 655 ancestors of heads and the ancestors common.
658 656
659 657 If heads is None, use the local heads. If common is None, use [nullid].
660 658
661 659 The nodes in common might not all be known locally due to the way the
662 660 current discovery protocol works.
663 661 """
664 662 outgoing = _computeoutgoing(repo, heads, common)
665 663 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps)
666 664
667 665 def changegroup(repo, basenodes, source):
668 666 # to avoid a race we use changegroupsubset() (issue1320)
669 667 return changegroupsubset(repo, basenodes, repo.heads(), source)
670 668
671 669 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
672 670 revisions = 0
673 671 files = 0
674 672 while True:
675 673 chunkdata = source.filelogheader()
676 674 if not chunkdata:
677 675 break
678 676 f = chunkdata["filename"]
679 677 repo.ui.debug("adding %s revisions\n" % f)
680 678 pr()
681 679 fl = repo.file(f)
682 680 o = len(fl)
683 681 try:
684 682 if not fl.addgroup(source, revmap, trp):
685 683 raise util.Abort(_("received file revlog group is empty"))
686 684 except error.CensoredBaseError, e:
687 685 raise util.Abort(_("received delta base is censored: %s") % e)
688 686 revisions += len(fl) - o
689 687 files += 1
690 688 if f in needfiles:
691 689 needs = needfiles[f]
692 690 for new in xrange(o, len(fl)):
693 691 n = fl.node(new)
694 692 if n in needs:
695 693 needs.remove(n)
696 694 else:
697 695 raise util.Abort(
698 696 _("received spurious file revlog entry"))
699 697 if not needs:
700 698 del needfiles[f]
701 699 repo.ui.progress(_('files'), None)
702 700
703 701 for f, needs in needfiles.iteritems():
704 702 fl = repo.file(f)
705 703 for n in needs:
706 704 try:
707 705 fl.rev(n)
708 706 except error.LookupError:
709 707 raise util.Abort(
710 708 _('missing file data for %s:%s - run hg verify') %
711 709 (f, hex(n)))
712 710
713 711 return revisions, files
714 712
715 713 def addchangegroup(repo, source, srctype, url, emptyok=False,
716 714 targetphase=phases.draft):
717 715 """Add the changegroup returned by source.read() to this repo.
718 716 srctype is a string like 'push', 'pull', or 'unbundle'. url is
719 717 the URL of the repo where this changegroup is coming from.
720 718
721 719 Return an integer summarizing the change to this repo:
722 720 - nothing changed or no source: 0
723 721 - more heads than before: 1+added heads (2..n)
724 722 - fewer heads than before: -1-removed heads (-2..-n)
725 723 - number of heads stays the same: 1
726 724 """
727 725 repo = repo.unfiltered()
728 726 def csmap(x):
729 727 repo.ui.debug("add changeset %s\n" % short(x))
730 728 return len(cl)
731 729
732 730 def revmap(x):
733 731 return cl.rev(x)
734 732
735 733 if not source:
736 734 return 0
737 735
738 736 changesets = files = revisions = 0
739 737 efiles = set()
740 738
741 739 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
742 740 # The transaction could have been created before and already carries source
743 741 # information. In this case we use the top level data. We overwrite the
744 742 # argument because we need to use the top level value (if they exist) in
745 743 # this function.
746 744 srctype = tr.hookargs.setdefault('source', srctype)
747 745 url = tr.hookargs.setdefault('url', url)
748 746
749 747 # write changelog data to temp files so concurrent readers will not see
750 748 # inconsistent view
751 749 cl = repo.changelog
752 750 cl.delayupdate(tr)
753 751 oldheads = cl.heads()
754 752 try:
755 753 repo.hook('prechangegroup', throw=True, **tr.hookargs)
756 754
757 755 trp = weakref.proxy(tr)
758 756 # pull off the changeset group
759 757 repo.ui.status(_("adding changesets\n"))
760 758 clstart = len(cl)
761 759 class prog(object):
762 760 step = _('changesets')
763 761 count = 1
764 762 ui = repo.ui
765 763 total = None
766 764 def __call__(repo):
767 765 repo.ui.progress(repo.step, repo.count, unit=_('chunks'),
768 766 total=repo.total)
769 767 repo.count += 1
770 768 pr = prog()
771 769 source.callback = pr
772 770
773 771 source.changelogheader()
774 772 srccontent = cl.addgroup(source, csmap, trp)
775 773 if not (srccontent or emptyok):
776 774 raise util.Abort(_("received changelog group is empty"))
777 775 clend = len(cl)
778 776 changesets = clend - clstart
779 777 for c in xrange(clstart, clend):
780 778 efiles.update(repo[c].files())
781 779 efiles = len(efiles)
782 780 repo.ui.progress(_('changesets'), None)
783 781
784 782 # pull off the manifest group
785 783 repo.ui.status(_("adding manifests\n"))
786 784 pr.step = _('manifests')
787 785 pr.count = 1
788 786 pr.total = changesets # manifests <= changesets
789 787 # no need to check for empty manifest group here:
790 788 # if the result of the merge of 1 and 2 is the same in 3 and 4,
791 789 # no new manifest will be created and the manifest group will
792 790 # be empty during the pull
793 791 source.manifestheader()
794 792 repo.manifest.addgroup(source, revmap, trp)
795 793 repo.ui.progress(_('manifests'), None)
796 794
797 795 needfiles = {}
798 796 if repo.ui.configbool('server', 'validate', default=False):
799 797 # validate incoming csets have their manifests
800 798 for cset in xrange(clstart, clend):
801 799 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
802 800 mfest = repo.manifest.readdelta(mfnode)
803 801 # store file nodes we must see
804 802 for f, n in mfest.iteritems():
805 803 needfiles.setdefault(f, set()).add(n)
806 804
807 805 # process the files
808 806 repo.ui.status(_("adding file changes\n"))
809 807 pr.step = _('files')
810 808 pr.count = 1
811 809 pr.total = efiles
812 810 source.callback = None
813 811
814 812 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
815 813 needfiles)
816 814 revisions += newrevs
817 815 files += newfiles
818 816
819 817 dh = 0
820 818 if oldheads:
821 819 heads = cl.heads()
822 820 dh = len(heads) - len(oldheads)
823 821 for h in heads:
824 822 if h not in oldheads and repo[h].closesbranch():
825 823 dh -= 1
826 824 htext = ""
827 825 if dh:
828 826 htext = _(" (%+d heads)") % dh
829 827
830 828 repo.ui.status(_("added %d changesets"
831 829 " with %d changes to %d files%s\n")
832 830 % (changesets, revisions, files, htext))
833 831 repo.invalidatevolatilesets()
834 832
835 833 if changesets > 0:
836 834 p = lambda: tr.writepending() and repo.root or ""
837 835 if 'node' not in tr.hookargs:
838 836 tr.hookargs['node'] = hex(cl.node(clstart))
839 837 hookargs = dict(tr.hookargs)
840 838 else:
841 839 hookargs = dict(tr.hookargs)
842 840 hookargs['node'] = hex(cl.node(clstart))
843 841 repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
844 842
845 843 added = [cl.node(r) for r in xrange(clstart, clend)]
846 844 publishing = repo.ui.configbool('phases', 'publish', True)
847 845 if srctype in ('push', 'serve'):
848 846 # Old servers can not push the boundary themselves.
849 847 # New servers won't push the boundary if changeset already
850 848 # exists locally as secret
851 849 #
852 850 # We should not use added here but the list of all change in
853 851 # the bundle
854 852 if publishing:
855 853 phases.advanceboundary(repo, tr, phases.public, srccontent)
856 854 else:
857 855 # Those changesets have been pushed from the outside, their
858 856 # phases are going to be pushed alongside. Therefor
859 857 # `targetphase` is ignored.
860 858 phases.advanceboundary(repo, tr, phases.draft, srccontent)
861 859 phases.retractboundary(repo, tr, phases.draft, added)
862 860 elif srctype != 'strip':
863 861 # publishing only alter behavior during push
864 862 #
865 863 # strip should not touch boundary at all
866 864 phases.retractboundary(repo, tr, targetphase, added)
867 865
868 866 if changesets > 0:
869 867 if srctype != 'strip':
870 868 # During strip, branchcache is invalid but coming call to
871 869 # `destroyed` will repair it.
872 870 # In other case we can safely update cache on disk.
873 871 branchmap.updatecache(repo.filtered('served'))
874 872
875 873 def runhooks():
876 874 # These hooks run when the lock releases, not when the
877 875 # transaction closes. So it's possible for the changelog
878 876 # to have changed since we last saw it.
879 877 if clstart >= len(repo):
880 878 return
881 879
882 880 # forcefully update the on-disk branch cache
883 881 repo.ui.debug("updating the branch cache\n")
884 882 repo.hook("changegroup", **hookargs)
885 883
886 884 for n in added:
887 885 args = hookargs.copy()
888 886 args['node'] = hex(n)
889 887 repo.hook("incoming", **args)
890 888
891 889 newheads = [h for h in repo.heads() if h not in oldheads]
892 890 repo.ui.log("incoming",
893 891 "%s incoming changes - new heads: %s\n",
894 892 len(added),
895 893 ', '.join([hex(c[:6]) for c in newheads]))
896 894
897 895 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
898 896 lambda tr: repo._afterlock(runhooks))
899 897
900 898 tr.close()
901 899
902 900 finally:
903 901 tr.release()
904 902 repo.ui.flush()
905 903 # never return 0 here:
906 904 if dh < 0:
907 905 return dh - 1
908 906 else:
909 907 return dh + 1
General Comments 0
You need to be logged in to leave comments. Login now