##// END OF EJS Templates
changegroup: add version argument to getchangegroup...
Pierre-Yves David -
r26509:83d82fbe default
parent child Browse files
Show More
@@ -1,900 +1,902 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise util.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise util.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 bundletypes = {
83 83 "": ("", None), # only when using unbundle on ssh and old http servers
84 84 # since the unification ssh accepts a header but there
85 85 # is no capability signaling it.
86 86 "HG20": (), # special-cased below
87 87 "HG10UN": ("HG10UN", None),
88 88 "HG10BZ": ("HG10", 'BZ'),
89 89 "HG10GZ": ("HG10GZ", 'GZ'),
90 90 }
91 91
92 92 # hgweb uses this list to communicate its preferred type
93 93 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
94 94
95 95 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
96 96 """Write a bundle file and return its filename.
97 97
98 98 Existing files will not be overwritten.
99 99 If no filename is specified, a temporary file is created.
100 100 bz2 compression can be turned off.
101 101 The bundle file will be deleted in case of errors.
102 102 """
103 103
104 104 fh = None
105 105 cleanup = None
106 106 try:
107 107 if filename:
108 108 if vfs:
109 109 fh = vfs.open(filename, "wb")
110 110 else:
111 111 fh = open(filename, "wb")
112 112 else:
113 113 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
114 114 fh = os.fdopen(fd, "wb")
115 115 cleanup = filename
116 116
117 117 if bundletype == "HG20":
118 118 from . import bundle2
119 119 bundle = bundle2.bundle20(ui)
120 120 bundle.setcompression(compression)
121 121 part = bundle.newpart('changegroup', data=cg.getchunks())
122 122 part.addparam('version', cg.version)
123 123 z = util.compressors[None]()
124 124 chunkiter = bundle.getchunks()
125 125 else:
126 126 # compression argument is only for the bundle2 case
127 127 assert compression is None
128 128 if cg.version != '01':
129 129 raise util.Abort(_('old bundle types only supports v1 '
130 130 'changegroups'))
131 131 header, comp = bundletypes[bundletype]
132 132 fh.write(header)
133 133 if comp not in util.compressors:
134 134 raise util.Abort(_('unknown stream compression type: %s')
135 135 % comp)
136 136 z = util.compressors[comp]()
137 137 chunkiter = cg.getchunks()
138 138
139 139 # parse the changegroup data, otherwise we will block
140 140 # in case of sshrepo because we don't know the end of the stream
141 141
142 142 # an empty chunkgroup is the end of the changegroup
143 143 # a changegroup has at least 2 chunkgroups (changelog and manifest).
144 144 # after that, an empty chunkgroup is the end of the changegroup
145 145 for chunk in chunkiter:
146 146 fh.write(z.compress(chunk))
147 147 fh.write(z.flush())
148 148 cleanup = None
149 149 return filename
150 150 finally:
151 151 if fh is not None:
152 152 fh.close()
153 153 if cleanup is not None:
154 154 if filename and vfs:
155 155 vfs.unlink(cleanup)
156 156 else:
157 157 os.unlink(cleanup)
158 158
159 159 class cg1unpacker(object):
160 160 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
161 161 deltaheadersize = struct.calcsize(deltaheader)
162 162 version = '01'
163 163 def __init__(self, fh, alg):
164 164 if alg == 'UN':
165 165 alg = None # get more modern without breaking too much
166 166 if not alg in util.decompressors:
167 167 raise util.Abort(_('unknown stream compression type: %s')
168 168 % alg)
169 169 if alg == 'BZ':
170 170 alg = '_truncatedBZ'
171 171 self._stream = util.decompressors[alg](fh)
172 172 self._type = alg
173 173 self.callback = None
174 174 def compressed(self):
175 175 return self._type is not None
176 176 def read(self, l):
177 177 return self._stream.read(l)
178 178 def seek(self, pos):
179 179 return self._stream.seek(pos)
180 180 def tell(self):
181 181 return self._stream.tell()
182 182 def close(self):
183 183 return self._stream.close()
184 184
185 185 def chunklength(self):
186 186 d = readexactly(self._stream, 4)
187 187 l = struct.unpack(">l", d)[0]
188 188 if l <= 4:
189 189 if l:
190 190 raise util.Abort(_("invalid chunk length %d") % l)
191 191 return 0
192 192 if self.callback:
193 193 self.callback()
194 194 return l - 4
195 195
196 196 def changelogheader(self):
197 197 """v10 does not have a changelog header chunk"""
198 198 return {}
199 199
200 200 def manifestheader(self):
201 201 """v10 does not have a manifest header chunk"""
202 202 return {}
203 203
204 204 def filelogheader(self):
205 205 """return the header of the filelogs chunk, v10 only has the filename"""
206 206 l = self.chunklength()
207 207 if not l:
208 208 return {}
209 209 fname = readexactly(self._stream, l)
210 210 return {'filename': fname}
211 211
212 212 def _deltaheader(self, headertuple, prevnode):
213 213 node, p1, p2, cs = headertuple
214 214 if prevnode is None:
215 215 deltabase = p1
216 216 else:
217 217 deltabase = prevnode
218 218 return node, p1, p2, deltabase, cs
219 219
220 220 def deltachunk(self, prevnode):
221 221 l = self.chunklength()
222 222 if not l:
223 223 return {}
224 224 headerdata = readexactly(self._stream, self.deltaheadersize)
225 225 header = struct.unpack(self.deltaheader, headerdata)
226 226 delta = readexactly(self._stream, l - self.deltaheadersize)
227 227 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
228 228 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
229 229 'deltabase': deltabase, 'delta': delta}
230 230
231 231 def getchunks(self):
232 232 """returns all the chunks contains in the bundle
233 233
234 234 Used when you need to forward the binary stream to a file or another
235 235 network API. To do so, it parse the changegroup data, otherwise it will
236 236 block in case of sshrepo because it don't know the end of the stream.
237 237 """
238 238 # an empty chunkgroup is the end of the changegroup
239 239 # a changegroup has at least 2 chunkgroups (changelog and manifest).
240 240 # after that, an empty chunkgroup is the end of the changegroup
241 241 empty = False
242 242 count = 0
243 243 while not empty or count <= 2:
244 244 empty = True
245 245 count += 1
246 246 while True:
247 247 chunk = getchunk(self)
248 248 if not chunk:
249 249 break
250 250 empty = False
251 251 yield chunkheader(len(chunk))
252 252 pos = 0
253 253 while pos < len(chunk):
254 254 next = pos + 2**20
255 255 yield chunk[pos:next]
256 256 pos = next
257 257 yield closechunk()
258 258
259 259 class cg2unpacker(cg1unpacker):
260 260 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
261 261 deltaheadersize = struct.calcsize(deltaheader)
262 262 version = '02'
263 263
264 264 def _deltaheader(self, headertuple, prevnode):
265 265 node, p1, p2, deltabase, cs = headertuple
266 266 return node, p1, p2, deltabase, cs
267 267
268 268 class headerlessfixup(object):
269 269 def __init__(self, fh, h):
270 270 self._h = h
271 271 self._fh = fh
272 272 def read(self, n):
273 273 if self._h:
274 274 d, self._h = self._h[:n], self._h[n:]
275 275 if len(d) < n:
276 276 d += readexactly(self._fh, n - len(d))
277 277 return d
278 278 return readexactly(self._fh, n)
279 279
280 280 class cg1packer(object):
281 281 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
282 282 version = '01'
283 283 def __init__(self, repo, bundlecaps=None):
284 284 """Given a source repo, construct a bundler.
285 285
286 286 bundlecaps is optional and can be used to specify the set of
287 287 capabilities which can be used to build the bundle.
288 288 """
289 289 # Set of capabilities we can use to build the bundle.
290 290 if bundlecaps is None:
291 291 bundlecaps = set()
292 292 self._bundlecaps = bundlecaps
293 293 # experimental config: bundle.reorder
294 294 reorder = repo.ui.config('bundle', 'reorder', 'auto')
295 295 if reorder == 'auto':
296 296 reorder = None
297 297 else:
298 298 reorder = util.parsebool(reorder)
299 299 self._repo = repo
300 300 self._reorder = reorder
301 301 self._progress = repo.ui.progress
302 302 if self._repo.ui.verbose and not self._repo.ui.debugflag:
303 303 self._verbosenote = self._repo.ui.note
304 304 else:
305 305 self._verbosenote = lambda s: None
306 306
307 307 def close(self):
308 308 return closechunk()
309 309
310 310 def fileheader(self, fname):
311 311 return chunkheader(len(fname)) + fname
312 312
313 313 def group(self, nodelist, revlog, lookup, units=None):
314 314 """Calculate a delta group, yielding a sequence of changegroup chunks
315 315 (strings).
316 316
317 317 Given a list of changeset revs, return a set of deltas and
318 318 metadata corresponding to nodes. The first delta is
319 319 first parent(nodelist[0]) -> nodelist[0], the receiver is
320 320 guaranteed to have this parent as it has all history before
321 321 these changesets. In the case firstparent is nullrev the
322 322 changegroup starts with a full revision.
323 323
324 324 If units is not None, progress detail will be generated, units specifies
325 325 the type of revlog that is touched (changelog, manifest, etc.).
326 326 """
327 327 # if we don't have any revisions touched by these changesets, bail
328 328 if len(nodelist) == 0:
329 329 yield self.close()
330 330 return
331 331
332 332 # for generaldelta revlogs, we linearize the revs; this will both be
333 333 # much quicker and generate a much smaller bundle
334 334 if (revlog._generaldelta and self._reorder is None) or self._reorder:
335 335 dag = dagutil.revlogdag(revlog)
336 336 revs = set(revlog.rev(n) for n in nodelist)
337 337 revs = dag.linearize(revs)
338 338 else:
339 339 revs = sorted([revlog.rev(n) for n in nodelist])
340 340
341 341 # add the parent of the first rev
342 342 p = revlog.parentrevs(revs[0])[0]
343 343 revs.insert(0, p)
344 344
345 345 # build deltas
346 346 total = len(revs) - 1
347 347 msgbundling = _('bundling')
348 348 for r in xrange(len(revs) - 1):
349 349 if units is not None:
350 350 self._progress(msgbundling, r + 1, unit=units, total=total)
351 351 prev, curr = revs[r], revs[r + 1]
352 352 linknode = lookup(revlog.node(curr))
353 353 for c in self.revchunk(revlog, curr, prev, linknode):
354 354 yield c
355 355
356 356 if units is not None:
357 357 self._progress(msgbundling, None)
358 358 yield self.close()
359 359
360 360 # filter any nodes that claim to be part of the known set
361 361 def prune(self, revlog, missing, commonrevs):
362 362 rr, rl = revlog.rev, revlog.linkrev
363 363 return [n for n in missing if rl(rr(n)) not in commonrevs]
364 364
365 365 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
366 366 '''yield a sequence of changegroup chunks (strings)'''
367 367 repo = self._repo
368 368 cl = repo.changelog
369 369 ml = repo.manifest
370 370
371 371 clrevorder = {}
372 372 mfs = {} # needed manifests
373 373 fnodes = {} # needed file nodes
374 374 changedfiles = set()
375 375
376 376 # Callback for the changelog, used to collect changed files and manifest
377 377 # nodes.
378 378 # Returns the linkrev node (identity in the changelog case).
379 379 def lookupcl(x):
380 380 c = cl.read(x)
381 381 clrevorder[x] = len(clrevorder)
382 382 changedfiles.update(c[3])
383 383 # record the first changeset introducing this manifest version
384 384 mfs.setdefault(c[0], x)
385 385 return x
386 386
387 387 self._verbosenote(_('uncompressed size of bundle content:\n'))
388 388 size = 0
389 389 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
390 390 size += len(chunk)
391 391 yield chunk
392 392 self._verbosenote(_('%8.i (changelog)\n') % size)
393 393
394 394 # We need to make sure that the linkrev in the changegroup refers to
395 395 # the first changeset that introduced the manifest or file revision.
396 396 # The fastpath is usually safer than the slowpath, because the filelogs
397 397 # are walked in revlog order.
398 398 #
399 399 # When taking the slowpath with reorder=None and the manifest revlog
400 400 # uses generaldelta, the manifest may be walked in the "wrong" order.
401 401 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
402 402 # cc0ff93d0c0c).
403 403 #
404 404 # When taking the fastpath, we are only vulnerable to reordering
405 405 # of the changelog itself. The changelog never uses generaldelta, so
406 406 # it is only reordered when reorder=True. To handle this case, we
407 407 # simply take the slowpath, which already has the 'clrevorder' logic.
408 408 # This was also fixed in cc0ff93d0c0c.
409 409 fastpathlinkrev = fastpathlinkrev and not self._reorder
410 410 # Callback for the manifest, used to collect linkrevs for filelog
411 411 # revisions.
412 412 # Returns the linkrev node (collected in lookupcl).
413 413 def lookupmf(x):
414 414 clnode = mfs[x]
415 415 if not fastpathlinkrev:
416 416 mdata = ml.readfast(x)
417 417 for f, n in mdata.iteritems():
418 418 if f in changedfiles:
419 419 # record the first changeset introducing this filelog
420 420 # version
421 421 fclnodes = fnodes.setdefault(f, {})
422 422 fclnode = fclnodes.setdefault(n, clnode)
423 423 if clrevorder[clnode] < clrevorder[fclnode]:
424 424 fclnodes[n] = clnode
425 425 return clnode
426 426
427 427 mfnodes = self.prune(ml, mfs, commonrevs)
428 428 size = 0
429 429 for chunk in self.group(mfnodes, ml, lookupmf, units=_('manifests')):
430 430 size += len(chunk)
431 431 yield chunk
432 432 self._verbosenote(_('%8.i (manifests)\n') % size)
433 433
434 434 mfs.clear()
435 435 clrevs = set(cl.rev(x) for x in clnodes)
436 436
437 437 def linknodes(filerevlog, fname):
438 438 if fastpathlinkrev:
439 439 llr = filerevlog.linkrev
440 440 def genfilenodes():
441 441 for r in filerevlog:
442 442 linkrev = llr(r)
443 443 if linkrev in clrevs:
444 444 yield filerevlog.node(r), cl.node(linkrev)
445 445 return dict(genfilenodes())
446 446 return fnodes.get(fname, {})
447 447
448 448 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
449 449 source):
450 450 yield chunk
451 451
452 452 yield self.close()
453 453
454 454 if clnodes:
455 455 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
456 456
457 457 # The 'source' parameter is useful for extensions
458 458 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
459 459 repo = self._repo
460 460 progress = self._progress
461 461 msgbundling = _('bundling')
462 462
463 463 total = len(changedfiles)
464 464 # for progress output
465 465 msgfiles = _('files')
466 466 for i, fname in enumerate(sorted(changedfiles)):
467 467 filerevlog = repo.file(fname)
468 468 if not filerevlog:
469 469 raise util.Abort(_("empty or missing revlog for %s") % fname)
470 470
471 471 linkrevnodes = linknodes(filerevlog, fname)
472 472 # Lookup for filenodes, we collected the linkrev nodes above in the
473 473 # fastpath case and with lookupmf in the slowpath case.
474 474 def lookupfilelog(x):
475 475 return linkrevnodes[x]
476 476
477 477 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
478 478 if filenodes:
479 479 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
480 480 total=total)
481 481 h = self.fileheader(fname)
482 482 size = len(h)
483 483 yield h
484 484 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
485 485 size += len(chunk)
486 486 yield chunk
487 487 self._verbosenote(_('%8.i %s\n') % (size, fname))
488 488 progress(msgbundling, None)
489 489
490 490 def deltaparent(self, revlog, rev, p1, p2, prev):
491 491 return prev
492 492
493 493 def revchunk(self, revlog, rev, prev, linknode):
494 494 node = revlog.node(rev)
495 495 p1, p2 = revlog.parentrevs(rev)
496 496 base = self.deltaparent(revlog, rev, p1, p2, prev)
497 497
498 498 prefix = ''
499 499 if revlog.iscensored(base) or revlog.iscensored(rev):
500 500 try:
501 501 delta = revlog.revision(node)
502 502 except error.CensoredNodeError as e:
503 503 delta = e.tombstone
504 504 if base == nullrev:
505 505 prefix = mdiff.trivialdiffheader(len(delta))
506 506 else:
507 507 baselen = revlog.rawsize(base)
508 508 prefix = mdiff.replacediffheader(baselen, len(delta))
509 509 elif base == nullrev:
510 510 delta = revlog.revision(node)
511 511 prefix = mdiff.trivialdiffheader(len(delta))
512 512 else:
513 513 delta = revlog.revdiff(base, rev)
514 514 p1n, p2n = revlog.parents(node)
515 515 basenode = revlog.node(base)
516 516 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
517 517 meta += prefix
518 518 l = len(meta) + len(delta)
519 519 yield chunkheader(l)
520 520 yield meta
521 521 yield delta
522 522 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
523 523 # do nothing with basenode, it is implicitly the previous one in HG10
524 524 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
525 525
526 526 class cg2packer(cg1packer):
527 527 version = '02'
528 528 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
529 529
530 530 def __init__(self, repo, bundlecaps=None):
531 531 super(cg2packer, self).__init__(repo, bundlecaps)
532 532 if self._reorder is None:
533 533 # Since generaldelta is directly supported by cg2, reordering
534 534 # generally doesn't help, so we disable it by default (treating
535 535 # bundle.reorder=auto just like bundle.reorder=False).
536 536 self._reorder = False
537 537
538 538 def deltaparent(self, revlog, rev, p1, p2, prev):
539 539 dp = revlog.deltaparent(rev)
540 540 # avoid storing full revisions; pick prev in those cases
541 541 # also pick prev when we can't be sure remote has dp
542 542 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
543 543 return prev
544 544 return dp
545 545
546 546 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
547 547 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
548 548
549 549 packermap = {'01': (cg1packer, cg1unpacker),
550 550 '02': (cg2packer, cg2unpacker)}
551 551
552 552 def _changegroupinfo(repo, nodes, source):
553 553 if repo.ui.verbose or source == 'bundle':
554 554 repo.ui.status(_("%d changesets found\n") % len(nodes))
555 555 if repo.ui.debugflag:
556 556 repo.ui.debug("list of changesets:\n")
557 557 for node in nodes:
558 558 repo.ui.debug("%s\n" % hex(node))
559 559
560 560 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
561 561 repo = repo.unfiltered()
562 562 commonrevs = outgoing.common
563 563 csets = outgoing.missing
564 564 heads = outgoing.missingheads
565 565 # We go through the fast path if we get told to, or if all (unfiltered
566 566 # heads have been requested (since we then know there all linkrevs will
567 567 # be pulled by the client).
568 568 heads.sort()
569 569 fastpathlinkrev = fastpath or (
570 570 repo.filtername is None and heads == sorted(repo.heads()))
571 571
572 572 repo.hook('preoutgoing', throw=True, source=source)
573 573 _changegroupinfo(repo, csets, source)
574 574 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
575 575
576 576 def getsubset(repo, outgoing, bundler, source, fastpath=False, version='01'):
577 577 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
578 578 return packermap[version][1](util.chunkbuffer(gengroup), None)
579 579
580 580 def changegroupsubset(repo, roots, heads, source, version='01'):
581 581 """Compute a changegroup consisting of all the nodes that are
582 582 descendants of any of the roots and ancestors of any of the heads.
583 583 Return a chunkbuffer object whose read() method will return
584 584 successive changegroup chunks.
585 585
586 586 It is fairly complex as determining which filenodes and which
587 587 manifest nodes need to be included for the changeset to be complete
588 588 is non-trivial.
589 589
590 590 Another wrinkle is doing the reverse, figuring out which changeset in
591 591 the changegroup a particular filenode or manifestnode belongs to.
592 592 """
593 593 cl = repo.changelog
594 594 if not roots:
595 595 roots = [nullid]
596 596 discbases = []
597 597 for n in roots:
598 598 discbases.extend([p for p in cl.parents(n) if p != nullid])
599 599 # TODO: remove call to nodesbetween.
600 600 csets, roots, heads = cl.nodesbetween(roots, heads)
601 601 included = set(csets)
602 602 discbases = [n for n in discbases if n not in included]
603 603 outgoing = discovery.outgoing(cl, discbases, heads)
604 604 bundler = packermap[version][0](repo)
605 605 return getsubset(repo, outgoing, bundler, source, version=version)
606 606
607 607 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
608 608 version='01'):
609 609 """Like getbundle, but taking a discovery.outgoing as an argument.
610 610
611 611 This is only implemented for local repos and reuses potentially
612 612 precomputed sets in outgoing. Returns a raw changegroup generator."""
613 613 if not outgoing.missing:
614 614 return None
615 615 bundler = packermap[version][0](repo, bundlecaps)
616 616 return getsubsetraw(repo, outgoing, bundler, source)
617 617
618 618 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
619 619 version='01'):
620 620 """Like getbundle, but taking a discovery.outgoing as an argument.
621 621
622 622 This is only implemented for local repos and reuses potentially
623 623 precomputed sets in outgoing."""
624 624 if not outgoing.missing:
625 625 return None
626 626 bundler = packermap[version][0](repo, bundlecaps)
627 627 return getsubset(repo, outgoing, bundler, source)
628 628
629 629 def computeoutgoing(repo, heads, common):
630 630 """Computes which revs are outgoing given a set of common
631 631 and a set of heads.
632 632
633 633 This is a separate function so extensions can have access to
634 634 the logic.
635 635
636 636 Returns a discovery.outgoing object.
637 637 """
638 638 cl = repo.changelog
639 639 if common:
640 640 hasnode = cl.hasnode
641 641 common = [n for n in common if hasnode(n)]
642 642 else:
643 643 common = [nullid]
644 644 if not heads:
645 645 heads = cl.heads()
646 646 return discovery.outgoing(cl, common, heads)
647 647
648 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None):
648 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
649 version='01'):
649 650 """Like changegroupsubset, but returns the set difference between the
650 651 ancestors of heads and the ancestors common.
651 652
652 653 If heads is None, use the local heads. If common is None, use [nullid].
653 654
654 655 The nodes in common might not all be known locally due to the way the
655 656 current discovery protocol works.
656 657 """
657 658 outgoing = computeoutgoing(repo, heads, common)
658 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps)
659 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
660 version=version)
659 661
660 662 def changegroup(repo, basenodes, source):
661 663 # to avoid a race we use changegroupsubset() (issue1320)
662 664 return changegroupsubset(repo, basenodes, repo.heads(), source)
663 665
664 666 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
665 667 revisions = 0
666 668 files = 0
667 669 while True:
668 670 chunkdata = source.filelogheader()
669 671 if not chunkdata:
670 672 break
671 673 f = chunkdata["filename"]
672 674 repo.ui.debug("adding %s revisions\n" % f)
673 675 pr()
674 676 fl = repo.file(f)
675 677 o = len(fl)
676 678 try:
677 679 if not fl.addgroup(source, revmap, trp):
678 680 raise util.Abort(_("received file revlog group is empty"))
679 681 except error.CensoredBaseError as e:
680 682 raise util.Abort(_("received delta base is censored: %s") % e)
681 683 revisions += len(fl) - o
682 684 files += 1
683 685 if f in needfiles:
684 686 needs = needfiles[f]
685 687 for new in xrange(o, len(fl)):
686 688 n = fl.node(new)
687 689 if n in needs:
688 690 needs.remove(n)
689 691 else:
690 692 raise util.Abort(
691 693 _("received spurious file revlog entry"))
692 694 if not needs:
693 695 del needfiles[f]
694 696 repo.ui.progress(_('files'), None)
695 697
696 698 for f, needs in needfiles.iteritems():
697 699 fl = repo.file(f)
698 700 for n in needs:
699 701 try:
700 702 fl.rev(n)
701 703 except error.LookupError:
702 704 raise util.Abort(
703 705 _('missing file data for %s:%s - run hg verify') %
704 706 (f, hex(n)))
705 707
706 708 return revisions, files
707 709
708 710 def addchangegroup(repo, source, srctype, url, emptyok=False,
709 711 targetphase=phases.draft, expectedtotal=None):
710 712 """Add the changegroup returned by source.read() to this repo.
711 713 srctype is a string like 'push', 'pull', or 'unbundle'. url is
712 714 the URL of the repo where this changegroup is coming from.
713 715
714 716 Return an integer summarizing the change to this repo:
715 717 - nothing changed or no source: 0
716 718 - more heads than before: 1+added heads (2..n)
717 719 - fewer heads than before: -1-removed heads (-2..-n)
718 720 - number of heads stays the same: 1
719 721 """
720 722 repo = repo.unfiltered()
721 723 def csmap(x):
722 724 repo.ui.debug("add changeset %s\n" % short(x))
723 725 return len(cl)
724 726
725 727 def revmap(x):
726 728 return cl.rev(x)
727 729
728 730 if not source:
729 731 return 0
730 732
731 733 changesets = files = revisions = 0
732 734
733 735 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
734 736 # The transaction could have been created before and already carries source
735 737 # information. In this case we use the top level data. We overwrite the
736 738 # argument because we need to use the top level value (if they exist) in
737 739 # this function.
738 740 srctype = tr.hookargs.setdefault('source', srctype)
739 741 url = tr.hookargs.setdefault('url', url)
740 742
741 743 # write changelog data to temp files so concurrent readers will not see
742 744 # inconsistent view
743 745 cl = repo.changelog
744 746 cl.delayupdate(tr)
745 747 oldheads = cl.heads()
746 748 try:
747 749 repo.hook('prechangegroup', throw=True, **tr.hookargs)
748 750
749 751 trp = weakref.proxy(tr)
750 752 # pull off the changeset group
751 753 repo.ui.status(_("adding changesets\n"))
752 754 clstart = len(cl)
753 755 class prog(object):
754 756 def __init__(self, step, total):
755 757 self._step = step
756 758 self._total = total
757 759 self._count = 1
758 760 def __call__(self):
759 761 repo.ui.progress(self._step, self._count, unit=_('chunks'),
760 762 total=self._total)
761 763 self._count += 1
762 764 source.callback = prog(_('changesets'), expectedtotal)
763 765
764 766 efiles = set()
765 767 def onchangelog(cl, node):
766 768 efiles.update(cl.read(node)[3])
767 769
768 770 source.changelogheader()
769 771 srccontent = cl.addgroup(source, csmap, trp,
770 772 addrevisioncb=onchangelog)
771 773 efiles = len(efiles)
772 774
773 775 if not (srccontent or emptyok):
774 776 raise util.Abort(_("received changelog group is empty"))
775 777 clend = len(cl)
776 778 changesets = clend - clstart
777 779 repo.ui.progress(_('changesets'), None)
778 780
779 781 # pull off the manifest group
780 782 repo.ui.status(_("adding manifests\n"))
781 783 # manifests <= changesets
782 784 source.callback = prog(_('manifests'), changesets)
783 785 # no need to check for empty manifest group here:
784 786 # if the result of the merge of 1 and 2 is the same in 3 and 4,
785 787 # no new manifest will be created and the manifest group will
786 788 # be empty during the pull
787 789 source.manifestheader()
788 790 repo.manifest.addgroup(source, revmap, trp)
789 791 repo.ui.progress(_('manifests'), None)
790 792
791 793 needfiles = {}
792 794 if repo.ui.configbool('server', 'validate', default=False):
793 795 # validate incoming csets have their manifests
794 796 for cset in xrange(clstart, clend):
795 797 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
796 798 mfest = repo.manifest.readdelta(mfnode)
797 799 # store file nodes we must see
798 800 for f, n in mfest.iteritems():
799 801 needfiles.setdefault(f, set()).add(n)
800 802
801 803 # process the files
802 804 repo.ui.status(_("adding file changes\n"))
803 805 source.callback = None
804 806 pr = prog(_('files'), efiles)
805 807 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
806 808 needfiles)
807 809 revisions += newrevs
808 810 files += newfiles
809 811
810 812 dh = 0
811 813 if oldheads:
812 814 heads = cl.heads()
813 815 dh = len(heads) - len(oldheads)
814 816 for h in heads:
815 817 if h not in oldheads and repo[h].closesbranch():
816 818 dh -= 1
817 819 htext = ""
818 820 if dh:
819 821 htext = _(" (%+d heads)") % dh
820 822
821 823 repo.ui.status(_("added %d changesets"
822 824 " with %d changes to %d files%s\n")
823 825 % (changesets, revisions, files, htext))
824 826 repo.invalidatevolatilesets()
825 827
826 828 if changesets > 0:
827 829 p = lambda: tr.writepending() and repo.root or ""
828 830 if 'node' not in tr.hookargs:
829 831 tr.hookargs['node'] = hex(cl.node(clstart))
830 832 hookargs = dict(tr.hookargs)
831 833 else:
832 834 hookargs = dict(tr.hookargs)
833 835 hookargs['node'] = hex(cl.node(clstart))
834 836 repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
835 837
836 838 added = [cl.node(r) for r in xrange(clstart, clend)]
837 839 publishing = repo.publishing()
838 840 if srctype in ('push', 'serve'):
839 841 # Old servers can not push the boundary themselves.
840 842 # New servers won't push the boundary if changeset already
841 843 # exists locally as secret
842 844 #
843 845 # We should not use added here but the list of all change in
844 846 # the bundle
845 847 if publishing:
846 848 phases.advanceboundary(repo, tr, phases.public, srccontent)
847 849 else:
848 850 # Those changesets have been pushed from the outside, their
849 851 # phases are going to be pushed alongside. Therefor
850 852 # `targetphase` is ignored.
851 853 phases.advanceboundary(repo, tr, phases.draft, srccontent)
852 854 phases.retractboundary(repo, tr, phases.draft, added)
853 855 elif srctype != 'strip':
854 856 # publishing only alter behavior during push
855 857 #
856 858 # strip should not touch boundary at all
857 859 phases.retractboundary(repo, tr, targetphase, added)
858 860
859 861 if changesets > 0:
860 862 if srctype != 'strip':
861 863 # During strip, branchcache is invalid but coming call to
862 864 # `destroyed` will repair it.
863 865 # In other case we can safely update cache on disk.
864 866 branchmap.updatecache(repo.filtered('served'))
865 867
866 868 def runhooks():
867 869 # These hooks run when the lock releases, not when the
868 870 # transaction closes. So it's possible for the changelog
869 871 # to have changed since we last saw it.
870 872 if clstart >= len(repo):
871 873 return
872 874
873 875 # forcefully update the on-disk branch cache
874 876 repo.ui.debug("updating the branch cache\n")
875 877 repo.hook("changegroup", **hookargs)
876 878
877 879 for n in added:
878 880 args = hookargs.copy()
879 881 args['node'] = hex(n)
880 882 repo.hook("incoming", **args)
881 883
882 884 newheads = [h for h in repo.heads() if h not in oldheads]
883 885 repo.ui.log("incoming",
884 886 "%s incoming changes - new heads: %s\n",
885 887 len(added),
886 888 ', '.join([hex(c[:6]) for c in newheads]))
887 889
888 890 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
889 891 lambda tr: repo._afterlock(runhooks))
890 892
891 893 tr.close()
892 894
893 895 finally:
894 896 tr.release()
895 897 repo.ui.flush()
896 898 # never return 0 here:
897 899 if dh < 0:
898 900 return dh - 1
899 901 else:
900 902 return dh + 1
General Comments 0
You need to be logged in to leave comments. Login now