##// END OF EJS Templates
bundle: unify/refactor unbundle/readbundle
Matt Mackall -
r12042:210049a8 default
parent child Browse files
Show More
@@ -1,161 +1,161 b''
1 # changegroup.py - Mercurial changegroup manipulation functions
1 # changegroup.py - Mercurial changegroup manipulation functions
2 #
2 #
3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from i18n import _
8 from i18n import _
9 import util
9 import util
10 import struct, os, bz2, zlib, tempfile
10 import struct, os, bz2, zlib, tempfile
11
11
12 def getchunk(source):
12 def getchunk(source):
13 """return the next chunk from changegroup 'source' as a string"""
13 """return the next chunk from changegroup 'source' as a string"""
14 d = source.read(4)
14 d = source.read(4)
15 if not d:
15 if not d:
16 return ""
16 return ""
17 l = struct.unpack(">l", d)[0]
17 l = struct.unpack(">l", d)[0]
18 if l <= 4:
18 if l <= 4:
19 return ""
19 return ""
20 d = source.read(l - 4)
20 d = source.read(l - 4)
21 if len(d) < l - 4:
21 if len(d) < l - 4:
22 raise util.Abort(_("premature EOF reading chunk"
22 raise util.Abort(_("premature EOF reading chunk"
23 " (got %d bytes, expected %d)")
23 " (got %d bytes, expected %d)")
24 % (len(d), l - 4))
24 % (len(d), l - 4))
25 return d
25 return d
26
26
27 def chunkiter(source, progress=None):
27 def chunkiter(source, progress=None):
28 """iterate through the chunks in source, yielding a sequence of chunks
28 """iterate through the chunks in source, yielding a sequence of chunks
29 (strings)"""
29 (strings)"""
30 while 1:
30 while 1:
31 c = getchunk(source)
31 c = getchunk(source)
32 if not c:
32 if not c:
33 break
33 break
34 elif progress is not None:
34 elif progress is not None:
35 progress()
35 progress()
36 yield c
36 yield c
37
37
38 def chunkheader(length):
38 def chunkheader(length):
39 """return a changegroup chunk header (string)"""
39 """return a changegroup chunk header (string)"""
40 return struct.pack(">l", length + 4)
40 return struct.pack(">l", length + 4)
41
41
42 def closechunk():
42 def closechunk():
43 """return a changegroup chunk header (string) for a zero-length chunk"""
43 """return a changegroup chunk header (string) for a zero-length chunk"""
44 return struct.pack(">l", 0)
44 return struct.pack(">l", 0)
45
45
46 class nocompress(object):
46 class nocompress(object):
47 def compress(self, x):
47 def compress(self, x):
48 return x
48 return x
49 def flush(self):
49 def flush(self):
50 return ""
50 return ""
51
51
52 bundletypes = {
52 bundletypes = {
53 "": ("", nocompress),
53 "": ("", nocompress),
54 "HG10UN": ("HG10UN", nocompress),
54 "HG10UN": ("HG10UN", nocompress),
55 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
55 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
56 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
56 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
57 }
57 }
58
58
59 def collector(cl, mmfs, files):
59 def collector(cl, mmfs, files):
60 # Gather information about changeset nodes going out in a bundle.
60 # Gather information about changeset nodes going out in a bundle.
61 # We want to gather manifests needed and filelogs affected.
61 # We want to gather manifests needed and filelogs affected.
62 def collect(node):
62 def collect(node):
63 c = cl.read(node)
63 c = cl.read(node)
64 files.update(c[3])
64 files.update(c[3])
65 mmfs.setdefault(c[0], node)
65 mmfs.setdefault(c[0], node)
66 return collect
66 return collect
67
67
68 # hgweb uses this list to communicate its preferred type
68 # hgweb uses this list to communicate its preferred type
69 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
69 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
70
70
71 def writebundle(cg, filename, bundletype):
71 def writebundle(cg, filename, bundletype):
72 """Write a bundle file and return its filename.
72 """Write a bundle file and return its filename.
73
73
74 Existing files will not be overwritten.
74 Existing files will not be overwritten.
75 If no filename is specified, a temporary file is created.
75 If no filename is specified, a temporary file is created.
76 bz2 compression can be turned off.
76 bz2 compression can be turned off.
77 The bundle file will be deleted in case of errors.
77 The bundle file will be deleted in case of errors.
78 """
78 """
79
79
80 fh = None
80 fh = None
81 cleanup = None
81 cleanup = None
82 try:
82 try:
83 if filename:
83 if filename:
84 fh = open(filename, "wb")
84 fh = open(filename, "wb")
85 else:
85 else:
86 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
86 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
87 fh = os.fdopen(fd, "wb")
87 fh = os.fdopen(fd, "wb")
88 cleanup = filename
88 cleanup = filename
89
89
90 header, compressor = bundletypes[bundletype]
90 header, compressor = bundletypes[bundletype]
91 fh.write(header)
91 fh.write(header)
92 z = compressor()
92 z = compressor()
93
93
94 # parse the changegroup data, otherwise we will block
94 # parse the changegroup data, otherwise we will block
95 # in case of sshrepo because we don't know the end of the stream
95 # in case of sshrepo because we don't know the end of the stream
96
96
97 # an empty chunkiter is the end of the changegroup
97 # an empty chunkiter is the end of the changegroup
98 # a changegroup has at least 2 chunkiters (changelog and manifest).
98 # a changegroup has at least 2 chunkiters (changelog and manifest).
99 # after that, an empty chunkiter is the end of the changegroup
99 # after that, an empty chunkiter is the end of the changegroup
100 empty = False
100 empty = False
101 count = 0
101 count = 0
102 while not empty or count <= 2:
102 while not empty or count <= 2:
103 empty = True
103 empty = True
104 count += 1
104 count += 1
105 for chunk in chunkiter(cg):
105 for chunk in chunkiter(cg):
106 empty = False
106 empty = False
107 fh.write(z.compress(chunkheader(len(chunk))))
107 fh.write(z.compress(chunkheader(len(chunk))))
108 pos = 0
108 pos = 0
109 while pos < len(chunk):
109 while pos < len(chunk):
110 next = pos + 2**20
110 next = pos + 2**20
111 fh.write(z.compress(chunk[pos:next]))
111 fh.write(z.compress(chunk[pos:next]))
112 pos = next
112 pos = next
113 fh.write(z.compress(closechunk()))
113 fh.write(z.compress(closechunk()))
114 fh.write(z.flush())
114 fh.write(z.flush())
115 cleanup = None
115 cleanup = None
116 return filename
116 return filename
117 finally:
117 finally:
118 if fh is not None:
118 if fh is not None:
119 fh.close()
119 fh.close()
120 if cleanup is not None:
120 if cleanup is not None:
121 os.unlink(cleanup)
121 os.unlink(cleanup)
122
122
123 def decompressor(fh, alg):
123 def decompressor(fh, alg):
124 if alg == 'UN':
124 if alg == 'UN':
125 return fh
125 return fh
126 elif alg == 'GZ':
126 elif alg == 'GZ':
127 def generator(f):
127 def generator(f):
128 zd = zlib.decompressobj()
128 zd = zlib.decompressobj()
129 for chunk in f:
129 for chunk in f:
130 yield zd.decompress(chunk)
130 yield zd.decompress(chunk)
131 elif alg == 'BZ':
131 elif alg == 'BZ':
132 def generator(f):
132 def generator(f):
133 zd = bz2.BZ2Decompressor()
133 zd = bz2.BZ2Decompressor()
134 zd.decompress("BZ")
134 zd.decompress("BZ")
135 for chunk in util.filechunkiter(f, 4096):
135 for chunk in util.filechunkiter(f, 4096):
136 yield zd.decompress(chunk)
136 yield zd.decompress(chunk)
137 else:
137 else:
138 raise util.Abort("unknown bundle compression '%s'" % alg)
138 raise util.Abort("unknown bundle compression '%s'" % alg)
139 return generator(fh)
139 return generator(fh)
140
140
141 def unbundle(header, fh):
142 if not header.startswith('HG'):
143 def fixup(f, h):
144 yield h
145 for x in f:
146 yield x
147 fh = fixup(f, h)
148 header = "HG10UN"
149
150 alg = header[4:6]
151 return util.chunkbuffer(decompressor(fh, alg))
152
153 def readbundle(fh, fname):
141 def readbundle(fh, fname):
154 header = fh.read(6)
142 header = fh.read(6)
155 if not header.startswith('HG'):
143
156 raise util.Abort(_('%s: not a Mercurial bundle file') % fname)
144 if not fname:
157 if not header.startswith('HG10'):
145 fname = "stream"
158 raise util.Abort(_('%s: unknown bundle version') % fname)
146 if not header.startswith('HG') and header.startswith('\0'):
159 elif header not in bundletypes:
147 # headerless bundle, clean things up
160 raise util.Abort(_('%s: unknown bundle compression type') % fname)
148 def fixup(f, h):
161 return unbundle(header, fh)
149 yield h
150 for x in f:
151 yield x
152 fh = fixup(fh, header)
153 header = "HG10UN"
154
155 magic, version, alg = header[0:2], header[2:4], header[4:6]
156
157 if magic != 'HG':
158 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
159 if version != '10':
160 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
161 return util.chunkbuffer(decompressor(fh, alg))
@@ -1,332 +1,326 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import repo, error, encoding, util, store
12 import repo, error, encoding, util, store
13 import pushkey as pushkey_
13 import pushkey as pushkey_
14
14
15 # list of nodes encoding / decoding
15 # list of nodes encoding / decoding
16
16
17 def decodelist(l, sep=' '):
17 def decodelist(l, sep=' '):
18 return map(bin, l.split(sep))
18 return map(bin, l.split(sep))
19
19
20 def encodelist(l, sep=' '):
20 def encodelist(l, sep=' '):
21 return sep.join(map(hex, l))
21 return sep.join(map(hex, l))
22
22
23 # client side
23 # client side
24
24
25 class wirerepository(repo.repository):
25 class wirerepository(repo.repository):
26 def lookup(self, key):
26 def lookup(self, key):
27 self.requirecap('lookup', _('look up remote revision'))
27 self.requirecap('lookup', _('look up remote revision'))
28 d = self._call("lookup", key=key)
28 d = self._call("lookup", key=key)
29 success, data = d[:-1].split(" ", 1)
29 success, data = d[:-1].split(" ", 1)
30 if int(success):
30 if int(success):
31 return bin(data)
31 return bin(data)
32 self._abort(error.RepoError(data))
32 self._abort(error.RepoError(data))
33
33
34 def heads(self):
34 def heads(self):
35 d = self._call("heads")
35 d = self._call("heads")
36 try:
36 try:
37 return decodelist(d[:-1])
37 return decodelist(d[:-1])
38 except:
38 except:
39 self._abort(error.ResponseError(_("unexpected response:"), d))
39 self._abort(error.ResponseError(_("unexpected response:"), d))
40
40
41 def branchmap(self):
41 def branchmap(self):
42 d = self._call("branchmap")
42 d = self._call("branchmap")
43 try:
43 try:
44 branchmap = {}
44 branchmap = {}
45 for branchpart in d.splitlines():
45 for branchpart in d.splitlines():
46 branchname, branchheads = branchpart.split(' ', 1)
46 branchname, branchheads = branchpart.split(' ', 1)
47 branchname = urllib.unquote(branchname)
47 branchname = urllib.unquote(branchname)
48 # Earlier servers (1.3.x) send branch names in (their) local
48 # Earlier servers (1.3.x) send branch names in (their) local
49 # charset. The best we can do is assume it's identical to our
49 # charset. The best we can do is assume it's identical to our
50 # own local charset, in case it's not utf-8.
50 # own local charset, in case it's not utf-8.
51 try:
51 try:
52 branchname.decode('utf-8')
52 branchname.decode('utf-8')
53 except UnicodeDecodeError:
53 except UnicodeDecodeError:
54 branchname = encoding.fromlocal(branchname)
54 branchname = encoding.fromlocal(branchname)
55 branchheads = decodelist(branchheads)
55 branchheads = decodelist(branchheads)
56 branchmap[branchname] = branchheads
56 branchmap[branchname] = branchheads
57 return branchmap
57 return branchmap
58 except TypeError:
58 except TypeError:
59 self._abort(error.ResponseError(_("unexpected response:"), d))
59 self._abort(error.ResponseError(_("unexpected response:"), d))
60
60
61 def branches(self, nodes):
61 def branches(self, nodes):
62 n = encodelist(nodes)
62 n = encodelist(nodes)
63 d = self._call("branches", nodes=n)
63 d = self._call("branches", nodes=n)
64 try:
64 try:
65 br = [tuple(decodelist(b)) for b in d.splitlines()]
65 br = [tuple(decodelist(b)) for b in d.splitlines()]
66 return br
66 return br
67 except:
67 except:
68 self._abort(error.ResponseError(_("unexpected response:"), d))
68 self._abort(error.ResponseError(_("unexpected response:"), d))
69
69
70 def between(self, pairs):
70 def between(self, pairs):
71 batch = 8 # avoid giant requests
71 batch = 8 # avoid giant requests
72 r = []
72 r = []
73 for i in xrange(0, len(pairs), batch):
73 for i in xrange(0, len(pairs), batch):
74 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
74 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
75 d = self._call("between", pairs=n)
75 d = self._call("between", pairs=n)
76 try:
76 try:
77 r.extend(l and decodelist(l) or [] for l in d.splitlines())
77 r.extend(l and decodelist(l) or [] for l in d.splitlines())
78 except:
78 except:
79 self._abort(error.ResponseError(_("unexpected response:"), d))
79 self._abort(error.ResponseError(_("unexpected response:"), d))
80 return r
80 return r
81
81
82 def pushkey(self, namespace, key, old, new):
82 def pushkey(self, namespace, key, old, new):
83 if not self.capable('pushkey'):
83 if not self.capable('pushkey'):
84 return False
84 return False
85 d = self._call("pushkey",
85 d = self._call("pushkey",
86 namespace=namespace, key=key, old=old, new=new)
86 namespace=namespace, key=key, old=old, new=new)
87 return bool(int(d))
87 return bool(int(d))
88
88
89 def listkeys(self, namespace):
89 def listkeys(self, namespace):
90 if not self.capable('pushkey'):
90 if not self.capable('pushkey'):
91 return {}
91 return {}
92 d = self._call("listkeys", namespace=namespace)
92 d = self._call("listkeys", namespace=namespace)
93 r = {}
93 r = {}
94 for l in d.splitlines():
94 for l in d.splitlines():
95 k, v = l.split('\t')
95 k, v = l.split('\t')
96 r[k.decode('string-escape')] = v.decode('string-escape')
96 r[k.decode('string-escape')] = v.decode('string-escape')
97 return r
97 return r
98
98
99 def stream_out(self):
99 def stream_out(self):
100 return self._callstream('stream_out')
100 return self._callstream('stream_out')
101
101
102 def changegroup(self, nodes, kind):
102 def changegroup(self, nodes, kind):
103 n = encodelist(nodes)
103 n = encodelist(nodes)
104 f = self._callstream("changegroup", roots=n)
104 f = self._callstream("changegroup", roots=n)
105 return self._decompress(f)
105 return self._decompress(f)
106
106
107 def changegroupsubset(self, bases, heads, kind):
107 def changegroupsubset(self, bases, heads, kind):
108 self.requirecap('changegroupsubset', _('look up remote changes'))
108 self.requirecap('changegroupsubset', _('look up remote changes'))
109 bases = encodelist(bases)
109 bases = encodelist(bases)
110 heads = encodelist(heads)
110 heads = encodelist(heads)
111 return self._decompress(self._callstream("changegroupsubset",
111 return self._decompress(self._callstream("changegroupsubset",
112 bases=bases, heads=heads))
112 bases=bases, heads=heads))
113
113
114 def unbundle(self, cg, heads, source):
114 def unbundle(self, cg, heads, source):
115 '''Send cg (a readable file-like object representing the
115 '''Send cg (a readable file-like object representing the
116 changegroup to push, typically a chunkbuffer object) to the
116 changegroup to push, typically a chunkbuffer object) to the
117 remote server as a bundle. Return an integer indicating the
117 remote server as a bundle. Return an integer indicating the
118 result of the push (see localrepository.addchangegroup()).'''
118 result of the push (see localrepository.addchangegroup()).'''
119
119
120 ret, output = self._callpush("unbundle", cg, heads=encodelist(heads))
120 ret, output = self._callpush("unbundle", cg, heads=encodelist(heads))
121 if ret == "":
121 if ret == "":
122 raise error.ResponseError(
122 raise error.ResponseError(
123 _('push failed:'), output)
123 _('push failed:'), output)
124 try:
124 try:
125 ret = int(ret)
125 ret = int(ret)
126 except ValueError, err:
126 except ValueError, err:
127 raise error.ResponseError(
127 raise error.ResponseError(
128 _('push failed (unexpected response):'), ret)
128 _('push failed (unexpected response):'), ret)
129
129
130 for l in output.splitlines(True):
130 for l in output.splitlines(True):
131 self.ui.status(_('remote: '), l)
131 self.ui.status(_('remote: '), l)
132 return ret
132 return ret
133
133
134 # server side
134 # server side
135
135
136 class streamres(object):
136 class streamres(object):
137 def __init__(self, gen):
137 def __init__(self, gen):
138 self.gen = gen
138 self.gen = gen
139
139
140 class pushres(object):
140 class pushres(object):
141 def __init__(self, res):
141 def __init__(self, res):
142 self.res = res
142 self.res = res
143
143
144 def dispatch(repo, proto, command):
144 def dispatch(repo, proto, command):
145 func, spec = commands[command]
145 func, spec = commands[command]
146 args = proto.getargs(spec)
146 args = proto.getargs(spec)
147 return func(repo, proto, *args)
147 return func(repo, proto, *args)
148
148
149 def between(repo, proto, pairs):
149 def between(repo, proto, pairs):
150 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
150 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
151 r = []
151 r = []
152 for b in repo.between(pairs):
152 for b in repo.between(pairs):
153 r.append(encodelist(b) + "\n")
153 r.append(encodelist(b) + "\n")
154 return "".join(r)
154 return "".join(r)
155
155
156 def branchmap(repo, proto):
156 def branchmap(repo, proto):
157 branchmap = repo.branchmap()
157 branchmap = repo.branchmap()
158 heads = []
158 heads = []
159 for branch, nodes in branchmap.iteritems():
159 for branch, nodes in branchmap.iteritems():
160 branchname = urllib.quote(branch)
160 branchname = urllib.quote(branch)
161 branchnodes = encodelist(nodes)
161 branchnodes = encodelist(nodes)
162 heads.append('%s %s' % (branchname, branchnodes))
162 heads.append('%s %s' % (branchname, branchnodes))
163 return '\n'.join(heads)
163 return '\n'.join(heads)
164
164
165 def branches(repo, proto, nodes):
165 def branches(repo, proto, nodes):
166 nodes = decodelist(nodes)
166 nodes = decodelist(nodes)
167 r = []
167 r = []
168 for b in repo.branches(nodes):
168 for b in repo.branches(nodes):
169 r.append(encodelist(b) + "\n")
169 r.append(encodelist(b) + "\n")
170 return "".join(r)
170 return "".join(r)
171
171
172 def capabilities(repo, proto):
172 def capabilities(repo, proto):
173 caps = 'lookup changegroupsubset branchmap pushkey'.split()
173 caps = 'lookup changegroupsubset branchmap pushkey'.split()
174 if _allowstream(repo.ui):
174 if _allowstream(repo.ui):
175 caps.append('stream=%d' % repo.changelog.version)
175 caps.append('stream=%d' % repo.changelog.version)
176 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
176 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
177 return ' '.join(caps)
177 return ' '.join(caps)
178
178
179 def changegroup(repo, proto, roots):
179 def changegroup(repo, proto, roots):
180 nodes = decodelist(roots)
180 nodes = decodelist(roots)
181 cg = repo.changegroup(nodes, 'serve')
181 cg = repo.changegroup(nodes, 'serve')
182 return streamres(proto.groupchunks(cg))
182 return streamres(proto.groupchunks(cg))
183
183
184 def changegroupsubset(repo, proto, bases, heads):
184 def changegroupsubset(repo, proto, bases, heads):
185 bases = decodelist(bases)
185 bases = decodelist(bases)
186 heads = decodelist(heads)
186 heads = decodelist(heads)
187 cg = repo.changegroupsubset(bases, heads, 'serve')
187 cg = repo.changegroupsubset(bases, heads, 'serve')
188 return streamres(proto.groupchunks(cg))
188 return streamres(proto.groupchunks(cg))
189
189
190 def heads(repo, proto):
190 def heads(repo, proto):
191 h = repo.heads()
191 h = repo.heads()
192 return encodelist(h) + "\n"
192 return encodelist(h) + "\n"
193
193
194 def hello(repo, proto):
194 def hello(repo, proto):
195 '''the hello command returns a set of lines describing various
195 '''the hello command returns a set of lines describing various
196 interesting things about the server, in an RFC822-like format.
196 interesting things about the server, in an RFC822-like format.
197 Currently the only one defined is "capabilities", which
197 Currently the only one defined is "capabilities", which
198 consists of a line in the form:
198 consists of a line in the form:
199
199
200 capabilities: space separated list of tokens
200 capabilities: space separated list of tokens
201 '''
201 '''
202 return "capabilities: %s\n" % (capabilities(repo, proto))
202 return "capabilities: %s\n" % (capabilities(repo, proto))
203
203
204 def listkeys(repo, proto, namespace):
204 def listkeys(repo, proto, namespace):
205 d = pushkey_.list(repo, namespace).items()
205 d = pushkey_.list(repo, namespace).items()
206 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
206 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
207 v.encode('string-escape')) for k, v in d])
207 v.encode('string-escape')) for k, v in d])
208 return t
208 return t
209
209
210 def lookup(repo, proto, key):
210 def lookup(repo, proto, key):
211 try:
211 try:
212 r = hex(repo.lookup(key))
212 r = hex(repo.lookup(key))
213 success = 1
213 success = 1
214 except Exception, inst:
214 except Exception, inst:
215 r = str(inst)
215 r = str(inst)
216 success = 0
216 success = 0
217 return "%s %s\n" % (success, r)
217 return "%s %s\n" % (success, r)
218
218
219 def pushkey(repo, proto, namespace, key, old, new):
219 def pushkey(repo, proto, namespace, key, old, new):
220 r = pushkey_.push(repo, namespace, key, old, new)
220 r = pushkey_.push(repo, namespace, key, old, new)
221 return '%s\n' % int(r)
221 return '%s\n' % int(r)
222
222
223 def _allowstream(ui):
223 def _allowstream(ui):
224 return ui.configbool('server', 'uncompressed', True, untrusted=True)
224 return ui.configbool('server', 'uncompressed', True, untrusted=True)
225
225
226 def stream(repo, proto):
226 def stream(repo, proto):
227 '''If the server supports streaming clone, it advertises the "stream"
227 '''If the server supports streaming clone, it advertises the "stream"
228 capability with a value representing the version and flags of the repo
228 capability with a value representing the version and flags of the repo
229 it is serving. Client checks to see if it understands the format.
229 it is serving. Client checks to see if it understands the format.
230
230
231 The format is simple: the server writes out a line with the amount
231 The format is simple: the server writes out a line with the amount
232 of files, then the total amount of bytes to be transfered (separated
232 of files, then the total amount of bytes to be transfered (separated
233 by a space). Then, for each file, the server first writes the filename
233 by a space). Then, for each file, the server first writes the filename
234 and filesize (separated by the null character), then the file contents.
234 and filesize (separated by the null character), then the file contents.
235 '''
235 '''
236
236
237 if not _allowstream(repo.ui):
237 if not _allowstream(repo.ui):
238 return '1\n'
238 return '1\n'
239
239
240 entries = []
240 entries = []
241 total_bytes = 0
241 total_bytes = 0
242 try:
242 try:
243 # get consistent snapshot of repo, lock during scan
243 # get consistent snapshot of repo, lock during scan
244 lock = repo.lock()
244 lock = repo.lock()
245 try:
245 try:
246 repo.ui.debug('scanning\n')
246 repo.ui.debug('scanning\n')
247 for name, ename, size in repo.store.walk():
247 for name, ename, size in repo.store.walk():
248 entries.append((name, size))
248 entries.append((name, size))
249 total_bytes += size
249 total_bytes += size
250 finally:
250 finally:
251 lock.release()
251 lock.release()
252 except error.LockError:
252 except error.LockError:
253 return '2\n' # error: 2
253 return '2\n' # error: 2
254
254
255 def streamer(repo, entries, total):
255 def streamer(repo, entries, total):
256 '''stream out all metadata files in repository.'''
256 '''stream out all metadata files in repository.'''
257 yield '0\n' # success
257 yield '0\n' # success
258 repo.ui.debug('%d files, %d bytes to transfer\n' %
258 repo.ui.debug('%d files, %d bytes to transfer\n' %
259 (len(entries), total_bytes))
259 (len(entries), total_bytes))
260 yield '%d %d\n' % (len(entries), total_bytes)
260 yield '%d %d\n' % (len(entries), total_bytes)
261 for name, size in entries:
261 for name, size in entries:
262 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
262 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
263 # partially encode name over the wire for backwards compat
263 # partially encode name over the wire for backwards compat
264 yield '%s\0%d\n' % (store.encodedir(name), size)
264 yield '%s\0%d\n' % (store.encodedir(name), size)
265 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
265 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
266 yield chunk
266 yield chunk
267
267
268 return streamres(streamer(repo, entries, total_bytes))
268 return streamres(streamer(repo, entries, total_bytes))
269
269
270 def unbundle(repo, proto, heads):
270 def unbundle(repo, proto, heads):
271 their_heads = decodelist(heads)
271 their_heads = decodelist(heads)
272
272
273 def check_heads():
273 def check_heads():
274 heads = repo.heads()
274 heads = repo.heads()
275 return their_heads == ['force'] or their_heads == heads
275 return their_heads == ['force'] or their_heads == heads
276
276
277 # fail early if possible
277 # fail early if possible
278 if not check_heads():
278 if not check_heads():
279 return 'unsynced changes'
279 return 'unsynced changes'
280
280
281 # write bundle data to temporary file because it can be big
281 # write bundle data to temporary file because it can be big
282 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
282 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
283 fp = os.fdopen(fd, 'wb+')
283 fp = os.fdopen(fd, 'wb+')
284 r = 0
284 r = 0
285 proto.redirect()
285 proto.redirect()
286 try:
286 try:
287 proto.getfile(fp)
287 proto.getfile(fp)
288 lock = repo.lock()
288 lock = repo.lock()
289 try:
289 try:
290 if not check_heads():
290 if not check_heads():
291 # someone else committed/pushed/unbundled while we
291 # someone else committed/pushed/unbundled while we
292 # were transferring data
292 # were transferring data
293 return 'unsynced changes'
293 return 'unsynced changes'
294
294
295 # push can proceed
295 # push can proceed
296 fp.seek(0)
296 fp.seek(0)
297 header = fp.read(6)
297 gen = changegroupmod.readbundle(fp, None)
298 if header.startswith('HG'):
299 if not header.startswith('HG10'):
300 raise ValueError('unknown bundle version')
301 elif header not in changegroupmod.bundletypes:
302 raise ValueError('unknown bundle compression type')
303 gen = changegroupmod.unbundle(header, fp)
304
298
305 try:
299 try:
306 r = repo.addchangegroup(gen, 'serve', proto._client(),
300 r = repo.addchangegroup(gen, 'serve', proto._client(),
307 lock=lock)
301 lock=lock)
308 except util.Abort, inst:
302 except util.Abort, inst:
309 sys.stderr.write("abort: %s\n" % inst)
303 sys.stderr.write("abort: %s\n" % inst)
310 finally:
304 finally:
311 lock.release()
305 lock.release()
312 return pushres(r)
306 return pushres(r)
313
307
314 finally:
308 finally:
315 fp.close()
309 fp.close()
316 os.unlink(tempname)
310 os.unlink(tempname)
317
311
318 commands = {
312 commands = {
319 'between': (between, 'pairs'),
313 'between': (between, 'pairs'),
320 'branchmap': (branchmap, ''),
314 'branchmap': (branchmap, ''),
321 'branches': (branches, 'nodes'),
315 'branches': (branches, 'nodes'),
322 'capabilities': (capabilities, ''),
316 'capabilities': (capabilities, ''),
323 'changegroup': (changegroup, 'roots'),
317 'changegroup': (changegroup, 'roots'),
324 'changegroupsubset': (changegroupsubset, 'bases heads'),
318 'changegroupsubset': (changegroupsubset, 'bases heads'),
325 'heads': (heads, ''),
319 'heads': (heads, ''),
326 'hello': (hello, ''),
320 'hello': (hello, ''),
327 'listkeys': (listkeys, 'namespace'),
321 'listkeys': (listkeys, 'namespace'),
328 'lookup': (lookup, 'key'),
322 'lookup': (lookup, 'key'),
329 'pushkey': (pushkey, 'namespace key old new'),
323 'pushkey': (pushkey, 'namespace key old new'),
330 'stream_out': (stream, ''),
324 'stream_out': (stream, ''),
331 'unbundle': (unbundle, 'heads'),
325 'unbundle': (unbundle, 'heads'),
332 }
326 }
General Comments 0
You need to be logged in to leave comments. Login now