##// END OF EJS Templates
changegroup: don't use 'repo' for non-repo 'self'...
Martin von Zweigbergk -
r25573:3e89e67f default
parent child Browse files
Show More
@@ -1,891 +1,890
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import weakref
9 9 from i18n import _
10 10 from node import nullrev, nullid, hex, short
11 11 import mdiff, util, dagutil
12 12 import struct, os, bz2, zlib, tempfile
13 13 import discovery, error, phases, branchmap
14 14
15 15 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
16 16 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
17 17
18 18 def readexactly(stream, n):
19 19 '''read n bytes from stream.read and abort if less was available'''
20 20 s = stream.read(n)
21 21 if len(s) < n:
22 22 raise util.Abort(_("stream ended unexpectedly"
23 23 " (got %d bytes, expected %d)")
24 24 % (len(s), n))
25 25 return s
26 26
27 27 def getchunk(stream):
28 28 """return the next chunk from stream as a string"""
29 29 d = readexactly(stream, 4)
30 30 l = struct.unpack(">l", d)[0]
31 31 if l <= 4:
32 32 if l:
33 33 raise util.Abort(_("invalid chunk length %d") % l)
34 34 return ""
35 35 return readexactly(stream, l - 4)
36 36
37 37 def chunkheader(length):
38 38 """return a changegroup chunk header (string)"""
39 39 return struct.pack(">l", length + 4)
40 40
41 41 def closechunk():
42 42 """return a changegroup chunk header (string) for a zero-length chunk"""
43 43 return struct.pack(">l", 0)
44 44
45 45 def combineresults(results):
46 46 """logic to combine 0 or more addchangegroup results into one"""
47 47 changedheads = 0
48 48 result = 1
49 49 for ret in results:
50 50 # If any changegroup result is 0, return 0
51 51 if ret == 0:
52 52 result = 0
53 53 break
54 54 if ret < -1:
55 55 changedheads += ret + 1
56 56 elif ret > 1:
57 57 changedheads += ret - 1
58 58 if changedheads > 0:
59 59 result = 1 + changedheads
60 60 elif changedheads < 0:
61 61 result = -1 + changedheads
62 62 return result
63 63
64 64 class nocompress(object):
65 65 def compress(self, x):
66 66 return x
67 67 def flush(self):
68 68 return ""
69 69
70 70 bundletypes = {
71 71 "": ("", nocompress), # only when using unbundle on ssh and old http servers
72 72 # since the unification ssh accepts a header but there
73 73 # is no capability signaling it.
74 74 "HG20": (), # special-cased below
75 75 "HG10UN": ("HG10UN", nocompress),
76 76 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
77 77 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
78 78 }
79 79
80 80 # hgweb uses this list to communicate its preferred type
81 81 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
82 82
83 83 def writebundle(ui, cg, filename, bundletype, vfs=None):
84 84 """Write a bundle file and return its filename.
85 85
86 86 Existing files will not be overwritten.
87 87 If no filename is specified, a temporary file is created.
88 88 bz2 compression can be turned off.
89 89 The bundle file will be deleted in case of errors.
90 90 """
91 91
92 92 fh = None
93 93 cleanup = None
94 94 try:
95 95 if filename:
96 96 if vfs:
97 97 fh = vfs.open(filename, "wb")
98 98 else:
99 99 fh = open(filename, "wb")
100 100 else:
101 101 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
102 102 fh = os.fdopen(fd, "wb")
103 103 cleanup = filename
104 104
105 105 if bundletype == "HG20":
106 106 import bundle2
107 107 bundle = bundle2.bundle20(ui)
108 108 part = bundle.newpart('changegroup', data=cg.getchunks())
109 109 part.addparam('version', cg.version)
110 110 z = nocompress()
111 111 chunkiter = bundle.getchunks()
112 112 else:
113 113 if cg.version != '01':
114 114 raise util.Abort(_('old bundle types only supports v1 '
115 115 'changegroups'))
116 116 header, compressor = bundletypes[bundletype]
117 117 fh.write(header)
118 118 z = compressor()
119 119 chunkiter = cg.getchunks()
120 120
121 121 # parse the changegroup data, otherwise we will block
122 122 # in case of sshrepo because we don't know the end of the stream
123 123
124 124 # an empty chunkgroup is the end of the changegroup
125 125 # a changegroup has at least 2 chunkgroups (changelog and manifest).
126 126 # after that, an empty chunkgroup is the end of the changegroup
127 127 for chunk in chunkiter:
128 128 fh.write(z.compress(chunk))
129 129 fh.write(z.flush())
130 130 cleanup = None
131 131 return filename
132 132 finally:
133 133 if fh is not None:
134 134 fh.close()
135 135 if cleanup is not None:
136 136 if filename and vfs:
137 137 vfs.unlink(cleanup)
138 138 else:
139 139 os.unlink(cleanup)
140 140
141 141 def decompressor(fh, alg):
142 142 if alg == 'UN':
143 143 return fh
144 144 elif alg == 'GZ':
145 145 def generator(f):
146 146 zd = zlib.decompressobj()
147 147 for chunk in util.filechunkiter(f):
148 148 yield zd.decompress(chunk)
149 149 elif alg == 'BZ':
150 150 def generator(f):
151 151 zd = bz2.BZ2Decompressor()
152 152 zd.decompress("BZ")
153 153 for chunk in util.filechunkiter(f, 4096):
154 154 yield zd.decompress(chunk)
155 155 else:
156 156 raise util.Abort("unknown bundle compression '%s'" % alg)
157 157 return util.chunkbuffer(generator(fh))
158 158
159 159 class cg1unpacker(object):
160 160 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
161 161 deltaheadersize = struct.calcsize(deltaheader)
162 162 version = '01'
163 163 def __init__(self, fh, alg):
164 164 self._stream = decompressor(fh, alg)
165 165 self._type = alg
166 166 self.callback = None
167 167 def compressed(self):
168 168 return self._type != 'UN'
169 169 def read(self, l):
170 170 return self._stream.read(l)
171 171 def seek(self, pos):
172 172 return self._stream.seek(pos)
173 173 def tell(self):
174 174 return self._stream.tell()
175 175 def close(self):
176 176 return self._stream.close()
177 177
178 178 def chunklength(self):
179 179 d = readexactly(self._stream, 4)
180 180 l = struct.unpack(">l", d)[0]
181 181 if l <= 4:
182 182 if l:
183 183 raise util.Abort(_("invalid chunk length %d") % l)
184 184 return 0
185 185 if self.callback:
186 186 self.callback()
187 187 return l - 4
188 188
189 189 def changelogheader(self):
190 190 """v10 does not have a changelog header chunk"""
191 191 return {}
192 192
193 193 def manifestheader(self):
194 194 """v10 does not have a manifest header chunk"""
195 195 return {}
196 196
197 197 def filelogheader(self):
198 198 """return the header of the filelogs chunk, v10 only has the filename"""
199 199 l = self.chunklength()
200 200 if not l:
201 201 return {}
202 202 fname = readexactly(self._stream, l)
203 203 return {'filename': fname}
204 204
205 205 def _deltaheader(self, headertuple, prevnode):
206 206 node, p1, p2, cs = headertuple
207 207 if prevnode is None:
208 208 deltabase = p1
209 209 else:
210 210 deltabase = prevnode
211 211 return node, p1, p2, deltabase, cs
212 212
213 213 def deltachunk(self, prevnode):
214 214 l = self.chunklength()
215 215 if not l:
216 216 return {}
217 217 headerdata = readexactly(self._stream, self.deltaheadersize)
218 218 header = struct.unpack(self.deltaheader, headerdata)
219 219 delta = readexactly(self._stream, l - self.deltaheadersize)
220 220 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
221 221 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
222 222 'deltabase': deltabase, 'delta': delta}
223 223
224 224 def getchunks(self):
225 225 """returns all the chunks contains in the bundle
226 226
227 227 Used when you need to forward the binary stream to a file or another
228 228 network API. To do so, it parse the changegroup data, otherwise it will
229 229 block in case of sshrepo because it don't know the end of the stream.
230 230 """
231 231 # an empty chunkgroup is the end of the changegroup
232 232 # a changegroup has at least 2 chunkgroups (changelog and manifest).
233 233 # after that, an empty chunkgroup is the end of the changegroup
234 234 empty = False
235 235 count = 0
236 236 while not empty or count <= 2:
237 237 empty = True
238 238 count += 1
239 239 while True:
240 240 chunk = getchunk(self)
241 241 if not chunk:
242 242 break
243 243 empty = False
244 244 yield chunkheader(len(chunk))
245 245 pos = 0
246 246 while pos < len(chunk):
247 247 next = pos + 2**20
248 248 yield chunk[pos:next]
249 249 pos = next
250 250 yield closechunk()
251 251
252 252 class cg2unpacker(cg1unpacker):
253 253 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
254 254 deltaheadersize = struct.calcsize(deltaheader)
255 255 version = '02'
256 256
257 257 def _deltaheader(self, headertuple, prevnode):
258 258 node, p1, p2, deltabase, cs = headertuple
259 259 return node, p1, p2, deltabase, cs
260 260
261 261 class headerlessfixup(object):
262 262 def __init__(self, fh, h):
263 263 self._h = h
264 264 self._fh = fh
265 265 def read(self, n):
266 266 if self._h:
267 267 d, self._h = self._h[:n], self._h[n:]
268 268 if len(d) < n:
269 269 d += readexactly(self._fh, n - len(d))
270 270 return d
271 271 return readexactly(self._fh, n)
272 272
273 273 class cg1packer(object):
274 274 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
275 275 version = '01'
276 276 def __init__(self, repo, bundlecaps=None):
277 277 """Given a source repo, construct a bundler.
278 278
279 279 bundlecaps is optional and can be used to specify the set of
280 280 capabilities which can be used to build the bundle.
281 281 """
282 282 # Set of capabilities we can use to build the bundle.
283 283 if bundlecaps is None:
284 284 bundlecaps = set()
285 285 self._bundlecaps = bundlecaps
286 286 reorder = repo.ui.config('bundle', 'reorder', 'auto')
287 287 if reorder == 'auto':
288 288 reorder = None
289 289 else:
290 290 reorder = util.parsebool(reorder)
291 291 self._repo = repo
292 292 self._reorder = reorder
293 293 self._progress = repo.ui.progress
294 294 if self._repo.ui.verbose and not self._repo.ui.debugflag:
295 295 self._verbosenote = self._repo.ui.note
296 296 else:
297 297 self._verbosenote = lambda s: None
298 298
299 299 def close(self):
300 300 return closechunk()
301 301
302 302 def fileheader(self, fname):
303 303 return chunkheader(len(fname)) + fname
304 304
305 305 def group(self, nodelist, revlog, lookup, units=None):
306 306 """Calculate a delta group, yielding a sequence of changegroup chunks
307 307 (strings).
308 308
309 309 Given a list of changeset revs, return a set of deltas and
310 310 metadata corresponding to nodes. The first delta is
311 311 first parent(nodelist[0]) -> nodelist[0], the receiver is
312 312 guaranteed to have this parent as it has all history before
313 313 these changesets. In the case firstparent is nullrev the
314 314 changegroup starts with a full revision.
315 315
316 316 If units is not None, progress detail will be generated, units specifies
317 317 the type of revlog that is touched (changelog, manifest, etc.).
318 318 """
319 319 # if we don't have any revisions touched by these changesets, bail
320 320 if len(nodelist) == 0:
321 321 yield self.close()
322 322 return
323 323
324 324 # for generaldelta revlogs, we linearize the revs; this will both be
325 325 # much quicker and generate a much smaller bundle
326 326 if (revlog._generaldelta and self._reorder is None) or self._reorder:
327 327 dag = dagutil.revlogdag(revlog)
328 328 revs = set(revlog.rev(n) for n in nodelist)
329 329 revs = dag.linearize(revs)
330 330 else:
331 331 revs = sorted([revlog.rev(n) for n in nodelist])
332 332
333 333 # add the parent of the first rev
334 334 p = revlog.parentrevs(revs[0])[0]
335 335 revs.insert(0, p)
336 336
337 337 # build deltas
338 338 total = len(revs) - 1
339 339 msgbundling = _('bundling')
340 340 for r in xrange(len(revs) - 1):
341 341 if units is not None:
342 342 self._progress(msgbundling, r + 1, unit=units, total=total)
343 343 prev, curr = revs[r], revs[r + 1]
344 344 linknode = lookup(revlog.node(curr))
345 345 for c in self.revchunk(revlog, curr, prev, linknode):
346 346 yield c
347 347
348 348 if units is not None:
349 349 self._progress(msgbundling, None)
350 350 yield self.close()
351 351
352 352 # filter any nodes that claim to be part of the known set
353 353 def prune(self, revlog, missing, commonrevs):
354 354 rr, rl = revlog.rev, revlog.linkrev
355 355 return [n for n in missing if rl(rr(n)) not in commonrevs]
356 356
357 357 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
358 358 '''yield a sequence of changegroup chunks (strings)'''
359 359 repo = self._repo
360 360 cl = repo.changelog
361 361 ml = repo.manifest
362 362
363 363 clrevorder = {}
364 364 mfs = {} # needed manifests
365 365 fnodes = {} # needed file nodes
366 366 changedfiles = set()
367 367
368 368 # Callback for the changelog, used to collect changed files and manifest
369 369 # nodes.
370 370 # Returns the linkrev node (identity in the changelog case).
371 371 def lookupcl(x):
372 372 c = cl.read(x)
373 373 clrevorder[x] = len(clrevorder)
374 374 changedfiles.update(c[3])
375 375 # record the first changeset introducing this manifest version
376 376 mfs.setdefault(c[0], x)
377 377 return x
378 378
379 379 self._verbosenote(_('uncompressed size of bundle content:\n'))
380 380 size = 0
381 381 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
382 382 size += len(chunk)
383 383 yield chunk
384 384 self._verbosenote(_('%8.i (changelog)\n') % size)
385 385
386 386 # We need to make sure that the linkrev in the changegroup refers to
387 387 # the first changeset that introduced the manifest or file revision.
388 388 # The fastpath is usually safer than the slowpath, because the filelogs
389 389 # are walked in revlog order.
390 390 #
391 391 # When taking the slowpath with reorder=None and the manifest revlog
392 392 # uses generaldelta, the manifest may be walked in the "wrong" order.
393 393 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
394 394 # cc0ff93d0c0c).
395 395 #
396 396 # When taking the fastpath, we are only vulnerable to reordering
397 397 # of the changelog itself. The changelog never uses generaldelta, so
398 398 # it is only reordered when reorder=True. To handle this case, we
399 399 # simply take the slowpath, which already has the 'clrevorder' logic.
400 400 # This was also fixed in cc0ff93d0c0c.
401 401 fastpathlinkrev = fastpathlinkrev and not self._reorder
402 402 # Callback for the manifest, used to collect linkrevs for filelog
403 403 # revisions.
404 404 # Returns the linkrev node (collected in lookupcl).
405 405 def lookupmf(x):
406 406 clnode = mfs[x]
407 407 if not fastpathlinkrev:
408 408 mdata = ml.readfast(x)
409 409 for f, n in mdata.iteritems():
410 410 if f in changedfiles:
411 411 # record the first changeset introducing this filelog
412 412 # version
413 413 fclnodes = fnodes.setdefault(f, {})
414 414 fclnode = fclnodes.setdefault(n, clnode)
415 415 if clrevorder[clnode] < clrevorder[fclnode]:
416 416 fclnodes[n] = clnode
417 417 return clnode
418 418
419 419 mfnodes = self.prune(ml, mfs, commonrevs)
420 420 size = 0
421 421 for chunk in self.group(mfnodes, ml, lookupmf, units=_('manifests')):
422 422 size += len(chunk)
423 423 yield chunk
424 424 self._verbosenote(_('%8.i (manifests)\n') % size)
425 425
426 426 mfs.clear()
427 427 clrevs = set(cl.rev(x) for x in clnodes)
428 428
429 429 def linknodes(filerevlog, fname):
430 430 if fastpathlinkrev:
431 431 llr = filerevlog.linkrev
432 432 def genfilenodes():
433 433 for r in filerevlog:
434 434 linkrev = llr(r)
435 435 if linkrev in clrevs:
436 436 yield filerevlog.node(r), cl.node(linkrev)
437 437 return dict(genfilenodes())
438 438 return fnodes.get(fname, {})
439 439
440 440 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
441 441 source):
442 442 yield chunk
443 443
444 444 yield self.close()
445 445
446 446 if clnodes:
447 447 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
448 448
449 449 # The 'source' parameter is useful for extensions
450 450 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
451 451 repo = self._repo
452 452 progress = self._progress
453 453 msgbundling = _('bundling')
454 454
455 455 total = len(changedfiles)
456 456 # for progress output
457 457 msgfiles = _('files')
458 458 for i, fname in enumerate(sorted(changedfiles)):
459 459 filerevlog = repo.file(fname)
460 460 if not filerevlog:
461 461 raise util.Abort(_("empty or missing revlog for %s") % fname)
462 462
463 463 linkrevnodes = linknodes(filerevlog, fname)
464 464 # Lookup for filenodes, we collected the linkrev nodes above in the
465 465 # fastpath case and with lookupmf in the slowpath case.
466 466 def lookupfilelog(x):
467 467 return linkrevnodes[x]
468 468
469 469 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
470 470 if filenodes:
471 471 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
472 472 total=total)
473 473 h = self.fileheader(fname)
474 474 size = len(h)
475 475 yield h
476 476 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
477 477 size += len(chunk)
478 478 yield chunk
479 479 self._verbosenote(_('%8.i %s\n') % (size, fname))
480 480 progress(msgbundling, None)
481 481
482 482 def deltaparent(self, revlog, rev, p1, p2, prev):
483 483 return prev
484 484
485 485 def revchunk(self, revlog, rev, prev, linknode):
486 486 node = revlog.node(rev)
487 487 p1, p2 = revlog.parentrevs(rev)
488 488 base = self.deltaparent(revlog, rev, p1, p2, prev)
489 489
490 490 prefix = ''
491 491 if revlog.iscensored(base) or revlog.iscensored(rev):
492 492 try:
493 493 delta = revlog.revision(node)
494 494 except error.CensoredNodeError, e:
495 495 delta = e.tombstone
496 496 if base == nullrev:
497 497 prefix = mdiff.trivialdiffheader(len(delta))
498 498 else:
499 499 baselen = revlog.rawsize(base)
500 500 prefix = mdiff.replacediffheader(baselen, len(delta))
501 501 elif base == nullrev:
502 502 delta = revlog.revision(node)
503 503 prefix = mdiff.trivialdiffheader(len(delta))
504 504 else:
505 505 delta = revlog.revdiff(base, rev)
506 506 p1n, p2n = revlog.parents(node)
507 507 basenode = revlog.node(base)
508 508 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
509 509 meta += prefix
510 510 l = len(meta) + len(delta)
511 511 yield chunkheader(l)
512 512 yield meta
513 513 yield delta
514 514 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
515 515 # do nothing with basenode, it is implicitly the previous one in HG10
516 516 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
517 517
518 518 class cg2packer(cg1packer):
519 519 version = '02'
520 520 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
521 521
522 522 def __init__(self, repo, bundlecaps=None):
523 523 super(cg2packer, self).__init__(repo, bundlecaps)
524 524 if self._reorder is None:
525 525 # Since generaldelta is directly supported by cg2, reordering
526 526 # generally doesn't help, so we disable it by default (treating
527 527 # bundle.reorder=auto just like bundle.reorder=False).
528 528 self._reorder = False
529 529
530 530 def deltaparent(self, revlog, rev, p1, p2, prev):
531 531 dp = revlog.deltaparent(rev)
532 532 # avoid storing full revisions; pick prev in those cases
533 533 # also pick prev when we can't be sure remote has dp
534 534 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
535 535 return prev
536 536 return dp
537 537
538 538 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
539 539 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
540 540
541 541 packermap = {'01': (cg1packer, cg1unpacker),
542 542 '02': (cg2packer, cg2unpacker)}
543 543
544 544 def _changegroupinfo(repo, nodes, source):
545 545 if repo.ui.verbose or source == 'bundle':
546 546 repo.ui.status(_("%d changesets found\n") % len(nodes))
547 547 if repo.ui.debugflag:
548 548 repo.ui.debug("list of changesets:\n")
549 549 for node in nodes:
550 550 repo.ui.debug("%s\n" % hex(node))
551 551
552 552 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
553 553 repo = repo.unfiltered()
554 554 commonrevs = outgoing.common
555 555 csets = outgoing.missing
556 556 heads = outgoing.missingheads
557 557 # We go through the fast path if we get told to, or if all (unfiltered
558 558 # heads have been requested (since we then know there all linkrevs will
559 559 # be pulled by the client).
560 560 heads.sort()
561 561 fastpathlinkrev = fastpath or (
562 562 repo.filtername is None and heads == sorted(repo.heads()))
563 563
564 564 repo.hook('preoutgoing', throw=True, source=source)
565 565 _changegroupinfo(repo, csets, source)
566 566 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
567 567
568 568 def getsubset(repo, outgoing, bundler, source, fastpath=False, version='01'):
569 569 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
570 570 return packermap[version][1](util.chunkbuffer(gengroup), 'UN')
571 571
572 572 def changegroupsubset(repo, roots, heads, source, version='01'):
573 573 """Compute a changegroup consisting of all the nodes that are
574 574 descendants of any of the roots and ancestors of any of the heads.
575 575 Return a chunkbuffer object whose read() method will return
576 576 successive changegroup chunks.
577 577
578 578 It is fairly complex as determining which filenodes and which
579 579 manifest nodes need to be included for the changeset to be complete
580 580 is non-trivial.
581 581
582 582 Another wrinkle is doing the reverse, figuring out which changeset in
583 583 the changegroup a particular filenode or manifestnode belongs to.
584 584 """
585 585 cl = repo.changelog
586 586 if not roots:
587 587 roots = [nullid]
588 588 # TODO: remove call to nodesbetween.
589 589 csets, roots, heads = cl.nodesbetween(roots, heads)
590 590 discbases = []
591 591 for n in roots:
592 592 discbases.extend([p for p in cl.parents(n) if p != nullid])
593 593 outgoing = discovery.outgoing(cl, discbases, heads)
594 594 bundler = packermap[version][0](repo)
595 595 return getsubset(repo, outgoing, bundler, source, version=version)
596 596
597 597 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
598 598 version='01'):
599 599 """Like getbundle, but taking a discovery.outgoing as an argument.
600 600
601 601 This is only implemented for local repos and reuses potentially
602 602 precomputed sets in outgoing. Returns a raw changegroup generator."""
603 603 if not outgoing.missing:
604 604 return None
605 605 bundler = packermap[version][0](repo, bundlecaps)
606 606 return getsubsetraw(repo, outgoing, bundler, source)
607 607
608 608 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None):
609 609 """Like getbundle, but taking a discovery.outgoing as an argument.
610 610
611 611 This is only implemented for local repos and reuses potentially
612 612 precomputed sets in outgoing."""
613 613 if not outgoing.missing:
614 614 return None
615 615 bundler = cg1packer(repo, bundlecaps)
616 616 return getsubset(repo, outgoing, bundler, source)
617 617
618 618 def computeoutgoing(repo, heads, common):
619 619 """Computes which revs are outgoing given a set of common
620 620 and a set of heads.
621 621
622 622 This is a separate function so extensions can have access to
623 623 the logic.
624 624
625 625 Returns a discovery.outgoing object.
626 626 """
627 627 cl = repo.changelog
628 628 if common:
629 629 hasnode = cl.hasnode
630 630 common = [n for n in common if hasnode(n)]
631 631 else:
632 632 common = [nullid]
633 633 if not heads:
634 634 heads = cl.heads()
635 635 return discovery.outgoing(cl, common, heads)
636 636
637 637 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None):
638 638 """Like changegroupsubset, but returns the set difference between the
639 639 ancestors of heads and the ancestors common.
640 640
641 641 If heads is None, use the local heads. If common is None, use [nullid].
642 642
643 643 The nodes in common might not all be known locally due to the way the
644 644 current discovery protocol works.
645 645 """
646 646 outgoing = computeoutgoing(repo, heads, common)
647 647 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps)
648 648
649 649 def changegroup(repo, basenodes, source):
650 650 # to avoid a race we use changegroupsubset() (issue1320)
651 651 return changegroupsubset(repo, basenodes, repo.heads(), source)
652 652
653 653 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
654 654 revisions = 0
655 655 files = 0
656 656 while True:
657 657 chunkdata = source.filelogheader()
658 658 if not chunkdata:
659 659 break
660 660 f = chunkdata["filename"]
661 661 repo.ui.debug("adding %s revisions\n" % f)
662 662 pr()
663 663 fl = repo.file(f)
664 664 o = len(fl)
665 665 try:
666 666 if not fl.addgroup(source, revmap, trp):
667 667 raise util.Abort(_("received file revlog group is empty"))
668 668 except error.CensoredBaseError, e:
669 669 raise util.Abort(_("received delta base is censored: %s") % e)
670 670 revisions += len(fl) - o
671 671 files += 1
672 672 if f in needfiles:
673 673 needs = needfiles[f]
674 674 for new in xrange(o, len(fl)):
675 675 n = fl.node(new)
676 676 if n in needs:
677 677 needs.remove(n)
678 678 else:
679 679 raise util.Abort(
680 680 _("received spurious file revlog entry"))
681 681 if not needs:
682 682 del needfiles[f]
683 683 repo.ui.progress(_('files'), None)
684 684
685 685 for f, needs in needfiles.iteritems():
686 686 fl = repo.file(f)
687 687 for n in needs:
688 688 try:
689 689 fl.rev(n)
690 690 except error.LookupError:
691 691 raise util.Abort(
692 692 _('missing file data for %s:%s - run hg verify') %
693 693 (f, hex(n)))
694 694
695 695 return revisions, files
696 696
697 697 def addchangegroup(repo, source, srctype, url, emptyok=False,
698 698 targetphase=phases.draft, expectedtotal=None):
699 699 """Add the changegroup returned by source.read() to this repo.
700 700 srctype is a string like 'push', 'pull', or 'unbundle'. url is
701 701 the URL of the repo where this changegroup is coming from.
702 702
703 703 Return an integer summarizing the change to this repo:
704 704 - nothing changed or no source: 0
705 705 - more heads than before: 1+added heads (2..n)
706 706 - fewer heads than before: -1-removed heads (-2..-n)
707 707 - number of heads stays the same: 1
708 708 """
709 709 repo = repo.unfiltered()
710 710 def csmap(x):
711 711 repo.ui.debug("add changeset %s\n" % short(x))
712 712 return len(cl)
713 713
714 714 def revmap(x):
715 715 return cl.rev(x)
716 716
717 717 if not source:
718 718 return 0
719 719
720 720 changesets = files = revisions = 0
721 721 efiles = set()
722 722
723 723 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
724 724 # The transaction could have been created before and already carries source
725 725 # information. In this case we use the top level data. We overwrite the
726 726 # argument because we need to use the top level value (if they exist) in
727 727 # this function.
728 728 srctype = tr.hookargs.setdefault('source', srctype)
729 729 url = tr.hookargs.setdefault('url', url)
730 730
731 731 # write changelog data to temp files so concurrent readers will not see
732 732 # inconsistent view
733 733 cl = repo.changelog
734 734 cl.delayupdate(tr)
735 735 oldheads = cl.heads()
736 736 try:
737 737 repo.hook('prechangegroup', throw=True, **tr.hookargs)
738 738
739 739 trp = weakref.proxy(tr)
740 740 # pull off the changeset group
741 741 repo.ui.status(_("adding changesets\n"))
742 742 clstart = len(cl)
743 743 class prog(object):
744 744 step = _('changesets')
745 745 count = 1
746 ui = repo.ui
747 746 total = expectedtotal
748 def __call__(repo):
749 repo.ui.progress(repo.step, repo.count, unit=_('chunks'),
750 total=repo.total)
751 repo.count += 1
747 def __call__(self):
748 repo.ui.progress(self.step, self.count, unit=_('chunks'),
749 total=self.total)
750 self.count += 1
752 751 pr = prog()
753 752 source.callback = pr
754 753
755 754 source.changelogheader()
756 755 srccontent = cl.addgroup(source, csmap, trp)
757 756 if not (srccontent or emptyok):
758 757 raise util.Abort(_("received changelog group is empty"))
759 758 clend = len(cl)
760 759 changesets = clend - clstart
761 760 for c in xrange(clstart, clend):
762 761 efiles.update(repo[c].files())
763 762 efiles = len(efiles)
764 763 repo.ui.progress(_('changesets'), None)
765 764
766 765 # pull off the manifest group
767 766 repo.ui.status(_("adding manifests\n"))
768 767 pr.step = _('manifests')
769 768 pr.count = 1
770 769 pr.total = changesets # manifests <= changesets
771 770 # no need to check for empty manifest group here:
772 771 # if the result of the merge of 1 and 2 is the same in 3 and 4,
773 772 # no new manifest will be created and the manifest group will
774 773 # be empty during the pull
775 774 source.manifestheader()
776 775 repo.manifest.addgroup(source, revmap, trp)
777 776 repo.ui.progress(_('manifests'), None)
778 777
779 778 needfiles = {}
780 779 if repo.ui.configbool('server', 'validate', default=False):
781 780 # validate incoming csets have their manifests
782 781 for cset in xrange(clstart, clend):
783 782 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
784 783 mfest = repo.manifest.readdelta(mfnode)
785 784 # store file nodes we must see
786 785 for f, n in mfest.iteritems():
787 786 needfiles.setdefault(f, set()).add(n)
788 787
789 788 # process the files
790 789 repo.ui.status(_("adding file changes\n"))
791 790 pr.step = _('files')
792 791 pr.count = 1
793 792 pr.total = efiles
794 793 source.callback = None
795 794
796 795 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
797 796 needfiles)
798 797 revisions += newrevs
799 798 files += newfiles
800 799
801 800 dh = 0
802 801 if oldheads:
803 802 heads = cl.heads()
804 803 dh = len(heads) - len(oldheads)
805 804 for h in heads:
806 805 if h not in oldheads and repo[h].closesbranch():
807 806 dh -= 1
808 807 htext = ""
809 808 if dh:
810 809 htext = _(" (%+d heads)") % dh
811 810
812 811 repo.ui.status(_("added %d changesets"
813 812 " with %d changes to %d files%s\n")
814 813 % (changesets, revisions, files, htext))
815 814 repo.invalidatevolatilesets()
816 815
817 816 if changesets > 0:
818 817 p = lambda: tr.writepending() and repo.root or ""
819 818 if 'node' not in tr.hookargs:
820 819 tr.hookargs['node'] = hex(cl.node(clstart))
821 820 hookargs = dict(tr.hookargs)
822 821 else:
823 822 hookargs = dict(tr.hookargs)
824 823 hookargs['node'] = hex(cl.node(clstart))
825 824 repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
826 825
827 826 added = [cl.node(r) for r in xrange(clstart, clend)]
828 827 publishing = repo.ui.configbool('phases', 'publish', True)
829 828 if srctype in ('push', 'serve'):
830 829 # Old servers can not push the boundary themselves.
831 830 # New servers won't push the boundary if changeset already
832 831 # exists locally as secret
833 832 #
834 833 # We should not use added here but the list of all change in
835 834 # the bundle
836 835 if publishing:
837 836 phases.advanceboundary(repo, tr, phases.public, srccontent)
838 837 else:
839 838 # Those changesets have been pushed from the outside, their
840 839 # phases are going to be pushed alongside. Therefor
841 840 # `targetphase` is ignored.
842 841 phases.advanceboundary(repo, tr, phases.draft, srccontent)
843 842 phases.retractboundary(repo, tr, phases.draft, added)
844 843 elif srctype != 'strip':
845 844 # publishing only alter behavior during push
846 845 #
847 846 # strip should not touch boundary at all
848 847 phases.retractboundary(repo, tr, targetphase, added)
849 848
850 849 if changesets > 0:
851 850 if srctype != 'strip':
852 851 # During strip, branchcache is invalid but coming call to
853 852 # `destroyed` will repair it.
854 853 # In other case we can safely update cache on disk.
855 854 branchmap.updatecache(repo.filtered('served'))
856 855
857 856 def runhooks():
858 857 # These hooks run when the lock releases, not when the
859 858 # transaction closes. So it's possible for the changelog
860 859 # to have changed since we last saw it.
861 860 if clstart >= len(repo):
862 861 return
863 862
864 863 # forcefully update the on-disk branch cache
865 864 repo.ui.debug("updating the branch cache\n")
866 865 repo.hook("changegroup", **hookargs)
867 866
868 867 for n in added:
869 868 args = hookargs.copy()
870 869 args['node'] = hex(n)
871 870 repo.hook("incoming", **args)
872 871
873 872 newheads = [h for h in repo.heads() if h not in oldheads]
874 873 repo.ui.log("incoming",
875 874 "%s incoming changes - new heads: %s\n",
876 875 len(added),
877 876 ', '.join([hex(c[:6]) for c in newheads]))
878 877
879 878 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
880 879 lambda tr: repo._afterlock(runhooks))
881 880
882 881 tr.close()
883 882
884 883 finally:
885 884 tr.release()
886 885 repo.ui.flush()
887 886 # never return 0 here:
888 887 if dh < 0:
889 888 return dh - 1
890 889 else:
891 890 return dh + 1
General Comments 0
You need to be logged in to leave comments. Login now