##// END OF EJS Templates
bundle: push chunkbuffer down into decompress...
Matt Mackall -
r12329:7458de93 default
parent child Browse files
Show More
@@ -1,170 +1,177 b''
1 # changegroup.py - Mercurial changegroup manipulation functions
1 # changegroup.py - Mercurial changegroup manipulation functions
2 #
2 #
3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from i18n import _
8 from i18n import _
9 import util
9 import util
10 import struct, os, bz2, zlib, tempfile
10 import struct, os, bz2, zlib, tempfile
11
11
12 def getchunk(source):
12 def getchunk(source):
13 """return the next chunk from changegroup 'source' as a string"""
13 """return the next chunk from changegroup 'source' as a string"""
14 d = source.read(4)
14 d = source.read(4)
15 if not d:
15 if not d:
16 return ""
16 return ""
17 l = struct.unpack(">l", d)[0]
17 l = struct.unpack(">l", d)[0]
18 if l <= 4:
18 if l <= 4:
19 return ""
19 return ""
20 d = source.read(l - 4)
20 d = source.read(l - 4)
21 if len(d) < l - 4:
21 if len(d) < l - 4:
22 raise util.Abort(_("premature EOF reading chunk"
22 raise util.Abort(_("premature EOF reading chunk"
23 " (got %d bytes, expected %d)")
23 " (got %d bytes, expected %d)")
24 % (len(d), l - 4))
24 % (len(d), l - 4))
25 return d
25 return d
26
26
27 def chunkiter(source, progress=None):
27 def chunkiter(source, progress=None):
28 """iterate through the chunks in source, yielding a sequence of chunks
28 """iterate through the chunks in source, yielding a sequence of chunks
29 (strings)"""
29 (strings)"""
30 while 1:
30 while 1:
31 c = getchunk(source)
31 c = getchunk(source)
32 if not c:
32 if not c:
33 break
33 break
34 elif progress is not None:
34 elif progress is not None:
35 progress()
35 progress()
36 yield c
36 yield c
37
37
38 def chunkheader(length):
38 def chunkheader(length):
39 """return a changegroup chunk header (string)"""
39 """return a changegroup chunk header (string)"""
40 return struct.pack(">l", length + 4)
40 return struct.pack(">l", length + 4)
41
41
42 def closechunk():
42 def closechunk():
43 """return a changegroup chunk header (string) for a zero-length chunk"""
43 """return a changegroup chunk header (string) for a zero-length chunk"""
44 return struct.pack(">l", 0)
44 return struct.pack(">l", 0)
45
45
46 class nocompress(object):
46 class nocompress(object):
47 def compress(self, x):
47 def compress(self, x):
48 return x
48 return x
49 def flush(self):
49 def flush(self):
50 return ""
50 return ""
51
51
52 bundletypes = {
52 bundletypes = {
53 "": ("", nocompress),
53 "": ("", nocompress),
54 "HG10UN": ("HG10UN", nocompress),
54 "HG10UN": ("HG10UN", nocompress),
55 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
55 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
56 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
56 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
57 }
57 }
58
58
59 def collector(cl, mmfs, files):
59 def collector(cl, mmfs, files):
60 # Gather information about changeset nodes going out in a bundle.
60 # Gather information about changeset nodes going out in a bundle.
61 # We want to gather manifests needed and filelogs affected.
61 # We want to gather manifests needed and filelogs affected.
62 def collect(node):
62 def collect(node):
63 c = cl.read(node)
63 c = cl.read(node)
64 files.update(c[3])
64 files.update(c[3])
65 mmfs.setdefault(c[0], node)
65 mmfs.setdefault(c[0], node)
66 return collect
66 return collect
67
67
68 # hgweb uses this list to communicate its preferred type
68 # hgweb uses this list to communicate its preferred type
69 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
69 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
70
70
71 def writebundle(cg, filename, bundletype):
71 def writebundle(cg, filename, bundletype):
72 """Write a bundle file and return its filename.
72 """Write a bundle file and return its filename.
73
73
74 Existing files will not be overwritten.
74 Existing files will not be overwritten.
75 If no filename is specified, a temporary file is created.
75 If no filename is specified, a temporary file is created.
76 bz2 compression can be turned off.
76 bz2 compression can be turned off.
77 The bundle file will be deleted in case of errors.
77 The bundle file will be deleted in case of errors.
78 """
78 """
79
79
80 fh = None
80 fh = None
81 cleanup = None
81 cleanup = None
82 try:
82 try:
83 if filename:
83 if filename:
84 fh = open(filename, "wb")
84 fh = open(filename, "wb")
85 else:
85 else:
86 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
86 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
87 fh = os.fdopen(fd, "wb")
87 fh = os.fdopen(fd, "wb")
88 cleanup = filename
88 cleanup = filename
89
89
90 header, compressor = bundletypes[bundletype]
90 header, compressor = bundletypes[bundletype]
91 fh.write(header)
91 fh.write(header)
92 z = compressor()
92 z = compressor()
93
93
94 # parse the changegroup data, otherwise we will block
94 # parse the changegroup data, otherwise we will block
95 # in case of sshrepo because we don't know the end of the stream
95 # in case of sshrepo because we don't know the end of the stream
96
96
97 # an empty chunkiter is the end of the changegroup
97 # an empty chunkiter is the end of the changegroup
98 # a changegroup has at least 2 chunkiters (changelog and manifest).
98 # a changegroup has at least 2 chunkiters (changelog and manifest).
99 # after that, an empty chunkiter is the end of the changegroup
99 # after that, an empty chunkiter is the end of the changegroup
100 empty = False
100 empty = False
101 count = 0
101 count = 0
102 while not empty or count <= 2:
102 while not empty or count <= 2:
103 empty = True
103 empty = True
104 count += 1
104 count += 1
105 for chunk in chunkiter(cg):
105 for chunk in chunkiter(cg):
106 empty = False
106 empty = False
107 fh.write(z.compress(chunkheader(len(chunk))))
107 fh.write(z.compress(chunkheader(len(chunk))))
108 pos = 0
108 pos = 0
109 while pos < len(chunk):
109 while pos < len(chunk):
110 next = pos + 2**20
110 next = pos + 2**20
111 fh.write(z.compress(chunk[pos:next]))
111 fh.write(z.compress(chunk[pos:next]))
112 pos = next
112 pos = next
113 fh.write(z.compress(closechunk()))
113 fh.write(z.compress(closechunk()))
114 fh.write(z.flush())
114 fh.write(z.flush())
115 cleanup = None
115 cleanup = None
116 return filename
116 return filename
117 finally:
117 finally:
118 if fh is not None:
118 if fh is not None:
119 fh.close()
119 fh.close()
120 if cleanup is not None:
120 if cleanup is not None:
121 os.unlink(cleanup)
121 os.unlink(cleanup)
122
122
123 def decompressor(fh, alg):
123 def decompressor(fh, alg):
124 if alg == 'UN':
124 if alg == 'UN':
125 return fh
125 return fh
126 elif alg == 'GZ':
126 elif alg == 'GZ':
127 def generator(f):
127 def generator(f):
128 zd = zlib.decompressobj()
128 zd = zlib.decompressobj()
129 for chunk in f:
129 for chunk in f:
130 yield zd.decompress(chunk)
130 yield zd.decompress(chunk)
131 elif alg == 'BZ':
131 elif alg == 'BZ':
132 def generator(f):
132 def generator(f):
133 zd = bz2.BZ2Decompressor()
133 zd = bz2.BZ2Decompressor()
134 zd.decompress("BZ")
134 zd.decompress("BZ")
135 for chunk in util.filechunkiter(f, 4096):
135 for chunk in util.filechunkiter(f, 4096):
136 yield zd.decompress(chunk)
136 yield zd.decompress(chunk)
137 else:
137 else:
138 raise util.Abort("unknown bundle compression '%s'" % alg)
138 raise util.Abort("unknown bundle compression '%s'" % alg)
139 return generator(fh)
139 return util.chunkbuffer(generator(fh))
140
140
141 class unbundle10(object):
141 class unbundle10(object):
142 def __init__(self, fh, alg):
142 def __init__(self, fh, alg):
143 self._stream = util.chunkbuffer(decompressor(fh, alg))
143 self._stream = decompressor(fh, alg)
144 self._type = alg
144 self._type = alg
145 def compressed(self):
145 def compressed(self):
146 return self._type != 'UN'
146 return self._type != 'UN'
147 def read(self, l):
147 def read(self, l):
148 return self._stream.read(l)
148 return self._stream.read(l)
149
149
150 class headerlessfixup(object):
151 def __init__(self, fh, h):
152 self._h = h
153 self._fh = fh
154 def read(self, n):
155 if self._h:
156 d, self._h = self._h[:n], self._h[n:]
157 if len(d) < n:
158 d += self._fh.read(n - len(d))
159 return d
160 return self._fh.read(n)
161
150 def readbundle(fh, fname):
162 def readbundle(fh, fname):
151 header = fh.read(6)
163 header = fh.read(6)
152
164
153 if not fname:
165 if not fname:
154 fname = "stream"
166 fname = "stream"
155 if not header.startswith('HG') and header.startswith('\0'):
167 if not header.startswith('HG') and header.startswith('\0'):
156 # headerless bundle, clean things up
168 fh = headerlessfixup(fh, header)
157 def fixup(f, h):
158 yield h
159 for x in f:
160 yield x
161 fh = fixup(fh, header)
162 header = "HG10UN"
169 header = "HG10UN"
163
170
164 magic, version, alg = header[0:2], header[2:4], header[4:6]
171 magic, version, alg = header[0:2], header[2:4], header[4:6]
165
172
166 if magic != 'HG':
173 if magic != 'HG':
167 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
174 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
168 if version != '10':
175 if version != '10':
169 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
176 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
170 return unbundle10(fh, alg)
177 return unbundle10(fh, alg)
General Comments 0
You need to be logged in to leave comments. Login now