##// END OF EJS Templates
bundle-ng: simplify bundle10.generate...
Sune Foldager -
r19206:6308896b default
parent child Browse files
Show More
@@ -1,417 +1,408 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import nullrev, hex
10 10 import mdiff, util, dagutil
11 11 import struct, os, bz2, zlib, tempfile
12 12
13 13 _BUNDLE10_DELTA_HEADER = "20s20s20s20s"
14 14
15 15 def readexactly(stream, n):
16 16 '''read n bytes from stream.read and abort if less was available'''
17 17 s = stream.read(n)
18 18 if len(s) < n:
19 19 raise util.Abort(_("stream ended unexpectedly"
20 20 " (got %d bytes, expected %d)")
21 21 % (len(s), n))
22 22 return s
23 23
24 24 def getchunk(stream):
25 25 """return the next chunk from stream as a string"""
26 26 d = readexactly(stream, 4)
27 27 l = struct.unpack(">l", d)[0]
28 28 if l <= 4:
29 29 if l:
30 30 raise util.Abort(_("invalid chunk length %d") % l)
31 31 return ""
32 32 return readexactly(stream, l - 4)
33 33
34 34 def chunkheader(length):
35 35 """return a changegroup chunk header (string)"""
36 36 return struct.pack(">l", length + 4)
37 37
38 38 def closechunk():
39 39 """return a changegroup chunk header (string) for a zero-length chunk"""
40 40 return struct.pack(">l", 0)
41 41
42 42 class nocompress(object):
43 43 def compress(self, x):
44 44 return x
45 45 def flush(self):
46 46 return ""
47 47
48 48 bundletypes = {
49 49 "": ("", nocompress), # only when using unbundle on ssh and old http servers
50 50 # since the unification ssh accepts a header but there
51 51 # is no capability signaling it.
52 52 "HG10UN": ("HG10UN", nocompress),
53 53 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
54 54 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
55 55 }
56 56
57 57 # hgweb uses this list to communicate its preferred type
58 58 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
59 59
60 60 def writebundle(cg, filename, bundletype):
61 61 """Write a bundle file and return its filename.
62 62
63 63 Existing files will not be overwritten.
64 64 If no filename is specified, a temporary file is created.
65 65 bz2 compression can be turned off.
66 66 The bundle file will be deleted in case of errors.
67 67 """
68 68
69 69 fh = None
70 70 cleanup = None
71 71 try:
72 72 if filename:
73 73 fh = open(filename, "wb")
74 74 else:
75 75 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
76 76 fh = os.fdopen(fd, "wb")
77 77 cleanup = filename
78 78
79 79 header, compressor = bundletypes[bundletype]
80 80 fh.write(header)
81 81 z = compressor()
82 82
83 83 # parse the changegroup data, otherwise we will block
84 84 # in case of sshrepo because we don't know the end of the stream
85 85
86 86 # an empty chunkgroup is the end of the changegroup
87 87 # a changegroup has at least 2 chunkgroups (changelog and manifest).
88 88 # after that, an empty chunkgroup is the end of the changegroup
89 89 empty = False
90 90 count = 0
91 91 while not empty or count <= 2:
92 92 empty = True
93 93 count += 1
94 94 while True:
95 95 chunk = getchunk(cg)
96 96 if not chunk:
97 97 break
98 98 empty = False
99 99 fh.write(z.compress(chunkheader(len(chunk))))
100 100 pos = 0
101 101 while pos < len(chunk):
102 102 next = pos + 2**20
103 103 fh.write(z.compress(chunk[pos:next]))
104 104 pos = next
105 105 fh.write(z.compress(closechunk()))
106 106 fh.write(z.flush())
107 107 cleanup = None
108 108 return filename
109 109 finally:
110 110 if fh is not None:
111 111 fh.close()
112 112 if cleanup is not None:
113 113 os.unlink(cleanup)
114 114
115 115 def decompressor(fh, alg):
116 116 if alg == 'UN':
117 117 return fh
118 118 elif alg == 'GZ':
119 119 def generator(f):
120 120 zd = zlib.decompressobj()
121 121 for chunk in util.filechunkiter(f):
122 122 yield zd.decompress(chunk)
123 123 elif alg == 'BZ':
124 124 def generator(f):
125 125 zd = bz2.BZ2Decompressor()
126 126 zd.decompress("BZ")
127 127 for chunk in util.filechunkiter(f, 4096):
128 128 yield zd.decompress(chunk)
129 129 else:
130 130 raise util.Abort("unknown bundle compression '%s'" % alg)
131 131 return util.chunkbuffer(generator(fh))
132 132
133 133 class unbundle10(object):
134 134 deltaheader = _BUNDLE10_DELTA_HEADER
135 135 deltaheadersize = struct.calcsize(deltaheader)
136 136 def __init__(self, fh, alg):
137 137 self._stream = decompressor(fh, alg)
138 138 self._type = alg
139 139 self.callback = None
140 140 def compressed(self):
141 141 return self._type != 'UN'
142 142 def read(self, l):
143 143 return self._stream.read(l)
144 144 def seek(self, pos):
145 145 return self._stream.seek(pos)
146 146 def tell(self):
147 147 return self._stream.tell()
148 148 def close(self):
149 149 return self._stream.close()
150 150
151 151 def chunklength(self):
152 152 d = readexactly(self._stream, 4)
153 153 l = struct.unpack(">l", d)[0]
154 154 if l <= 4:
155 155 if l:
156 156 raise util.Abort(_("invalid chunk length %d") % l)
157 157 return 0
158 158 if self.callback:
159 159 self.callback()
160 160 return l - 4
161 161
162 162 def changelogheader(self):
163 163 """v10 does not have a changelog header chunk"""
164 164 return {}
165 165
166 166 def manifestheader(self):
167 167 """v10 does not have a manifest header chunk"""
168 168 return {}
169 169
170 170 def filelogheader(self):
171 171 """return the header of the filelogs chunk, v10 only has the filename"""
172 172 l = self.chunklength()
173 173 if not l:
174 174 return {}
175 175 fname = readexactly(self._stream, l)
176 176 return dict(filename=fname)
177 177
178 178 def _deltaheader(self, headertuple, prevnode):
179 179 node, p1, p2, cs = headertuple
180 180 if prevnode is None:
181 181 deltabase = p1
182 182 else:
183 183 deltabase = prevnode
184 184 return node, p1, p2, deltabase, cs
185 185
186 186 def deltachunk(self, prevnode):
187 187 l = self.chunklength()
188 188 if not l:
189 189 return {}
190 190 headerdata = readexactly(self._stream, self.deltaheadersize)
191 191 header = struct.unpack(self.deltaheader, headerdata)
192 192 delta = readexactly(self._stream, l - self.deltaheadersize)
193 193 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
194 194 return dict(node=node, p1=p1, p2=p2, cs=cs,
195 195 deltabase=deltabase, delta=delta)
196 196
197 197 class headerlessfixup(object):
198 198 def __init__(self, fh, h):
199 199 self._h = h
200 200 self._fh = fh
201 201 def read(self, n):
202 202 if self._h:
203 203 d, self._h = self._h[:n], self._h[n:]
204 204 if len(d) < n:
205 205 d += readexactly(self._fh, n - len(d))
206 206 return d
207 207 return readexactly(self._fh, n)
208 208
209 209 def readbundle(fh, fname):
210 210 header = readexactly(fh, 6)
211 211
212 212 if not fname:
213 213 fname = "stream"
214 214 if not header.startswith('HG') and header.startswith('\0'):
215 215 fh = headerlessfixup(fh, header)
216 216 header = "HG10UN"
217 217
218 218 magic, version, alg = header[0:2], header[2:4], header[4:6]
219 219
220 220 if magic != 'HG':
221 221 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
222 222 if version != '10':
223 223 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
224 224 return unbundle10(fh, alg)
225 225
226 226 class bundle10(object):
227 227 deltaheader = _BUNDLE10_DELTA_HEADER
228 228 def __init__(self, repo, bundlecaps=None):
229 229 """Given a source repo, construct a bundler.
230 230
231 231 bundlecaps is optional and can be used to specify the set of
232 232 capabilities which can be used to build the bundle.
233 233 """
234 234 # Set of capabilities we can use to build the bundle.
235 235 if bundlecaps is None:
236 236 bundlecaps = set()
237 237 self._bundlecaps = bundlecaps
238 238 self._changelog = repo.changelog
239 239 self._manifest = repo.manifest
240 240 reorder = repo.ui.config('bundle', 'reorder', 'auto')
241 241 if reorder == 'auto':
242 242 reorder = None
243 243 else:
244 244 reorder = util.parsebool(reorder)
245 245 self._repo = repo
246 246 self._reorder = reorder
247 247 self.count = [0, 0]
248 def start(self, lookup):
249 self._lookup = lookup
250 248 def close(self):
251 249 return closechunk()
252 250
253 251 def fileheader(self, fname):
254 252 return chunkheader(len(fname)) + fname
255 253
256 254 def group(self, nodelist, revlog, reorder=None):
257 255 """Calculate a delta group, yielding a sequence of changegroup chunks
258 256 (strings).
259 257
260 258 Given a list of changeset revs, return a set of deltas and
261 259 metadata corresponding to nodes. The first delta is
262 260 first parent(nodelist[0]) -> nodelist[0], the receiver is
263 261 guaranteed to have this parent as it has all history before
264 262 these changesets. In the case firstparent is nullrev the
265 263 changegroup starts with a full revision.
266 264 """
267 265
268 266 # if we don't have any revisions touched by these changesets, bail
269 267 if len(nodelist) == 0:
270 268 yield self.close()
271 269 return
272 270
273 271 # for generaldelta revlogs, we linearize the revs; this will both be
274 272 # much quicker and generate a much smaller bundle
275 273 if (revlog._generaldelta and reorder is not False) or reorder:
276 274 dag = dagutil.revlogdag(revlog)
277 275 revs = set(revlog.rev(n) for n in nodelist)
278 276 revs = dag.linearize(revs)
279 277 else:
280 278 revs = sorted([revlog.rev(n) for n in nodelist])
281 279
282 280 # add the parent of the first rev
283 281 p = revlog.parentrevs(revs[0])[0]
284 282 revs.insert(0, p)
285 283
286 284 # build deltas
287 285 for r in xrange(len(revs) - 1):
288 286 prev, curr = revs[r], revs[r + 1]
289 287 for c in self.revchunk(revlog, curr, prev):
290 288 yield c
291 289
292 290 yield self.close()
293 291
294 292 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
295 293 '''yield a sequence of changegroup chunks (strings)'''
296 294 repo = self._repo
297 295 cl = self._changelog
298 296 mf = self._manifest
299 297 reorder = self._reorder
300 298 progress = repo.ui.progress
301 299 count = self.count
302 300 _bundling = _('bundling')
303 301 _changesets = _('changesets')
304 302 _manifests = _('manifests')
305 303 _files = _('files')
306 304
307 305 mfs = {} # needed manifests
308 306 fnodes = {} # needed file nodes
309 307 changedfiles = set()
310 308 fstate = ['', {}]
311 309
312 310 # filter any nodes that claim to be part of the known set
313 311 def prune(revlog, missing):
314 312 rr, rl = revlog.rev, revlog.linkrev
315 313 return [n for n in missing
316 314 if rl(rr(n)) not in commonrevs]
317 315
318 316 def lookup(revlog, x):
319 317 if revlog == cl:
320 318 c = cl.read(x)
321 319 changedfiles.update(c[3])
322 320 mfs.setdefault(c[0], x)
323 321 count[0] += 1
324 322 progress(_bundling, count[0],
325 323 unit=_changesets, total=count[1])
326 324 return x
327 325 elif revlog == mf:
328 326 clnode = mfs[x]
329 327 if not fastpathlinkrev:
330 328 mdata = mf.readfast(x)
331 329 for f, n in mdata.iteritems():
332 330 if f in changedfiles:
333 331 fnodes[f].setdefault(n, clnode)
334 332 count[0] += 1
335 333 progress(_bundling, count[0],
336 334 unit=_manifests, total=count[1])
337 335 return clnode
338 336 else:
339 337 progress(_bundling, count[0], item=fstate[0],
340 338 unit=_files, total=count[1])
341 339 return fstate[1][x]
342 340
343 self.start(lookup)
341 self._lookup = lookup
342
343 count[:] = [0, len(clnodes)]
344 for chunk in self.group(clnodes, cl, reorder=reorder):
345 yield chunk
346 progress(_bundling, None)
344 347
345 def getmfnodes():
346 for f in changedfiles:
347 fnodes[f] = {}
348 count[:] = [0, len(mfs)]
349 return prune(mf, mfs)
350 def getfiles():
351 mfs.clear()
352 return changedfiles
353 def getfilenodes(fname, filerevlog):
348 for f in changedfiles:
349 fnodes[f] = {}
350 count[:] = [0, len(mfs)]
351 mfnodes = prune(mf, mfs)
352 for chunk in self.group(mfnodes, mf, reorder=reorder):
353 yield chunk
354 progress(_bundling, None)
355
356 mfs.clear()
357 count[:] = [0, len(changedfiles)]
358 for fname in sorted(changedfiles):
359 filerevlog = repo.file(fname)
360 if not len(filerevlog):
361 raise util.Abort(_("empty or missing revlog for %s")
362 % fname)
363
354 364 if fastpathlinkrev:
355 365 ln, llr = filerevlog.node, filerevlog.linkrev
356 366 def genfilenodes():
357 367 for r in filerevlog:
358 368 linkrev = llr(r)
359 369 if linkrev not in commonrevs:
360 370 yield filerevlog.node(r), cl.node(linkrev)
361 371 fnodes[fname] = dict(genfilenodes())
362 372 fstate[0] = fname
363 373 fstate[1] = fnodes.pop(fname, {})
364 return prune(filerevlog, fstate[1])
365
366
367 count[:] = [0, len(clnodes)]
368 for chunk in self.group(clnodes, cl, reorder=reorder):
369 yield chunk
370 progress(_bundling, None)
371
372 for chunk in self.group(getmfnodes(), mf, reorder=reorder):
373 yield chunk
374 progress(_bundling, None)
375
376 changedfiles = getfiles()
377 count[:] = [0, len(changedfiles)]
378 for fname in sorted(changedfiles):
379 filerevlog = repo.file(fname)
380 if not len(filerevlog):
381 raise util.Abort(_("empty or missing revlog for %s")
382 % fname)
383 nodelist = getfilenodes(fname, filerevlog)
384 if nodelist:
374 filenodes = prune(filerevlog, fstate[1])
375 if filenodes:
385 376 count[0] += 1
386 377 yield self.fileheader(fname)
387 for chunk in self.group(nodelist, filerevlog, reorder):
378 for chunk in self.group(filenodes, filerevlog, reorder):
388 379 yield chunk
389 380 yield self.close()
390 381 progress(_bundling, None)
391 382
392 383 if clnodes:
393 384 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
394 385
395 386 def revchunk(self, revlog, rev, prev):
396 387 node = revlog.node(rev)
397 388 p1, p2 = revlog.parentrevs(rev)
398 389 base = prev
399 390
400 391 prefix = ''
401 392 if base == nullrev:
402 393 delta = revlog.revision(node)
403 394 prefix = mdiff.trivialdiffheader(len(delta))
404 395 else:
405 396 delta = revlog.revdiff(base, rev)
406 397 linknode = self._lookup(revlog, node)
407 398 p1n, p2n = revlog.parents(node)
408 399 basenode = revlog.node(base)
409 400 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
410 401 meta += prefix
411 402 l = len(meta) + len(delta)
412 403 yield chunkheader(l)
413 404 yield meta
414 405 yield delta
415 406 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
416 407 # do nothing with basenode, it is implicitly the previous one in HG10
417 408 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
General Comments 0
You need to be logged in to leave comments. Login now