##// END OF EJS Templates
bundle: make unbundle object seekable...
Matt Mackall -
r12330:e527b863 default
parent child Browse files
Show More
@@ -1,177 +1,181 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 import util
10 10 import struct, os, bz2, zlib, tempfile
11 11
12 12 def getchunk(source):
13 13 """return the next chunk from changegroup 'source' as a string"""
14 14 d = source.read(4)
15 15 if not d:
16 16 return ""
17 17 l = struct.unpack(">l", d)[0]
18 18 if l <= 4:
19 19 return ""
20 20 d = source.read(l - 4)
21 21 if len(d) < l - 4:
22 22 raise util.Abort(_("premature EOF reading chunk"
23 23 " (got %d bytes, expected %d)")
24 24 % (len(d), l - 4))
25 25 return d
26 26
27 27 def chunkiter(source, progress=None):
28 28 """iterate through the chunks in source, yielding a sequence of chunks
29 29 (strings)"""
30 30 while 1:
31 31 c = getchunk(source)
32 32 if not c:
33 33 break
34 34 elif progress is not None:
35 35 progress()
36 36 yield c
37 37
38 38 def chunkheader(length):
39 39 """return a changegroup chunk header (string)"""
40 40 return struct.pack(">l", length + 4)
41 41
42 42 def closechunk():
43 43 """return a changegroup chunk header (string) for a zero-length chunk"""
44 44 return struct.pack(">l", 0)
45 45
46 46 class nocompress(object):
47 47 def compress(self, x):
48 48 return x
49 49 def flush(self):
50 50 return ""
51 51
52 52 bundletypes = {
53 53 "": ("", nocompress),
54 54 "HG10UN": ("HG10UN", nocompress),
55 55 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
56 56 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
57 57 }
58 58
59 59 def collector(cl, mmfs, files):
60 60 # Gather information about changeset nodes going out in a bundle.
61 61 # We want to gather manifests needed and filelogs affected.
62 62 def collect(node):
63 63 c = cl.read(node)
64 64 files.update(c[3])
65 65 mmfs.setdefault(c[0], node)
66 66 return collect
67 67
68 68 # hgweb uses this list to communicate its preferred type
69 69 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
70 70
71 71 def writebundle(cg, filename, bundletype):
72 72 """Write a bundle file and return its filename.
73 73
74 74 Existing files will not be overwritten.
75 75 If no filename is specified, a temporary file is created.
76 76 bz2 compression can be turned off.
77 77 The bundle file will be deleted in case of errors.
78 78 """
79 79
80 80 fh = None
81 81 cleanup = None
82 82 try:
83 83 if filename:
84 84 fh = open(filename, "wb")
85 85 else:
86 86 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
87 87 fh = os.fdopen(fd, "wb")
88 88 cleanup = filename
89 89
90 90 header, compressor = bundletypes[bundletype]
91 91 fh.write(header)
92 92 z = compressor()
93 93
94 94 # parse the changegroup data, otherwise we will block
95 95 # in case of sshrepo because we don't know the end of the stream
96 96
97 97 # an empty chunkiter is the end of the changegroup
98 98 # a changegroup has at least 2 chunkiters (changelog and manifest).
99 99 # after that, an empty chunkiter is the end of the changegroup
100 100 empty = False
101 101 count = 0
102 102 while not empty or count <= 2:
103 103 empty = True
104 104 count += 1
105 105 for chunk in chunkiter(cg):
106 106 empty = False
107 107 fh.write(z.compress(chunkheader(len(chunk))))
108 108 pos = 0
109 109 while pos < len(chunk):
110 110 next = pos + 2**20
111 111 fh.write(z.compress(chunk[pos:next]))
112 112 pos = next
113 113 fh.write(z.compress(closechunk()))
114 114 fh.write(z.flush())
115 115 cleanup = None
116 116 return filename
117 117 finally:
118 118 if fh is not None:
119 119 fh.close()
120 120 if cleanup is not None:
121 121 os.unlink(cleanup)
122 122
123 123 def decompressor(fh, alg):
124 124 if alg == 'UN':
125 125 return fh
126 126 elif alg == 'GZ':
127 127 def generator(f):
128 128 zd = zlib.decompressobj()
129 129 for chunk in f:
130 130 yield zd.decompress(chunk)
131 131 elif alg == 'BZ':
132 132 def generator(f):
133 133 zd = bz2.BZ2Decompressor()
134 134 zd.decompress("BZ")
135 135 for chunk in util.filechunkiter(f, 4096):
136 136 yield zd.decompress(chunk)
137 137 else:
138 138 raise util.Abort("unknown bundle compression '%s'" % alg)
139 139 return util.chunkbuffer(generator(fh))
140 140
141 141 class unbundle10(object):
142 142 def __init__(self, fh, alg):
143 143 self._stream = decompressor(fh, alg)
144 144 self._type = alg
145 145 def compressed(self):
146 146 return self._type != 'UN'
147 147 def read(self, l):
148 148 return self._stream.read(l)
149 def seek(self, pos):
150 return self._stream.seek(pos)
151 def tell(self):
152 return self._stream.tell(pos)
149 153
150 154 class headerlessfixup(object):
151 155 def __init__(self, fh, h):
152 156 self._h = h
153 157 self._fh = fh
154 158 def read(self, n):
155 159 if self._h:
156 160 d, self._h = self._h[:n], self._h[n:]
157 161 if len(d) < n:
158 162 d += self._fh.read(n - len(d))
159 163 return d
160 164 return self._fh.read(n)
161 165
162 166 def readbundle(fh, fname):
163 167 header = fh.read(6)
164 168
165 169 if not fname:
166 170 fname = "stream"
167 171 if not header.startswith('HG') and header.startswith('\0'):
168 172 fh = headerlessfixup(fh, header)
169 173 header = "HG10UN"
170 174
171 175 magic, version, alg = header[0:2], header[2:4], header[4:6]
172 176
173 177 if magic != 'HG':
174 178 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
175 179 if version != '10':
176 180 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
177 181 return unbundle10(fh, alg)
General Comments 0
You need to be logged in to leave comments. Login now