##// END OF EJS Templates
improve changegroup.readbundle(), use it in hgweb
Dirkjan Ochtman -
r6154:ef1c5a3b default
parent child Browse files
Show More
@@ -1,129 +1,141 b''
1 """
1 """
2 changegroup.py - Mercurial changegroup manipulation functions
2 changegroup.py - Mercurial changegroup manipulation functions
3
3
4 Copyright 2006 Matt Mackall <mpm@selenic.com>
4 Copyright 2006 Matt Mackall <mpm@selenic.com>
5
5
6 This software may be used and distributed according to the terms
6 This software may be used and distributed according to the terms
7 of the GNU General Public License, incorporated herein by reference.
7 of the GNU General Public License, incorporated herein by reference.
8 """
8 """
9
9
10 from i18n import _
10 from i18n import _
11 import struct, os, bz2, zlib, util, tempfile
11 import struct, os, bz2, zlib, util, tempfile
12
12
13 def getchunk(source):
13 def getchunk(source):
14 """get a chunk from a changegroup"""
14 """get a chunk from a changegroup"""
15 d = source.read(4)
15 d = source.read(4)
16 if not d:
16 if not d:
17 return ""
17 return ""
18 l = struct.unpack(">l", d)[0]
18 l = struct.unpack(">l", d)[0]
19 if l <= 4:
19 if l <= 4:
20 return ""
20 return ""
21 d = source.read(l - 4)
21 d = source.read(l - 4)
22 if len(d) < l - 4:
22 if len(d) < l - 4:
23 raise util.Abort(_("premature EOF reading chunk"
23 raise util.Abort(_("premature EOF reading chunk"
24 " (got %d bytes, expected %d)")
24 " (got %d bytes, expected %d)")
25 % (len(d), l - 4))
25 % (len(d), l - 4))
26 return d
26 return d
27
27
28 def chunkiter(source):
28 def chunkiter(source):
29 """iterate through the chunks in source"""
29 """iterate through the chunks in source"""
30 while 1:
30 while 1:
31 c = getchunk(source)
31 c = getchunk(source)
32 if not c:
32 if not c:
33 break
33 break
34 yield c
34 yield c
35
35
36 def chunkheader(length):
36 def chunkheader(length):
37 """build a changegroup chunk header"""
37 """build a changegroup chunk header"""
38 return struct.pack(">l", length + 4)
38 return struct.pack(">l", length + 4)
39
39
40 def closechunk():
40 def closechunk():
41 return struct.pack(">l", 0)
41 return struct.pack(">l", 0)
42
42
43 class nocompress(object):
43 class nocompress(object):
44 def compress(self, x):
44 def compress(self, x):
45 return x
45 return x
46 def flush(self):
46 def flush(self):
47 return ""
47 return ""
48
48
49 bundletypes = {
49 bundletypes = {
50 "": ("", nocompress),
50 "": ("", nocompress),
51 "HG10UN": ("HG10UN", nocompress),
51 "HG10UN": ("HG10UN", nocompress),
52 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
52 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
53 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
53 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
54 }
54 }
55
55
56 # hgweb uses this list to communicate it's preferred type
56 # hgweb uses this list to communicate it's preferred type
57 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
57 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
58
58
59 def writebundle(cg, filename, bundletype):
59 def writebundle(cg, filename, bundletype):
60 """Write a bundle file and return its filename.
60 """Write a bundle file and return its filename.
61
61
62 Existing files will not be overwritten.
62 Existing files will not be overwritten.
63 If no filename is specified, a temporary file is created.
63 If no filename is specified, a temporary file is created.
64 bz2 compression can be turned off.
64 bz2 compression can be turned off.
65 The bundle file will be deleted in case of errors.
65 The bundle file will be deleted in case of errors.
66 """
66 """
67
67
68 fh = None
68 fh = None
69 cleanup = None
69 cleanup = None
70 try:
70 try:
71 if filename:
71 if filename:
72 fh = open(filename, "wb")
72 fh = open(filename, "wb")
73 else:
73 else:
74 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
74 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
75 fh = os.fdopen(fd, "wb")
75 fh = os.fdopen(fd, "wb")
76 cleanup = filename
76 cleanup = filename
77
77
78 header, compressor = bundletypes[bundletype]
78 header, compressor = bundletypes[bundletype]
79 fh.write(header)
79 fh.write(header)
80 z = compressor()
80 z = compressor()
81
81
82 # parse the changegroup data, otherwise we will block
82 # parse the changegroup data, otherwise we will block
83 # in case of sshrepo because we don't know the end of the stream
83 # in case of sshrepo because we don't know the end of the stream
84
84
85 # an empty chunkiter is the end of the changegroup
85 # an empty chunkiter is the end of the changegroup
86 # a changegroup has at least 2 chunkiters (changelog and manifest).
86 # a changegroup has at least 2 chunkiters (changelog and manifest).
87 # after that, an empty chunkiter is the end of the changegroup
87 # after that, an empty chunkiter is the end of the changegroup
88 empty = False
88 empty = False
89 count = 0
89 count = 0
90 while not empty or count <= 2:
90 while not empty or count <= 2:
91 empty = True
91 empty = True
92 count += 1
92 count += 1
93 for chunk in chunkiter(cg):
93 for chunk in chunkiter(cg):
94 empty = False
94 empty = False
95 fh.write(z.compress(chunkheader(len(chunk))))
95 fh.write(z.compress(chunkheader(len(chunk))))
96 pos = 0
96 pos = 0
97 while pos < len(chunk):
97 while pos < len(chunk):
98 next = pos + 2**20
98 next = pos + 2**20
99 fh.write(z.compress(chunk[pos:next]))
99 fh.write(z.compress(chunk[pos:next]))
100 pos = next
100 pos = next
101 fh.write(z.compress(closechunk()))
101 fh.write(z.compress(closechunk()))
102 fh.write(z.flush())
102 fh.write(z.flush())
103 cleanup = None
103 cleanup = None
104 return filename
104 return filename
105 finally:
105 finally:
106 if fh is not None:
106 if fh is not None:
107 fh.close()
107 fh.close()
108 if cleanup is not None:
108 if cleanup is not None:
109 os.unlink(cleanup)
109 os.unlink(cleanup)
110
110
111 def readbundle(fh, fname):
111 def unbundle(header, fh):
112 header = fh.read(6)
112 if header == 'HG10UN':
113 if not header.startswith("HG"):
113 return fh
114 raise util.Abort(_("%s: not a Mercurial bundle file") % fname)
114 elif not header.startswith('HG'):
115 elif not header.startswith("HG10"):
115 # old client with uncompressed bundle
116 raise util.Abort(_("%s: unknown bundle version") % fname)
116 def generator(f):
117
117 yield header
118 if header == "HG10BZ":
118 for chunk in f:
119 yield chunk
120 elif header == 'HG10GZ':
121 def generator(f):
122 zd = zlib.decompressobj()
123 for chunk in f:
124 yield zd.decompress(chunk)
125 elif header == 'HG10BZ':
119 def generator(f):
126 def generator(f):
120 zd = bz2.BZ2Decompressor()
127 zd = bz2.BZ2Decompressor()
121 zd.decompress("BZ")
128 zd.decompress("BZ")
122 for chunk in util.filechunkiter(f, 4096):
129 for chunk in util.filechunkiter(f, 4096):
123 yield zd.decompress(chunk)
130 yield zd.decompress(chunk)
124 return util.chunkbuffer(generator(fh))
131 return util.chunkbuffer(generator(fh))
125 elif header == "HG10UN":
126 return fh
127
132
128 raise util.Abort(_("%s: unknown bundle compression type")
133 def readbundle(fh, fname):
129 % fname)
134 header = fh.read(6)
135 if not header.startswith('HG'):
136 raise util.Abort(_('%s: not a Mercurial bundle file') % fname)
137 if not header.startswith('HG10'):
138 raise util.Abort(_('%s: unknown bundle version') % fname)
139 elif header not in bundletypes:
140 raise util.Abort(_('%s: unknown bundle compression type') % fname)
141 return unbundle(header, fh)
@@ -1,243 +1,221 b''
1 #
1 #
2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms
5 # This software may be used and distributed according to the terms
6 # of the GNU General Public License, incorporated herein by reference.
6 # of the GNU General Public License, incorporated herein by reference.
7
7
8 import cStringIO, zlib, bz2, tempfile, errno, os, sys
8 import cStringIO, zlib, bz2, tempfile, errno, os, sys
9 from mercurial import util, streamclone
9 from mercurial import util, streamclone
10 from mercurial.i18n import gettext as _
10 from mercurial.i18n import gettext as _
11 from mercurial.node import *
11 from mercurial.node import *
12 from mercurial import changegroup as changegroupmod
12 from common import HTTP_OK, HTTP_NOT_FOUND, HTTP_SERVER_ERROR
13 from common import HTTP_OK, HTTP_NOT_FOUND, HTTP_SERVER_ERROR
13
14
14 # __all__ is populated with the allowed commands. Be sure to add to it if
15 # __all__ is populated with the allowed commands. Be sure to add to it if
15 # you're adding a new command, or the new command won't work.
16 # you're adding a new command, or the new command won't work.
16
17
17 __all__ = [
18 __all__ = [
18 'lookup', 'heads', 'branches', 'between', 'changegroup',
19 'lookup', 'heads', 'branches', 'between', 'changegroup',
19 'changegroupsubset', 'capabilities', 'unbundle', 'stream_out',
20 'changegroupsubset', 'capabilities', 'unbundle', 'stream_out',
20 ]
21 ]
21
22
22 HGTYPE = 'application/mercurial-0.1'
23 HGTYPE = 'application/mercurial-0.1'
23
24
24 def lookup(web, req):
25 def lookup(web, req):
25 try:
26 try:
26 r = hex(web.repo.lookup(req.form['key'][0]))
27 r = hex(web.repo.lookup(req.form['key'][0]))
27 success = 1
28 success = 1
28 except Exception,inst:
29 except Exception,inst:
29 r = str(inst)
30 r = str(inst)
30 success = 0
31 success = 0
31 resp = "%s %s\n" % (success, r)
32 resp = "%s %s\n" % (success, r)
32 req.respond(HTTP_OK, HGTYPE, length=len(resp))
33 req.respond(HTTP_OK, HGTYPE, length=len(resp))
33 req.write(resp)
34 req.write(resp)
34
35
35 def heads(web, req):
36 def heads(web, req):
36 resp = " ".join(map(hex, web.repo.heads())) + "\n"
37 resp = " ".join(map(hex, web.repo.heads())) + "\n"
37 req.respond(HTTP_OK, HGTYPE, length=len(resp))
38 req.respond(HTTP_OK, HGTYPE, length=len(resp))
38 req.write(resp)
39 req.write(resp)
39
40
40 def branches(web, req):
41 def branches(web, req):
41 nodes = []
42 nodes = []
42 if 'nodes' in req.form:
43 if 'nodes' in req.form:
43 nodes = map(bin, req.form['nodes'][0].split(" "))
44 nodes = map(bin, req.form['nodes'][0].split(" "))
44 resp = cStringIO.StringIO()
45 resp = cStringIO.StringIO()
45 for b in web.repo.branches(nodes):
46 for b in web.repo.branches(nodes):
46 resp.write(" ".join(map(hex, b)) + "\n")
47 resp.write(" ".join(map(hex, b)) + "\n")
47 resp = resp.getvalue()
48 resp = resp.getvalue()
48 req.respond(HTTP_OK, HGTYPE, length=len(resp))
49 req.respond(HTTP_OK, HGTYPE, length=len(resp))
49 req.write(resp)
50 req.write(resp)
50
51
51 def between(web, req):
52 def between(web, req):
52 if 'pairs' in req.form:
53 if 'pairs' in req.form:
53 pairs = [map(bin, p.split("-"))
54 pairs = [map(bin, p.split("-"))
54 for p in req.form['pairs'][0].split(" ")]
55 for p in req.form['pairs'][0].split(" ")]
55 resp = cStringIO.StringIO()
56 resp = cStringIO.StringIO()
56 for b in web.repo.between(pairs):
57 for b in web.repo.between(pairs):
57 resp.write(" ".join(map(hex, b)) + "\n")
58 resp.write(" ".join(map(hex, b)) + "\n")
58 resp = resp.getvalue()
59 resp = resp.getvalue()
59 req.respond(HTTP_OK, HGTYPE, length=len(resp))
60 req.respond(HTTP_OK, HGTYPE, length=len(resp))
60 req.write(resp)
61 req.write(resp)
61
62
62 def changegroup(web, req):
63 def changegroup(web, req):
63 req.respond(HTTP_OK, HGTYPE)
64 req.respond(HTTP_OK, HGTYPE)
64 nodes = []
65 nodes = []
65 if not web.allowpull:
66 if not web.allowpull:
66 return
67 return
67
68
68 if 'roots' in req.form:
69 if 'roots' in req.form:
69 nodes = map(bin, req.form['roots'][0].split(" "))
70 nodes = map(bin, req.form['roots'][0].split(" "))
70
71
71 z = zlib.compressobj()
72 z = zlib.compressobj()
72 f = web.repo.changegroup(nodes, 'serve')
73 f = web.repo.changegroup(nodes, 'serve')
73 while 1:
74 while 1:
74 chunk = f.read(4096)
75 chunk = f.read(4096)
75 if not chunk:
76 if not chunk:
76 break
77 break
77 req.write(z.compress(chunk))
78 req.write(z.compress(chunk))
78
79
79 req.write(z.flush())
80 req.write(z.flush())
80
81
81 def changegroupsubset(web, req):
82 def changegroupsubset(web, req):
82 req.respond(HTTP_OK, HGTYPE)
83 req.respond(HTTP_OK, HGTYPE)
83 bases = []
84 bases = []
84 heads = []
85 heads = []
85 if not web.allowpull:
86 if not web.allowpull:
86 return
87 return
87
88
88 if 'bases' in req.form:
89 if 'bases' in req.form:
89 bases = [bin(x) for x in req.form['bases'][0].split(' ')]
90 bases = [bin(x) for x in req.form['bases'][0].split(' ')]
90 if 'heads' in req.form:
91 if 'heads' in req.form:
91 heads = [bin(x) for x in req.form['heads'][0].split(' ')]
92 heads = [bin(x) for x in req.form['heads'][0].split(' ')]
92
93
93 z = zlib.compressobj()
94 z = zlib.compressobj()
94 f = web.repo.changegroupsubset(bases, heads, 'serve')
95 f = web.repo.changegroupsubset(bases, heads, 'serve')
95 while 1:
96 while 1:
96 chunk = f.read(4096)
97 chunk = f.read(4096)
97 if not chunk:
98 if not chunk:
98 break
99 break
99 req.write(z.compress(chunk))
100 req.write(z.compress(chunk))
100
101
101 req.write(z.flush())
102 req.write(z.flush())
102
103
103 def capabilities(web, req):
104 def capabilities(web, req):
104 resp = ' '.join(web.capabilities())
105 resp = ' '.join(web.capabilities())
105 req.respond(HTTP_OK, HGTYPE, length=len(resp))
106 req.respond(HTTP_OK, HGTYPE, length=len(resp))
106 req.write(resp)
107 req.write(resp)
107
108
108 def unbundle(web, req):
109 def unbundle(web, req):
109 def bail(response, headers={}):
110 def bail(response, headers={}):
110 length = int(req.env['CONTENT_LENGTH'])
111 length = int(req.env['CONTENT_LENGTH'])
111 for s in util.filechunkiter(req, limit=length):
112 for s in util.filechunkiter(req, limit=length):
112 # drain incoming bundle, else client will not see
113 # drain incoming bundle, else client will not see
113 # response when run outside cgi script
114 # response when run outside cgi script
114 pass
115 pass
115 req.header(headers.items())
116 req.header(headers.items())
116 req.respond(HTTP_OK, HGTYPE)
117 req.respond(HTTP_OK, HGTYPE)
117 req.write('0\n')
118 req.write('0\n')
118 req.write(response)
119 req.write(response)
119
120
120 # require ssl by default, auth info cannot be sniffed and
121 # require ssl by default, auth info cannot be sniffed and
121 # replayed
122 # replayed
122 ssl_req = web.configbool('web', 'push_ssl', True)
123 ssl_req = web.configbool('web', 'push_ssl', True)
123 if ssl_req:
124 if ssl_req:
124 if req.env.get('wsgi.url_scheme') != 'https':
125 if req.env.get('wsgi.url_scheme') != 'https':
125 bail(_('ssl required\n'))
126 bail(_('ssl required\n'))
126 return
127 return
127 proto = 'https'
128 proto = 'https'
128 else:
129 else:
129 proto = 'http'
130 proto = 'http'
130
131
131 # do not allow push unless explicitly allowed
132 # do not allow push unless explicitly allowed
132 if not web.check_perm(req, 'push', False):
133 if not web.check_perm(req, 'push', False):
133 bail(_('push not authorized\n'),
134 bail(_('push not authorized\n'),
134 headers={'status': '401 Unauthorized'})
135 headers={'status': '401 Unauthorized'})
135 return
136 return
136
137
137 their_heads = req.form['heads'][0].split(' ')
138 their_heads = req.form['heads'][0].split(' ')
138
139
139 def check_heads():
140 def check_heads():
140 heads = map(hex, web.repo.heads())
141 heads = map(hex, web.repo.heads())
141 return their_heads == [hex('force')] or their_heads == heads
142 return their_heads == [hex('force')] or their_heads == heads
142
143
143 # fail early if possible
144 # fail early if possible
144 if not check_heads():
145 if not check_heads():
145 bail(_('unsynced changes\n'))
146 bail(_('unsynced changes\n'))
146 return
147 return
147
148
148 req.respond(HTTP_OK, HGTYPE)
149 req.respond(HTTP_OK, HGTYPE)
149
150
150 # do not lock repo until all changegroup data is
151 # do not lock repo until all changegroup data is
151 # streamed. save to temporary file.
152 # streamed. save to temporary file.
152
153
153 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
154 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
154 fp = os.fdopen(fd, 'wb+')
155 fp = os.fdopen(fd, 'wb+')
155 try:
156 try:
156 length = int(req.env['CONTENT_LENGTH'])
157 length = int(req.env['CONTENT_LENGTH'])
157 for s in util.filechunkiter(req, limit=length):
158 for s in util.filechunkiter(req, limit=length):
158 fp.write(s)
159 fp.write(s)
159
160
160 try:
161 try:
161 lock = web.repo.lock()
162 lock = web.repo.lock()
162 try:
163 try:
163 if not check_heads():
164 if not check_heads():
164 req.write('0\n')
165 req.write('0\n')
165 req.write(_('unsynced changes\n'))
166 req.write(_('unsynced changes\n'))
166 return
167 return
167
168
168 fp.seek(0)
169 fp.seek(0)
169 header = fp.read(6)
170 header = fp.read(6)
170 if not header.startswith("HG"):
171 if header.startswith('HG') and not header.startswith('HG10'):
171 # old client with uncompressed bundle
172 raise ValueError('unknown bundle version')
172 def generator(f):
173 elif header not in changegroupmod.bundletypes:
173 yield header
174 raise ValueError('unknown bundle compression type')
174 for chunk in f:
175 gen = changegroupmod.unbundle(header, fp)
175 yield chunk
176 elif not header.startswith("HG10"):
177 req.write("0\n")
178 req.write(_("unknown bundle version\n"))
179 return
180 elif header == "HG10GZ":
181 def generator(f):
182 zd = zlib.decompressobj()
183 for chunk in f:
184 yield zd.decompress(chunk)
185 elif header == "HG10BZ":
186 def generator(f):
187 zd = bz2.BZ2Decompressor()
188 zd.decompress("BZ")
189 for chunk in f:
190 yield zd.decompress(chunk)
191 elif header == "HG10UN":
192 def generator(f):
193 for chunk in f:
194 yield chunk
195 else:
196 req.write("0\n")
197 req.write(_("unknown bundle compression type\n"))
198 return
199 gen = generator(util.filechunkiter(fp, 4096))
200
176
201 # send addchangegroup output to client
177 # send addchangegroup output to client
202
178
203 old_stdout = sys.stdout
179 old_stdout = sys.stdout
204 sys.stdout = cStringIO.StringIO()
180 sys.stdout = cStringIO.StringIO()
205
181
206 try:
182 try:
207 url = 'remote:%s:%s' % (proto,
183 url = 'remote:%s:%s' % (proto,
208 req.env.get('REMOTE_HOST', ''))
184 req.env.get('REMOTE_HOST', ''))
209 try:
185 try:
210 ret = web.repo.addchangegroup(
186 ret = web.repo.addchangegroup(gen, 'serve', url)
211 util.chunkbuffer(gen), 'serve', url)
212 except util.Abort, inst:
187 except util.Abort, inst:
213 sys.stdout.write("abort: %s\n" % inst)
188 sys.stdout.write("abort: %s\n" % inst)
214 ret = 0
189 ret = 0
215 finally:
190 finally:
216 val = sys.stdout.getvalue()
191 val = sys.stdout.getvalue()
217 sys.stdout = old_stdout
192 sys.stdout = old_stdout
218 req.write('%d\n' % ret)
193 req.write('%d\n' % ret)
219 req.write(val)
194 req.write(val)
220 finally:
195 finally:
221 del lock
196 del lock
197 except ValueError, inst:
198 req.write('0\n')
199 req.write(str(inst) + '\n')
222 except (OSError, IOError), inst:
200 except (OSError, IOError), inst:
223 req.write('0\n')
201 req.write('0\n')
224 filename = getattr(inst, 'filename', '')
202 filename = getattr(inst, 'filename', '')
225 # Don't send our filesystem layout to the client
203 # Don't send our filesystem layout to the client
226 if filename.startswith(web.repo.root):
204 if filename.startswith(web.repo.root):
227 filename = filename[len(web.repo.root)+1:]
205 filename = filename[len(web.repo.root)+1:]
228 else:
206 else:
229 filename = ''
207 filename = ''
230 error = getattr(inst, 'strerror', 'Unknown error')
208 error = getattr(inst, 'strerror', 'Unknown error')
231 if inst.errno == errno.ENOENT:
209 if inst.errno == errno.ENOENT:
232 code = HTTP_NOT_FOUND
210 code = HTTP_NOT_FOUND
233 else:
211 else:
234 code = HTTP_SERVER_ERROR
212 code = HTTP_SERVER_ERROR
235 req.respond(code)
213 req.respond(code)
236 req.write('%s: %s\n' % (error, filename))
214 req.write('%s: %s\n' % (error, filename))
237 finally:
215 finally:
238 fp.close()
216 fp.close()
239 os.unlink(tempname)
217 os.unlink(tempname)
240
218
241 def stream_out(web, req):
219 def stream_out(web, req):
242 req.respond(HTTP_OK, HGTYPE)
220 req.respond(HTTP_OK, HGTYPE)
243 streamclone.stream_out(web.repo, req, untrusted=True)
221 streamclone.stream_out(web.repo, req, untrusted=True)
General Comments 0
You need to be logged in to leave comments. Login now