##// END OF EJS Templates
changegroup: verify all stream reads...
Mads Kiilerich -
r13457:e74fe15d stable
parent child Browse files
Show More
@@ -1,200 +1,199 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 import util
10 10 import struct, os, bz2, zlib, tempfile
11 11
12 def getchunk(source):
13 """return the next chunk from changegroup 'source' as a string"""
14 d = source.read(4)
12 def readexactly(stream, n):
13 '''read n bytes from stream.read and abort if less was available'''
14 s = stream.read(n)
15 if len(s) < n:
16 raise util.Abort(_("stream ended unexpectedly"
17 " (got %d bytes, expected %d)")
18 % (len(s), n))
19 return s
20
21 def getchunk(stream):
22 """return the next chunk from stream as a string"""
23 d = readexactly(stream, 4)
15 24 l = struct.unpack(">l", d)[0]
16 25 if l <= 4:
17 26 return ""
18 d = source.read(l - 4)
19 if len(d) < l - 4:
20 raise util.Abort(_("premature EOF reading chunk"
21 " (got %d bytes, expected %d)")
22 % (len(d), l - 4))
23 return d
27 return readexactly(stream, l - 4)
24 28
25 29 def chunkheader(length):
26 30 """return a changegroup chunk header (string)"""
27 31 return struct.pack(">l", length + 4)
28 32
29 33 def closechunk():
30 34 """return a changegroup chunk header (string) for a zero-length chunk"""
31 35 return struct.pack(">l", 0)
32 36
33 37 class nocompress(object):
34 38 def compress(self, x):
35 39 return x
36 40 def flush(self):
37 41 return ""
38 42
39 43 bundletypes = {
40 44 "": ("", nocompress),
41 45 "HG10UN": ("HG10UN", nocompress),
42 46 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
43 47 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
44 48 }
45 49
46 50 def collector(cl, mmfs, files):
47 51 # Gather information about changeset nodes going out in a bundle.
48 52 # We want to gather manifests needed and filelogs affected.
49 53 def collect(node):
50 54 c = cl.read(node)
51 55 files.update(c[3])
52 56 mmfs.setdefault(c[0], node)
53 57 return collect
54 58
55 59 # hgweb uses this list to communicate its preferred type
56 60 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
57 61
58 62 def writebundle(cg, filename, bundletype):
59 63 """Write a bundle file and return its filename.
60 64
61 65 Existing files will not be overwritten.
62 66 If no filename is specified, a temporary file is created.
63 67 bz2 compression can be turned off.
64 68 The bundle file will be deleted in case of errors.
65 69 """
66 70
67 71 fh = None
68 72 cleanup = None
69 73 try:
70 74 if filename:
71 75 fh = open(filename, "wb")
72 76 else:
73 77 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
74 78 fh = os.fdopen(fd, "wb")
75 79 cleanup = filename
76 80
77 81 header, compressor = bundletypes[bundletype]
78 82 fh.write(header)
79 83 z = compressor()
80 84
81 85 # parse the changegroup data, otherwise we will block
82 86 # in case of sshrepo because we don't know the end of the stream
83 87
84 88 # an empty chunkgroup is the end of the changegroup
85 89 # a changegroup has at least 2 chunkgroups (changelog and manifest).
86 90 # after that, an empty chunkgroup is the end of the changegroup
87 91 empty = False
88 92 count = 0
89 93 while not empty or count <= 2:
90 94 empty = True
91 95 count += 1
92 96 while 1:
93 97 chunk = getchunk(cg)
94 98 if not chunk:
95 99 break
96 100 empty = False
97 101 fh.write(z.compress(chunkheader(len(chunk))))
98 102 pos = 0
99 103 while pos < len(chunk):
100 104 next = pos + 2**20
101 105 fh.write(z.compress(chunk[pos:next]))
102 106 pos = next
103 107 fh.write(z.compress(closechunk()))
104 108 fh.write(z.flush())
105 109 cleanup = None
106 110 return filename
107 111 finally:
108 112 if fh is not None:
109 113 fh.close()
110 114 if cleanup is not None:
111 115 os.unlink(cleanup)
112 116
113 117 def decompressor(fh, alg):
114 118 if alg == 'UN':
115 119 return fh
116 120 elif alg == 'GZ':
117 121 def generator(f):
118 122 zd = zlib.decompressobj()
119 123 for chunk in f:
120 124 yield zd.decompress(chunk)
121 125 elif alg == 'BZ':
122 126 def generator(f):
123 127 zd = bz2.BZ2Decompressor()
124 128 zd.decompress("BZ")
125 129 for chunk in util.filechunkiter(f, 4096):
126 130 yield zd.decompress(chunk)
127 131 else:
128 132 raise util.Abort("unknown bundle compression '%s'" % alg)
129 133 return util.chunkbuffer(generator(fh))
130 134
131 135 class unbundle10(object):
132 136 def __init__(self, fh, alg):
133 137 self._stream = decompressor(fh, alg)
134 138 self._type = alg
135 139 self.callback = None
136 140 def compressed(self):
137 141 return self._type != 'UN'
138 142 def read(self, l):
139 143 return self._stream.read(l)
140 144 def seek(self, pos):
141 145 return self._stream.seek(pos)
142 146 def tell(self):
143 147 return self._stream.tell()
144 148 def close(self):
145 149 return self._stream.close()
146 150
147 151 def chunklength(self):
148 d = self.read(4)
152 d = readexactly(self._stream, 4)
149 153 l = max(0, struct.unpack(">l", d)[0] - 4)
150 154 if l and self.callback:
151 155 self.callback()
152 156 return l
153 157
154 158 def chunk(self):
155 159 """return the next chunk from changegroup 'source' as a string"""
156 160 l = self.chunklength()
157 d = self.read(l)
158 if len(d) < l:
159 raise util.Abort(_("premature EOF reading chunk"
160 " (got %d bytes, expected %d)")
161 % (len(d), l))
162 return d
161 return readexactly(self._stream, l)
163 162
164 163 def parsechunk(self):
165 164 l = self.chunklength()
166 165 if not l:
167 166 return {}
168 h = self.read(80)
167 h = readexactly(self._stream, 80)
169 168 node, p1, p2, cs = struct.unpack("20s20s20s20s", h)
170 data = self.read(l - 80)
169 data = readexactly(self._stream, l - 80)
171 170 return dict(node=node, p1=p1, p2=p2, cs=cs, data=data)
172 171
173 172 class headerlessfixup(object):
174 173 def __init__(self, fh, h):
175 174 self._h = h
176 175 self._fh = fh
177 176 def read(self, n):
178 177 if self._h:
179 178 d, self._h = self._h[:n], self._h[n:]
180 179 if len(d) < n:
181 d += self._fh.read(n - len(d))
180 d += readexactly(self._fh, n - len(d))
182 181 return d
183 return self._fh.read(n)
182 return readexactly(self._fh, n)
184 183
185 184 def readbundle(fh, fname):
186 header = fh.read(6)
185 header = readexactly(fh, 6)
187 186
188 187 if not fname:
189 188 fname = "stream"
190 189 if not header.startswith('HG') and header.startswith('\0'):
191 190 fh = headerlessfixup(fh, header)
192 191 header = "HG10UN"
193 192
194 193 magic, version, alg = header[0:2], header[2:4], header[4:6]
195 194
196 195 if magic != 'HG':
197 196 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
198 197 if version != '10':
199 198 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
200 199 return unbundle10(fh, alg)
General Comments 0
You need to be logged in to leave comments. Login now