##// END OF EJS Templates
changegroup: rename 'mf' to 'ml' to match 'cl', since it's a revlog...
Martin von Zweigbergk -
r24899:cf1ea756 default
parent child Browse files
Show More
@@ -1,900 +1,900 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import weakref
9 9 from i18n import _
10 10 from node import nullrev, nullid, hex, short
11 11 import mdiff, util, dagutil
12 12 import struct, os, bz2, zlib, tempfile
13 13 import discovery, error, phases, branchmap
14 14
15 15 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
16 16 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
17 17
18 18 def readexactly(stream, n):
19 19 '''read n bytes from stream.read and abort if less was available'''
20 20 s = stream.read(n)
21 21 if len(s) < n:
22 22 raise util.Abort(_("stream ended unexpectedly"
23 23 " (got %d bytes, expected %d)")
24 24 % (len(s), n))
25 25 return s
26 26
27 27 def getchunk(stream):
28 28 """return the next chunk from stream as a string"""
29 29 d = readexactly(stream, 4)
30 30 l = struct.unpack(">l", d)[0]
31 31 if l <= 4:
32 32 if l:
33 33 raise util.Abort(_("invalid chunk length %d") % l)
34 34 return ""
35 35 return readexactly(stream, l - 4)
36 36
37 37 def chunkheader(length):
38 38 """return a changegroup chunk header (string)"""
39 39 return struct.pack(">l", length + 4)
40 40
41 41 def closechunk():
42 42 """return a changegroup chunk header (string) for a zero-length chunk"""
43 43 return struct.pack(">l", 0)
44 44
45 45 def combineresults(results):
46 46 """logic to combine 0 or more addchangegroup results into one"""
47 47 changedheads = 0
48 48 result = 1
49 49 for ret in results:
50 50 # If any changegroup result is 0, return 0
51 51 if ret == 0:
52 52 result = 0
53 53 break
54 54 if ret < -1:
55 55 changedheads += ret + 1
56 56 elif ret > 1:
57 57 changedheads += ret - 1
58 58 if changedheads > 0:
59 59 result = 1 + changedheads
60 60 elif changedheads < 0:
61 61 result = -1 + changedheads
62 62 return result
63 63
64 64 class nocompress(object):
65 65 def compress(self, x):
66 66 return x
67 67 def flush(self):
68 68 return ""
69 69
70 70 bundletypes = {
71 71 "": ("", nocompress), # only when using unbundle on ssh and old http servers
72 72 # since the unification ssh accepts a header but there
73 73 # is no capability signaling it.
74 74 "HG20": (), # special-cased below
75 75 "HG10UN": ("HG10UN", nocompress),
76 76 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
77 77 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
78 78 }
79 79
80 80 # hgweb uses this list to communicate its preferred type
81 81 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
82 82
83 83 def writebundle(ui, cg, filename, bundletype, vfs=None):
84 84 """Write a bundle file and return its filename.
85 85
86 86 Existing files will not be overwritten.
87 87 If no filename is specified, a temporary file is created.
88 88 bz2 compression can be turned off.
89 89 The bundle file will be deleted in case of errors.
90 90 """
91 91
92 92 fh = None
93 93 cleanup = None
94 94 try:
95 95 if filename:
96 96 if vfs:
97 97 fh = vfs.open(filename, "wb")
98 98 else:
99 99 fh = open(filename, "wb")
100 100 else:
101 101 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
102 102 fh = os.fdopen(fd, "wb")
103 103 cleanup = filename
104 104
105 105 if bundletype == "HG20":
106 106 import bundle2
107 107 bundle = bundle2.bundle20(ui)
108 108 part = bundle.newpart('changegroup', data=cg.getchunks())
109 109 part.addparam('version', cg.version)
110 110 z = nocompress()
111 111 chunkiter = bundle.getchunks()
112 112 else:
113 113 if cg.version != '01':
114 114 raise util.Abort(_('old bundle types only supports v1 '
115 115 'changegroups'))
116 116 header, compressor = bundletypes[bundletype]
117 117 fh.write(header)
118 118 z = compressor()
119 119 chunkiter = cg.getchunks()
120 120
121 121 # parse the changegroup data, otherwise we will block
122 122 # in case of sshrepo because we don't know the end of the stream
123 123
124 124 # an empty chunkgroup is the end of the changegroup
125 125 # a changegroup has at least 2 chunkgroups (changelog and manifest).
126 126 # after that, an empty chunkgroup is the end of the changegroup
127 127 for chunk in chunkiter:
128 128 fh.write(z.compress(chunk))
129 129 fh.write(z.flush())
130 130 cleanup = None
131 131 return filename
132 132 finally:
133 133 if fh is not None:
134 134 fh.close()
135 135 if cleanup is not None:
136 136 if filename and vfs:
137 137 vfs.unlink(cleanup)
138 138 else:
139 139 os.unlink(cleanup)
140 140
141 141 def decompressor(fh, alg):
142 142 if alg == 'UN':
143 143 return fh
144 144 elif alg == 'GZ':
145 145 def generator(f):
146 146 zd = zlib.decompressobj()
147 147 for chunk in util.filechunkiter(f):
148 148 yield zd.decompress(chunk)
149 149 elif alg == 'BZ':
150 150 def generator(f):
151 151 zd = bz2.BZ2Decompressor()
152 152 zd.decompress("BZ")
153 153 for chunk in util.filechunkiter(f, 4096):
154 154 yield zd.decompress(chunk)
155 155 else:
156 156 raise util.Abort("unknown bundle compression '%s'" % alg)
157 157 return util.chunkbuffer(generator(fh))
158 158
159 159 class cg1unpacker(object):
160 160 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
161 161 deltaheadersize = struct.calcsize(deltaheader)
162 162 version = '01'
163 163 def __init__(self, fh, alg):
164 164 self._stream = decompressor(fh, alg)
165 165 self._type = alg
166 166 self.callback = None
167 167 def compressed(self):
168 168 return self._type != 'UN'
169 169 def read(self, l):
170 170 return self._stream.read(l)
171 171 def seek(self, pos):
172 172 return self._stream.seek(pos)
173 173 def tell(self):
174 174 return self._stream.tell()
175 175 def close(self):
176 176 return self._stream.close()
177 177
178 178 def chunklength(self):
179 179 d = readexactly(self._stream, 4)
180 180 l = struct.unpack(">l", d)[0]
181 181 if l <= 4:
182 182 if l:
183 183 raise util.Abort(_("invalid chunk length %d") % l)
184 184 return 0
185 185 if self.callback:
186 186 self.callback()
187 187 return l - 4
188 188
189 189 def changelogheader(self):
190 190 """v10 does not have a changelog header chunk"""
191 191 return {}
192 192
193 193 def manifestheader(self):
194 194 """v10 does not have a manifest header chunk"""
195 195 return {}
196 196
197 197 def filelogheader(self):
198 198 """return the header of the filelogs chunk, v10 only has the filename"""
199 199 l = self.chunklength()
200 200 if not l:
201 201 return {}
202 202 fname = readexactly(self._stream, l)
203 203 return {'filename': fname}
204 204
205 205 def _deltaheader(self, headertuple, prevnode):
206 206 node, p1, p2, cs = headertuple
207 207 if prevnode is None:
208 208 deltabase = p1
209 209 else:
210 210 deltabase = prevnode
211 211 return node, p1, p2, deltabase, cs
212 212
213 213 def deltachunk(self, prevnode):
214 214 l = self.chunklength()
215 215 if not l:
216 216 return {}
217 217 headerdata = readexactly(self._stream, self.deltaheadersize)
218 218 header = struct.unpack(self.deltaheader, headerdata)
219 219 delta = readexactly(self._stream, l - self.deltaheadersize)
220 220 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
221 221 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
222 222 'deltabase': deltabase, 'delta': delta}
223 223
224 224 def getchunks(self):
225 225 """returns all the chunks contains in the bundle
226 226
227 227 Used when you need to forward the binary stream to a file or another
228 228 network API. To do so, it parse the changegroup data, otherwise it will
229 229 block in case of sshrepo because it don't know the end of the stream.
230 230 """
231 231 # an empty chunkgroup is the end of the changegroup
232 232 # a changegroup has at least 2 chunkgroups (changelog and manifest).
233 233 # after that, an empty chunkgroup is the end of the changegroup
234 234 empty = False
235 235 count = 0
236 236 while not empty or count <= 2:
237 237 empty = True
238 238 count += 1
239 239 while True:
240 240 chunk = getchunk(self)
241 241 if not chunk:
242 242 break
243 243 empty = False
244 244 yield chunkheader(len(chunk))
245 245 pos = 0
246 246 while pos < len(chunk):
247 247 next = pos + 2**20
248 248 yield chunk[pos:next]
249 249 pos = next
250 250 yield closechunk()
251 251
252 252 class cg2unpacker(cg1unpacker):
253 253 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
254 254 deltaheadersize = struct.calcsize(deltaheader)
255 255 version = '02'
256 256
257 257 def _deltaheader(self, headertuple, prevnode):
258 258 node, p1, p2, deltabase, cs = headertuple
259 259 return node, p1, p2, deltabase, cs
260 260
261 261 class headerlessfixup(object):
262 262 def __init__(self, fh, h):
263 263 self._h = h
264 264 self._fh = fh
265 265 def read(self, n):
266 266 if self._h:
267 267 d, self._h = self._h[:n], self._h[n:]
268 268 if len(d) < n:
269 269 d += readexactly(self._fh, n - len(d))
270 270 return d
271 271 return readexactly(self._fh, n)
272 272
273 273 class cg1packer(object):
274 274 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
275 275 version = '01'
276 276 def __init__(self, repo, bundlecaps=None):
277 277 """Given a source repo, construct a bundler.
278 278
279 279 bundlecaps is optional and can be used to specify the set of
280 280 capabilities which can be used to build the bundle.
281 281 """
282 282 # Set of capabilities we can use to build the bundle.
283 283 if bundlecaps is None:
284 284 bundlecaps = set()
285 285 self._bundlecaps = bundlecaps
286 286 self._changelog = repo.changelog
287 287 self._manifest = repo.manifest
288 288 reorder = repo.ui.config('bundle', 'reorder', 'auto')
289 289 if reorder == 'auto':
290 290 reorder = None
291 291 else:
292 292 reorder = util.parsebool(reorder)
293 293 self._repo = repo
294 294 self._reorder = reorder
295 295 self._progress = repo.ui.progress
296 296 if self._repo.ui.verbose and not self._repo.ui.debugflag:
297 297 self._verbosenote = self._repo.ui.note
298 298 else:
299 299 self._verbosenote = lambda s: None
300 300
301 301 def close(self):
302 302 return closechunk()
303 303
304 304 def fileheader(self, fname):
305 305 return chunkheader(len(fname)) + fname
306 306
307 307 def group(self, nodelist, revlog, lookup, units=None, reorder=None):
308 308 """Calculate a delta group, yielding a sequence of changegroup chunks
309 309 (strings).
310 310
311 311 Given a list of changeset revs, return a set of deltas and
312 312 metadata corresponding to nodes. The first delta is
313 313 first parent(nodelist[0]) -> nodelist[0], the receiver is
314 314 guaranteed to have this parent as it has all history before
315 315 these changesets. In the case firstparent is nullrev the
316 316 changegroup starts with a full revision.
317 317
318 318 If units is not None, progress detail will be generated, units specifies
319 319 the type of revlog that is touched (changelog, manifest, etc.).
320 320 """
321 321 # if we don't have any revisions touched by these changesets, bail
322 322 if len(nodelist) == 0:
323 323 yield self.close()
324 324 return
325 325
326 326 # for generaldelta revlogs, we linearize the revs; this will both be
327 327 # much quicker and generate a much smaller bundle
328 328 if (revlog._generaldelta and reorder is not False) or reorder:
329 329 dag = dagutil.revlogdag(revlog)
330 330 revs = set(revlog.rev(n) for n in nodelist)
331 331 revs = dag.linearize(revs)
332 332 else:
333 333 revs = sorted([revlog.rev(n) for n in nodelist])
334 334
335 335 # add the parent of the first rev
336 336 p = revlog.parentrevs(revs[0])[0]
337 337 revs.insert(0, p)
338 338
339 339 # build deltas
340 340 total = len(revs) - 1
341 341 msgbundling = _('bundling')
342 342 for r in xrange(len(revs) - 1):
343 343 if units is not None:
344 344 self._progress(msgbundling, r + 1, unit=units, total=total)
345 345 prev, curr = revs[r], revs[r + 1]
346 346 linknode = lookup(revlog.node(curr))
347 347 for c in self.revchunk(revlog, curr, prev, linknode):
348 348 yield c
349 349
350 350 yield self.close()
351 351
352 352 # filter any nodes that claim to be part of the known set
353 353 def prune(self, revlog, missing, commonrevs):
354 354 rr, rl = revlog.rev, revlog.linkrev
355 355 return [n for n in missing if rl(rr(n)) not in commonrevs]
356 356
357 357 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
358 358 '''yield a sequence of changegroup chunks (strings)'''
359 359 repo = self._repo
360 360 cl = self._changelog
361 mf = self._manifest
361 ml = self._manifest
362 362 reorder = self._reorder
363 363 progress = self._progress
364 364
365 365 # for progress output
366 366 msgbundling = _('bundling')
367 367
368 368 clrevorder = {}
369 369 mfs = {} # needed manifests
370 370 fnodes = {} # needed file nodes
371 371 changedfiles = set()
372 372
373 373 # Callback for the changelog, used to collect changed files and manifest
374 374 # nodes.
375 375 # Returns the linkrev node (identity in the changelog case).
376 376 def lookupcl(x):
377 377 c = cl.read(x)
378 378 clrevorder[x] = len(clrevorder)
379 379 changedfiles.update(c[3])
380 380 # record the first changeset introducing this manifest version
381 381 mfs.setdefault(c[0], x)
382 382 return x
383 383
384 384 self._verbosenote(_('uncompressed size of bundle content:\n'))
385 385 size = 0
386 386 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets'),
387 387 reorder=reorder):
388 388 size += len(chunk)
389 389 yield chunk
390 390 self._verbosenote(_('%8.i (changelog)\n') % size)
391 391 progress(msgbundling, None)
392 392
393 393 # Callback for the manifest, used to collect linkrevs for filelog
394 394 # revisions.
395 395 # Returns the linkrev node (collected in lookupcl).
396 396 def lookupmf(x):
397 397 clnode = mfs[x]
398 398 if not fastpathlinkrev or reorder:
399 mdata = mf.readfast(x)
399 mdata = ml.readfast(x)
400 400 for f, n in mdata.iteritems():
401 401 if f in changedfiles:
402 402 # record the first changeset introducing this filelog
403 403 # version
404 404 fclnodes = fnodes.setdefault(f, {})
405 405 fclnode = fclnodes.setdefault(n, clnode)
406 406 if clrevorder[clnode] < clrevorder[fclnode]:
407 407 fclnodes[n] = clnode
408 408 return clnode
409 409
410 mfnodes = self.prune(mf, mfs, commonrevs)
410 mfnodes = self.prune(ml, mfs, commonrevs)
411 411 size = 0
412 for chunk in self.group(mfnodes, mf, lookupmf, units=_('manifests'),
412 for chunk in self.group(mfnodes, ml, lookupmf, units=_('manifests'),
413 413 reorder=reorder):
414 414 size += len(chunk)
415 415 yield chunk
416 416 self._verbosenote(_('%8.i (manifests)\n') % size)
417 417 progress(msgbundling, None)
418 418
419 419 mfs.clear()
420 420 clrevs = set(cl.rev(x) for x in clnodes)
421 421
422 422 def linknodes(filerevlog, fname):
423 423 if fastpathlinkrev and not reorder:
424 424 llr = filerevlog.linkrev
425 425 def genfilenodes():
426 426 for r in filerevlog:
427 427 linkrev = llr(r)
428 428 if linkrev in clrevs:
429 429 yield filerevlog.node(r), cl.node(linkrev)
430 430 return dict(genfilenodes())
431 431 return fnodes.get(fname, {})
432 432
433 433 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
434 434 source):
435 435 yield chunk
436 436
437 437 yield self.close()
438 438 progress(msgbundling, None)
439 439
440 440 if clnodes:
441 441 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
442 442
443 443 # The 'source' parameter is useful for extensions
444 444 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
445 445 repo = self._repo
446 446 progress = self._progress
447 447 reorder = self._reorder
448 448 msgbundling = _('bundling')
449 449
450 450 total = len(changedfiles)
451 451 # for progress output
452 452 msgfiles = _('files')
453 453 for i, fname in enumerate(sorted(changedfiles)):
454 454 filerevlog = repo.file(fname)
455 455 if not filerevlog:
456 456 raise util.Abort(_("empty or missing revlog for %s") % fname)
457 457
458 458 linkrevnodes = linknodes(filerevlog, fname)
459 459 # Lookup for filenodes, we collected the linkrev nodes above in the
460 460 # fastpath case and with lookupmf in the slowpath case.
461 461 def lookupfilelog(x):
462 462 return linkrevnodes[x]
463 463
464 464 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
465 465 if filenodes:
466 466 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
467 467 total=total)
468 468 h = self.fileheader(fname)
469 469 size = len(h)
470 470 yield h
471 471 for chunk in self.group(filenodes, filerevlog, lookupfilelog,
472 472 reorder=reorder):
473 473 size += len(chunk)
474 474 yield chunk
475 475 self._verbosenote(_('%8.i %s\n') % (size, fname))
476 476
477 477 def deltaparent(self, revlog, rev, p1, p2, prev):
478 478 return prev
479 479
480 480 def revchunk(self, revlog, rev, prev, linknode):
481 481 node = revlog.node(rev)
482 482 p1, p2 = revlog.parentrevs(rev)
483 483 base = self.deltaparent(revlog, rev, p1, p2, prev)
484 484
485 485 prefix = ''
486 486 if revlog.iscensored(base) or revlog.iscensored(rev):
487 487 try:
488 488 delta = revlog.revision(node)
489 489 except error.CensoredNodeError, e:
490 490 delta = e.tombstone
491 491 if base == nullrev:
492 492 prefix = mdiff.trivialdiffheader(len(delta))
493 493 else:
494 494 baselen = revlog.rawsize(base)
495 495 prefix = mdiff.replacediffheader(baselen, len(delta))
496 496 elif base == nullrev:
497 497 delta = revlog.revision(node)
498 498 prefix = mdiff.trivialdiffheader(len(delta))
499 499 else:
500 500 delta = revlog.revdiff(base, rev)
501 501 p1n, p2n = revlog.parents(node)
502 502 basenode = revlog.node(base)
503 503 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
504 504 meta += prefix
505 505 l = len(meta) + len(delta)
506 506 yield chunkheader(l)
507 507 yield meta
508 508 yield delta
509 509 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
510 510 # do nothing with basenode, it is implicitly the previous one in HG10
511 511 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
512 512
513 513 class cg2packer(cg1packer):
514 514 version = '02'
515 515 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
516 516
517 517 def group(self, nodelist, revlog, lookup, units=None, reorder=None):
518 518 if (revlog._generaldelta and reorder is not True):
519 519 reorder = False
520 520 return super(cg2packer, self).group(nodelist, revlog, lookup,
521 521 units=units, reorder=reorder)
522 522
523 523 def deltaparent(self, revlog, rev, p1, p2, prev):
524 524 dp = revlog.deltaparent(rev)
525 525 # avoid storing full revisions; pick prev in those cases
526 526 # also pick prev when we can't be sure remote has dp
527 527 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
528 528 return prev
529 529 return dp
530 530
531 531 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
532 532 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
533 533
534 534 packermap = {'01': (cg1packer, cg1unpacker),
535 535 '02': (cg2packer, cg2unpacker)}
536 536
537 537 def _changegroupinfo(repo, nodes, source):
538 538 if repo.ui.verbose or source == 'bundle':
539 539 repo.ui.status(_("%d changesets found\n") % len(nodes))
540 540 if repo.ui.debugflag:
541 541 repo.ui.debug("list of changesets:\n")
542 542 for node in nodes:
543 543 repo.ui.debug("%s\n" % hex(node))
544 544
545 545 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
546 546 repo = repo.unfiltered()
547 547 commonrevs = outgoing.common
548 548 csets = outgoing.missing
549 549 heads = outgoing.missingheads
550 550 # We go through the fast path if we get told to, or if all (unfiltered
551 551 # heads have been requested (since we then know there all linkrevs will
552 552 # be pulled by the client).
553 553 heads.sort()
554 554 fastpathlinkrev = fastpath or (
555 555 repo.filtername is None and heads == sorted(repo.heads()))
556 556
557 557 repo.hook('preoutgoing', throw=True, source=source)
558 558 _changegroupinfo(repo, csets, source)
559 559 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
560 560
561 561 def getsubset(repo, outgoing, bundler, source, fastpath=False, version='01'):
562 562 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
563 563 return packermap[version][1](util.chunkbuffer(gengroup), 'UN')
564 564
565 565 def changegroupsubset(repo, roots, heads, source, version='01'):
566 566 """Compute a changegroup consisting of all the nodes that are
567 567 descendants of any of the roots and ancestors of any of the heads.
568 568 Return a chunkbuffer object whose read() method will return
569 569 successive changegroup chunks.
570 570
571 571 It is fairly complex as determining which filenodes and which
572 572 manifest nodes need to be included for the changeset to be complete
573 573 is non-trivial.
574 574
575 575 Another wrinkle is doing the reverse, figuring out which changeset in
576 576 the changegroup a particular filenode or manifestnode belongs to.
577 577 """
578 578 cl = repo.changelog
579 579 if not roots:
580 580 roots = [nullid]
581 581 # TODO: remove call to nodesbetween.
582 582 csets, roots, heads = cl.nodesbetween(roots, heads)
583 583 discbases = []
584 584 for n in roots:
585 585 discbases.extend([p for p in cl.parents(n) if p != nullid])
586 586 outgoing = discovery.outgoing(cl, discbases, heads)
587 587 bundler = packermap[version][0](repo)
588 588 return getsubset(repo, outgoing, bundler, source, version=version)
589 589
590 590 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
591 591 version='01'):
592 592 """Like getbundle, but taking a discovery.outgoing as an argument.
593 593
594 594 This is only implemented for local repos and reuses potentially
595 595 precomputed sets in outgoing. Returns a raw changegroup generator."""
596 596 if not outgoing.missing:
597 597 return None
598 598 bundler = packermap[version][0](repo, bundlecaps)
599 599 return getsubsetraw(repo, outgoing, bundler, source)
600 600
601 601 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None):
602 602 """Like getbundle, but taking a discovery.outgoing as an argument.
603 603
604 604 This is only implemented for local repos and reuses potentially
605 605 precomputed sets in outgoing."""
606 606 if not outgoing.missing:
607 607 return None
608 608 bundler = cg1packer(repo, bundlecaps)
609 609 return getsubset(repo, outgoing, bundler, source)
610 610
611 611 def _computeoutgoing(repo, heads, common):
612 612 """Computes which revs are outgoing given a set of common
613 613 and a set of heads.
614 614
615 615 This is a separate function so extensions can have access to
616 616 the logic.
617 617
618 618 Returns a discovery.outgoing object.
619 619 """
620 620 cl = repo.changelog
621 621 if common:
622 622 hasnode = cl.hasnode
623 623 common = [n for n in common if hasnode(n)]
624 624 else:
625 625 common = [nullid]
626 626 if not heads:
627 627 heads = cl.heads()
628 628 return discovery.outgoing(cl, common, heads)
629 629
630 630 def getchangegroupraw(repo, source, heads=None, common=None, bundlecaps=None,
631 631 version='01'):
632 632 """Like changegroupsubset, but returns the set difference between the
633 633 ancestors of heads and the ancestors common.
634 634
635 635 If heads is None, use the local heads. If common is None, use [nullid].
636 636
637 637 If version is None, use a version '1' changegroup.
638 638
639 639 The nodes in common might not all be known locally due to the way the
640 640 current discovery protocol works. Returns a raw changegroup generator.
641 641 """
642 642 outgoing = _computeoutgoing(repo, heads, common)
643 643 return getlocalchangegroupraw(repo, source, outgoing, bundlecaps=bundlecaps,
644 644 version=version)
645 645
646 646 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None):
647 647 """Like changegroupsubset, but returns the set difference between the
648 648 ancestors of heads and the ancestors common.
649 649
650 650 If heads is None, use the local heads. If common is None, use [nullid].
651 651
652 652 The nodes in common might not all be known locally due to the way the
653 653 current discovery protocol works.
654 654 """
655 655 outgoing = _computeoutgoing(repo, heads, common)
656 656 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps)
657 657
658 658 def changegroup(repo, basenodes, source):
659 659 # to avoid a race we use changegroupsubset() (issue1320)
660 660 return changegroupsubset(repo, basenodes, repo.heads(), source)
661 661
662 662 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
663 663 revisions = 0
664 664 files = 0
665 665 while True:
666 666 chunkdata = source.filelogheader()
667 667 if not chunkdata:
668 668 break
669 669 f = chunkdata["filename"]
670 670 repo.ui.debug("adding %s revisions\n" % f)
671 671 pr()
672 672 fl = repo.file(f)
673 673 o = len(fl)
674 674 try:
675 675 if not fl.addgroup(source, revmap, trp):
676 676 raise util.Abort(_("received file revlog group is empty"))
677 677 except error.CensoredBaseError, e:
678 678 raise util.Abort(_("received delta base is censored: %s") % e)
679 679 revisions += len(fl) - o
680 680 files += 1
681 681 if f in needfiles:
682 682 needs = needfiles[f]
683 683 for new in xrange(o, len(fl)):
684 684 n = fl.node(new)
685 685 if n in needs:
686 686 needs.remove(n)
687 687 else:
688 688 raise util.Abort(
689 689 _("received spurious file revlog entry"))
690 690 if not needs:
691 691 del needfiles[f]
692 692 repo.ui.progress(_('files'), None)
693 693
694 694 for f, needs in needfiles.iteritems():
695 695 fl = repo.file(f)
696 696 for n in needs:
697 697 try:
698 698 fl.rev(n)
699 699 except error.LookupError:
700 700 raise util.Abort(
701 701 _('missing file data for %s:%s - run hg verify') %
702 702 (f, hex(n)))
703 703
704 704 return revisions, files
705 705
706 706 def addchangegroup(repo, source, srctype, url, emptyok=False,
707 707 targetphase=phases.draft):
708 708 """Add the changegroup returned by source.read() to this repo.
709 709 srctype is a string like 'push', 'pull', or 'unbundle'. url is
710 710 the URL of the repo where this changegroup is coming from.
711 711
712 712 Return an integer summarizing the change to this repo:
713 713 - nothing changed or no source: 0
714 714 - more heads than before: 1+added heads (2..n)
715 715 - fewer heads than before: -1-removed heads (-2..-n)
716 716 - number of heads stays the same: 1
717 717 """
718 718 repo = repo.unfiltered()
719 719 def csmap(x):
720 720 repo.ui.debug("add changeset %s\n" % short(x))
721 721 return len(cl)
722 722
723 723 def revmap(x):
724 724 return cl.rev(x)
725 725
726 726 if not source:
727 727 return 0
728 728
729 729 changesets = files = revisions = 0
730 730 efiles = set()
731 731
732 732 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
733 733 # The transaction could have been created before and already carries source
734 734 # information. In this case we use the top level data. We overwrite the
735 735 # argument because we need to use the top level value (if they exist) in
736 736 # this function.
737 737 srctype = tr.hookargs.setdefault('source', srctype)
738 738 url = tr.hookargs.setdefault('url', url)
739 739
740 740 # write changelog data to temp files so concurrent readers will not see
741 741 # inconsistent view
742 742 cl = repo.changelog
743 743 cl.delayupdate(tr)
744 744 oldheads = cl.heads()
745 745 try:
746 746 repo.hook('prechangegroup', throw=True, **tr.hookargs)
747 747
748 748 trp = weakref.proxy(tr)
749 749 # pull off the changeset group
750 750 repo.ui.status(_("adding changesets\n"))
751 751 clstart = len(cl)
752 752 class prog(object):
753 753 step = _('changesets')
754 754 count = 1
755 755 ui = repo.ui
756 756 total = None
757 757 def __call__(repo):
758 758 repo.ui.progress(repo.step, repo.count, unit=_('chunks'),
759 759 total=repo.total)
760 760 repo.count += 1
761 761 pr = prog()
762 762 source.callback = pr
763 763
764 764 source.changelogheader()
765 765 srccontent = cl.addgroup(source, csmap, trp)
766 766 if not (srccontent or emptyok):
767 767 raise util.Abort(_("received changelog group is empty"))
768 768 clend = len(cl)
769 769 changesets = clend - clstart
770 770 for c in xrange(clstart, clend):
771 771 efiles.update(repo[c].files())
772 772 efiles = len(efiles)
773 773 repo.ui.progress(_('changesets'), None)
774 774
775 775 # pull off the manifest group
776 776 repo.ui.status(_("adding manifests\n"))
777 777 pr.step = _('manifests')
778 778 pr.count = 1
779 779 pr.total = changesets # manifests <= changesets
780 780 # no need to check for empty manifest group here:
781 781 # if the result of the merge of 1 and 2 is the same in 3 and 4,
782 782 # no new manifest will be created and the manifest group will
783 783 # be empty during the pull
784 784 source.manifestheader()
785 785 repo.manifest.addgroup(source, revmap, trp)
786 786 repo.ui.progress(_('manifests'), None)
787 787
788 788 needfiles = {}
789 789 if repo.ui.configbool('server', 'validate', default=False):
790 790 # validate incoming csets have their manifests
791 791 for cset in xrange(clstart, clend):
792 792 mfest = repo.changelog.read(repo.changelog.node(cset))[0]
793 793 mfest = repo.manifest.readdelta(mfest)
794 794 # store file nodes we must see
795 795 for f, n in mfest.iteritems():
796 796 needfiles.setdefault(f, set()).add(n)
797 797
798 798 # process the files
799 799 repo.ui.status(_("adding file changes\n"))
800 800 pr.step = _('files')
801 801 pr.count = 1
802 802 pr.total = efiles
803 803 source.callback = None
804 804
805 805 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
806 806 needfiles)
807 807 revisions += newrevs
808 808 files += newfiles
809 809
810 810 dh = 0
811 811 if oldheads:
812 812 heads = cl.heads()
813 813 dh = len(heads) - len(oldheads)
814 814 for h in heads:
815 815 if h not in oldheads and repo[h].closesbranch():
816 816 dh -= 1
817 817 htext = ""
818 818 if dh:
819 819 htext = _(" (%+d heads)") % dh
820 820
821 821 repo.ui.status(_("added %d changesets"
822 822 " with %d changes to %d files%s\n")
823 823 % (changesets, revisions, files, htext))
824 824 repo.invalidatevolatilesets()
825 825
826 826 if changesets > 0:
827 827 p = lambda: tr.writepending() and repo.root or ""
828 828 if 'node' not in tr.hookargs:
829 829 tr.hookargs['node'] = hex(cl.node(clstart))
830 830 hookargs = dict(tr.hookargs)
831 831 else:
832 832 hookargs = dict(tr.hookargs)
833 833 hookargs['node'] = hex(cl.node(clstart))
834 834 repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
835 835
836 836 added = [cl.node(r) for r in xrange(clstart, clend)]
837 837 publishing = repo.ui.configbool('phases', 'publish', True)
838 838 if srctype in ('push', 'serve'):
839 839 # Old servers can not push the boundary themselves.
840 840 # New servers won't push the boundary if changeset already
841 841 # exists locally as secret
842 842 #
843 843 # We should not use added here but the list of all change in
844 844 # the bundle
845 845 if publishing:
846 846 phases.advanceboundary(repo, tr, phases.public, srccontent)
847 847 else:
848 848 # Those changesets have been pushed from the outside, their
849 849 # phases are going to be pushed alongside. Therefor
850 850 # `targetphase` is ignored.
851 851 phases.advanceboundary(repo, tr, phases.draft, srccontent)
852 852 phases.retractboundary(repo, tr, phases.draft, added)
853 853 elif srctype != 'strip':
854 854 # publishing only alter behavior during push
855 855 #
856 856 # strip should not touch boundary at all
857 857 phases.retractboundary(repo, tr, targetphase, added)
858 858
859 859 if changesets > 0:
860 860 if srctype != 'strip':
861 861 # During strip, branchcache is invalid but coming call to
862 862 # `destroyed` will repair it.
863 863 # In other case we can safely update cache on disk.
864 864 branchmap.updatecache(repo.filtered('served'))
865 865
866 866 def runhooks():
867 867 # These hooks run when the lock releases, not when the
868 868 # transaction closes. So it's possible for the changelog
869 869 # to have changed since we last saw it.
870 870 if clstart >= len(repo):
871 871 return
872 872
873 873 # forcefully update the on-disk branch cache
874 874 repo.ui.debug("updating the branch cache\n")
875 875 repo.hook("changegroup", **hookargs)
876 876
877 877 for n in added:
878 878 args = hookargs.copy()
879 879 args['node'] = hex(n)
880 880 repo.hook("incoming", **args)
881 881
882 882 newheads = [h for h in repo.heads() if h not in oldheads]
883 883 repo.ui.log("incoming",
884 884 "%s incoming changes - new heads: %s\n",
885 885 len(added),
886 886 ', '.join([hex(c[:6]) for c in newheads]))
887 887
888 888 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
889 889 lambda tr: repo._afterlock(runhooks))
890 890
891 891 tr.close()
892 892
893 893 finally:
894 894 tr.release()
895 895 repo.ui.flush()
896 896 # never return 0 here:
897 897 if dh < 0:
898 898 return dh - 1
899 899 else:
900 900 return dh + 1
General Comments 0
You need to be logged in to leave comments. Login now