##// END OF EJS Templates
wireproto: catch possible cast error in pushkey...
David Soria Parra -
r13450:b3f9af7c stable
parent child Browse files
Show More
@@ -1,348 +1,353 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import repo, error, encoding, util, store
13 13 import pushkey as pushkeymod
14 14
15 15 # list of nodes encoding / decoding
16 16
17 17 def decodelist(l, sep=' '):
18 18 return map(bin, l.split(sep))
19 19
20 20 def encodelist(l, sep=' '):
21 21 return sep.join(map(hex, l))
22 22
23 23 # client side
24 24
25 25 class wirerepository(repo.repository):
26 26 def lookup(self, key):
27 27 self.requirecap('lookup', _('look up remote revision'))
28 28 d = self._call("lookup", key=encoding.fromlocal(key))
29 29 success, data = d[:-1].split(" ", 1)
30 30 if int(success):
31 31 return bin(data)
32 32 self._abort(error.RepoError(data))
33 33
34 34 def heads(self):
35 35 d = self._call("heads")
36 36 try:
37 37 return decodelist(d[:-1])
38 38 except:
39 39 self._abort(error.ResponseError(_("unexpected response:"), d))
40 40
41 41 def branchmap(self):
42 42 d = self._call("branchmap")
43 43 try:
44 44 branchmap = {}
45 45 for branchpart in d.splitlines():
46 46 branchname, branchheads = branchpart.split(' ', 1)
47 47 branchname = encoding.tolocal(urllib.unquote(branchname))
48 48 branchheads = decodelist(branchheads)
49 49 branchmap[branchname] = branchheads
50 50 return branchmap
51 51 except TypeError:
52 52 self._abort(error.ResponseError(_("unexpected response:"), d))
53 53
54 54 def branches(self, nodes):
55 55 n = encodelist(nodes)
56 56 d = self._call("branches", nodes=n)
57 57 try:
58 58 br = [tuple(decodelist(b)) for b in d.splitlines()]
59 59 return br
60 60 except:
61 61 self._abort(error.ResponseError(_("unexpected response:"), d))
62 62
63 63 def between(self, pairs):
64 64 batch = 8 # avoid giant requests
65 65 r = []
66 66 for i in xrange(0, len(pairs), batch):
67 67 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
68 68 d = self._call("between", pairs=n)
69 69 try:
70 70 r.extend(l and decodelist(l) or [] for l in d.splitlines())
71 71 except:
72 72 self._abort(error.ResponseError(_("unexpected response:"), d))
73 73 return r
74 74
75 75 def pushkey(self, namespace, key, old, new):
76 76 if not self.capable('pushkey'):
77 77 return False
78 78 d = self._call("pushkey",
79 79 namespace=encoding.fromlocal(namespace),
80 80 key=encoding.fromlocal(key),
81 81 old=encoding.fromlocal(old),
82 82 new=encoding.fromlocal(new))
83 return bool(int(d))
83 try:
84 d = bool(int(d))
85 except ValueError:
86 raise error.ResponseError(
87 _('push failed (unexpected response):'), d)
88 return d
84 89
85 90 def listkeys(self, namespace):
86 91 if not self.capable('pushkey'):
87 92 return {}
88 93 d = self._call("listkeys", namespace=encoding.fromlocal(namespace))
89 94 r = {}
90 95 for l in d.splitlines():
91 96 k, v = l.split('\t')
92 97 r[encoding.tolocal(k)] = encoding.tolocal(v)
93 98 return r
94 99
95 100 def stream_out(self):
96 101 return self._callstream('stream_out')
97 102
98 103 def changegroup(self, nodes, kind):
99 104 n = encodelist(nodes)
100 105 f = self._callstream("changegroup", roots=n)
101 106 return changegroupmod.unbundle10(self._decompress(f), 'UN')
102 107
103 108 def changegroupsubset(self, bases, heads, kind):
104 109 self.requirecap('changegroupsubset', _('look up remote changes'))
105 110 bases = encodelist(bases)
106 111 heads = encodelist(heads)
107 112 f = self._callstream("changegroupsubset",
108 113 bases=bases, heads=heads)
109 114 return changegroupmod.unbundle10(self._decompress(f), 'UN')
110 115
111 116 def unbundle(self, cg, heads, source):
112 117 '''Send cg (a readable file-like object representing the
113 118 changegroup to push, typically a chunkbuffer object) to the
114 119 remote server as a bundle. Return an integer indicating the
115 120 result of the push (see localrepository.addchangegroup()).'''
116 121
117 122 ret, output = self._callpush("unbundle", cg, heads=encodelist(heads))
118 123 if ret == "":
119 124 raise error.ResponseError(
120 125 _('push failed:'), output)
121 126 try:
122 127 ret = int(ret)
123 128 except ValueError:
124 129 raise error.ResponseError(
125 130 _('push failed (unexpected response):'), ret)
126 131
127 132 for l in output.splitlines(True):
128 133 self.ui.status(_('remote: '), l)
129 134 return ret
130 135
131 136 # server side
132 137
133 138 class streamres(object):
134 139 def __init__(self, gen):
135 140 self.gen = gen
136 141
137 142 class pushres(object):
138 143 def __init__(self, res):
139 144 self.res = res
140 145
141 146 class pusherr(object):
142 147 def __init__(self, res):
143 148 self.res = res
144 149
145 150 def dispatch(repo, proto, command):
146 151 func, spec = commands[command]
147 152 args = proto.getargs(spec)
148 153 return func(repo, proto, *args)
149 154
150 155 def between(repo, proto, pairs):
151 156 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
152 157 r = []
153 158 for b in repo.between(pairs):
154 159 r.append(encodelist(b) + "\n")
155 160 return "".join(r)
156 161
157 162 def branchmap(repo, proto):
158 163 branchmap = repo.branchmap()
159 164 heads = []
160 165 for branch, nodes in branchmap.iteritems():
161 166 branchname = urllib.quote(encoding.fromlocal(branch))
162 167 branchnodes = encodelist(nodes)
163 168 heads.append('%s %s' % (branchname, branchnodes))
164 169 return '\n'.join(heads)
165 170
166 171 def branches(repo, proto, nodes):
167 172 nodes = decodelist(nodes)
168 173 r = []
169 174 for b in repo.branches(nodes):
170 175 r.append(encodelist(b) + "\n")
171 176 return "".join(r)
172 177
173 178 def capabilities(repo, proto):
174 179 caps = 'lookup changegroupsubset branchmap pushkey'.split()
175 180 if _allowstream(repo.ui):
176 181 requiredformats = repo.requirements & repo.supportedformats
177 182 # if our local revlogs are just revlogv1, add 'stream' cap
178 183 if not requiredformats - set(('revlogv1',)):
179 184 caps.append('stream')
180 185 # otherwise, add 'streamreqs' detailing our local revlog format
181 186 else:
182 187 caps.append('streamreqs=%s' % ','.join(requiredformats))
183 188 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
184 189 return ' '.join(caps)
185 190
186 191 def changegroup(repo, proto, roots):
187 192 nodes = decodelist(roots)
188 193 cg = repo.changegroup(nodes, 'serve')
189 194 return streamres(proto.groupchunks(cg))
190 195
191 196 def changegroupsubset(repo, proto, bases, heads):
192 197 bases = decodelist(bases)
193 198 heads = decodelist(heads)
194 199 cg = repo.changegroupsubset(bases, heads, 'serve')
195 200 return streamres(proto.groupchunks(cg))
196 201
197 202 def heads(repo, proto):
198 203 h = repo.heads()
199 204 return encodelist(h) + "\n"
200 205
201 206 def hello(repo, proto):
202 207 '''the hello command returns a set of lines describing various
203 208 interesting things about the server, in an RFC822-like format.
204 209 Currently the only one defined is "capabilities", which
205 210 consists of a line in the form:
206 211
207 212 capabilities: space separated list of tokens
208 213 '''
209 214 return "capabilities: %s\n" % (capabilities(repo, proto))
210 215
211 216 def listkeys(repo, proto, namespace):
212 217 d = pushkeymod.list(repo, encoding.tolocal(namespace)).items()
213 218 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
214 219 for k, v in d])
215 220 return t
216 221
217 222 def lookup(repo, proto, key):
218 223 try:
219 224 r = hex(repo.lookup(encoding.tolocal(key)))
220 225 success = 1
221 226 except Exception, inst:
222 227 r = str(inst)
223 228 success = 0
224 229 return "%s %s\n" % (success, r)
225 230
226 231 def pushkey(repo, proto, namespace, key, old, new):
227 232 # compatibility with pre-1.8 clients which were accidentally
228 233 # sending raw binary nodes rather than utf-8-encoded hex
229 234 if len(new) == 20 and new.encode('string-escape') != new:
230 235 # looks like it could be a binary node
231 236 try:
232 237 u = new.decode('utf-8')
233 238 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
234 239 except UnicodeDecodeError:
235 240 pass # binary, leave unmodified
236 241 else:
237 242 new = encoding.tolocal(new) # normal path
238 243
239 244 r = pushkeymod.push(repo,
240 245 encoding.tolocal(namespace), encoding.tolocal(key),
241 246 encoding.tolocal(old), new)
242 247 return '%s\n' % int(r)
243 248
244 249 def _allowstream(ui):
245 250 return ui.configbool('server', 'uncompressed', True, untrusted=True)
246 251
247 252 def stream(repo, proto):
248 253 '''If the server supports streaming clone, it advertises the "stream"
249 254 capability with a value representing the version and flags of the repo
250 255 it is serving. Client checks to see if it understands the format.
251 256
252 257 The format is simple: the server writes out a line with the amount
253 258 of files, then the total amount of bytes to be transfered (separated
254 259 by a space). Then, for each file, the server first writes the filename
255 260 and filesize (separated by the null character), then the file contents.
256 261 '''
257 262
258 263 if not _allowstream(repo.ui):
259 264 return '1\n'
260 265
261 266 entries = []
262 267 total_bytes = 0
263 268 try:
264 269 # get consistent snapshot of repo, lock during scan
265 270 lock = repo.lock()
266 271 try:
267 272 repo.ui.debug('scanning\n')
268 273 for name, ename, size in repo.store.walk():
269 274 entries.append((name, size))
270 275 total_bytes += size
271 276 finally:
272 277 lock.release()
273 278 except error.LockError:
274 279 return '2\n' # error: 2
275 280
276 281 def streamer(repo, entries, total):
277 282 '''stream out all metadata files in repository.'''
278 283 yield '0\n' # success
279 284 repo.ui.debug('%d files, %d bytes to transfer\n' %
280 285 (len(entries), total_bytes))
281 286 yield '%d %d\n' % (len(entries), total_bytes)
282 287 for name, size in entries:
283 288 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
284 289 # partially encode name over the wire for backwards compat
285 290 yield '%s\0%d\n' % (store.encodedir(name), size)
286 291 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
287 292 yield chunk
288 293
289 294 return streamres(streamer(repo, entries, total_bytes))
290 295
291 296 def unbundle(repo, proto, heads):
292 297 their_heads = decodelist(heads)
293 298
294 299 def check_heads():
295 300 heads = repo.heads()
296 301 return their_heads == ['force'] or their_heads == heads
297 302
298 303 proto.redirect()
299 304
300 305 # fail early if possible
301 306 if not check_heads():
302 307 return pusherr('unsynced changes')
303 308
304 309 # write bundle data to temporary file because it can be big
305 310 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
306 311 fp = os.fdopen(fd, 'wb+')
307 312 r = 0
308 313 try:
309 314 proto.getfile(fp)
310 315 lock = repo.lock()
311 316 try:
312 317 if not check_heads():
313 318 # someone else committed/pushed/unbundled while we
314 319 # were transferring data
315 320 return pusherr('unsynced changes')
316 321
317 322 # push can proceed
318 323 fp.seek(0)
319 324 gen = changegroupmod.readbundle(fp, None)
320 325
321 326 try:
322 327 r = repo.addchangegroup(gen, 'serve', proto._client(),
323 328 lock=lock)
324 329 except util.Abort, inst:
325 330 sys.stderr.write("abort: %s\n" % inst)
326 331 finally:
327 332 lock.release()
328 333 return pushres(r)
329 334
330 335 finally:
331 336 fp.close()
332 337 os.unlink(tempname)
333 338
334 339 commands = {
335 340 'between': (between, 'pairs'),
336 341 'branchmap': (branchmap, ''),
337 342 'branches': (branches, 'nodes'),
338 343 'capabilities': (capabilities, ''),
339 344 'changegroup': (changegroup, 'roots'),
340 345 'changegroupsubset': (changegroupsubset, 'bases heads'),
341 346 'heads': (heads, ''),
342 347 'hello': (hello, ''),
343 348 'listkeys': (listkeys, 'namespace'),
344 349 'lookup': (lookup, 'key'),
345 350 'pushkey': (pushkey, 'namespace key old new'),
346 351 'stream_out': (stream, ''),
347 352 'unbundle': (unbundle, 'heads'),
348 353 }
General Comments 0
You need to be logged in to leave comments. Login now