##// END OF EJS Templates
changegroup: verify all stream reads...
Mads Kiilerich -
r13457:e74fe15d stable
parent child Browse files
Show More
@@ -1,200 +1,199 b''
1 # changegroup.py - Mercurial changegroup manipulation functions
1 # changegroup.py - Mercurial changegroup manipulation functions
2 #
2 #
3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from i18n import _
8 from i18n import _
9 import util
9 import util
10 import struct, os, bz2, zlib, tempfile
10 import struct, os, bz2, zlib, tempfile
11
11
12 def getchunk(source):
12 def readexactly(stream, n):
13 """return the next chunk from changegroup 'source' as a string"""
13 '''read n bytes from stream.read and abort if less was available'''
14 d = source.read(4)
14 s = stream.read(n)
15 if len(s) < n:
16 raise util.Abort(_("stream ended unexpectedly"
17 " (got %d bytes, expected %d)")
18 % (len(s), n))
19 return s
20
21 def getchunk(stream):
22 """return the next chunk from stream as a string"""
23 d = readexactly(stream, 4)
15 l = struct.unpack(">l", d)[0]
24 l = struct.unpack(">l", d)[0]
16 if l <= 4:
25 if l <= 4:
17 return ""
26 return ""
18 d = source.read(l - 4)
27 return readexactly(stream, l - 4)
19 if len(d) < l - 4:
20 raise util.Abort(_("premature EOF reading chunk"
21 " (got %d bytes, expected %d)")
22 % (len(d), l - 4))
23 return d
24
28
25 def chunkheader(length):
29 def chunkheader(length):
26 """return a changegroup chunk header (string)"""
30 """return a changegroup chunk header (string)"""
27 return struct.pack(">l", length + 4)
31 return struct.pack(">l", length + 4)
28
32
29 def closechunk():
33 def closechunk():
30 """return a changegroup chunk header (string) for a zero-length chunk"""
34 """return a changegroup chunk header (string) for a zero-length chunk"""
31 return struct.pack(">l", 0)
35 return struct.pack(">l", 0)
32
36
33 class nocompress(object):
37 class nocompress(object):
34 def compress(self, x):
38 def compress(self, x):
35 return x
39 return x
36 def flush(self):
40 def flush(self):
37 return ""
41 return ""
38
42
39 bundletypes = {
43 bundletypes = {
40 "": ("", nocompress),
44 "": ("", nocompress),
41 "HG10UN": ("HG10UN", nocompress),
45 "HG10UN": ("HG10UN", nocompress),
42 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
46 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
43 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
47 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
44 }
48 }
45
49
46 def collector(cl, mmfs, files):
50 def collector(cl, mmfs, files):
47 # Gather information about changeset nodes going out in a bundle.
51 # Gather information about changeset nodes going out in a bundle.
48 # We want to gather manifests needed and filelogs affected.
52 # We want to gather manifests needed and filelogs affected.
49 def collect(node):
53 def collect(node):
50 c = cl.read(node)
54 c = cl.read(node)
51 files.update(c[3])
55 files.update(c[3])
52 mmfs.setdefault(c[0], node)
56 mmfs.setdefault(c[0], node)
53 return collect
57 return collect
54
58
55 # hgweb uses this list to communicate its preferred type
59 # hgweb uses this list to communicate its preferred type
56 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
60 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
57
61
58 def writebundle(cg, filename, bundletype):
62 def writebundle(cg, filename, bundletype):
59 """Write a bundle file and return its filename.
63 """Write a bundle file and return its filename.
60
64
61 Existing files will not be overwritten.
65 Existing files will not be overwritten.
62 If no filename is specified, a temporary file is created.
66 If no filename is specified, a temporary file is created.
63 bz2 compression can be turned off.
67 bz2 compression can be turned off.
64 The bundle file will be deleted in case of errors.
68 The bundle file will be deleted in case of errors.
65 """
69 """
66
70
67 fh = None
71 fh = None
68 cleanup = None
72 cleanup = None
69 try:
73 try:
70 if filename:
74 if filename:
71 fh = open(filename, "wb")
75 fh = open(filename, "wb")
72 else:
76 else:
73 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
77 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
74 fh = os.fdopen(fd, "wb")
78 fh = os.fdopen(fd, "wb")
75 cleanup = filename
79 cleanup = filename
76
80
77 header, compressor = bundletypes[bundletype]
81 header, compressor = bundletypes[bundletype]
78 fh.write(header)
82 fh.write(header)
79 z = compressor()
83 z = compressor()
80
84
81 # parse the changegroup data, otherwise we will block
85 # parse the changegroup data, otherwise we will block
82 # in case of sshrepo because we don't know the end of the stream
86 # in case of sshrepo because we don't know the end of the stream
83
87
84 # an empty chunkgroup is the end of the changegroup
88 # an empty chunkgroup is the end of the changegroup
85 # a changegroup has at least 2 chunkgroups (changelog and manifest).
89 # a changegroup has at least 2 chunkgroups (changelog and manifest).
86 # after that, an empty chunkgroup is the end of the changegroup
90 # after that, an empty chunkgroup is the end of the changegroup
87 empty = False
91 empty = False
88 count = 0
92 count = 0
89 while not empty or count <= 2:
93 while not empty or count <= 2:
90 empty = True
94 empty = True
91 count += 1
95 count += 1
92 while 1:
96 while 1:
93 chunk = getchunk(cg)
97 chunk = getchunk(cg)
94 if not chunk:
98 if not chunk:
95 break
99 break
96 empty = False
100 empty = False
97 fh.write(z.compress(chunkheader(len(chunk))))
101 fh.write(z.compress(chunkheader(len(chunk))))
98 pos = 0
102 pos = 0
99 while pos < len(chunk):
103 while pos < len(chunk):
100 next = pos + 2**20
104 next = pos + 2**20
101 fh.write(z.compress(chunk[pos:next]))
105 fh.write(z.compress(chunk[pos:next]))
102 pos = next
106 pos = next
103 fh.write(z.compress(closechunk()))
107 fh.write(z.compress(closechunk()))
104 fh.write(z.flush())
108 fh.write(z.flush())
105 cleanup = None
109 cleanup = None
106 return filename
110 return filename
107 finally:
111 finally:
108 if fh is not None:
112 if fh is not None:
109 fh.close()
113 fh.close()
110 if cleanup is not None:
114 if cleanup is not None:
111 os.unlink(cleanup)
115 os.unlink(cleanup)
112
116
113 def decompressor(fh, alg):
117 def decompressor(fh, alg):
114 if alg == 'UN':
118 if alg == 'UN':
115 return fh
119 return fh
116 elif alg == 'GZ':
120 elif alg == 'GZ':
117 def generator(f):
121 def generator(f):
118 zd = zlib.decompressobj()
122 zd = zlib.decompressobj()
119 for chunk in f:
123 for chunk in f:
120 yield zd.decompress(chunk)
124 yield zd.decompress(chunk)
121 elif alg == 'BZ':
125 elif alg == 'BZ':
122 def generator(f):
126 def generator(f):
123 zd = bz2.BZ2Decompressor()
127 zd = bz2.BZ2Decompressor()
124 zd.decompress("BZ")
128 zd.decompress("BZ")
125 for chunk in util.filechunkiter(f, 4096):
129 for chunk in util.filechunkiter(f, 4096):
126 yield zd.decompress(chunk)
130 yield zd.decompress(chunk)
127 else:
131 else:
128 raise util.Abort("unknown bundle compression '%s'" % alg)
132 raise util.Abort("unknown bundle compression '%s'" % alg)
129 return util.chunkbuffer(generator(fh))
133 return util.chunkbuffer(generator(fh))
130
134
131 class unbundle10(object):
135 class unbundle10(object):
132 def __init__(self, fh, alg):
136 def __init__(self, fh, alg):
133 self._stream = decompressor(fh, alg)
137 self._stream = decompressor(fh, alg)
134 self._type = alg
138 self._type = alg
135 self.callback = None
139 self.callback = None
136 def compressed(self):
140 def compressed(self):
137 return self._type != 'UN'
141 return self._type != 'UN'
138 def read(self, l):
142 def read(self, l):
139 return self._stream.read(l)
143 return self._stream.read(l)
140 def seek(self, pos):
144 def seek(self, pos):
141 return self._stream.seek(pos)
145 return self._stream.seek(pos)
142 def tell(self):
146 def tell(self):
143 return self._stream.tell()
147 return self._stream.tell()
144 def close(self):
148 def close(self):
145 return self._stream.close()
149 return self._stream.close()
146
150
147 def chunklength(self):
151 def chunklength(self):
148 d = self.read(4)
152 d = readexactly(self._stream, 4)
149 l = max(0, struct.unpack(">l", d)[0] - 4)
153 l = max(0, struct.unpack(">l", d)[0] - 4)
150 if l and self.callback:
154 if l and self.callback:
151 self.callback()
155 self.callback()
152 return l
156 return l
153
157
154 def chunk(self):
158 def chunk(self):
155 """return the next chunk from changegroup 'source' as a string"""
159 """return the next chunk from changegroup 'source' as a string"""
156 l = self.chunklength()
160 l = self.chunklength()
157 d = self.read(l)
161 return readexactly(self._stream, l)
158 if len(d) < l:
159 raise util.Abort(_("premature EOF reading chunk"
160 " (got %d bytes, expected %d)")
161 % (len(d), l))
162 return d
163
162
164 def parsechunk(self):
163 def parsechunk(self):
165 l = self.chunklength()
164 l = self.chunklength()
166 if not l:
165 if not l:
167 return {}
166 return {}
168 h = self.read(80)
167 h = readexactly(self._stream, 80)
169 node, p1, p2, cs = struct.unpack("20s20s20s20s", h)
168 node, p1, p2, cs = struct.unpack("20s20s20s20s", h)
170 data = self.read(l - 80)
169 data = readexactly(self._stream, l - 80)
171 return dict(node=node, p1=p1, p2=p2, cs=cs, data=data)
170 return dict(node=node, p1=p1, p2=p2, cs=cs, data=data)
172
171
173 class headerlessfixup(object):
172 class headerlessfixup(object):
174 def __init__(self, fh, h):
173 def __init__(self, fh, h):
175 self._h = h
174 self._h = h
176 self._fh = fh
175 self._fh = fh
177 def read(self, n):
176 def read(self, n):
178 if self._h:
177 if self._h:
179 d, self._h = self._h[:n], self._h[n:]
178 d, self._h = self._h[:n], self._h[n:]
180 if len(d) < n:
179 if len(d) < n:
181 d += self._fh.read(n - len(d))
180 d += readexactly(self._fh, n - len(d))
182 return d
181 return d
183 return self._fh.read(n)
182 return readexactly(self._fh, n)
184
183
185 def readbundle(fh, fname):
184 def readbundle(fh, fname):
186 header = fh.read(6)
185 header = readexactly(fh, 6)
187
186
188 if not fname:
187 if not fname:
189 fname = "stream"
188 fname = "stream"
190 if not header.startswith('HG') and header.startswith('\0'):
189 if not header.startswith('HG') and header.startswith('\0'):
191 fh = headerlessfixup(fh, header)
190 fh = headerlessfixup(fh, header)
192 header = "HG10UN"
191 header = "HG10UN"
193
192
194 magic, version, alg = header[0:2], header[2:4], header[4:6]
193 magic, version, alg = header[0:2], header[2:4], header[4:6]
195
194
196 if magic != 'HG':
195 if magic != 'HG':
197 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
196 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
198 if version != '10':
197 if version != '10':
199 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
198 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
200 return unbundle10(fh, alg)
199 return unbundle10(fh, alg)
General Comments 0
You need to be logged in to leave comments. Login now