##// END OF EJS Templates
wireproto: catch possible cast error in pushkey...
David Soria Parra -
r13450:b3f9af7c stable
parent child Browse files
Show More
@@ -1,348 +1,353 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import repo, error, encoding, util, store
12 import repo, error, encoding, util, store
13 import pushkey as pushkeymod
13 import pushkey as pushkeymod
14
14
15 # list of nodes encoding / decoding
15 # list of nodes encoding / decoding
16
16
17 def decodelist(l, sep=' '):
17 def decodelist(l, sep=' '):
18 return map(bin, l.split(sep))
18 return map(bin, l.split(sep))
19
19
20 def encodelist(l, sep=' '):
20 def encodelist(l, sep=' '):
21 return sep.join(map(hex, l))
21 return sep.join(map(hex, l))
22
22
23 # client side
23 # client side
24
24
25 class wirerepository(repo.repository):
25 class wirerepository(repo.repository):
26 def lookup(self, key):
26 def lookup(self, key):
27 self.requirecap('lookup', _('look up remote revision'))
27 self.requirecap('lookup', _('look up remote revision'))
28 d = self._call("lookup", key=encoding.fromlocal(key))
28 d = self._call("lookup", key=encoding.fromlocal(key))
29 success, data = d[:-1].split(" ", 1)
29 success, data = d[:-1].split(" ", 1)
30 if int(success):
30 if int(success):
31 return bin(data)
31 return bin(data)
32 self._abort(error.RepoError(data))
32 self._abort(error.RepoError(data))
33
33
34 def heads(self):
34 def heads(self):
35 d = self._call("heads")
35 d = self._call("heads")
36 try:
36 try:
37 return decodelist(d[:-1])
37 return decodelist(d[:-1])
38 except:
38 except:
39 self._abort(error.ResponseError(_("unexpected response:"), d))
39 self._abort(error.ResponseError(_("unexpected response:"), d))
40
40
41 def branchmap(self):
41 def branchmap(self):
42 d = self._call("branchmap")
42 d = self._call("branchmap")
43 try:
43 try:
44 branchmap = {}
44 branchmap = {}
45 for branchpart in d.splitlines():
45 for branchpart in d.splitlines():
46 branchname, branchheads = branchpart.split(' ', 1)
46 branchname, branchheads = branchpart.split(' ', 1)
47 branchname = encoding.tolocal(urllib.unquote(branchname))
47 branchname = encoding.tolocal(urllib.unquote(branchname))
48 branchheads = decodelist(branchheads)
48 branchheads = decodelist(branchheads)
49 branchmap[branchname] = branchheads
49 branchmap[branchname] = branchheads
50 return branchmap
50 return branchmap
51 except TypeError:
51 except TypeError:
52 self._abort(error.ResponseError(_("unexpected response:"), d))
52 self._abort(error.ResponseError(_("unexpected response:"), d))
53
53
54 def branches(self, nodes):
54 def branches(self, nodes):
55 n = encodelist(nodes)
55 n = encodelist(nodes)
56 d = self._call("branches", nodes=n)
56 d = self._call("branches", nodes=n)
57 try:
57 try:
58 br = [tuple(decodelist(b)) for b in d.splitlines()]
58 br = [tuple(decodelist(b)) for b in d.splitlines()]
59 return br
59 return br
60 except:
60 except:
61 self._abort(error.ResponseError(_("unexpected response:"), d))
61 self._abort(error.ResponseError(_("unexpected response:"), d))
62
62
63 def between(self, pairs):
63 def between(self, pairs):
64 batch = 8 # avoid giant requests
64 batch = 8 # avoid giant requests
65 r = []
65 r = []
66 for i in xrange(0, len(pairs), batch):
66 for i in xrange(0, len(pairs), batch):
67 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
67 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
68 d = self._call("between", pairs=n)
68 d = self._call("between", pairs=n)
69 try:
69 try:
70 r.extend(l and decodelist(l) or [] for l in d.splitlines())
70 r.extend(l and decodelist(l) or [] for l in d.splitlines())
71 except:
71 except:
72 self._abort(error.ResponseError(_("unexpected response:"), d))
72 self._abort(error.ResponseError(_("unexpected response:"), d))
73 return r
73 return r
74
74
75 def pushkey(self, namespace, key, old, new):
75 def pushkey(self, namespace, key, old, new):
76 if not self.capable('pushkey'):
76 if not self.capable('pushkey'):
77 return False
77 return False
78 d = self._call("pushkey",
78 d = self._call("pushkey",
79 namespace=encoding.fromlocal(namespace),
79 namespace=encoding.fromlocal(namespace),
80 key=encoding.fromlocal(key),
80 key=encoding.fromlocal(key),
81 old=encoding.fromlocal(old),
81 old=encoding.fromlocal(old),
82 new=encoding.fromlocal(new))
82 new=encoding.fromlocal(new))
83 return bool(int(d))
83 try:
84 d = bool(int(d))
85 except ValueError:
86 raise error.ResponseError(
87 _('push failed (unexpected response):'), d)
88 return d
84
89
85 def listkeys(self, namespace):
90 def listkeys(self, namespace):
86 if not self.capable('pushkey'):
91 if not self.capable('pushkey'):
87 return {}
92 return {}
88 d = self._call("listkeys", namespace=encoding.fromlocal(namespace))
93 d = self._call("listkeys", namespace=encoding.fromlocal(namespace))
89 r = {}
94 r = {}
90 for l in d.splitlines():
95 for l in d.splitlines():
91 k, v = l.split('\t')
96 k, v = l.split('\t')
92 r[encoding.tolocal(k)] = encoding.tolocal(v)
97 r[encoding.tolocal(k)] = encoding.tolocal(v)
93 return r
98 return r
94
99
95 def stream_out(self):
100 def stream_out(self):
96 return self._callstream('stream_out')
101 return self._callstream('stream_out')
97
102
98 def changegroup(self, nodes, kind):
103 def changegroup(self, nodes, kind):
99 n = encodelist(nodes)
104 n = encodelist(nodes)
100 f = self._callstream("changegroup", roots=n)
105 f = self._callstream("changegroup", roots=n)
101 return changegroupmod.unbundle10(self._decompress(f), 'UN')
106 return changegroupmod.unbundle10(self._decompress(f), 'UN')
102
107
103 def changegroupsubset(self, bases, heads, kind):
108 def changegroupsubset(self, bases, heads, kind):
104 self.requirecap('changegroupsubset', _('look up remote changes'))
109 self.requirecap('changegroupsubset', _('look up remote changes'))
105 bases = encodelist(bases)
110 bases = encodelist(bases)
106 heads = encodelist(heads)
111 heads = encodelist(heads)
107 f = self._callstream("changegroupsubset",
112 f = self._callstream("changegroupsubset",
108 bases=bases, heads=heads)
113 bases=bases, heads=heads)
109 return changegroupmod.unbundle10(self._decompress(f), 'UN')
114 return changegroupmod.unbundle10(self._decompress(f), 'UN')
110
115
111 def unbundle(self, cg, heads, source):
116 def unbundle(self, cg, heads, source):
112 '''Send cg (a readable file-like object representing the
117 '''Send cg (a readable file-like object representing the
113 changegroup to push, typically a chunkbuffer object) to the
118 changegroup to push, typically a chunkbuffer object) to the
114 remote server as a bundle. Return an integer indicating the
119 remote server as a bundle. Return an integer indicating the
115 result of the push (see localrepository.addchangegroup()).'''
120 result of the push (see localrepository.addchangegroup()).'''
116
121
117 ret, output = self._callpush("unbundle", cg, heads=encodelist(heads))
122 ret, output = self._callpush("unbundle", cg, heads=encodelist(heads))
118 if ret == "":
123 if ret == "":
119 raise error.ResponseError(
124 raise error.ResponseError(
120 _('push failed:'), output)
125 _('push failed:'), output)
121 try:
126 try:
122 ret = int(ret)
127 ret = int(ret)
123 except ValueError:
128 except ValueError:
124 raise error.ResponseError(
129 raise error.ResponseError(
125 _('push failed (unexpected response):'), ret)
130 _('push failed (unexpected response):'), ret)
126
131
127 for l in output.splitlines(True):
132 for l in output.splitlines(True):
128 self.ui.status(_('remote: '), l)
133 self.ui.status(_('remote: '), l)
129 return ret
134 return ret
130
135
131 # server side
136 # server side
132
137
133 class streamres(object):
138 class streamres(object):
134 def __init__(self, gen):
139 def __init__(self, gen):
135 self.gen = gen
140 self.gen = gen
136
141
137 class pushres(object):
142 class pushres(object):
138 def __init__(self, res):
143 def __init__(self, res):
139 self.res = res
144 self.res = res
140
145
141 class pusherr(object):
146 class pusherr(object):
142 def __init__(self, res):
147 def __init__(self, res):
143 self.res = res
148 self.res = res
144
149
145 def dispatch(repo, proto, command):
150 def dispatch(repo, proto, command):
146 func, spec = commands[command]
151 func, spec = commands[command]
147 args = proto.getargs(spec)
152 args = proto.getargs(spec)
148 return func(repo, proto, *args)
153 return func(repo, proto, *args)
149
154
150 def between(repo, proto, pairs):
155 def between(repo, proto, pairs):
151 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
156 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
152 r = []
157 r = []
153 for b in repo.between(pairs):
158 for b in repo.between(pairs):
154 r.append(encodelist(b) + "\n")
159 r.append(encodelist(b) + "\n")
155 return "".join(r)
160 return "".join(r)
156
161
157 def branchmap(repo, proto):
162 def branchmap(repo, proto):
158 branchmap = repo.branchmap()
163 branchmap = repo.branchmap()
159 heads = []
164 heads = []
160 for branch, nodes in branchmap.iteritems():
165 for branch, nodes in branchmap.iteritems():
161 branchname = urllib.quote(encoding.fromlocal(branch))
166 branchname = urllib.quote(encoding.fromlocal(branch))
162 branchnodes = encodelist(nodes)
167 branchnodes = encodelist(nodes)
163 heads.append('%s %s' % (branchname, branchnodes))
168 heads.append('%s %s' % (branchname, branchnodes))
164 return '\n'.join(heads)
169 return '\n'.join(heads)
165
170
166 def branches(repo, proto, nodes):
171 def branches(repo, proto, nodes):
167 nodes = decodelist(nodes)
172 nodes = decodelist(nodes)
168 r = []
173 r = []
169 for b in repo.branches(nodes):
174 for b in repo.branches(nodes):
170 r.append(encodelist(b) + "\n")
175 r.append(encodelist(b) + "\n")
171 return "".join(r)
176 return "".join(r)
172
177
173 def capabilities(repo, proto):
178 def capabilities(repo, proto):
174 caps = 'lookup changegroupsubset branchmap pushkey'.split()
179 caps = 'lookup changegroupsubset branchmap pushkey'.split()
175 if _allowstream(repo.ui):
180 if _allowstream(repo.ui):
176 requiredformats = repo.requirements & repo.supportedformats
181 requiredformats = repo.requirements & repo.supportedformats
177 # if our local revlogs are just revlogv1, add 'stream' cap
182 # if our local revlogs are just revlogv1, add 'stream' cap
178 if not requiredformats - set(('revlogv1',)):
183 if not requiredformats - set(('revlogv1',)):
179 caps.append('stream')
184 caps.append('stream')
180 # otherwise, add 'streamreqs' detailing our local revlog format
185 # otherwise, add 'streamreqs' detailing our local revlog format
181 else:
186 else:
182 caps.append('streamreqs=%s' % ','.join(requiredformats))
187 caps.append('streamreqs=%s' % ','.join(requiredformats))
183 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
188 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
184 return ' '.join(caps)
189 return ' '.join(caps)
185
190
186 def changegroup(repo, proto, roots):
191 def changegroup(repo, proto, roots):
187 nodes = decodelist(roots)
192 nodes = decodelist(roots)
188 cg = repo.changegroup(nodes, 'serve')
193 cg = repo.changegroup(nodes, 'serve')
189 return streamres(proto.groupchunks(cg))
194 return streamres(proto.groupchunks(cg))
190
195
191 def changegroupsubset(repo, proto, bases, heads):
196 def changegroupsubset(repo, proto, bases, heads):
192 bases = decodelist(bases)
197 bases = decodelist(bases)
193 heads = decodelist(heads)
198 heads = decodelist(heads)
194 cg = repo.changegroupsubset(bases, heads, 'serve')
199 cg = repo.changegroupsubset(bases, heads, 'serve')
195 return streamres(proto.groupchunks(cg))
200 return streamres(proto.groupchunks(cg))
196
201
197 def heads(repo, proto):
202 def heads(repo, proto):
198 h = repo.heads()
203 h = repo.heads()
199 return encodelist(h) + "\n"
204 return encodelist(h) + "\n"
200
205
201 def hello(repo, proto):
206 def hello(repo, proto):
202 '''the hello command returns a set of lines describing various
207 '''the hello command returns a set of lines describing various
203 interesting things about the server, in an RFC822-like format.
208 interesting things about the server, in an RFC822-like format.
204 Currently the only one defined is "capabilities", which
209 Currently the only one defined is "capabilities", which
205 consists of a line in the form:
210 consists of a line in the form:
206
211
207 capabilities: space separated list of tokens
212 capabilities: space separated list of tokens
208 '''
213 '''
209 return "capabilities: %s\n" % (capabilities(repo, proto))
214 return "capabilities: %s\n" % (capabilities(repo, proto))
210
215
211 def listkeys(repo, proto, namespace):
216 def listkeys(repo, proto, namespace):
212 d = pushkeymod.list(repo, encoding.tolocal(namespace)).items()
217 d = pushkeymod.list(repo, encoding.tolocal(namespace)).items()
213 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
218 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
214 for k, v in d])
219 for k, v in d])
215 return t
220 return t
216
221
217 def lookup(repo, proto, key):
222 def lookup(repo, proto, key):
218 try:
223 try:
219 r = hex(repo.lookup(encoding.tolocal(key)))
224 r = hex(repo.lookup(encoding.tolocal(key)))
220 success = 1
225 success = 1
221 except Exception, inst:
226 except Exception, inst:
222 r = str(inst)
227 r = str(inst)
223 success = 0
228 success = 0
224 return "%s %s\n" % (success, r)
229 return "%s %s\n" % (success, r)
225
230
226 def pushkey(repo, proto, namespace, key, old, new):
231 def pushkey(repo, proto, namespace, key, old, new):
227 # compatibility with pre-1.8 clients which were accidentally
232 # compatibility with pre-1.8 clients which were accidentally
228 # sending raw binary nodes rather than utf-8-encoded hex
233 # sending raw binary nodes rather than utf-8-encoded hex
229 if len(new) == 20 and new.encode('string-escape') != new:
234 if len(new) == 20 and new.encode('string-escape') != new:
230 # looks like it could be a binary node
235 # looks like it could be a binary node
231 try:
236 try:
232 u = new.decode('utf-8')
237 u = new.decode('utf-8')
233 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
238 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
234 except UnicodeDecodeError:
239 except UnicodeDecodeError:
235 pass # binary, leave unmodified
240 pass # binary, leave unmodified
236 else:
241 else:
237 new = encoding.tolocal(new) # normal path
242 new = encoding.tolocal(new) # normal path
238
243
239 r = pushkeymod.push(repo,
244 r = pushkeymod.push(repo,
240 encoding.tolocal(namespace), encoding.tolocal(key),
245 encoding.tolocal(namespace), encoding.tolocal(key),
241 encoding.tolocal(old), new)
246 encoding.tolocal(old), new)
242 return '%s\n' % int(r)
247 return '%s\n' % int(r)
243
248
244 def _allowstream(ui):
249 def _allowstream(ui):
245 return ui.configbool('server', 'uncompressed', True, untrusted=True)
250 return ui.configbool('server', 'uncompressed', True, untrusted=True)
246
251
247 def stream(repo, proto):
252 def stream(repo, proto):
248 '''If the server supports streaming clone, it advertises the "stream"
253 '''If the server supports streaming clone, it advertises the "stream"
249 capability with a value representing the version and flags of the repo
254 capability with a value representing the version and flags of the repo
250 it is serving. Client checks to see if it understands the format.
255 it is serving. Client checks to see if it understands the format.
251
256
252 The format is simple: the server writes out a line with the amount
257 The format is simple: the server writes out a line with the amount
253 of files, then the total amount of bytes to be transfered (separated
258 of files, then the total amount of bytes to be transfered (separated
254 by a space). Then, for each file, the server first writes the filename
259 by a space). Then, for each file, the server first writes the filename
255 and filesize (separated by the null character), then the file contents.
260 and filesize (separated by the null character), then the file contents.
256 '''
261 '''
257
262
258 if not _allowstream(repo.ui):
263 if not _allowstream(repo.ui):
259 return '1\n'
264 return '1\n'
260
265
261 entries = []
266 entries = []
262 total_bytes = 0
267 total_bytes = 0
263 try:
268 try:
264 # get consistent snapshot of repo, lock during scan
269 # get consistent snapshot of repo, lock during scan
265 lock = repo.lock()
270 lock = repo.lock()
266 try:
271 try:
267 repo.ui.debug('scanning\n')
272 repo.ui.debug('scanning\n')
268 for name, ename, size in repo.store.walk():
273 for name, ename, size in repo.store.walk():
269 entries.append((name, size))
274 entries.append((name, size))
270 total_bytes += size
275 total_bytes += size
271 finally:
276 finally:
272 lock.release()
277 lock.release()
273 except error.LockError:
278 except error.LockError:
274 return '2\n' # error: 2
279 return '2\n' # error: 2
275
280
276 def streamer(repo, entries, total):
281 def streamer(repo, entries, total):
277 '''stream out all metadata files in repository.'''
282 '''stream out all metadata files in repository.'''
278 yield '0\n' # success
283 yield '0\n' # success
279 repo.ui.debug('%d files, %d bytes to transfer\n' %
284 repo.ui.debug('%d files, %d bytes to transfer\n' %
280 (len(entries), total_bytes))
285 (len(entries), total_bytes))
281 yield '%d %d\n' % (len(entries), total_bytes)
286 yield '%d %d\n' % (len(entries), total_bytes)
282 for name, size in entries:
287 for name, size in entries:
283 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
288 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
284 # partially encode name over the wire for backwards compat
289 # partially encode name over the wire for backwards compat
285 yield '%s\0%d\n' % (store.encodedir(name), size)
290 yield '%s\0%d\n' % (store.encodedir(name), size)
286 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
291 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
287 yield chunk
292 yield chunk
288
293
289 return streamres(streamer(repo, entries, total_bytes))
294 return streamres(streamer(repo, entries, total_bytes))
290
295
291 def unbundle(repo, proto, heads):
296 def unbundle(repo, proto, heads):
292 their_heads = decodelist(heads)
297 their_heads = decodelist(heads)
293
298
294 def check_heads():
299 def check_heads():
295 heads = repo.heads()
300 heads = repo.heads()
296 return their_heads == ['force'] or their_heads == heads
301 return their_heads == ['force'] or their_heads == heads
297
302
298 proto.redirect()
303 proto.redirect()
299
304
300 # fail early if possible
305 # fail early if possible
301 if not check_heads():
306 if not check_heads():
302 return pusherr('unsynced changes')
307 return pusherr('unsynced changes')
303
308
304 # write bundle data to temporary file because it can be big
309 # write bundle data to temporary file because it can be big
305 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
310 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
306 fp = os.fdopen(fd, 'wb+')
311 fp = os.fdopen(fd, 'wb+')
307 r = 0
312 r = 0
308 try:
313 try:
309 proto.getfile(fp)
314 proto.getfile(fp)
310 lock = repo.lock()
315 lock = repo.lock()
311 try:
316 try:
312 if not check_heads():
317 if not check_heads():
313 # someone else committed/pushed/unbundled while we
318 # someone else committed/pushed/unbundled while we
314 # were transferring data
319 # were transferring data
315 return pusherr('unsynced changes')
320 return pusherr('unsynced changes')
316
321
317 # push can proceed
322 # push can proceed
318 fp.seek(0)
323 fp.seek(0)
319 gen = changegroupmod.readbundle(fp, None)
324 gen = changegroupmod.readbundle(fp, None)
320
325
321 try:
326 try:
322 r = repo.addchangegroup(gen, 'serve', proto._client(),
327 r = repo.addchangegroup(gen, 'serve', proto._client(),
323 lock=lock)
328 lock=lock)
324 except util.Abort, inst:
329 except util.Abort, inst:
325 sys.stderr.write("abort: %s\n" % inst)
330 sys.stderr.write("abort: %s\n" % inst)
326 finally:
331 finally:
327 lock.release()
332 lock.release()
328 return pushres(r)
333 return pushres(r)
329
334
330 finally:
335 finally:
331 fp.close()
336 fp.close()
332 os.unlink(tempname)
337 os.unlink(tempname)
333
338
334 commands = {
339 commands = {
335 'between': (between, 'pairs'),
340 'between': (between, 'pairs'),
336 'branchmap': (branchmap, ''),
341 'branchmap': (branchmap, ''),
337 'branches': (branches, 'nodes'),
342 'branches': (branches, 'nodes'),
338 'capabilities': (capabilities, ''),
343 'capabilities': (capabilities, ''),
339 'changegroup': (changegroup, 'roots'),
344 'changegroup': (changegroup, 'roots'),
340 'changegroupsubset': (changegroupsubset, 'bases heads'),
345 'changegroupsubset': (changegroupsubset, 'bases heads'),
341 'heads': (heads, ''),
346 'heads': (heads, ''),
342 'hello': (hello, ''),
347 'hello': (hello, ''),
343 'listkeys': (listkeys, 'namespace'),
348 'listkeys': (listkeys, 'namespace'),
344 'lookup': (lookup, 'key'),
349 'lookup': (lookup, 'key'),
345 'pushkey': (pushkey, 'namespace key old new'),
350 'pushkey': (pushkey, 'namespace key old new'),
346 'stream_out': (stream, ''),
351 'stream_out': (stream, ''),
347 'unbundle': (unbundle, 'heads'),
352 'unbundle': (unbundle, 'heads'),
348 }
353 }
General Comments 0
You need to be logged in to leave comments. Login now