##// END OF EJS Templates
writebundle: add a compression argument for the bundle2 case...
Pierre-Yves David -
r26424:60825fbe default
parent child Browse files
Show More
@@ -1,896 +1,899 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise util.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise util.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 bundletypes = {
83 83 "": ("", None), # only when using unbundle on ssh and old http servers
84 84 # since the unification ssh accepts a header but there
85 85 # is no capability signaling it.
86 86 "HG20": (), # special-cased below
87 87 "HG10UN": ("HG10UN", None),
88 88 "HG10BZ": ("HG10", 'BZ'),
89 89 "HG10GZ": ("HG10GZ", 'GZ'),
90 90 }
91 91
92 92 # hgweb uses this list to communicate its preferred type
93 93 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
94 94
95 def writebundle(ui, cg, filename, bundletype, vfs=None):
95 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
96 96 """Write a bundle file and return its filename.
97 97
98 98 Existing files will not be overwritten.
99 99 If no filename is specified, a temporary file is created.
100 100 bz2 compression can be turned off.
101 101 The bundle file will be deleted in case of errors.
102 102 """
103 103
104 104 fh = None
105 105 cleanup = None
106 106 try:
107 107 if filename:
108 108 if vfs:
109 109 fh = vfs.open(filename, "wb")
110 110 else:
111 111 fh = open(filename, "wb")
112 112 else:
113 113 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
114 114 fh = os.fdopen(fd, "wb")
115 115 cleanup = filename
116 116
117 117 if bundletype == "HG20":
118 118 from . import bundle2
119 119 bundle = bundle2.bundle20(ui)
120 bundle.setcompression(compression)
120 121 part = bundle.newpart('changegroup', data=cg.getchunks())
121 122 part.addparam('version', cg.version)
122 123 z = util.compressors[None]()
123 124 chunkiter = bundle.getchunks()
124 125 else:
126 # compression argument is only for the bundle2 case
127 assert compression is None
125 128 if cg.version != '01':
126 129 raise util.Abort(_('old bundle types only supports v1 '
127 130 'changegroups'))
128 131 header, comp = bundletypes[bundletype]
129 132 fh.write(header)
130 133 if comp not in util.compressors:
131 134 raise util.Abort(_('unknown stream compression type: %s')
132 135 % comp)
133 136 z = util.compressors[comp]()
134 137 chunkiter = cg.getchunks()
135 138
136 139 # parse the changegroup data, otherwise we will block
137 140 # in case of sshrepo because we don't know the end of the stream
138 141
139 142 # an empty chunkgroup is the end of the changegroup
140 143 # a changegroup has at least 2 chunkgroups (changelog and manifest).
141 144 # after that, an empty chunkgroup is the end of the changegroup
142 145 for chunk in chunkiter:
143 146 fh.write(z.compress(chunk))
144 147 fh.write(z.flush())
145 148 cleanup = None
146 149 return filename
147 150 finally:
148 151 if fh is not None:
149 152 fh.close()
150 153 if cleanup is not None:
151 154 if filename and vfs:
152 155 vfs.unlink(cleanup)
153 156 else:
154 157 os.unlink(cleanup)
155 158
156 159 class cg1unpacker(object):
157 160 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
158 161 deltaheadersize = struct.calcsize(deltaheader)
159 162 version = '01'
160 163 def __init__(self, fh, alg):
161 164 if alg == 'UN':
162 165 alg = None # get more modern without breaking too much
163 166 if not alg in util.decompressors:
164 167 raise util.Abort(_('unknown stream compression type: %s')
165 168 % alg)
166 169 if alg == 'BZ':
167 170 alg = '_truncatedBZ'
168 171 self._stream = util.decompressors[alg](fh)
169 172 self._type = alg
170 173 self.callback = None
171 174 def compressed(self):
172 175 return self._type is not None
173 176 def read(self, l):
174 177 return self._stream.read(l)
175 178 def seek(self, pos):
176 179 return self._stream.seek(pos)
177 180 def tell(self):
178 181 return self._stream.tell()
179 182 def close(self):
180 183 return self._stream.close()
181 184
182 185 def chunklength(self):
183 186 d = readexactly(self._stream, 4)
184 187 l = struct.unpack(">l", d)[0]
185 188 if l <= 4:
186 189 if l:
187 190 raise util.Abort(_("invalid chunk length %d") % l)
188 191 return 0
189 192 if self.callback:
190 193 self.callback()
191 194 return l - 4
192 195
193 196 def changelogheader(self):
194 197 """v10 does not have a changelog header chunk"""
195 198 return {}
196 199
197 200 def manifestheader(self):
198 201 """v10 does not have a manifest header chunk"""
199 202 return {}
200 203
201 204 def filelogheader(self):
202 205 """return the header of the filelogs chunk, v10 only has the filename"""
203 206 l = self.chunklength()
204 207 if not l:
205 208 return {}
206 209 fname = readexactly(self._stream, l)
207 210 return {'filename': fname}
208 211
209 212 def _deltaheader(self, headertuple, prevnode):
210 213 node, p1, p2, cs = headertuple
211 214 if prevnode is None:
212 215 deltabase = p1
213 216 else:
214 217 deltabase = prevnode
215 218 return node, p1, p2, deltabase, cs
216 219
217 220 def deltachunk(self, prevnode):
218 221 l = self.chunklength()
219 222 if not l:
220 223 return {}
221 224 headerdata = readexactly(self._stream, self.deltaheadersize)
222 225 header = struct.unpack(self.deltaheader, headerdata)
223 226 delta = readexactly(self._stream, l - self.deltaheadersize)
224 227 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
225 228 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
226 229 'deltabase': deltabase, 'delta': delta}
227 230
228 231 def getchunks(self):
229 232 """returns all the chunks contains in the bundle
230 233
231 234 Used when you need to forward the binary stream to a file or another
232 235 network API. To do so, it parse the changegroup data, otherwise it will
233 236 block in case of sshrepo because it don't know the end of the stream.
234 237 """
235 238 # an empty chunkgroup is the end of the changegroup
236 239 # a changegroup has at least 2 chunkgroups (changelog and manifest).
237 240 # after that, an empty chunkgroup is the end of the changegroup
238 241 empty = False
239 242 count = 0
240 243 while not empty or count <= 2:
241 244 empty = True
242 245 count += 1
243 246 while True:
244 247 chunk = getchunk(self)
245 248 if not chunk:
246 249 break
247 250 empty = False
248 251 yield chunkheader(len(chunk))
249 252 pos = 0
250 253 while pos < len(chunk):
251 254 next = pos + 2**20
252 255 yield chunk[pos:next]
253 256 pos = next
254 257 yield closechunk()
255 258
256 259 class cg2unpacker(cg1unpacker):
257 260 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
258 261 deltaheadersize = struct.calcsize(deltaheader)
259 262 version = '02'
260 263
261 264 def _deltaheader(self, headertuple, prevnode):
262 265 node, p1, p2, deltabase, cs = headertuple
263 266 return node, p1, p2, deltabase, cs
264 267
265 268 class headerlessfixup(object):
266 269 def __init__(self, fh, h):
267 270 self._h = h
268 271 self._fh = fh
269 272 def read(self, n):
270 273 if self._h:
271 274 d, self._h = self._h[:n], self._h[n:]
272 275 if len(d) < n:
273 276 d += readexactly(self._fh, n - len(d))
274 277 return d
275 278 return readexactly(self._fh, n)
276 279
277 280 class cg1packer(object):
278 281 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
279 282 version = '01'
280 283 def __init__(self, repo, bundlecaps=None):
281 284 """Given a source repo, construct a bundler.
282 285
283 286 bundlecaps is optional and can be used to specify the set of
284 287 capabilities which can be used to build the bundle.
285 288 """
286 289 # Set of capabilities we can use to build the bundle.
287 290 if bundlecaps is None:
288 291 bundlecaps = set()
289 292 self._bundlecaps = bundlecaps
290 293 # experimental config: bundle.reorder
291 294 reorder = repo.ui.config('bundle', 'reorder', 'auto')
292 295 if reorder == 'auto':
293 296 reorder = None
294 297 else:
295 298 reorder = util.parsebool(reorder)
296 299 self._repo = repo
297 300 self._reorder = reorder
298 301 self._progress = repo.ui.progress
299 302 if self._repo.ui.verbose and not self._repo.ui.debugflag:
300 303 self._verbosenote = self._repo.ui.note
301 304 else:
302 305 self._verbosenote = lambda s: None
303 306
304 307 def close(self):
305 308 return closechunk()
306 309
307 310 def fileheader(self, fname):
308 311 return chunkheader(len(fname)) + fname
309 312
310 313 def group(self, nodelist, revlog, lookup, units=None):
311 314 """Calculate a delta group, yielding a sequence of changegroup chunks
312 315 (strings).
313 316
314 317 Given a list of changeset revs, return a set of deltas and
315 318 metadata corresponding to nodes. The first delta is
316 319 first parent(nodelist[0]) -> nodelist[0], the receiver is
317 320 guaranteed to have this parent as it has all history before
318 321 these changesets. In the case firstparent is nullrev the
319 322 changegroup starts with a full revision.
320 323
321 324 If units is not None, progress detail will be generated, units specifies
322 325 the type of revlog that is touched (changelog, manifest, etc.).
323 326 """
324 327 # if we don't have any revisions touched by these changesets, bail
325 328 if len(nodelist) == 0:
326 329 yield self.close()
327 330 return
328 331
329 332 # for generaldelta revlogs, we linearize the revs; this will both be
330 333 # much quicker and generate a much smaller bundle
331 334 if (revlog._generaldelta and self._reorder is None) or self._reorder:
332 335 dag = dagutil.revlogdag(revlog)
333 336 revs = set(revlog.rev(n) for n in nodelist)
334 337 revs = dag.linearize(revs)
335 338 else:
336 339 revs = sorted([revlog.rev(n) for n in nodelist])
337 340
338 341 # add the parent of the first rev
339 342 p = revlog.parentrevs(revs[0])[0]
340 343 revs.insert(0, p)
341 344
342 345 # build deltas
343 346 total = len(revs) - 1
344 347 msgbundling = _('bundling')
345 348 for r in xrange(len(revs) - 1):
346 349 if units is not None:
347 350 self._progress(msgbundling, r + 1, unit=units, total=total)
348 351 prev, curr = revs[r], revs[r + 1]
349 352 linknode = lookup(revlog.node(curr))
350 353 for c in self.revchunk(revlog, curr, prev, linknode):
351 354 yield c
352 355
353 356 if units is not None:
354 357 self._progress(msgbundling, None)
355 358 yield self.close()
356 359
357 360 # filter any nodes that claim to be part of the known set
358 361 def prune(self, revlog, missing, commonrevs):
359 362 rr, rl = revlog.rev, revlog.linkrev
360 363 return [n for n in missing if rl(rr(n)) not in commonrevs]
361 364
362 365 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
363 366 '''yield a sequence of changegroup chunks (strings)'''
364 367 repo = self._repo
365 368 cl = repo.changelog
366 369 ml = repo.manifest
367 370
368 371 clrevorder = {}
369 372 mfs = {} # needed manifests
370 373 fnodes = {} # needed file nodes
371 374 changedfiles = set()
372 375
373 376 # Callback for the changelog, used to collect changed files and manifest
374 377 # nodes.
375 378 # Returns the linkrev node (identity in the changelog case).
376 379 def lookupcl(x):
377 380 c = cl.read(x)
378 381 clrevorder[x] = len(clrevorder)
379 382 changedfiles.update(c[3])
380 383 # record the first changeset introducing this manifest version
381 384 mfs.setdefault(c[0], x)
382 385 return x
383 386
384 387 self._verbosenote(_('uncompressed size of bundle content:\n'))
385 388 size = 0
386 389 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
387 390 size += len(chunk)
388 391 yield chunk
389 392 self._verbosenote(_('%8.i (changelog)\n') % size)
390 393
391 394 # We need to make sure that the linkrev in the changegroup refers to
392 395 # the first changeset that introduced the manifest or file revision.
393 396 # The fastpath is usually safer than the slowpath, because the filelogs
394 397 # are walked in revlog order.
395 398 #
396 399 # When taking the slowpath with reorder=None and the manifest revlog
397 400 # uses generaldelta, the manifest may be walked in the "wrong" order.
398 401 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
399 402 # cc0ff93d0c0c).
400 403 #
401 404 # When taking the fastpath, we are only vulnerable to reordering
402 405 # of the changelog itself. The changelog never uses generaldelta, so
403 406 # it is only reordered when reorder=True. To handle this case, we
404 407 # simply take the slowpath, which already has the 'clrevorder' logic.
405 408 # This was also fixed in cc0ff93d0c0c.
406 409 fastpathlinkrev = fastpathlinkrev and not self._reorder
407 410 # Callback for the manifest, used to collect linkrevs for filelog
408 411 # revisions.
409 412 # Returns the linkrev node (collected in lookupcl).
410 413 def lookupmf(x):
411 414 clnode = mfs[x]
412 415 if not fastpathlinkrev:
413 416 mdata = ml.readfast(x)
414 417 for f, n in mdata.iteritems():
415 418 if f in changedfiles:
416 419 # record the first changeset introducing this filelog
417 420 # version
418 421 fclnodes = fnodes.setdefault(f, {})
419 422 fclnode = fclnodes.setdefault(n, clnode)
420 423 if clrevorder[clnode] < clrevorder[fclnode]:
421 424 fclnodes[n] = clnode
422 425 return clnode
423 426
424 427 mfnodes = self.prune(ml, mfs, commonrevs)
425 428 size = 0
426 429 for chunk in self.group(mfnodes, ml, lookupmf, units=_('manifests')):
427 430 size += len(chunk)
428 431 yield chunk
429 432 self._verbosenote(_('%8.i (manifests)\n') % size)
430 433
431 434 mfs.clear()
432 435 clrevs = set(cl.rev(x) for x in clnodes)
433 436
434 437 def linknodes(filerevlog, fname):
435 438 if fastpathlinkrev:
436 439 llr = filerevlog.linkrev
437 440 def genfilenodes():
438 441 for r in filerevlog:
439 442 linkrev = llr(r)
440 443 if linkrev in clrevs:
441 444 yield filerevlog.node(r), cl.node(linkrev)
442 445 return dict(genfilenodes())
443 446 return fnodes.get(fname, {})
444 447
445 448 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
446 449 source):
447 450 yield chunk
448 451
449 452 yield self.close()
450 453
451 454 if clnodes:
452 455 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
453 456
454 457 # The 'source' parameter is useful for extensions
455 458 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
456 459 repo = self._repo
457 460 progress = self._progress
458 461 msgbundling = _('bundling')
459 462
460 463 total = len(changedfiles)
461 464 # for progress output
462 465 msgfiles = _('files')
463 466 for i, fname in enumerate(sorted(changedfiles)):
464 467 filerevlog = repo.file(fname)
465 468 if not filerevlog:
466 469 raise util.Abort(_("empty or missing revlog for %s") % fname)
467 470
468 471 linkrevnodes = linknodes(filerevlog, fname)
469 472 # Lookup for filenodes, we collected the linkrev nodes above in the
470 473 # fastpath case and with lookupmf in the slowpath case.
471 474 def lookupfilelog(x):
472 475 return linkrevnodes[x]
473 476
474 477 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
475 478 if filenodes:
476 479 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
477 480 total=total)
478 481 h = self.fileheader(fname)
479 482 size = len(h)
480 483 yield h
481 484 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
482 485 size += len(chunk)
483 486 yield chunk
484 487 self._verbosenote(_('%8.i %s\n') % (size, fname))
485 488 progress(msgbundling, None)
486 489
487 490 def deltaparent(self, revlog, rev, p1, p2, prev):
488 491 return prev
489 492
490 493 def revchunk(self, revlog, rev, prev, linknode):
491 494 node = revlog.node(rev)
492 495 p1, p2 = revlog.parentrevs(rev)
493 496 base = self.deltaparent(revlog, rev, p1, p2, prev)
494 497
495 498 prefix = ''
496 499 if revlog.iscensored(base) or revlog.iscensored(rev):
497 500 try:
498 501 delta = revlog.revision(node)
499 502 except error.CensoredNodeError as e:
500 503 delta = e.tombstone
501 504 if base == nullrev:
502 505 prefix = mdiff.trivialdiffheader(len(delta))
503 506 else:
504 507 baselen = revlog.rawsize(base)
505 508 prefix = mdiff.replacediffheader(baselen, len(delta))
506 509 elif base == nullrev:
507 510 delta = revlog.revision(node)
508 511 prefix = mdiff.trivialdiffheader(len(delta))
509 512 else:
510 513 delta = revlog.revdiff(base, rev)
511 514 p1n, p2n = revlog.parents(node)
512 515 basenode = revlog.node(base)
513 516 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
514 517 meta += prefix
515 518 l = len(meta) + len(delta)
516 519 yield chunkheader(l)
517 520 yield meta
518 521 yield delta
519 522 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
520 523 # do nothing with basenode, it is implicitly the previous one in HG10
521 524 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
522 525
523 526 class cg2packer(cg1packer):
524 527 version = '02'
525 528 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
526 529
527 530 def __init__(self, repo, bundlecaps=None):
528 531 super(cg2packer, self).__init__(repo, bundlecaps)
529 532 if self._reorder is None:
530 533 # Since generaldelta is directly supported by cg2, reordering
531 534 # generally doesn't help, so we disable it by default (treating
532 535 # bundle.reorder=auto just like bundle.reorder=False).
533 536 self._reorder = False
534 537
535 538 def deltaparent(self, revlog, rev, p1, p2, prev):
536 539 dp = revlog.deltaparent(rev)
537 540 # avoid storing full revisions; pick prev in those cases
538 541 # also pick prev when we can't be sure remote has dp
539 542 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
540 543 return prev
541 544 return dp
542 545
543 546 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
544 547 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
545 548
546 549 packermap = {'01': (cg1packer, cg1unpacker),
547 550 '02': (cg2packer, cg2unpacker)}
548 551
549 552 def _changegroupinfo(repo, nodes, source):
550 553 if repo.ui.verbose or source == 'bundle':
551 554 repo.ui.status(_("%d changesets found\n") % len(nodes))
552 555 if repo.ui.debugflag:
553 556 repo.ui.debug("list of changesets:\n")
554 557 for node in nodes:
555 558 repo.ui.debug("%s\n" % hex(node))
556 559
557 560 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
558 561 repo = repo.unfiltered()
559 562 commonrevs = outgoing.common
560 563 csets = outgoing.missing
561 564 heads = outgoing.missingheads
562 565 # We go through the fast path if we get told to, or if all (unfiltered
563 566 # heads have been requested (since we then know there all linkrevs will
564 567 # be pulled by the client).
565 568 heads.sort()
566 569 fastpathlinkrev = fastpath or (
567 570 repo.filtername is None and heads == sorted(repo.heads()))
568 571
569 572 repo.hook('preoutgoing', throw=True, source=source)
570 573 _changegroupinfo(repo, csets, source)
571 574 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
572 575
573 576 def getsubset(repo, outgoing, bundler, source, fastpath=False, version='01'):
574 577 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
575 578 return packermap[version][1](util.chunkbuffer(gengroup), None)
576 579
577 580 def changegroupsubset(repo, roots, heads, source, version='01'):
578 581 """Compute a changegroup consisting of all the nodes that are
579 582 descendants of any of the roots and ancestors of any of the heads.
580 583 Return a chunkbuffer object whose read() method will return
581 584 successive changegroup chunks.
582 585
583 586 It is fairly complex as determining which filenodes and which
584 587 manifest nodes need to be included for the changeset to be complete
585 588 is non-trivial.
586 589
587 590 Another wrinkle is doing the reverse, figuring out which changeset in
588 591 the changegroup a particular filenode or manifestnode belongs to.
589 592 """
590 593 cl = repo.changelog
591 594 if not roots:
592 595 roots = [nullid]
593 596 discbases = []
594 597 for n in roots:
595 598 discbases.extend([p for p in cl.parents(n) if p != nullid])
596 599 # TODO: remove call to nodesbetween.
597 600 csets, roots, heads = cl.nodesbetween(roots, heads)
598 601 included = set(csets)
599 602 discbases = [n for n in discbases if n not in included]
600 603 outgoing = discovery.outgoing(cl, discbases, heads)
601 604 bundler = packermap[version][0](repo)
602 605 return getsubset(repo, outgoing, bundler, source, version=version)
603 606
604 607 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
605 608 version='01'):
606 609 """Like getbundle, but taking a discovery.outgoing as an argument.
607 610
608 611 This is only implemented for local repos and reuses potentially
609 612 precomputed sets in outgoing. Returns a raw changegroup generator."""
610 613 if not outgoing.missing:
611 614 return None
612 615 bundler = packermap[version][0](repo, bundlecaps)
613 616 return getsubsetraw(repo, outgoing, bundler, source)
614 617
615 618 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None):
616 619 """Like getbundle, but taking a discovery.outgoing as an argument.
617 620
618 621 This is only implemented for local repos and reuses potentially
619 622 precomputed sets in outgoing."""
620 623 if not outgoing.missing:
621 624 return None
622 625 bundler = cg1packer(repo, bundlecaps)
623 626 return getsubset(repo, outgoing, bundler, source)
624 627
625 628 def computeoutgoing(repo, heads, common):
626 629 """Computes which revs are outgoing given a set of common
627 630 and a set of heads.
628 631
629 632 This is a separate function so extensions can have access to
630 633 the logic.
631 634
632 635 Returns a discovery.outgoing object.
633 636 """
634 637 cl = repo.changelog
635 638 if common:
636 639 hasnode = cl.hasnode
637 640 common = [n for n in common if hasnode(n)]
638 641 else:
639 642 common = [nullid]
640 643 if not heads:
641 644 heads = cl.heads()
642 645 return discovery.outgoing(cl, common, heads)
643 646
644 647 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None):
645 648 """Like changegroupsubset, but returns the set difference between the
646 649 ancestors of heads and the ancestors common.
647 650
648 651 If heads is None, use the local heads. If common is None, use [nullid].
649 652
650 653 The nodes in common might not all be known locally due to the way the
651 654 current discovery protocol works.
652 655 """
653 656 outgoing = computeoutgoing(repo, heads, common)
654 657 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps)
655 658
656 659 def changegroup(repo, basenodes, source):
657 660 # to avoid a race we use changegroupsubset() (issue1320)
658 661 return changegroupsubset(repo, basenodes, repo.heads(), source)
659 662
660 663 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
661 664 revisions = 0
662 665 files = 0
663 666 while True:
664 667 chunkdata = source.filelogheader()
665 668 if not chunkdata:
666 669 break
667 670 f = chunkdata["filename"]
668 671 repo.ui.debug("adding %s revisions\n" % f)
669 672 pr()
670 673 fl = repo.file(f)
671 674 o = len(fl)
672 675 try:
673 676 if not fl.addgroup(source, revmap, trp):
674 677 raise util.Abort(_("received file revlog group is empty"))
675 678 except error.CensoredBaseError as e:
676 679 raise util.Abort(_("received delta base is censored: %s") % e)
677 680 revisions += len(fl) - o
678 681 files += 1
679 682 if f in needfiles:
680 683 needs = needfiles[f]
681 684 for new in xrange(o, len(fl)):
682 685 n = fl.node(new)
683 686 if n in needs:
684 687 needs.remove(n)
685 688 else:
686 689 raise util.Abort(
687 690 _("received spurious file revlog entry"))
688 691 if not needs:
689 692 del needfiles[f]
690 693 repo.ui.progress(_('files'), None)
691 694
692 695 for f, needs in needfiles.iteritems():
693 696 fl = repo.file(f)
694 697 for n in needs:
695 698 try:
696 699 fl.rev(n)
697 700 except error.LookupError:
698 701 raise util.Abort(
699 702 _('missing file data for %s:%s - run hg verify') %
700 703 (f, hex(n)))
701 704
702 705 return revisions, files
703 706
704 707 def addchangegroup(repo, source, srctype, url, emptyok=False,
705 708 targetphase=phases.draft, expectedtotal=None):
706 709 """Add the changegroup returned by source.read() to this repo.
707 710 srctype is a string like 'push', 'pull', or 'unbundle'. url is
708 711 the URL of the repo where this changegroup is coming from.
709 712
710 713 Return an integer summarizing the change to this repo:
711 714 - nothing changed or no source: 0
712 715 - more heads than before: 1+added heads (2..n)
713 716 - fewer heads than before: -1-removed heads (-2..-n)
714 717 - number of heads stays the same: 1
715 718 """
716 719 repo = repo.unfiltered()
717 720 def csmap(x):
718 721 repo.ui.debug("add changeset %s\n" % short(x))
719 722 return len(cl)
720 723
721 724 def revmap(x):
722 725 return cl.rev(x)
723 726
724 727 if not source:
725 728 return 0
726 729
727 730 changesets = files = revisions = 0
728 731
729 732 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
730 733 # The transaction could have been created before and already carries source
731 734 # information. In this case we use the top level data. We overwrite the
732 735 # argument because we need to use the top level value (if they exist) in
733 736 # this function.
734 737 srctype = tr.hookargs.setdefault('source', srctype)
735 738 url = tr.hookargs.setdefault('url', url)
736 739
737 740 # write changelog data to temp files so concurrent readers will not see
738 741 # inconsistent view
739 742 cl = repo.changelog
740 743 cl.delayupdate(tr)
741 744 oldheads = cl.heads()
742 745 try:
743 746 repo.hook('prechangegroup', throw=True, **tr.hookargs)
744 747
745 748 trp = weakref.proxy(tr)
746 749 # pull off the changeset group
747 750 repo.ui.status(_("adding changesets\n"))
748 751 clstart = len(cl)
749 752 class prog(object):
750 753 def __init__(self, step, total):
751 754 self._step = step
752 755 self._total = total
753 756 self._count = 1
754 757 def __call__(self):
755 758 repo.ui.progress(self._step, self._count, unit=_('chunks'),
756 759 total=self._total)
757 760 self._count += 1
758 761 source.callback = prog(_('changesets'), expectedtotal)
759 762
760 763 efiles = set()
761 764 def onchangelog(cl, node):
762 765 efiles.update(cl.read(node)[3])
763 766
764 767 source.changelogheader()
765 768 srccontent = cl.addgroup(source, csmap, trp,
766 769 addrevisioncb=onchangelog)
767 770 efiles = len(efiles)
768 771
769 772 if not (srccontent or emptyok):
770 773 raise util.Abort(_("received changelog group is empty"))
771 774 clend = len(cl)
772 775 changesets = clend - clstart
773 776 repo.ui.progress(_('changesets'), None)
774 777
775 778 # pull off the manifest group
776 779 repo.ui.status(_("adding manifests\n"))
777 780 # manifests <= changesets
778 781 source.callback = prog(_('manifests'), changesets)
779 782 # no need to check for empty manifest group here:
780 783 # if the result of the merge of 1 and 2 is the same in 3 and 4,
781 784 # no new manifest will be created and the manifest group will
782 785 # be empty during the pull
783 786 source.manifestheader()
784 787 repo.manifest.addgroup(source, revmap, trp)
785 788 repo.ui.progress(_('manifests'), None)
786 789
787 790 needfiles = {}
788 791 if repo.ui.configbool('server', 'validate', default=False):
789 792 # validate incoming csets have their manifests
790 793 for cset in xrange(clstart, clend):
791 794 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
792 795 mfest = repo.manifest.readdelta(mfnode)
793 796 # store file nodes we must see
794 797 for f, n in mfest.iteritems():
795 798 needfiles.setdefault(f, set()).add(n)
796 799
797 800 # process the files
798 801 repo.ui.status(_("adding file changes\n"))
799 802 source.callback = None
800 803 pr = prog(_('files'), efiles)
801 804 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
802 805 needfiles)
803 806 revisions += newrevs
804 807 files += newfiles
805 808
806 809 dh = 0
807 810 if oldheads:
808 811 heads = cl.heads()
809 812 dh = len(heads) - len(oldheads)
810 813 for h in heads:
811 814 if h not in oldheads and repo[h].closesbranch():
812 815 dh -= 1
813 816 htext = ""
814 817 if dh:
815 818 htext = _(" (%+d heads)") % dh
816 819
817 820 repo.ui.status(_("added %d changesets"
818 821 " with %d changes to %d files%s\n")
819 822 % (changesets, revisions, files, htext))
820 823 repo.invalidatevolatilesets()
821 824
822 825 if changesets > 0:
823 826 p = lambda: tr.writepending() and repo.root or ""
824 827 if 'node' not in tr.hookargs:
825 828 tr.hookargs['node'] = hex(cl.node(clstart))
826 829 hookargs = dict(tr.hookargs)
827 830 else:
828 831 hookargs = dict(tr.hookargs)
829 832 hookargs['node'] = hex(cl.node(clstart))
830 833 repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
831 834
832 835 added = [cl.node(r) for r in xrange(clstart, clend)]
833 836 publishing = repo.publishing()
834 837 if srctype in ('push', 'serve'):
835 838 # Old servers can not push the boundary themselves.
836 839 # New servers won't push the boundary if changeset already
837 840 # exists locally as secret
838 841 #
839 842 # We should not use added here but the list of all change in
840 843 # the bundle
841 844 if publishing:
842 845 phases.advanceboundary(repo, tr, phases.public, srccontent)
843 846 else:
844 847 # Those changesets have been pushed from the outside, their
845 848 # phases are going to be pushed alongside. Therefor
846 849 # `targetphase` is ignored.
847 850 phases.advanceboundary(repo, tr, phases.draft, srccontent)
848 851 phases.retractboundary(repo, tr, phases.draft, added)
849 852 elif srctype != 'strip':
850 853 # publishing only alter behavior during push
851 854 #
852 855 # strip should not touch boundary at all
853 856 phases.retractboundary(repo, tr, targetphase, added)
854 857
855 858 if changesets > 0:
856 859 if srctype != 'strip':
857 860 # During strip, branchcache is invalid but coming call to
858 861 # `destroyed` will repair it.
859 862 # In other case we can safely update cache on disk.
860 863 branchmap.updatecache(repo.filtered('served'))
861 864
862 865 def runhooks():
863 866 # These hooks run when the lock releases, not when the
864 867 # transaction closes. So it's possible for the changelog
865 868 # to have changed since we last saw it.
866 869 if clstart >= len(repo):
867 870 return
868 871
869 872 # forcefully update the on-disk branch cache
870 873 repo.ui.debug("updating the branch cache\n")
871 874 repo.hook("changegroup", **hookargs)
872 875
873 876 for n in added:
874 877 args = hookargs.copy()
875 878 args['node'] = hex(n)
876 879 repo.hook("incoming", **args)
877 880
878 881 newheads = [h for h in repo.heads() if h not in oldheads]
879 882 repo.ui.log("incoming",
880 883 "%s incoming changes - new heads: %s\n",
881 884 len(added),
882 885 ', '.join([hex(c[:6]) for c in newheads]))
883 886
884 887 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
885 888 lambda tr: repo._afterlock(runhooks))
886 889
887 890 tr.close()
888 891
889 892 finally:
890 893 tr.release()
891 894 repo.ui.flush()
892 895 # never return 0 here:
893 896 if dh < 0:
894 897 return dh - 1
895 898 else:
896 899 return dh + 1
General Comments 0
You need to be logged in to leave comments. Login now