##// END OF EJS Templates
wireproto: avoid naked excepts
Matt Mackall -
r13726:378522bd default
parent child Browse files
Show More
@@ -1,393 +1,393 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import repo, error, encoding, util, store
13 13 import pushkey as pushkeymod
14 14
15 15 # list of nodes encoding / decoding
16 16
17 17 def decodelist(l, sep=' '):
18 18 if l:
19 19 return map(bin, l.split(sep))
20 20 return []
21 21
22 22 def encodelist(l, sep=' '):
23 23 return sep.join(map(hex, l))
24 24
25 25 # client side
26 26
27 27 class wirerepository(repo.repository):
28 28 def lookup(self, key):
29 29 self.requirecap('lookup', _('look up remote revision'))
30 30 d = self._call("lookup", key=encoding.fromlocal(key))
31 31 success, data = d[:-1].split(" ", 1)
32 32 if int(success):
33 33 return bin(data)
34 34 self._abort(error.RepoError(data))
35 35
36 36 def heads(self):
37 37 d = self._call("heads")
38 38 try:
39 39 return decodelist(d[:-1])
40 except:
40 except ValueError:
41 41 self._abort(error.ResponseError(_("unexpected response:"), d))
42 42
43 43 def known(self, nodes):
44 44 n = encodelist(nodes)
45 45 d = self._call("known", nodes=n)
46 46 try:
47 47 return [bool(int(f)) for f in d]
48 except:
48 except ValueError:
49 49 self._abort(error.ResponseError(_("unexpected response:"), d))
50 50
51 51 def branchmap(self):
52 52 d = self._call("branchmap")
53 53 try:
54 54 branchmap = {}
55 55 for branchpart in d.splitlines():
56 56 branchname, branchheads = branchpart.split(' ', 1)
57 57 branchname = encoding.tolocal(urllib.unquote(branchname))
58 58 branchheads = decodelist(branchheads)
59 59 branchmap[branchname] = branchheads
60 60 return branchmap
61 61 except TypeError:
62 62 self._abort(error.ResponseError(_("unexpected response:"), d))
63 63
64 64 def branches(self, nodes):
65 65 n = encodelist(nodes)
66 66 d = self._call("branches", nodes=n)
67 67 try:
68 68 br = [tuple(decodelist(b)) for b in d.splitlines()]
69 69 return br
70 except:
70 except ValueError:
71 71 self._abort(error.ResponseError(_("unexpected response:"), d))
72 72
73 73 def between(self, pairs):
74 74 batch = 8 # avoid giant requests
75 75 r = []
76 76 for i in xrange(0, len(pairs), batch):
77 77 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
78 78 d = self._call("between", pairs=n)
79 79 try:
80 80 r.extend(l and decodelist(l) or [] for l in d.splitlines())
81 except:
81 except ValueError:
82 82 self._abort(error.ResponseError(_("unexpected response:"), d))
83 83 return r
84 84
85 85 def pushkey(self, namespace, key, old, new):
86 86 if not self.capable('pushkey'):
87 87 return False
88 88 d = self._call("pushkey",
89 89 namespace=encoding.fromlocal(namespace),
90 90 key=encoding.fromlocal(key),
91 91 old=encoding.fromlocal(old),
92 92 new=encoding.fromlocal(new))
93 93 try:
94 94 d = bool(int(d))
95 95 except ValueError:
96 96 raise error.ResponseError(
97 97 _('push failed (unexpected response):'), d)
98 98 return d
99 99
100 100 def listkeys(self, namespace):
101 101 if not self.capable('pushkey'):
102 102 return {}
103 103 d = self._call("listkeys", namespace=encoding.fromlocal(namespace))
104 104 r = {}
105 105 for l in d.splitlines():
106 106 k, v = l.split('\t')
107 107 r[encoding.tolocal(k)] = encoding.tolocal(v)
108 108 return r
109 109
110 110 def stream_out(self):
111 111 return self._callstream('stream_out')
112 112
113 113 def changegroup(self, nodes, kind):
114 114 n = encodelist(nodes)
115 115 f = self._callstream("changegroup", roots=n)
116 116 return changegroupmod.unbundle10(self._decompress(f), 'UN')
117 117
118 118 def changegroupsubset(self, bases, heads, kind):
119 119 self.requirecap('changegroupsubset', _('look up remote changes'))
120 120 bases = encodelist(bases)
121 121 heads = encodelist(heads)
122 122 f = self._callstream("changegroupsubset",
123 123 bases=bases, heads=heads)
124 124 return changegroupmod.unbundle10(self._decompress(f), 'UN')
125 125
126 126 def unbundle(self, cg, heads, source):
127 127 '''Send cg (a readable file-like object representing the
128 128 changegroup to push, typically a chunkbuffer object) to the
129 129 remote server as a bundle. Return an integer indicating the
130 130 result of the push (see localrepository.addchangegroup()).'''
131 131
132 132 ret, output = self._callpush("unbundle", cg, heads=encodelist(heads))
133 133 if ret == "":
134 134 raise error.ResponseError(
135 135 _('push failed:'), output)
136 136 try:
137 137 ret = int(ret)
138 138 except ValueError:
139 139 raise error.ResponseError(
140 140 _('push failed (unexpected response):'), ret)
141 141
142 142 for l in output.splitlines(True):
143 143 self.ui.status(_('remote: '), l)
144 144 return ret
145 145
146 146 def debugwireargs(self, one, two, three=None, four=None):
147 147 # don't pass optional arguments left at their default value
148 148 opts = {}
149 149 if three is not None:
150 150 opts['three'] = three
151 151 if four is not None:
152 152 opts['four'] = four
153 153 return self._call('debugwireargs', one=one, two=two, **opts)
154 154
155 155 # server side
156 156
157 157 class streamres(object):
158 158 def __init__(self, gen):
159 159 self.gen = gen
160 160
161 161 class pushres(object):
162 162 def __init__(self, res):
163 163 self.res = res
164 164
165 165 class pusherr(object):
166 166 def __init__(self, res):
167 167 self.res = res
168 168
169 169 def dispatch(repo, proto, command):
170 170 func, spec = commands[command]
171 171 args = proto.getargs(spec)
172 172 return func(repo, proto, *args)
173 173
174 174 def options(cmd, keys, others):
175 175 opts = {}
176 176 for k in keys:
177 177 if k in others:
178 178 opts[k] = others[k]
179 179 del others[k]
180 180 if others:
181 181 sys.stderr.write("abort: %s got unexpected arguments %s\n"
182 182 % (cmd, ",".join(others)))
183 183 return opts
184 184
185 185 def between(repo, proto, pairs):
186 186 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
187 187 r = []
188 188 for b in repo.between(pairs):
189 189 r.append(encodelist(b) + "\n")
190 190 return "".join(r)
191 191
192 192 def branchmap(repo, proto):
193 193 branchmap = repo.branchmap()
194 194 heads = []
195 195 for branch, nodes in branchmap.iteritems():
196 196 branchname = urllib.quote(encoding.fromlocal(branch))
197 197 branchnodes = encodelist(nodes)
198 198 heads.append('%s %s' % (branchname, branchnodes))
199 199 return '\n'.join(heads)
200 200
201 201 def branches(repo, proto, nodes):
202 202 nodes = decodelist(nodes)
203 203 r = []
204 204 for b in repo.branches(nodes):
205 205 r.append(encodelist(b) + "\n")
206 206 return "".join(r)
207 207
208 208 def capabilities(repo, proto):
209 209 caps = 'lookup changegroupsubset branchmap pushkey known'.split()
210 210 if _allowstream(repo.ui):
211 211 requiredformats = repo.requirements & repo.supportedformats
212 212 # if our local revlogs are just revlogv1, add 'stream' cap
213 213 if not requiredformats - set(('revlogv1',)):
214 214 caps.append('stream')
215 215 # otherwise, add 'streamreqs' detailing our local revlog format
216 216 else:
217 217 caps.append('streamreqs=%s' % ','.join(requiredformats))
218 218 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
219 219 return ' '.join(caps)
220 220
221 221 def changegroup(repo, proto, roots):
222 222 nodes = decodelist(roots)
223 223 cg = repo.changegroup(nodes, 'serve')
224 224 return streamres(proto.groupchunks(cg))
225 225
226 226 def changegroupsubset(repo, proto, bases, heads):
227 227 bases = decodelist(bases)
228 228 heads = decodelist(heads)
229 229 cg = repo.changegroupsubset(bases, heads, 'serve')
230 230 return streamres(proto.groupchunks(cg))
231 231
232 232 def debugwireargs(repo, proto, one, two, others):
233 233 # only accept optional args from the known set
234 234 opts = options('debugwireargs', ['three', 'four'], others)
235 235 return repo.debugwireargs(one, two, **opts)
236 236
237 237 def heads(repo, proto):
238 238 h = repo.heads()
239 239 return encodelist(h) + "\n"
240 240
241 241 def hello(repo, proto):
242 242 '''the hello command returns a set of lines describing various
243 243 interesting things about the server, in an RFC822-like format.
244 244 Currently the only one defined is "capabilities", which
245 245 consists of a line in the form:
246 246
247 247 capabilities: space separated list of tokens
248 248 '''
249 249 return "capabilities: %s\n" % (capabilities(repo, proto))
250 250
251 251 def listkeys(repo, proto, namespace):
252 252 d = pushkeymod.list(repo, encoding.tolocal(namespace)).items()
253 253 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
254 254 for k, v in d])
255 255 return t
256 256
257 257 def lookup(repo, proto, key):
258 258 try:
259 259 r = hex(repo.lookup(encoding.tolocal(key)))
260 260 success = 1
261 261 except Exception, inst:
262 262 r = str(inst)
263 263 success = 0
264 264 return "%s %s\n" % (success, r)
265 265
266 266 def known(repo, proto, nodes):
267 267 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
268 268
269 269 def pushkey(repo, proto, namespace, key, old, new):
270 270 # compatibility with pre-1.8 clients which were accidentally
271 271 # sending raw binary nodes rather than utf-8-encoded hex
272 272 if len(new) == 20 and new.encode('string-escape') != new:
273 273 # looks like it could be a binary node
274 274 try:
275 275 u = new.decode('utf-8')
276 276 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
277 277 except UnicodeDecodeError:
278 278 pass # binary, leave unmodified
279 279 else:
280 280 new = encoding.tolocal(new) # normal path
281 281
282 282 r = pushkeymod.push(repo,
283 283 encoding.tolocal(namespace), encoding.tolocal(key),
284 284 encoding.tolocal(old), new)
285 285 return '%s\n' % int(r)
286 286
287 287 def _allowstream(ui):
288 288 return ui.configbool('server', 'uncompressed', True, untrusted=True)
289 289
290 290 def stream(repo, proto):
291 291 '''If the server supports streaming clone, it advertises the "stream"
292 292 capability with a value representing the version and flags of the repo
293 293 it is serving. Client checks to see if it understands the format.
294 294
295 295 The format is simple: the server writes out a line with the amount
296 296 of files, then the total amount of bytes to be transfered (separated
297 297 by a space). Then, for each file, the server first writes the filename
298 298 and filesize (separated by the null character), then the file contents.
299 299 '''
300 300
301 301 if not _allowstream(repo.ui):
302 302 return '1\n'
303 303
304 304 entries = []
305 305 total_bytes = 0
306 306 try:
307 307 # get consistent snapshot of repo, lock during scan
308 308 lock = repo.lock()
309 309 try:
310 310 repo.ui.debug('scanning\n')
311 311 for name, ename, size in repo.store.walk():
312 312 entries.append((name, size))
313 313 total_bytes += size
314 314 finally:
315 315 lock.release()
316 316 except error.LockError:
317 317 return '2\n' # error: 2
318 318
319 319 def streamer(repo, entries, total):
320 320 '''stream out all metadata files in repository.'''
321 321 yield '0\n' # success
322 322 repo.ui.debug('%d files, %d bytes to transfer\n' %
323 323 (len(entries), total_bytes))
324 324 yield '%d %d\n' % (len(entries), total_bytes)
325 325 for name, size in entries:
326 326 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
327 327 # partially encode name over the wire for backwards compat
328 328 yield '%s\0%d\n' % (store.encodedir(name), size)
329 329 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
330 330 yield chunk
331 331
332 332 return streamres(streamer(repo, entries, total_bytes))
333 333
334 334 def unbundle(repo, proto, heads):
335 335 their_heads = decodelist(heads)
336 336
337 337 def check_heads():
338 338 heads = repo.heads()
339 339 return their_heads == ['force'] or their_heads == heads
340 340
341 341 proto.redirect()
342 342
343 343 # fail early if possible
344 344 if not check_heads():
345 345 return pusherr('unsynced changes')
346 346
347 347 # write bundle data to temporary file because it can be big
348 348 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
349 349 fp = os.fdopen(fd, 'wb+')
350 350 r = 0
351 351 try:
352 352 proto.getfile(fp)
353 353 lock = repo.lock()
354 354 try:
355 355 if not check_heads():
356 356 # someone else committed/pushed/unbundled while we
357 357 # were transferring data
358 358 return pusherr('unsynced changes')
359 359
360 360 # push can proceed
361 361 fp.seek(0)
362 362 gen = changegroupmod.readbundle(fp, None)
363 363
364 364 try:
365 365 r = repo.addchangegroup(gen, 'serve', proto._client(),
366 366 lock=lock)
367 367 except util.Abort, inst:
368 368 sys.stderr.write("abort: %s\n" % inst)
369 369 finally:
370 370 lock.release()
371 371 return pushres(r)
372 372
373 373 finally:
374 374 fp.close()
375 375 os.unlink(tempname)
376 376
377 377 commands = {
378 378 'between': (between, 'pairs'),
379 379 'branchmap': (branchmap, ''),
380 380 'branches': (branches, 'nodes'),
381 381 'capabilities': (capabilities, ''),
382 382 'changegroup': (changegroup, 'roots'),
383 383 'changegroupsubset': (changegroupsubset, 'bases heads'),
384 384 'debugwireargs': (debugwireargs, 'one two *'),
385 385 'heads': (heads, ''),
386 386 'hello': (hello, ''),
387 387 'known': (known, 'nodes'),
388 388 'listkeys': (listkeys, 'namespace'),
389 389 'lookup': (lookup, 'key'),
390 390 'pushkey': (pushkey, 'namespace key old new'),
391 391 'stream_out': (stream, ''),
392 392 'unbundle': (unbundle, 'heads'),
393 393 }
General Comments 0
You need to be logged in to leave comments. Login now