##// END OF EJS Templates
bundle: unify/refactor unbundle/readbundle
Matt Mackall -
r12042:210049a8 default
parent child Browse files
Show More
@@ -1,161 +1,161 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 import util
10 10 import struct, os, bz2, zlib, tempfile
11 11
12 12 def getchunk(source):
13 13 """return the next chunk from changegroup 'source' as a string"""
14 14 d = source.read(4)
15 15 if not d:
16 16 return ""
17 17 l = struct.unpack(">l", d)[0]
18 18 if l <= 4:
19 19 return ""
20 20 d = source.read(l - 4)
21 21 if len(d) < l - 4:
22 22 raise util.Abort(_("premature EOF reading chunk"
23 23 " (got %d bytes, expected %d)")
24 24 % (len(d), l - 4))
25 25 return d
26 26
27 27 def chunkiter(source, progress=None):
28 28 """iterate through the chunks in source, yielding a sequence of chunks
29 29 (strings)"""
30 30 while 1:
31 31 c = getchunk(source)
32 32 if not c:
33 33 break
34 34 elif progress is not None:
35 35 progress()
36 36 yield c
37 37
38 38 def chunkheader(length):
39 39 """return a changegroup chunk header (string)"""
40 40 return struct.pack(">l", length + 4)
41 41
42 42 def closechunk():
43 43 """return a changegroup chunk header (string) for a zero-length chunk"""
44 44 return struct.pack(">l", 0)
45 45
46 46 class nocompress(object):
47 47 def compress(self, x):
48 48 return x
49 49 def flush(self):
50 50 return ""
51 51
52 52 bundletypes = {
53 53 "": ("", nocompress),
54 54 "HG10UN": ("HG10UN", nocompress),
55 55 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
56 56 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
57 57 }
58 58
59 59 def collector(cl, mmfs, files):
60 60 # Gather information about changeset nodes going out in a bundle.
61 61 # We want to gather manifests needed and filelogs affected.
62 62 def collect(node):
63 63 c = cl.read(node)
64 64 files.update(c[3])
65 65 mmfs.setdefault(c[0], node)
66 66 return collect
67 67
68 68 # hgweb uses this list to communicate its preferred type
69 69 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
70 70
71 71 def writebundle(cg, filename, bundletype):
72 72 """Write a bundle file and return its filename.
73 73
74 74 Existing files will not be overwritten.
75 75 If no filename is specified, a temporary file is created.
76 76 bz2 compression can be turned off.
77 77 The bundle file will be deleted in case of errors.
78 78 """
79 79
80 80 fh = None
81 81 cleanup = None
82 82 try:
83 83 if filename:
84 84 fh = open(filename, "wb")
85 85 else:
86 86 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
87 87 fh = os.fdopen(fd, "wb")
88 88 cleanup = filename
89 89
90 90 header, compressor = bundletypes[bundletype]
91 91 fh.write(header)
92 92 z = compressor()
93 93
94 94 # parse the changegroup data, otherwise we will block
95 95 # in case of sshrepo because we don't know the end of the stream
96 96
97 97 # an empty chunkiter is the end of the changegroup
98 98 # a changegroup has at least 2 chunkiters (changelog and manifest).
99 99 # after that, an empty chunkiter is the end of the changegroup
100 100 empty = False
101 101 count = 0
102 102 while not empty or count <= 2:
103 103 empty = True
104 104 count += 1
105 105 for chunk in chunkiter(cg):
106 106 empty = False
107 107 fh.write(z.compress(chunkheader(len(chunk))))
108 108 pos = 0
109 109 while pos < len(chunk):
110 110 next = pos + 2**20
111 111 fh.write(z.compress(chunk[pos:next]))
112 112 pos = next
113 113 fh.write(z.compress(closechunk()))
114 114 fh.write(z.flush())
115 115 cleanup = None
116 116 return filename
117 117 finally:
118 118 if fh is not None:
119 119 fh.close()
120 120 if cleanup is not None:
121 121 os.unlink(cleanup)
122 122
123 123 def decompressor(fh, alg):
124 124 if alg == 'UN':
125 125 return fh
126 126 elif alg == 'GZ':
127 127 def generator(f):
128 128 zd = zlib.decompressobj()
129 129 for chunk in f:
130 130 yield zd.decompress(chunk)
131 131 elif alg == 'BZ':
132 132 def generator(f):
133 133 zd = bz2.BZ2Decompressor()
134 134 zd.decompress("BZ")
135 135 for chunk in util.filechunkiter(f, 4096):
136 136 yield zd.decompress(chunk)
137 137 else:
138 138 raise util.Abort("unknown bundle compression '%s'" % alg)
139 139 return generator(fh)
140 140
141 def unbundle(header, fh):
142 if not header.startswith('HG'):
143 def fixup(f, h):
144 yield h
145 for x in f:
146 yield x
147 fh = fixup(f, h)
148 header = "HG10UN"
149
150 alg = header[4:6]
151 return util.chunkbuffer(decompressor(fh, alg))
152
153 141 def readbundle(fh, fname):
154 142 header = fh.read(6)
155 if not header.startswith('HG'):
156 raise util.Abort(_('%s: not a Mercurial bundle file') % fname)
157 if not header.startswith('HG10'):
158 raise util.Abort(_('%s: unknown bundle version') % fname)
159 elif header not in bundletypes:
160 raise util.Abort(_('%s: unknown bundle compression type') % fname)
161 return unbundle(header, fh)
143
144 if not fname:
145 fname = "stream"
146 if not header.startswith('HG') and header.startswith('\0'):
147 # headerless bundle, clean things up
148 def fixup(f, h):
149 yield h
150 for x in f:
151 yield x
152 fh = fixup(fh, header)
153 header = "HG10UN"
154
155 magic, version, alg = header[0:2], header[2:4], header[4:6]
156
157 if magic != 'HG':
158 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
159 if version != '10':
160 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
161 return util.chunkbuffer(decompressor(fh, alg))
@@ -1,332 +1,326 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import repo, error, encoding, util, store
13 13 import pushkey as pushkey_
14 14
15 15 # list of nodes encoding / decoding
16 16
17 17 def decodelist(l, sep=' '):
18 18 return map(bin, l.split(sep))
19 19
20 20 def encodelist(l, sep=' '):
21 21 return sep.join(map(hex, l))
22 22
23 23 # client side
24 24
25 25 class wirerepository(repo.repository):
26 26 def lookup(self, key):
27 27 self.requirecap('lookup', _('look up remote revision'))
28 28 d = self._call("lookup", key=key)
29 29 success, data = d[:-1].split(" ", 1)
30 30 if int(success):
31 31 return bin(data)
32 32 self._abort(error.RepoError(data))
33 33
34 34 def heads(self):
35 35 d = self._call("heads")
36 36 try:
37 37 return decodelist(d[:-1])
38 38 except:
39 39 self._abort(error.ResponseError(_("unexpected response:"), d))
40 40
41 41 def branchmap(self):
42 42 d = self._call("branchmap")
43 43 try:
44 44 branchmap = {}
45 45 for branchpart in d.splitlines():
46 46 branchname, branchheads = branchpart.split(' ', 1)
47 47 branchname = urllib.unquote(branchname)
48 48 # Earlier servers (1.3.x) send branch names in (their) local
49 49 # charset. The best we can do is assume it's identical to our
50 50 # own local charset, in case it's not utf-8.
51 51 try:
52 52 branchname.decode('utf-8')
53 53 except UnicodeDecodeError:
54 54 branchname = encoding.fromlocal(branchname)
55 55 branchheads = decodelist(branchheads)
56 56 branchmap[branchname] = branchheads
57 57 return branchmap
58 58 except TypeError:
59 59 self._abort(error.ResponseError(_("unexpected response:"), d))
60 60
61 61 def branches(self, nodes):
62 62 n = encodelist(nodes)
63 63 d = self._call("branches", nodes=n)
64 64 try:
65 65 br = [tuple(decodelist(b)) for b in d.splitlines()]
66 66 return br
67 67 except:
68 68 self._abort(error.ResponseError(_("unexpected response:"), d))
69 69
70 70 def between(self, pairs):
71 71 batch = 8 # avoid giant requests
72 72 r = []
73 73 for i in xrange(0, len(pairs), batch):
74 74 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
75 75 d = self._call("between", pairs=n)
76 76 try:
77 77 r.extend(l and decodelist(l) or [] for l in d.splitlines())
78 78 except:
79 79 self._abort(error.ResponseError(_("unexpected response:"), d))
80 80 return r
81 81
82 82 def pushkey(self, namespace, key, old, new):
83 83 if not self.capable('pushkey'):
84 84 return False
85 85 d = self._call("pushkey",
86 86 namespace=namespace, key=key, old=old, new=new)
87 87 return bool(int(d))
88 88
89 89 def listkeys(self, namespace):
90 90 if not self.capable('pushkey'):
91 91 return {}
92 92 d = self._call("listkeys", namespace=namespace)
93 93 r = {}
94 94 for l in d.splitlines():
95 95 k, v = l.split('\t')
96 96 r[k.decode('string-escape')] = v.decode('string-escape')
97 97 return r
98 98
99 99 def stream_out(self):
100 100 return self._callstream('stream_out')
101 101
102 102 def changegroup(self, nodes, kind):
103 103 n = encodelist(nodes)
104 104 f = self._callstream("changegroup", roots=n)
105 105 return self._decompress(f)
106 106
107 107 def changegroupsubset(self, bases, heads, kind):
108 108 self.requirecap('changegroupsubset', _('look up remote changes'))
109 109 bases = encodelist(bases)
110 110 heads = encodelist(heads)
111 111 return self._decompress(self._callstream("changegroupsubset",
112 112 bases=bases, heads=heads))
113 113
114 114 def unbundle(self, cg, heads, source):
115 115 '''Send cg (a readable file-like object representing the
116 116 changegroup to push, typically a chunkbuffer object) to the
117 117 remote server as a bundle. Return an integer indicating the
118 118 result of the push (see localrepository.addchangegroup()).'''
119 119
120 120 ret, output = self._callpush("unbundle", cg, heads=encodelist(heads))
121 121 if ret == "":
122 122 raise error.ResponseError(
123 123 _('push failed:'), output)
124 124 try:
125 125 ret = int(ret)
126 126 except ValueError, err:
127 127 raise error.ResponseError(
128 128 _('push failed (unexpected response):'), ret)
129 129
130 130 for l in output.splitlines(True):
131 131 self.ui.status(_('remote: '), l)
132 132 return ret
133 133
134 134 # server side
135 135
136 136 class streamres(object):
137 137 def __init__(self, gen):
138 138 self.gen = gen
139 139
140 140 class pushres(object):
141 141 def __init__(self, res):
142 142 self.res = res
143 143
144 144 def dispatch(repo, proto, command):
145 145 func, spec = commands[command]
146 146 args = proto.getargs(spec)
147 147 return func(repo, proto, *args)
148 148
149 149 def between(repo, proto, pairs):
150 150 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
151 151 r = []
152 152 for b in repo.between(pairs):
153 153 r.append(encodelist(b) + "\n")
154 154 return "".join(r)
155 155
156 156 def branchmap(repo, proto):
157 157 branchmap = repo.branchmap()
158 158 heads = []
159 159 for branch, nodes in branchmap.iteritems():
160 160 branchname = urllib.quote(branch)
161 161 branchnodes = encodelist(nodes)
162 162 heads.append('%s %s' % (branchname, branchnodes))
163 163 return '\n'.join(heads)
164 164
165 165 def branches(repo, proto, nodes):
166 166 nodes = decodelist(nodes)
167 167 r = []
168 168 for b in repo.branches(nodes):
169 169 r.append(encodelist(b) + "\n")
170 170 return "".join(r)
171 171
172 172 def capabilities(repo, proto):
173 173 caps = 'lookup changegroupsubset branchmap pushkey'.split()
174 174 if _allowstream(repo.ui):
175 175 caps.append('stream=%d' % repo.changelog.version)
176 176 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
177 177 return ' '.join(caps)
178 178
179 179 def changegroup(repo, proto, roots):
180 180 nodes = decodelist(roots)
181 181 cg = repo.changegroup(nodes, 'serve')
182 182 return streamres(proto.groupchunks(cg))
183 183
184 184 def changegroupsubset(repo, proto, bases, heads):
185 185 bases = decodelist(bases)
186 186 heads = decodelist(heads)
187 187 cg = repo.changegroupsubset(bases, heads, 'serve')
188 188 return streamres(proto.groupchunks(cg))
189 189
190 190 def heads(repo, proto):
191 191 h = repo.heads()
192 192 return encodelist(h) + "\n"
193 193
194 194 def hello(repo, proto):
195 195 '''the hello command returns a set of lines describing various
196 196 interesting things about the server, in an RFC822-like format.
197 197 Currently the only one defined is "capabilities", which
198 198 consists of a line in the form:
199 199
200 200 capabilities: space separated list of tokens
201 201 '''
202 202 return "capabilities: %s\n" % (capabilities(repo, proto))
203 203
204 204 def listkeys(repo, proto, namespace):
205 205 d = pushkey_.list(repo, namespace).items()
206 206 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
207 207 v.encode('string-escape')) for k, v in d])
208 208 return t
209 209
210 210 def lookup(repo, proto, key):
211 211 try:
212 212 r = hex(repo.lookup(key))
213 213 success = 1
214 214 except Exception, inst:
215 215 r = str(inst)
216 216 success = 0
217 217 return "%s %s\n" % (success, r)
218 218
219 219 def pushkey(repo, proto, namespace, key, old, new):
220 220 r = pushkey_.push(repo, namespace, key, old, new)
221 221 return '%s\n' % int(r)
222 222
223 223 def _allowstream(ui):
224 224 return ui.configbool('server', 'uncompressed', True, untrusted=True)
225 225
226 226 def stream(repo, proto):
227 227 '''If the server supports streaming clone, it advertises the "stream"
228 228 capability with a value representing the version and flags of the repo
229 229 it is serving. Client checks to see if it understands the format.
230 230
231 231 The format is simple: the server writes out a line with the amount
232 232 of files, then the total amount of bytes to be transfered (separated
233 233 by a space). Then, for each file, the server first writes the filename
234 234 and filesize (separated by the null character), then the file contents.
235 235 '''
236 236
237 237 if not _allowstream(repo.ui):
238 238 return '1\n'
239 239
240 240 entries = []
241 241 total_bytes = 0
242 242 try:
243 243 # get consistent snapshot of repo, lock during scan
244 244 lock = repo.lock()
245 245 try:
246 246 repo.ui.debug('scanning\n')
247 247 for name, ename, size in repo.store.walk():
248 248 entries.append((name, size))
249 249 total_bytes += size
250 250 finally:
251 251 lock.release()
252 252 except error.LockError:
253 253 return '2\n' # error: 2
254 254
255 255 def streamer(repo, entries, total):
256 256 '''stream out all metadata files in repository.'''
257 257 yield '0\n' # success
258 258 repo.ui.debug('%d files, %d bytes to transfer\n' %
259 259 (len(entries), total_bytes))
260 260 yield '%d %d\n' % (len(entries), total_bytes)
261 261 for name, size in entries:
262 262 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
263 263 # partially encode name over the wire for backwards compat
264 264 yield '%s\0%d\n' % (store.encodedir(name), size)
265 265 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
266 266 yield chunk
267 267
268 268 return streamres(streamer(repo, entries, total_bytes))
269 269
270 270 def unbundle(repo, proto, heads):
271 271 their_heads = decodelist(heads)
272 272
273 273 def check_heads():
274 274 heads = repo.heads()
275 275 return their_heads == ['force'] or their_heads == heads
276 276
277 277 # fail early if possible
278 278 if not check_heads():
279 279 return 'unsynced changes'
280 280
281 281 # write bundle data to temporary file because it can be big
282 282 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
283 283 fp = os.fdopen(fd, 'wb+')
284 284 r = 0
285 285 proto.redirect()
286 286 try:
287 287 proto.getfile(fp)
288 288 lock = repo.lock()
289 289 try:
290 290 if not check_heads():
291 291 # someone else committed/pushed/unbundled while we
292 292 # were transferring data
293 293 return 'unsynced changes'
294 294
295 295 # push can proceed
296 296 fp.seek(0)
297 header = fp.read(6)
298 if header.startswith('HG'):
299 if not header.startswith('HG10'):
300 raise ValueError('unknown bundle version')
301 elif header not in changegroupmod.bundletypes:
302 raise ValueError('unknown bundle compression type')
303 gen = changegroupmod.unbundle(header, fp)
297 gen = changegroupmod.readbundle(fp, None)
304 298
305 299 try:
306 300 r = repo.addchangegroup(gen, 'serve', proto._client(),
307 301 lock=lock)
308 302 except util.Abort, inst:
309 303 sys.stderr.write("abort: %s\n" % inst)
310 304 finally:
311 305 lock.release()
312 306 return pushres(r)
313 307
314 308 finally:
315 309 fp.close()
316 310 os.unlink(tempname)
317 311
318 312 commands = {
319 313 'between': (between, 'pairs'),
320 314 'branchmap': (branchmap, ''),
321 315 'branches': (branches, 'nodes'),
322 316 'capabilities': (capabilities, ''),
323 317 'changegroup': (changegroup, 'roots'),
324 318 'changegroupsubset': (changegroupsubset, 'bases heads'),
325 319 'heads': (heads, ''),
326 320 'hello': (hello, ''),
327 321 'listkeys': (listkeys, 'namespace'),
328 322 'lookup': (lookup, 'key'),
329 323 'pushkey': (pushkey, 'namespace key old new'),
330 324 'stream_out': (stream, ''),
331 325 'unbundle': (unbundle, 'heads'),
332 326 }
General Comments 0
You need to be logged in to leave comments. Login now