##// END OF EJS Templates
changegroup: extract the file management part in its own function...
Pierre-Yves David -
r26540:7469067d default
parent child Browse files
Show More
@@ -1,902 +1,913 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise util.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise util.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 bundletypes = {
83 83 "": ("", None), # only when using unbundle on ssh and old http servers
84 84 # since the unification ssh accepts a header but there
85 85 # is no capability signaling it.
86 86 "HG20": (), # special-cased below
87 87 "HG10UN": ("HG10UN", None),
88 88 "HG10BZ": ("HG10", 'BZ'),
89 89 "HG10GZ": ("HG10GZ", 'GZ'),
90 90 }
91 91
92 92 # hgweb uses this list to communicate its preferred type
93 93 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
94 94
95 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
96 """Write a bundle file and return its filename.
95 def writechunks(ui, chunks, filename, vfs=None):
96 """Write chunks to a file and return its filename.
97 97
98 The stream is assumed to be a bundle file.
98 99 Existing files will not be overwritten.
99 100 If no filename is specified, a temporary file is created.
100 bz2 compression can be turned off.
101 The bundle file will be deleted in case of errors.
102 101 """
103
104 102 fh = None
105 103 cleanup = None
106 104 try:
107 105 if filename:
108 106 if vfs:
109 107 fh = vfs.open(filename, "wb")
110 108 else:
111 109 fh = open(filename, "wb")
112 110 else:
113 111 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
114 112 fh = os.fdopen(fd, "wb")
115 113 cleanup = filename
116
117 if bundletype == "HG20":
118 from . import bundle2
119 bundle = bundle2.bundle20(ui)
120 bundle.setcompression(compression)
121 part = bundle.newpart('changegroup', data=cg.getchunks())
122 part.addparam('version', cg.version)
123 z = util.compressors[None]()
124 chunkiter = bundle.getchunks()
125 else:
126 # compression argument is only for the bundle2 case
127 assert compression is None
128 if cg.version != '01':
129 raise util.Abort(_('old bundle types only supports v1 '
130 'changegroups'))
131 header, comp = bundletypes[bundletype]
132 fh.write(header)
133 if comp not in util.compressors:
134 raise util.Abort(_('unknown stream compression type: %s')
135 % comp)
136 z = util.compressors[comp]()
137 chunkiter = cg.getchunks()
138
139 # parse the changegroup data, otherwise we will block
140 # in case of sshrepo because we don't know the end of the stream
141
142 # an empty chunkgroup is the end of the changegroup
143 # a changegroup has at least 2 chunkgroups (changelog and manifest).
144 # after that, an empty chunkgroup is the end of the changegroup
145 for chunk in chunkiter:
146 fh.write(z.compress(chunk))
147 fh.write(z.flush())
114 for c in chunks:
115 fh.write(c)
148 116 cleanup = None
149 117 return filename
150 118 finally:
151 119 if fh is not None:
152 120 fh.close()
153 121 if cleanup is not None:
154 122 if filename and vfs:
155 123 vfs.unlink(cleanup)
156 124 else:
157 125 os.unlink(cleanup)
158 126
127 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
128 """Write a bundle file and return its filename.
129
130 Existing files will not be overwritten.
131 If no filename is specified, a temporary file is created.
132 bz2 compression can be turned off.
133 The bundle file will be deleted in case of errors.
134 """
135
136 if bundletype == "HG20":
137 from . import bundle2
138 bundle = bundle2.bundle20(ui)
139 bundle.setcompression(compression)
140 part = bundle.newpart('changegroup', data=cg.getchunks())
141 part.addparam('version', cg.version)
142 chunkiter = bundle.getchunks()
143 else:
144 # compression argument is only for the bundle2 case
145 assert compression is None
146 if cg.version != '01':
147 raise util.Abort(_('old bundle types only supports v1 '
148 'changegroups'))
149 header, comp = bundletypes[bundletype]
150 if comp not in util.compressors:
151 raise util.Abort(_('unknown stream compression type: %s')
152 % comp)
153 z = util.compressors[comp]()
154 subchunkiter = cg.getchunks()
155 def chunkiter():
156 yield header
157 for chunk in subchunkiter:
158 yield z.compress(chunk)
159 yield z.flush()
160 chunkiter = chunkiter()
161
162 # parse the changegroup data, otherwise we will block
163 # in case of sshrepo because we don't know the end of the stream
164
165 # an empty chunkgroup is the end of the changegroup
166 # a changegroup has at least 2 chunkgroups (changelog and manifest).
167 # after that, an empty chunkgroup is the end of the changegroup
168 return writechunks(ui, chunkiter, filename, vfs=vfs)
169
159 170 class cg1unpacker(object):
160 171 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
161 172 deltaheadersize = struct.calcsize(deltaheader)
162 173 version = '01'
163 174 def __init__(self, fh, alg):
164 175 if alg == 'UN':
165 176 alg = None # get more modern without breaking too much
166 177 if not alg in util.decompressors:
167 178 raise util.Abort(_('unknown stream compression type: %s')
168 179 % alg)
169 180 if alg == 'BZ':
170 181 alg = '_truncatedBZ'
171 182 self._stream = util.decompressors[alg](fh)
172 183 self._type = alg
173 184 self.callback = None
174 185 def compressed(self):
175 186 return self._type is not None
176 187 def read(self, l):
177 188 return self._stream.read(l)
178 189 def seek(self, pos):
179 190 return self._stream.seek(pos)
180 191 def tell(self):
181 192 return self._stream.tell()
182 193 def close(self):
183 194 return self._stream.close()
184 195
185 196 def chunklength(self):
186 197 d = readexactly(self._stream, 4)
187 198 l = struct.unpack(">l", d)[0]
188 199 if l <= 4:
189 200 if l:
190 201 raise util.Abort(_("invalid chunk length %d") % l)
191 202 return 0
192 203 if self.callback:
193 204 self.callback()
194 205 return l - 4
195 206
196 207 def changelogheader(self):
197 208 """v10 does not have a changelog header chunk"""
198 209 return {}
199 210
200 211 def manifestheader(self):
201 212 """v10 does not have a manifest header chunk"""
202 213 return {}
203 214
204 215 def filelogheader(self):
205 216 """return the header of the filelogs chunk, v10 only has the filename"""
206 217 l = self.chunklength()
207 218 if not l:
208 219 return {}
209 220 fname = readexactly(self._stream, l)
210 221 return {'filename': fname}
211 222
212 223 def _deltaheader(self, headertuple, prevnode):
213 224 node, p1, p2, cs = headertuple
214 225 if prevnode is None:
215 226 deltabase = p1
216 227 else:
217 228 deltabase = prevnode
218 229 return node, p1, p2, deltabase, cs
219 230
220 231 def deltachunk(self, prevnode):
221 232 l = self.chunklength()
222 233 if not l:
223 234 return {}
224 235 headerdata = readexactly(self._stream, self.deltaheadersize)
225 236 header = struct.unpack(self.deltaheader, headerdata)
226 237 delta = readexactly(self._stream, l - self.deltaheadersize)
227 238 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
228 239 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
229 240 'deltabase': deltabase, 'delta': delta}
230 241
231 242 def getchunks(self):
232 243 """returns all the chunks contains in the bundle
233 244
234 245 Used when you need to forward the binary stream to a file or another
235 246 network API. To do so, it parse the changegroup data, otherwise it will
236 247 block in case of sshrepo because it don't know the end of the stream.
237 248 """
238 249 # an empty chunkgroup is the end of the changegroup
239 250 # a changegroup has at least 2 chunkgroups (changelog and manifest).
240 251 # after that, an empty chunkgroup is the end of the changegroup
241 252 empty = False
242 253 count = 0
243 254 while not empty or count <= 2:
244 255 empty = True
245 256 count += 1
246 257 while True:
247 258 chunk = getchunk(self)
248 259 if not chunk:
249 260 break
250 261 empty = False
251 262 yield chunkheader(len(chunk))
252 263 pos = 0
253 264 while pos < len(chunk):
254 265 next = pos + 2**20
255 266 yield chunk[pos:next]
256 267 pos = next
257 268 yield closechunk()
258 269
259 270 class cg2unpacker(cg1unpacker):
260 271 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
261 272 deltaheadersize = struct.calcsize(deltaheader)
262 273 version = '02'
263 274
264 275 def _deltaheader(self, headertuple, prevnode):
265 276 node, p1, p2, deltabase, cs = headertuple
266 277 return node, p1, p2, deltabase, cs
267 278
268 279 class headerlessfixup(object):
269 280 def __init__(self, fh, h):
270 281 self._h = h
271 282 self._fh = fh
272 283 def read(self, n):
273 284 if self._h:
274 285 d, self._h = self._h[:n], self._h[n:]
275 286 if len(d) < n:
276 287 d += readexactly(self._fh, n - len(d))
277 288 return d
278 289 return readexactly(self._fh, n)
279 290
280 291 class cg1packer(object):
281 292 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
282 293 version = '01'
283 294 def __init__(self, repo, bundlecaps=None):
284 295 """Given a source repo, construct a bundler.
285 296
286 297 bundlecaps is optional and can be used to specify the set of
287 298 capabilities which can be used to build the bundle.
288 299 """
289 300 # Set of capabilities we can use to build the bundle.
290 301 if bundlecaps is None:
291 302 bundlecaps = set()
292 303 self._bundlecaps = bundlecaps
293 304 # experimental config: bundle.reorder
294 305 reorder = repo.ui.config('bundle', 'reorder', 'auto')
295 306 if reorder == 'auto':
296 307 reorder = None
297 308 else:
298 309 reorder = util.parsebool(reorder)
299 310 self._repo = repo
300 311 self._reorder = reorder
301 312 self._progress = repo.ui.progress
302 313 if self._repo.ui.verbose and not self._repo.ui.debugflag:
303 314 self._verbosenote = self._repo.ui.note
304 315 else:
305 316 self._verbosenote = lambda s: None
306 317
307 318 def close(self):
308 319 return closechunk()
309 320
310 321 def fileheader(self, fname):
311 322 return chunkheader(len(fname)) + fname
312 323
313 324 def group(self, nodelist, revlog, lookup, units=None):
314 325 """Calculate a delta group, yielding a sequence of changegroup chunks
315 326 (strings).
316 327
317 328 Given a list of changeset revs, return a set of deltas and
318 329 metadata corresponding to nodes. The first delta is
319 330 first parent(nodelist[0]) -> nodelist[0], the receiver is
320 331 guaranteed to have this parent as it has all history before
321 332 these changesets. In the case firstparent is nullrev the
322 333 changegroup starts with a full revision.
323 334
324 335 If units is not None, progress detail will be generated, units specifies
325 336 the type of revlog that is touched (changelog, manifest, etc.).
326 337 """
327 338 # if we don't have any revisions touched by these changesets, bail
328 339 if len(nodelist) == 0:
329 340 yield self.close()
330 341 return
331 342
332 343 # for generaldelta revlogs, we linearize the revs; this will both be
333 344 # much quicker and generate a much smaller bundle
334 345 if (revlog._generaldelta and self._reorder is None) or self._reorder:
335 346 dag = dagutil.revlogdag(revlog)
336 347 revs = set(revlog.rev(n) for n in nodelist)
337 348 revs = dag.linearize(revs)
338 349 else:
339 350 revs = sorted([revlog.rev(n) for n in nodelist])
340 351
341 352 # add the parent of the first rev
342 353 p = revlog.parentrevs(revs[0])[0]
343 354 revs.insert(0, p)
344 355
345 356 # build deltas
346 357 total = len(revs) - 1
347 358 msgbundling = _('bundling')
348 359 for r in xrange(len(revs) - 1):
349 360 if units is not None:
350 361 self._progress(msgbundling, r + 1, unit=units, total=total)
351 362 prev, curr = revs[r], revs[r + 1]
352 363 linknode = lookup(revlog.node(curr))
353 364 for c in self.revchunk(revlog, curr, prev, linknode):
354 365 yield c
355 366
356 367 if units is not None:
357 368 self._progress(msgbundling, None)
358 369 yield self.close()
359 370
360 371 # filter any nodes that claim to be part of the known set
361 372 def prune(self, revlog, missing, commonrevs):
362 373 rr, rl = revlog.rev, revlog.linkrev
363 374 return [n for n in missing if rl(rr(n)) not in commonrevs]
364 375
365 376 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
366 377 '''yield a sequence of changegroup chunks (strings)'''
367 378 repo = self._repo
368 379 cl = repo.changelog
369 380 ml = repo.manifest
370 381
371 382 clrevorder = {}
372 383 mfs = {} # needed manifests
373 384 fnodes = {} # needed file nodes
374 385 changedfiles = set()
375 386
376 387 # Callback for the changelog, used to collect changed files and manifest
377 388 # nodes.
378 389 # Returns the linkrev node (identity in the changelog case).
379 390 def lookupcl(x):
380 391 c = cl.read(x)
381 392 clrevorder[x] = len(clrevorder)
382 393 changedfiles.update(c[3])
383 394 # record the first changeset introducing this manifest version
384 395 mfs.setdefault(c[0], x)
385 396 return x
386 397
387 398 self._verbosenote(_('uncompressed size of bundle content:\n'))
388 399 size = 0
389 400 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
390 401 size += len(chunk)
391 402 yield chunk
392 403 self._verbosenote(_('%8.i (changelog)\n') % size)
393 404
394 405 # We need to make sure that the linkrev in the changegroup refers to
395 406 # the first changeset that introduced the manifest or file revision.
396 407 # The fastpath is usually safer than the slowpath, because the filelogs
397 408 # are walked in revlog order.
398 409 #
399 410 # When taking the slowpath with reorder=None and the manifest revlog
400 411 # uses generaldelta, the manifest may be walked in the "wrong" order.
401 412 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
402 413 # cc0ff93d0c0c).
403 414 #
404 415 # When taking the fastpath, we are only vulnerable to reordering
405 416 # of the changelog itself. The changelog never uses generaldelta, so
406 417 # it is only reordered when reorder=True. To handle this case, we
407 418 # simply take the slowpath, which already has the 'clrevorder' logic.
408 419 # This was also fixed in cc0ff93d0c0c.
409 420 fastpathlinkrev = fastpathlinkrev and not self._reorder
410 421 # Callback for the manifest, used to collect linkrevs for filelog
411 422 # revisions.
412 423 # Returns the linkrev node (collected in lookupcl).
413 424 def lookupmf(x):
414 425 clnode = mfs[x]
415 426 if not fastpathlinkrev:
416 427 mdata = ml.readfast(x)
417 428 for f, n in mdata.iteritems():
418 429 if f in changedfiles:
419 430 # record the first changeset introducing this filelog
420 431 # version
421 432 fclnodes = fnodes.setdefault(f, {})
422 433 fclnode = fclnodes.setdefault(n, clnode)
423 434 if clrevorder[clnode] < clrevorder[fclnode]:
424 435 fclnodes[n] = clnode
425 436 return clnode
426 437
427 438 mfnodes = self.prune(ml, mfs, commonrevs)
428 439 size = 0
429 440 for chunk in self.group(mfnodes, ml, lookupmf, units=_('manifests')):
430 441 size += len(chunk)
431 442 yield chunk
432 443 self._verbosenote(_('%8.i (manifests)\n') % size)
433 444
434 445 mfs.clear()
435 446 clrevs = set(cl.rev(x) for x in clnodes)
436 447
437 448 def linknodes(filerevlog, fname):
438 449 if fastpathlinkrev:
439 450 llr = filerevlog.linkrev
440 451 def genfilenodes():
441 452 for r in filerevlog:
442 453 linkrev = llr(r)
443 454 if linkrev in clrevs:
444 455 yield filerevlog.node(r), cl.node(linkrev)
445 456 return dict(genfilenodes())
446 457 return fnodes.get(fname, {})
447 458
448 459 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
449 460 source):
450 461 yield chunk
451 462
452 463 yield self.close()
453 464
454 465 if clnodes:
455 466 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
456 467
457 468 # The 'source' parameter is useful for extensions
458 469 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
459 470 repo = self._repo
460 471 progress = self._progress
461 472 msgbundling = _('bundling')
462 473
463 474 total = len(changedfiles)
464 475 # for progress output
465 476 msgfiles = _('files')
466 477 for i, fname in enumerate(sorted(changedfiles)):
467 478 filerevlog = repo.file(fname)
468 479 if not filerevlog:
469 480 raise util.Abort(_("empty or missing revlog for %s") % fname)
470 481
471 482 linkrevnodes = linknodes(filerevlog, fname)
472 483 # Lookup for filenodes, we collected the linkrev nodes above in the
473 484 # fastpath case and with lookupmf in the slowpath case.
474 485 def lookupfilelog(x):
475 486 return linkrevnodes[x]
476 487
477 488 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
478 489 if filenodes:
479 490 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
480 491 total=total)
481 492 h = self.fileheader(fname)
482 493 size = len(h)
483 494 yield h
484 495 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
485 496 size += len(chunk)
486 497 yield chunk
487 498 self._verbosenote(_('%8.i %s\n') % (size, fname))
488 499 progress(msgbundling, None)
489 500
490 501 def deltaparent(self, revlog, rev, p1, p2, prev):
491 502 return prev
492 503
493 504 def revchunk(self, revlog, rev, prev, linknode):
494 505 node = revlog.node(rev)
495 506 p1, p2 = revlog.parentrevs(rev)
496 507 base = self.deltaparent(revlog, rev, p1, p2, prev)
497 508
498 509 prefix = ''
499 510 if revlog.iscensored(base) or revlog.iscensored(rev):
500 511 try:
501 512 delta = revlog.revision(node)
502 513 except error.CensoredNodeError as e:
503 514 delta = e.tombstone
504 515 if base == nullrev:
505 516 prefix = mdiff.trivialdiffheader(len(delta))
506 517 else:
507 518 baselen = revlog.rawsize(base)
508 519 prefix = mdiff.replacediffheader(baselen, len(delta))
509 520 elif base == nullrev:
510 521 delta = revlog.revision(node)
511 522 prefix = mdiff.trivialdiffheader(len(delta))
512 523 else:
513 524 delta = revlog.revdiff(base, rev)
514 525 p1n, p2n = revlog.parents(node)
515 526 basenode = revlog.node(base)
516 527 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
517 528 meta += prefix
518 529 l = len(meta) + len(delta)
519 530 yield chunkheader(l)
520 531 yield meta
521 532 yield delta
522 533 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
523 534 # do nothing with basenode, it is implicitly the previous one in HG10
524 535 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
525 536
526 537 class cg2packer(cg1packer):
527 538 version = '02'
528 539 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
529 540
530 541 def __init__(self, repo, bundlecaps=None):
531 542 super(cg2packer, self).__init__(repo, bundlecaps)
532 543 if self._reorder is None:
533 544 # Since generaldelta is directly supported by cg2, reordering
534 545 # generally doesn't help, so we disable it by default (treating
535 546 # bundle.reorder=auto just like bundle.reorder=False).
536 547 self._reorder = False
537 548
538 549 def deltaparent(self, revlog, rev, p1, p2, prev):
539 550 dp = revlog.deltaparent(rev)
540 551 # avoid storing full revisions; pick prev in those cases
541 552 # also pick prev when we can't be sure remote has dp
542 553 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
543 554 return prev
544 555 return dp
545 556
546 557 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
547 558 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
548 559
549 560 packermap = {'01': (cg1packer, cg1unpacker),
550 561 '02': (cg2packer, cg2unpacker)}
551 562
552 563 def _changegroupinfo(repo, nodes, source):
553 564 if repo.ui.verbose or source == 'bundle':
554 565 repo.ui.status(_("%d changesets found\n") % len(nodes))
555 566 if repo.ui.debugflag:
556 567 repo.ui.debug("list of changesets:\n")
557 568 for node in nodes:
558 569 repo.ui.debug("%s\n" % hex(node))
559 570
560 571 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
561 572 repo = repo.unfiltered()
562 573 commonrevs = outgoing.common
563 574 csets = outgoing.missing
564 575 heads = outgoing.missingheads
565 576 # We go through the fast path if we get told to, or if all (unfiltered
566 577 # heads have been requested (since we then know there all linkrevs will
567 578 # be pulled by the client).
568 579 heads.sort()
569 580 fastpathlinkrev = fastpath or (
570 581 repo.filtername is None and heads == sorted(repo.heads()))
571 582
572 583 repo.hook('preoutgoing', throw=True, source=source)
573 584 _changegroupinfo(repo, csets, source)
574 585 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
575 586
576 587 def getsubset(repo, outgoing, bundler, source, fastpath=False, version='01'):
577 588 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
578 589 return packermap[version][1](util.chunkbuffer(gengroup), None)
579 590
580 591 def changegroupsubset(repo, roots, heads, source, version='01'):
581 592 """Compute a changegroup consisting of all the nodes that are
582 593 descendants of any of the roots and ancestors of any of the heads.
583 594 Return a chunkbuffer object whose read() method will return
584 595 successive changegroup chunks.
585 596
586 597 It is fairly complex as determining which filenodes and which
587 598 manifest nodes need to be included for the changeset to be complete
588 599 is non-trivial.
589 600
590 601 Another wrinkle is doing the reverse, figuring out which changeset in
591 602 the changegroup a particular filenode or manifestnode belongs to.
592 603 """
593 604 cl = repo.changelog
594 605 if not roots:
595 606 roots = [nullid]
596 607 discbases = []
597 608 for n in roots:
598 609 discbases.extend([p for p in cl.parents(n) if p != nullid])
599 610 # TODO: remove call to nodesbetween.
600 611 csets, roots, heads = cl.nodesbetween(roots, heads)
601 612 included = set(csets)
602 613 discbases = [n for n in discbases if n not in included]
603 614 outgoing = discovery.outgoing(cl, discbases, heads)
604 615 bundler = packermap[version][0](repo)
605 616 return getsubset(repo, outgoing, bundler, source, version=version)
606 617
607 618 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
608 619 version='01'):
609 620 """Like getbundle, but taking a discovery.outgoing as an argument.
610 621
611 622 This is only implemented for local repos and reuses potentially
612 623 precomputed sets in outgoing. Returns a raw changegroup generator."""
613 624 if not outgoing.missing:
614 625 return None
615 626 bundler = packermap[version][0](repo, bundlecaps)
616 627 return getsubsetraw(repo, outgoing, bundler, source)
617 628
618 629 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
619 630 version='01'):
620 631 """Like getbundle, but taking a discovery.outgoing as an argument.
621 632
622 633 This is only implemented for local repos and reuses potentially
623 634 precomputed sets in outgoing."""
624 635 if not outgoing.missing:
625 636 return None
626 637 bundler = packermap[version][0](repo, bundlecaps)
627 638 return getsubset(repo, outgoing, bundler, source)
628 639
629 640 def computeoutgoing(repo, heads, common):
630 641 """Computes which revs are outgoing given a set of common
631 642 and a set of heads.
632 643
633 644 This is a separate function so extensions can have access to
634 645 the logic.
635 646
636 647 Returns a discovery.outgoing object.
637 648 """
638 649 cl = repo.changelog
639 650 if common:
640 651 hasnode = cl.hasnode
641 652 common = [n for n in common if hasnode(n)]
642 653 else:
643 654 common = [nullid]
644 655 if not heads:
645 656 heads = cl.heads()
646 657 return discovery.outgoing(cl, common, heads)
647 658
648 659 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
649 660 version='01'):
650 661 """Like changegroupsubset, but returns the set difference between the
651 662 ancestors of heads and the ancestors common.
652 663
653 664 If heads is None, use the local heads. If common is None, use [nullid].
654 665
655 666 The nodes in common might not all be known locally due to the way the
656 667 current discovery protocol works.
657 668 """
658 669 outgoing = computeoutgoing(repo, heads, common)
659 670 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
660 671 version=version)
661 672
662 673 def changegroup(repo, basenodes, source):
663 674 # to avoid a race we use changegroupsubset() (issue1320)
664 675 return changegroupsubset(repo, basenodes, repo.heads(), source)
665 676
666 677 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
667 678 revisions = 0
668 679 files = 0
669 680 while True:
670 681 chunkdata = source.filelogheader()
671 682 if not chunkdata:
672 683 break
673 684 f = chunkdata["filename"]
674 685 repo.ui.debug("adding %s revisions\n" % f)
675 686 pr()
676 687 fl = repo.file(f)
677 688 o = len(fl)
678 689 try:
679 690 if not fl.addgroup(source, revmap, trp):
680 691 raise util.Abort(_("received file revlog group is empty"))
681 692 except error.CensoredBaseError as e:
682 693 raise util.Abort(_("received delta base is censored: %s") % e)
683 694 revisions += len(fl) - o
684 695 files += 1
685 696 if f in needfiles:
686 697 needs = needfiles[f]
687 698 for new in xrange(o, len(fl)):
688 699 n = fl.node(new)
689 700 if n in needs:
690 701 needs.remove(n)
691 702 else:
692 703 raise util.Abort(
693 704 _("received spurious file revlog entry"))
694 705 if not needs:
695 706 del needfiles[f]
696 707 repo.ui.progress(_('files'), None)
697 708
698 709 for f, needs in needfiles.iteritems():
699 710 fl = repo.file(f)
700 711 for n in needs:
701 712 try:
702 713 fl.rev(n)
703 714 except error.LookupError:
704 715 raise util.Abort(
705 716 _('missing file data for %s:%s - run hg verify') %
706 717 (f, hex(n)))
707 718
708 719 return revisions, files
709 720
710 721 def addchangegroup(repo, source, srctype, url, emptyok=False,
711 722 targetphase=phases.draft, expectedtotal=None):
712 723 """Add the changegroup returned by source.read() to this repo.
713 724 srctype is a string like 'push', 'pull', or 'unbundle'. url is
714 725 the URL of the repo where this changegroup is coming from.
715 726
716 727 Return an integer summarizing the change to this repo:
717 728 - nothing changed or no source: 0
718 729 - more heads than before: 1+added heads (2..n)
719 730 - fewer heads than before: -1-removed heads (-2..-n)
720 731 - number of heads stays the same: 1
721 732 """
722 733 repo = repo.unfiltered()
723 734 def csmap(x):
724 735 repo.ui.debug("add changeset %s\n" % short(x))
725 736 return len(cl)
726 737
727 738 def revmap(x):
728 739 return cl.rev(x)
729 740
730 741 if not source:
731 742 return 0
732 743
733 744 changesets = files = revisions = 0
734 745
735 746 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
736 747 # The transaction could have been created before and already carries source
737 748 # information. In this case we use the top level data. We overwrite the
738 749 # argument because we need to use the top level value (if they exist) in
739 750 # this function.
740 751 srctype = tr.hookargs.setdefault('source', srctype)
741 752 url = tr.hookargs.setdefault('url', url)
742 753
743 754 # write changelog data to temp files so concurrent readers will not see
744 755 # inconsistent view
745 756 cl = repo.changelog
746 757 cl.delayupdate(tr)
747 758 oldheads = cl.heads()
748 759 try:
749 760 repo.hook('prechangegroup', throw=True, **tr.hookargs)
750 761
751 762 trp = weakref.proxy(tr)
752 763 # pull off the changeset group
753 764 repo.ui.status(_("adding changesets\n"))
754 765 clstart = len(cl)
755 766 class prog(object):
756 767 def __init__(self, step, total):
757 768 self._step = step
758 769 self._total = total
759 770 self._count = 1
760 771 def __call__(self):
761 772 repo.ui.progress(self._step, self._count, unit=_('chunks'),
762 773 total=self._total)
763 774 self._count += 1
764 775 source.callback = prog(_('changesets'), expectedtotal)
765 776
766 777 efiles = set()
767 778 def onchangelog(cl, node):
768 779 efiles.update(cl.read(node)[3])
769 780
770 781 source.changelogheader()
771 782 srccontent = cl.addgroup(source, csmap, trp,
772 783 addrevisioncb=onchangelog)
773 784 efiles = len(efiles)
774 785
775 786 if not (srccontent or emptyok):
776 787 raise util.Abort(_("received changelog group is empty"))
777 788 clend = len(cl)
778 789 changesets = clend - clstart
779 790 repo.ui.progress(_('changesets'), None)
780 791
781 792 # pull off the manifest group
782 793 repo.ui.status(_("adding manifests\n"))
783 794 # manifests <= changesets
784 795 source.callback = prog(_('manifests'), changesets)
785 796 # no need to check for empty manifest group here:
786 797 # if the result of the merge of 1 and 2 is the same in 3 and 4,
787 798 # no new manifest will be created and the manifest group will
788 799 # be empty during the pull
789 800 source.manifestheader()
790 801 repo.manifest.addgroup(source, revmap, trp)
791 802 repo.ui.progress(_('manifests'), None)
792 803
793 804 needfiles = {}
794 805 if repo.ui.configbool('server', 'validate', default=False):
795 806 # validate incoming csets have their manifests
796 807 for cset in xrange(clstart, clend):
797 808 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
798 809 mfest = repo.manifest.readdelta(mfnode)
799 810 # store file nodes we must see
800 811 for f, n in mfest.iteritems():
801 812 needfiles.setdefault(f, set()).add(n)
802 813
803 814 # process the files
804 815 repo.ui.status(_("adding file changes\n"))
805 816 source.callback = None
806 817 pr = prog(_('files'), efiles)
807 818 newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
808 819 needfiles)
809 820 revisions += newrevs
810 821 files += newfiles
811 822
812 823 dh = 0
813 824 if oldheads:
814 825 heads = cl.heads()
815 826 dh = len(heads) - len(oldheads)
816 827 for h in heads:
817 828 if h not in oldheads and repo[h].closesbranch():
818 829 dh -= 1
819 830 htext = ""
820 831 if dh:
821 832 htext = _(" (%+d heads)") % dh
822 833
823 834 repo.ui.status(_("added %d changesets"
824 835 " with %d changes to %d files%s\n")
825 836 % (changesets, revisions, files, htext))
826 837 repo.invalidatevolatilesets()
827 838
828 839 if changesets > 0:
829 840 p = lambda: tr.writepending() and repo.root or ""
830 841 if 'node' not in tr.hookargs:
831 842 tr.hookargs['node'] = hex(cl.node(clstart))
832 843 hookargs = dict(tr.hookargs)
833 844 else:
834 845 hookargs = dict(tr.hookargs)
835 846 hookargs['node'] = hex(cl.node(clstart))
836 847 repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
837 848
838 849 added = [cl.node(r) for r in xrange(clstart, clend)]
839 850 publishing = repo.publishing()
840 851 if srctype in ('push', 'serve'):
841 852 # Old servers can not push the boundary themselves.
842 853 # New servers won't push the boundary if changeset already
843 854 # exists locally as secret
844 855 #
845 856 # We should not use added here but the list of all change in
846 857 # the bundle
847 858 if publishing:
848 859 phases.advanceboundary(repo, tr, phases.public, srccontent)
849 860 else:
850 861 # Those changesets have been pushed from the outside, their
851 862 # phases are going to be pushed alongside. Therefor
852 863 # `targetphase` is ignored.
853 864 phases.advanceboundary(repo, tr, phases.draft, srccontent)
854 865 phases.retractboundary(repo, tr, phases.draft, added)
855 866 elif srctype != 'strip':
856 867 # publishing only alter behavior during push
857 868 #
858 869 # strip should not touch boundary at all
859 870 phases.retractboundary(repo, tr, targetphase, added)
860 871
861 872 if changesets > 0:
862 873 if srctype != 'strip':
863 874 # During strip, branchcache is invalid but coming call to
864 875 # `destroyed` will repair it.
865 876 # In other case we can safely update cache on disk.
866 877 branchmap.updatecache(repo.filtered('served'))
867 878
868 879 def runhooks():
869 880 # These hooks run when the lock releases, not when the
870 881 # transaction closes. So it's possible for the changelog
871 882 # to have changed since we last saw it.
872 883 if clstart >= len(repo):
873 884 return
874 885
875 886 # forcefully update the on-disk branch cache
876 887 repo.ui.debug("updating the branch cache\n")
877 888 repo.hook("changegroup", **hookargs)
878 889
879 890 for n in added:
880 891 args = hookargs.copy()
881 892 args['node'] = hex(n)
882 893 repo.hook("incoming", **args)
883 894
884 895 newheads = [h for h in repo.heads() if h not in oldheads]
885 896 repo.ui.log("incoming",
886 897 "%s incoming changes - new heads: %s\n",
887 898 len(added),
888 899 ', '.join([hex(c[:6]) for c in newheads]))
889 900
890 901 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
891 902 lambda tr: repo._afterlock(runhooks))
892 903
893 904 tr.close()
894 905
895 906 finally:
896 907 tr.release()
897 908 repo.ui.flush()
898 909 # never return 0 here:
899 910 if dh < 0:
900 911 return dh - 1
901 912 else:
902 913 return dh + 1
General Comments 0
You need to be logged in to leave comments. Login now