##// END OF EJS Templates
changegroup: close progress in same function as it's started...
Martin von Zweigbergk -
r24901:e9edd537 default
parent child Browse files
Show More
@@ -1,900 +1,896
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import weakref
9 9 from i18n import _
10 10 from node import nullrev, nullid, hex, short
11 11 import mdiff, util, dagutil
12 12 import struct, os, bz2, zlib, tempfile
13 13 import discovery, error, phases, branchmap
14 14
15 15 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
16 16 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
17 17
18 18 def readexactly(stream, n):
19 19 '''read n bytes from stream.read and abort if less was available'''
20 20 s = stream.read(n)
21 21 if len(s) < n:
22 22 raise util.Abort(_("stream ended unexpectedly"
23 23 " (got %d bytes, expected %d)")
24 24 % (len(s), n))
25 25 return s
26 26
27 27 def getchunk(stream):
28 28 """return the next chunk from stream as a string"""
29 29 d = readexactly(stream, 4)
30 30 l = struct.unpack(">l", d)[0]
31 31 if l <= 4:
32 32 if l:
33 33 raise util.Abort(_("invalid chunk length %d") % l)
34 34 return ""
35 35 return readexactly(stream, l - 4)
36 36
37 37 def chunkheader(length):
38 38 """return a changegroup chunk header (string)"""
39 39 return struct.pack(">l", length + 4)
40 40
41 41 def closechunk():
42 42 """return a changegroup chunk header (string) for a zero-length chunk"""
43 43 return struct.pack(">l", 0)
44 44
45 45 def combineresults(results):
46 46 """logic to combine 0 or more addchangegroup results into one"""
47 47 changedheads = 0
48 48 result = 1
49 49 for ret in results:
50 50 # If any changegroup result is 0, return 0
51 51 if ret == 0:
52 52 result = 0
53 53 break
54 54 if ret < -1:
55 55 changedheads += ret + 1
56 56 elif ret > 1:
57 57 changedheads += ret - 1
58 58 if changedheads > 0:
59 59 result = 1 + changedheads
60 60 elif changedheads < 0:
61 61 result = -1 + changedheads
62 62 return result
63 63
64 64 class nocompress(object):
65 65 def compress(self, x):
66 66 return x
67 67 def flush(self):
68 68 return ""
69 69
70 70 bundletypes = {
71 71 "": ("", nocompress), # only when using unbundle on ssh and old http servers
72 72 # since the unification ssh accepts a header but there
73 73 # is no capability signaling it.
74 74 "HG20": (), # special-cased below
75 75 "HG10UN": ("HG10UN", nocompress),
76 76 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
77 77 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
78 78 }
79 79
80 80 # hgweb uses this list to communicate its preferred type
81 81 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
82 82
83 83 def writebundle(ui, cg, filename, bundletype, vfs=None):
84 84 """Write a bundle file and return its filename.
85 85
86 86 Existing files will not be overwritten.
87 87 If no filename is specified, a temporary file is created.
88 88 bz2 compression can be turned off.
89 89 The bundle file will be deleted in case of errors.
90 90 """
91 91
92 92 fh = None
93 93 cleanup = None
94 94 try:
95 95 if filename:
96 96 if vfs:
97 97 fh = vfs.open(filename, "wb")
98 98 else:
99 99 fh = open(filename, "wb")
100 100 else:
101 101 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
102 102 fh = os.fdopen(fd, "wb")
103 103 cleanup = filename
104 104
105 105 if bundletype == "HG20":
106 106 import bundle2
107 107 bundle = bundle2.bundle20(ui)
108 108 part = bundle.newpart('changegroup', data=cg.getchunks())
109 109 part.addparam('version', cg.version)
110 110 z = nocompress()
111 111 chunkiter = bundle.getchunks()
112 112 else:
113 113 if cg.version != '01':
114 114 raise util.Abort(_('old bundle types only supports v1 '
115 115 'changegroups'))
116 116 header, compressor = bundletypes[bundletype]
117 117 fh.write(header)
118 118 z = compressor()
119 119 chunkiter = cg.getchunks()
120 120
121 121 # parse the changegroup data, otherwise we will block
122 122 # in case of sshrepo because we don't know the end of the stream
123 123
124 124 # an empty chunkgroup is the end of the changegroup
125 125 # a changegroup has at least 2 chunkgroups (changelog and manifest).
126 126 # after that, an empty chunkgroup is the end of the changegroup
127 127 for chunk in chunkiter:
128 128 fh.write(z.compress(chunk))
129 129 fh.write(z.flush())
130 130 cleanup = None
131 131 return filename
132 132 finally:
133 133 if fh is not None:
134 134 fh.close()
135 135 if cleanup is not None:
136 136 if filename and vfs:
137 137 vfs.unlink(cleanup)
138 138 else:
139 139 os.unlink(cleanup)
140 140
141 141 def decompressor(fh, alg):
142 142 if alg == 'UN':
143 143 return fh
144 144 elif alg == 'GZ':
145 145 def generator(f):
146 146 zd = zlib.decompressobj()
147 147 for chunk in util.filechunkiter(f):
148 148 yield zd.decompress(chunk)
149 149 elif alg == 'BZ':
150 150 def generator(f):
151 151 zd = bz2.BZ2Decompressor()
152 152 zd.decompress("BZ")
153 153 for chunk in util.filechunkiter(f, 4096):
154 154 yield zd.decompress(chunk)
155 155 else:
156 156 raise util.Abort("unknown bundle compression '%s'" % alg)
157 157 return util.chunkbuffer(generator(fh))
158 158
159 159 class cg1unpacker(object):
160 160 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
161 161 deltaheadersize = struct.calcsize(deltaheader)
162 162 version = '01'
163 163 def __init__(self, fh, alg):
164 164 self._stream = decompressor(fh, alg)
165 165 self._type = alg
166 166 self.callback = None
167 167 def compressed(self):
168 168 return self._type != 'UN'
169 169 def read(self, l):
170 170 return self._stream.read(l)
171 171 def seek(self, pos):
172 172 return self._stream.seek(pos)
173 173 def tell(self):
174 174 return self._stream.tell()
175 175 def close(self):
176 176 return self._stream.close()
177 177
178 178 def chunklength(self):
179 179 d = readexactly(self._stream, 4)
180 180 l = struct.unpack(">l", d)[0]
181 181 if l <= 4:
182 182 if l:
183 183 raise util.Abort(_("invalid chunk length %d") % l)
184 184 return 0
185 185 if self.callback:
186 186 self.callback()
187 187 return l - 4
188 188
189 189 def changelogheader(self):
190 190 """v10 does not have a changelog header chunk"""
191 191 return {}
192 192
193 193 def manifestheader(self):
194 194 """v10 does not have a manifest header chunk"""
195 195 return {}
196 196
197 197 def filelogheader(self):
198 198 """return the header of the filelogs chunk, v10 only has the filename"""
199 199 l = self.chunklength()
200 200 if not l:
201 201 return {}
202 202 fname = readexactly(self._stream, l)
203 203 return {'filename': fname}
204 204
205 205 def _deltaheader(self, headertuple, prevnode):
206 206 node, p1, p2, cs = headertuple
207 207 if prevnode is None:
208 208 deltabase = p1
209 209 else:
210 210 deltabase = prevnode
211 211 return node, p1, p2, deltabase, cs
212 212
213 213 def deltachunk(self, prevnode):
214 214 l = self.chunklength()
215 215 if not l:
216 216 return {}
217 217 headerdata = readexactly(self._stream, self.deltaheadersize)
218 218 header = struct.unpack(self.deltaheader, headerdata)
219 219 delta = readexactly(self._stream, l - self.deltaheadersize)
220 220 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
221 221 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
222 222 'deltabase': deltabase, 'delta': delta}
223 223
224 224 def getchunks(self):
225 225 """returns all the chunks contains in the bundle
226 226
227 227 Used when you need to forward the binary stream to a file or another
228 228 network API. To do so, it parse the changegroup data, otherwise it will
229 229 block in case of sshrepo because it don't know the end of the stream.
230 230 """
231 231 # an empty chunkgroup is the end of the changegroup
232 232 # a changegroup has at least 2 chunkgroups (changelog and manifest).
233 233 # after that, an empty chunkgroup is the end of the changegroup
234 234 empty = False
235 235 count = 0
236 236 while not empty or count <= 2:
237 237 empty = True
238 238 count += 1
239 239 while True:
240 240 chunk = getchunk(self)
241 241 if not chunk:
242 242 break
243 243 empty = False
244 244 yield chunkheader(len(chunk))
245 245 pos = 0
246 246 while pos < len(chunk):
247 247 next = pos + 2**20
248 248 yield chunk[pos:next]
249 249 pos = next
250 250 yield closechunk()
251 251
252 252 class cg2unpacker(cg1unpacker):
253 253 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
254 254 deltaheadersize = struct.calcsize(deltaheader)
255 255 version = '02'
256 256
257 257 def _deltaheader(self, headertuple, prevnode):
258 258 node, p1, p2, deltabase, cs = headertuple
259 259 return node, p1, p2, deltabase, cs
260 260
261 261 class headerlessfixup(object):
262 262 def __init__(self, fh, h):
263 263 self._h = h
264 264 self._fh = fh
265 265 def read(self, n):
266 266 if self._h:
267 267 d, self._h = self._h[:n], self._h[n:]
268 268 if len(d) < n:
269 269 d += readexactly(self._fh, n - len(d))
270 270 return d
271 271 return readexactly(self._fh, n)
272 272
273 273 class cg1packer(object):
274 274 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
275 275 version = '01'
276 276 def __init__(self, repo, bundlecaps=None):
277 277 """Given a source repo, construct a bundler.
278 278
279 279 bundlecaps is optional and can be used to specify the set of
280 280 capabilities which can be used to build the bundle.
281 281 """
282 282 # Set of capabilities we can use to build the bundle.
283 283 if bundlecaps is None:
284 284 bundlecaps = set()
285 285 self._bundlecaps = bundlecaps
286 286 self._changelog = repo.changelog
287 287 self._manifest = repo.manifest
288 288 reorder = repo.ui.config('bundle', 'reorder', 'auto')
289 289 if reorder == 'auto':
290 290 reorder = None
291 291 else:
292 292 reorder = util.parsebool(reorder)
293 293 self._repo = repo
294 294 self._reorder = reorder
295 295 self._progress = repo.ui.progress
296 296 if self._repo.ui.verbose and not self._repo.ui.debugflag:
297 297 self._verbosenote = self._repo.ui.note
298 298 else:
299 299 self._verbosenote = lambda s: None
300 300
301 301 def close(self):
302 302 return closechunk()
303 303
304 304 def fileheader(self, fname):
305 305 return chunkheader(len(fname)) + fname
306 306
307 307 def group(self, nodelist, revlog, lookup, units=None, reorder=None):
308 308 """Calculate a delta group, yielding a sequence of changegroup chunks
309 309 (strings).
310 310
311 311 Given a list of changeset revs, return a set of deltas and
312 312 metadata corresponding to nodes. The first delta is
313 313 first parent(nodelist[0]) -> nodelist[0], the receiver is
314 314 guaranteed to have this parent as it has all history before
315 315 these changesets. In the case firstparent is nullrev the
316 316 changegroup starts with a full revision.
317 317
318 318 If units is not None, progress detail will be generated, units specifies
319 319 the type of revlog that is touched (changelog, manifest, etc.).
320 320 """
321 321 # if we don't have any revisions touched by these changesets, bail
322 322 if len(nodelist) == 0:
323 323 yield self.close()
324 324 return
325 325
326 326 # for generaldelta revlogs, we linearize the revs; this will both be
327 327 # much quicker and generate a much smaller bundle
328 328 if (revlog._generaldelta and reorder is not False) or reorder:
329 329 dag = dagutil.revlogdag(revlog)
330 330 revs = set(revlog.rev(n) for n in nodelist)
331 331 revs = dag.linearize(revs)
332 332 else:
333 333 revs = sorted([revlog.rev(n) for n in nodelist])
334 334
335 335 # add the parent of the first rev
336 336 p = revlog.parentrevs(revs[0])[0]
337 337 revs.insert(0, p)
338 338
339 339 # build deltas
340 340 total = len(revs) - 1
341 341 msgbundling = _('bundling')
342 342 for r in xrange(len(revs) - 1):
343 343 if units is not None:
344 344 self._progress(msgbundling, r + 1, unit=units, total=total)
345 345 prev, curr = revs[r], revs[r + 1]
346 346 linknode = lookup(revlog.node(curr))
347 347 for c in self.revchunk(revlog, curr, prev, linknode):
348 348 yield c
349 349
350 if units is not None:
351 self._progress(msgbundling, None)
350 352 yield self.close()
351 353
352 354 # filter any nodes that claim to be part of the known set
353 355 def prune(self, revlog, missing, commonrevs):
354 356 rr, rl = revlog.rev, revlog.linkrev
355 357 return [n for n in missing if rl(rr(n)) not in commonrevs]
356 358
357 359 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
358 360 '''yield a sequence of changegroup chunks (strings)'''
359 361 repo = self._repo
360 362 cl = self._changelog
361 363 ml = self._manifest
362 364 reorder = self._reorder
363 progress = self._progress
364
365 # for progress output
366 msgbundling = _('bundling')
367 365
368 366 clrevorder = {}
369 367 mfs = {} # needed manifests
370 368 fnodes = {} # needed file nodes
371 369 changedfiles = set()
372 370
373 371 # Callback for the changelog, used to collect changed files and manifest
374 372 # nodes.
375 373 # Returns the linkrev node (identity in the changelog case).
376 374 def lookupcl(x):
377 375 c = cl.read(x)
378 376 clrevorder[x] = len(clrevorder)
379 377 changedfiles.update(c[3])
380 378 # record the first changeset introducing this manifest version
381 379 mfs.setdefault(c[0], x)
382 380 return x
383 381
384 382 self._verbosenote(_('uncompressed size of bundle content:\n'))
385 383 size = 0
386 384 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets'),
387 385 reorder=reorder):
388 386 size += len(chunk)
389 387 yield chunk
390 388 self._verbosenote(_('%8.i (changelog)\n') % size)
391 progress(msgbundling, None)
392 389
393 390 # Callback for the manifest, used to collect linkrevs for filelog
394 391 # revisions.
395 392 # Returns the linkrev node (collected in lookupcl).
396 393 def lookupmf(x):
397 394 clnode = mfs[x]
398 395 if not fastpathlinkrev or reorder:
399 396 mdata = ml.readfast(x)
400 397 for f, n in mdata.iteritems():
401 398 if f in changedfiles:
402 399 # record the first changeset introducing this filelog
403 400 # version
404 401 fclnodes = fnodes.setdefault(f, {})
405 402 fclnode = fclnodes.setdefault(n, clnode)
406 403 if clrevorder[clnode] < clrevorder[fclnode]:
407 404 fclnodes[n] = clnode
408 405 return clnode
409 406
410 407 mfnodes = self.prune(ml, mfs, commonrevs)
411 408 size = 0
412 409 for chunk in self.group(mfnodes, ml, lookupmf, units=_('manifests'),
413 410 reorder=reorder):
414 411 size += len(chunk)
415 412 yield chunk
416 413 self._verbosenote(_('%8.i (manifests)\n') % size)
417 progress(msgbundling, None)
418 414
419 415 mfs.clear()
420 416 clrevs = set(cl.rev(x) for x in clnodes)
421 417
422 418 def linknodes(filerevlog, fname):
423 419 if fastpathlinkrev and not reorder:
424 420 llr = filerevlog.linkrev
425 421 def genfilenodes():
426 422 for r in filerevlog:
427 423 linkrev = llr(r)
428 424 if linkrev in clrevs:
429 425 yield filerevlog.node(r), cl.node(linkrev)
430 426 return dict(genfilenodes())
431 427 return fnodes.get(fname, {})
432 428
433 429 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
434 430 source):
435 431 yield chunk
436 432
437 433 yield self.close()
438 progress(msgbundling, None)
439 434
440 435 if clnodes:
441 436 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
442 437
443 438 # The 'source' parameter is useful for extensions
444 439 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
445 440 repo = self._repo
446 441 progress = self._progress
447 442 reorder = self._reorder
448 443 msgbundling = _('bundling')
449 444
450 445 total = len(changedfiles)
451 446 # for progress output
452 447 msgfiles = _('files')
453 448 for i, fname in enumerate(sorted(changedfiles)):
454 449 filerevlog = repo.file(fname)
455 450 if not filerevlog:
456 451 raise util.Abort(_("empty or missing revlog for %s") % fname)
457 452
458 453 linkrevnodes = linknodes(filerevlog, fname)
459 454 # Lookup for filenodes, we collected the linkrev nodes above in the
460 455 # fastpath case and with lookupmf in the slowpath case.
461 456 def lookupfilelog(x):
462 457 return linkrevnodes[x]
463 458
464 459 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
465 460 if filenodes:
466 461 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
467 462 total=total)
468 463 h = self.fileheader(fname)
469 464 size = len(h)
470 465 yield h
471 466 for chunk in self.group(filenodes, filerevlog, lookupfilelog,
472 467 reorder=reorder):
473 468 size += len(chunk)
474 469 yield chunk
475 470 self._verbosenote(_('%8.i %s\n') % (size, fname))
471 progress(msgbundling, None)
476 472
477 473 def deltaparent(self, revlog, rev, p1, p2, prev):
478 474 return prev
479 475
480 476 def revchunk(self, revlog, rev, prev, linknode):
481 477 node = revlog.node(rev)
482 478 p1, p2 = revlog.parentrevs(rev)
483 479 base = self.deltaparent(revlog, rev, p1, p2, prev)
484 480
485 481 prefix = ''
486 482 if revlog.iscensored(base) or revlog.iscensored(rev):
487 483 try:
488 484 delta = revlog.revision(node)
489 485 except error.CensoredNodeError, e:
490 486 delta = e.tombstone
491 487 if base == nullrev:
492 488 prefix = mdiff.trivialdiffheader(len(delta))
493 489 else:
494 490 baselen = revlog.rawsize(base)
495 491 prefix = mdiff.replacediffheader(baselen, len(delta))
496 492 elif base == nullrev:
497 493 delta = revlog.revision(node)
498 494 prefix = mdiff.trivialdiffheader(len(delta))
499 495 else:
500 496 delta = revlog.revdiff(base, rev)
501 497 p1n, p2n = revlog.parents(node)
502 498 basenode = revlog.node(base)
503 499 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
504 500 meta += prefix
505 501 l = len(meta) + len(delta)
506 502 yield chunkheader(l)
507 503 yield meta
508 504 yield delta
509 505 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
510 506 # do nothing with basenode, it is implicitly the previous one in HG10
511 507 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
512 508
513 509 class cg2packer(cg1packer):
514 510 version = '02'
515 511 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
516 512
517 513 def group(self, nodelist, revlog, lookup, units=None, reorder=None):
518 514 if (revlog._generaldelta and reorder is not True):
519 515 reorder = False
520 516 return super(cg2packer, self).group(nodelist, revlog, lookup,
521 517 units=units, reorder=reorder)
522 518
523 519 def deltaparent(self, revlog, rev, p1, p2, prev):
524 520 dp = revlog.deltaparent(rev)
525 521 # avoid storing full revisions; pick prev in those cases
526 522 # also pick prev when we can't be sure remote has dp
527 523 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
528 524 return prev
529 525 return dp
530 526
531 527 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
532 528 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
533 529
534 530 packermap = {'01': (cg1packer, cg1unpacker),
535 531 '02': (cg2packer, cg2unpacker)}
536 532
537 533 def _changegroupinfo(repo, nodes, source):
538 534 if repo.ui.verbose or source == 'bundle':
539 535 repo.ui.status(_("%d changesets found\n") % len(nodes))
540 536 if repo.ui.debugflag:
541 537 repo.ui.debug("list of changesets:\n")
542 538 for node in nodes:
543 539 repo.ui.debug("%s\n" % hex(node))
544 540
545 541 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
546 542 repo = repo.unfiltered()
547 543 commonrevs = outgoing.common
548 544 csets = outgoing.missing
549 545 heads = outgoing.missingheads
550 546 # We go through the fast path if we get told to, or if all (unfiltered
551 547 # heads have been requested (since we then know there all linkrevs will
552 548 # be pulled by the client).
553 549 heads.sort()
554 550 fastpathlinkrev = fastpath or (
555 551 repo.filtername is None and heads == sorted(repo.heads()))
556 552
557 553 repo.hook('preoutgoing', throw=True, source=source)
558 554 _changegroupinfo(repo, csets, source)
559 555 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
560 556
561 557 def getsubset(repo, outgoing, bundler, source, fastpath=False, version='01'):
562 558 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
563 559 return packermap[version][1](util.chunkbuffer(gengroup), 'UN')
564 560
565 561 def changegroupsubset(repo, roots, heads, source, version='01'):
566 562 """Compute a changegroup consisting of all the nodes that are
567 563 descendants of any of the roots and ancestors of any of the heads.
568 564 Return a chunkbuffer object whose read() method will return
569 565 successive changegroup chunks.
570 566
571 567 It is fairly complex as determining which filenodes and which
572 568 manifest nodes need to be included for the changeset to be complete
573 569 is non-trivial.
574 570
575 571 Another wrinkle is doing the reverse, figuring out which changeset in
576 572 the changegroup a particular filenode or manifestnode belongs to.
577 573 """
578 574 cl = repo.changelog
579 575 if not roots:
580 576 roots = [nullid]
581 577 # TODO: remove call to nodesbetween.
582 578 csets, roots, heads = cl.nodesbetween(roots, heads)
583 579 discbases = []
584 580 for n in roots:
585 581 discbases.extend([p for p in cl.parents(n) if p != nullid])
586 582 outgoing = discovery.outgoing(cl, discbases, heads)
587 583 bundler = packermap[version][0](repo)
588 584 return getsubset(repo, outgoing, bundler, source, version=version)
589 585
590 586 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
591 587 version='01'):
592 588 """Like getbundle, but taking a discovery.outgoing as an argument.
593 589
594 590 This is only implemented for local repos and reuses potentially
595 591 precomputed sets in outgoing. Returns a raw changegroup generator."""
596 592 if not outgoing.missing:
597 593 return None
598 594 bundler = packermap[version][0](repo, bundlecaps)
599 595 return getsubsetraw(repo, outgoing, bundler, source)
600 596
601 597 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None):
602 598 """Like getbundle, but taking a discovery.outgoing as an argument.
603 599
604 600 This is only implemented for local repos and reuses potentially
605 601 precomputed sets in outgoing."""
606 602 if not outgoing.missing:
607 603 return None
608 604 bundler = cg1packer(repo, bundlecaps)
609 605 return getsubset(repo, outgoing, bundler, source)
610 606
611 607 def _computeoutgoing(repo, heads, common):
612 608 """Computes which revs are outgoing given a set of common
613 609 and a set of heads.
614 610
615 611 This is a separate function so extensions can have access to
616 612 the logic.
617 613
618 614 Returns a discovery.outgoing object.
619 615 """
620 616 cl = repo.changelog
621 617 if common:
622 618 hasnode = cl.hasnode
623 619 common = [n for n in common if hasnode(n)]
624 620 else:
625 621 common = [nullid]
626 622 if not heads:
627 623 heads = cl.heads()
628 624 return discovery.outgoing(cl, common, heads)
629 625
630 626 def getchangegroupraw(repo, source, heads=None, common=None, bundlecaps=None,
631 627 version='01'):
632 628 """Like changegroupsubset, but returns the set difference between the
633 629 ancestors of heads and the ancestors common.
634 630
635 631 If heads is None, use the local heads. If common is None, use [nullid].
636 632
637 633 If version is None, use a version '1' changegroup.
638 634
639 635 The nodes in common might not all be known locally due to the way the
640 636 current discovery protocol works. Returns a raw changegroup generator.
641 637 """
642 638 outgoing = _computeoutgoing(repo, heads, common)
643 639 return getlocalchangegroupraw(repo, source, outgoing, bundlecaps=bundlecaps,
644 640 version=version)
645 641
646 642 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None):
647 643 """Like changegroupsubset, but returns the set difference between the
648 644 ancestors of heads and the ancestors common.
649 645
650 646 If heads is None, use the local heads. If common is None, use [nullid].
651 647
652 648 The nodes in common might not all be known locally due to the way the
653 649 current discovery protocol works.
654 650 """
655 651 outgoing = _computeoutgoing(repo, heads, common)
656 652 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps)
657 653
658 654 def changegroup(repo, basenodes, source):
659 655 # to avoid a race we use changegroupsubset() (issue1320)
660 656 return changegroupsubset(repo, basenodes, repo.heads(), source)
661 657
662 658 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
663 659 revisions = 0
664 660 files = 0
665 661 while True:
666 662 chunkdata = source.filelogheader()
667 663 if not chunkdata:
668 664 break
669 665 f = chunkdata["filename"]
670 666 repo.ui.debug("adding %s revisions\n" % f)
671 667 pr()
672 668 fl = repo.file(f)
673 669 o = len(fl)
674 670 try:
675 671 if not fl.addgroup(source, revmap, trp):
676 672 raise util.Abort(_("received file revlog group is empty"))
677 673 except error.CensoredBaseError, e:
678 674 raise util.Abort(_("received delta base is censored: %s") % e)
679 675 revisions += len(fl) - o
680 676 files += 1
681 677 if f in needfiles:
682 678 needs = needfiles[f]
683 679 for new in xrange(o, len(fl)):
684 680 n = fl.node(new)
685 681 if n in needs:
686 682 needs.remove(n)
687 683 else:
688 684 raise util.Abort(
689 685 _("received spurious file revlog entry"))
690 686 if not needs:
691 687 del needfiles[f]
692 688 repo.ui.progress(_('files'), None)
693 689
694 690 for f, needs in needfiles.iteritems():
695 691 fl = repo.file(f)
696 692 for n in needs:
697 693 try:
698 694 fl.rev(n)
699 695 except error.LookupError:
700 696 raise util.Abort(
701 697 _('missing file data for %s:%s - run hg verify') %
702 698 (f, hex(n)))
703 699
704 700 return revisions, files
705 701
706 702 def addchangegroup(repo, source, srctype, url, emptyok=False,
707 703 targetphase=phases.draft):
708 704 """Add the changegroup returned by source.read() to this repo.
709 705 srctype is a string like 'push', 'pull', or 'unbundle'. url is
710 706 the URL of the repo where this changegroup is coming from.
711 707
712 708 Return an integer summarizing the change to this repo:
713 709 - nothing changed or no source: 0
714 710 - more heads than before: 1+added heads (2..n)
715 711 - fewer heads than before: -1-removed heads (-2..-n)
716 712 - number of heads stays the same: 1
717 713 """
718 714 repo = repo.unfiltered()
719 715 def csmap(x):
720 716 repo.ui.debug("add changeset %s\n" % short(x))
721 717 return len(cl)
722 718
723 719 def revmap(x):
724 720 return cl.rev(x)
725 721
726 722 if not source:
727 723 return 0
728 724
729 725 changesets = files = revisions = 0
730 726 efiles = set()
731 727
732 728 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
733 729 # The transaction could have been created before and already carries source
734 730 # information. In this case we use the top level data. We overwrite the
735 731 # argument because we need to use the top level value (if they exist) in
736 732 # this function.
737 733 srctype = tr.hookargs.setdefault('source', srctype)
738 734 url = tr.hookargs.setdefault('url', url)
739 735
740 736 # write changelog data to temp files so concurrent readers will not see
741 737 # inconsistent view
742 738 cl = repo.changelog
743 739 cl.delayupdate(tr)
744 740 oldheads = cl.heads()
745 741 try:
746 742 repo.hook('prechangegroup', throw=True, **tr.hookargs)
747 743
748 744 trp = weakref.proxy(tr)
749 745 # pull off the changeset group
750 746 repo.ui.status(_("adding changesets\n"))
751 747 clstart = len(cl)
752 748 class prog(object):
753 749 step = _('changesets')
754 750 count = 1
755 751 ui = repo.ui
756 752 total = None
757 753 def __call__(repo):
758 754 repo.ui.progress(repo.step, repo.count, unit=_('chunks'),
759 755 total=repo.total)
760 756 repo.count += 1
761 757 pr = prog()
762 758 source.callback = pr
763 759
764 760 source.changelogheader()
765 761 srccontent = cl.addgroup(source, csmap, trp)
766 762 if not (srccontent or emptyok):
767 763 raise util.Abort(_("received changelog group is empty"))
768 764 clend = len(cl)
769 765 changesets = clend - clstart
770 766 for c in xrange(clstart, clend):
771 767 efiles.update(repo[c].files())
772 768 efiles = len(efiles)
773 769 repo.ui.progress(_('changesets'), None)
774 770
775 771 # pull off the manifest group
776 772 repo.ui.status(_("adding manifests\n"))
777 773 pr.step = _('manifests')
778 774 pr.count = 1
779 775 pr.total = changesets # manifests <= changesets
780 776 # no need to check for empty manifest group here:
781 777 # if the result of the merge of 1 and 2 is the same in 3 and 4,
782 778 # no new manifest will be created and the manifest group will
783 779 # be empty during the pull
784 780 source.manifestheader()
785 781 repo.manifest.addgroup(source, revmap, trp)
786 782 repo.ui.progress(_('manifests'), None)
787 783
788 784 needfiles = {}
789 785 if repo.ui.configbool('server', 'validate', default=False):
790 786 # validate incoming csets have their manifests
791 787 for cset in xrange(clstart, clend):
792 788 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
793 789 mfest = repo.manifest.readdelta(mfnode)
794 790 # store file nodes we must see
795 791 for f, n in mfest.iteritems():
796 792 needfiles.setdefault(f, set()).add(n)
797 793
798 794 # process the files
799 795 repo.ui.status(_("adding file changes\n"))
800 796 pr.step = _('files')
801 797 pr.count = 1
802 798 pr.total = efiles
803 799 source.callback = None
804 800
805 801 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
806 802 needfiles)
807 803 revisions += newrevs
808 804 files += newfiles
809 805
810 806 dh = 0
811 807 if oldheads:
812 808 heads = cl.heads()
813 809 dh = len(heads) - len(oldheads)
814 810 for h in heads:
815 811 if h not in oldheads and repo[h].closesbranch():
816 812 dh -= 1
817 813 htext = ""
818 814 if dh:
819 815 htext = _(" (%+d heads)") % dh
820 816
821 817 repo.ui.status(_("added %d changesets"
822 818 " with %d changes to %d files%s\n")
823 819 % (changesets, revisions, files, htext))
824 820 repo.invalidatevolatilesets()
825 821
826 822 if changesets > 0:
827 823 p = lambda: tr.writepending() and repo.root or ""
828 824 if 'node' not in tr.hookargs:
829 825 tr.hookargs['node'] = hex(cl.node(clstart))
830 826 hookargs = dict(tr.hookargs)
831 827 else:
832 828 hookargs = dict(tr.hookargs)
833 829 hookargs['node'] = hex(cl.node(clstart))
834 830 repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
835 831
836 832 added = [cl.node(r) for r in xrange(clstart, clend)]
837 833 publishing = repo.ui.configbool('phases', 'publish', True)
838 834 if srctype in ('push', 'serve'):
839 835 # Old servers can not push the boundary themselves.
840 836 # New servers won't push the boundary if changeset already
841 837 # exists locally as secret
842 838 #
843 839 # We should not use added here but the list of all change in
844 840 # the bundle
845 841 if publishing:
846 842 phases.advanceboundary(repo, tr, phases.public, srccontent)
847 843 else:
848 844 # Those changesets have been pushed from the outside, their
849 845 # phases are going to be pushed alongside. Therefor
850 846 # `targetphase` is ignored.
851 847 phases.advanceboundary(repo, tr, phases.draft, srccontent)
852 848 phases.retractboundary(repo, tr, phases.draft, added)
853 849 elif srctype != 'strip':
854 850 # publishing only alter behavior during push
855 851 #
856 852 # strip should not touch boundary at all
857 853 phases.retractboundary(repo, tr, targetphase, added)
858 854
859 855 if changesets > 0:
860 856 if srctype != 'strip':
861 857 # During strip, branchcache is invalid but coming call to
862 858 # `destroyed` will repair it.
863 859 # In other case we can safely update cache on disk.
864 860 branchmap.updatecache(repo.filtered('served'))
865 861
866 862 def runhooks():
867 863 # These hooks run when the lock releases, not when the
868 864 # transaction closes. So it's possible for the changelog
869 865 # to have changed since we last saw it.
870 866 if clstart >= len(repo):
871 867 return
872 868
873 869 # forcefully update the on-disk branch cache
874 870 repo.ui.debug("updating the branch cache\n")
875 871 repo.hook("changegroup", **hookargs)
876 872
877 873 for n in added:
878 874 args = hookargs.copy()
879 875 args['node'] = hex(n)
880 876 repo.hook("incoming", **args)
881 877
882 878 newheads = [h for h in repo.heads() if h not in oldheads]
883 879 repo.ui.log("incoming",
884 880 "%s incoming changes - new heads: %s\n",
885 881 len(added),
886 882 ', '.join([hex(c[:6]) for c in newheads]))
887 883
888 884 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
889 885 lambda tr: repo._afterlock(runhooks))
890 886
891 887 tr.close()
892 888
893 889 finally:
894 890 tr.release()
895 891 repo.ui.flush()
896 892 # never return 0 here:
897 893 if dh < 0:
898 894 return dh - 1
899 895 else:
900 896 return dh + 1
General Comments 0
You need to be logged in to leave comments. Login now