##// END OF EJS Templates
wireproto: enable optional args for known() for future extensibility...
Peter Arrenbrecht -
r14436:5adb5252 default
parent child Browse files
Show More
@@ -1,421 +1,421 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import repo, error, encoding, util, store
13 13 import pushkey as pushkeymod
14 14
15 15 # list of nodes encoding / decoding
16 16
17 17 def decodelist(l, sep=' '):
18 18 if l:
19 19 return map(bin, l.split(sep))
20 20 return []
21 21
22 22 def encodelist(l, sep=' '):
23 23 return sep.join(map(hex, l))
24 24
25 25 # client side
26 26
27 27 class wirerepository(repo.repository):
28 28 def lookup(self, key):
29 29 self.requirecap('lookup', _('look up remote revision'))
30 30 d = self._call("lookup", key=encoding.fromlocal(key))
31 31 success, data = d[:-1].split(" ", 1)
32 32 if int(success):
33 33 return bin(data)
34 34 self._abort(error.RepoError(data))
35 35
36 36 def heads(self):
37 37 d = self._call("heads")
38 38 try:
39 39 return decodelist(d[:-1])
40 40 except ValueError:
41 41 self._abort(error.ResponseError(_("unexpected response:"), d))
42 42
43 43 def known(self, nodes):
44 44 n = encodelist(nodes)
45 45 d = self._call("known", nodes=n)
46 46 try:
47 47 return [bool(int(f)) for f in d]
48 48 except ValueError:
49 49 self._abort(error.ResponseError(_("unexpected response:"), d))
50 50
51 51 def branchmap(self):
52 52 d = self._call("branchmap")
53 53 try:
54 54 branchmap = {}
55 55 for branchpart in d.splitlines():
56 56 branchname, branchheads = branchpart.split(' ', 1)
57 57 branchname = encoding.tolocal(urllib.unquote(branchname))
58 58 branchheads = decodelist(branchheads)
59 59 branchmap[branchname] = branchheads
60 60 return branchmap
61 61 except TypeError:
62 62 self._abort(error.ResponseError(_("unexpected response:"), d))
63 63
64 64 def branches(self, nodes):
65 65 n = encodelist(nodes)
66 66 d = self._call("branches", nodes=n)
67 67 try:
68 68 br = [tuple(decodelist(b)) for b in d.splitlines()]
69 69 return br
70 70 except ValueError:
71 71 self._abort(error.ResponseError(_("unexpected response:"), d))
72 72
73 73 def between(self, pairs):
74 74 batch = 8 # avoid giant requests
75 75 r = []
76 76 for i in xrange(0, len(pairs), batch):
77 77 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
78 78 d = self._call("between", pairs=n)
79 79 try:
80 80 r.extend(l and decodelist(l) or [] for l in d.splitlines())
81 81 except ValueError:
82 82 self._abort(error.ResponseError(_("unexpected response:"), d))
83 83 return r
84 84
85 85 def pushkey(self, namespace, key, old, new):
86 86 if not self.capable('pushkey'):
87 87 return False
88 88 d = self._call("pushkey",
89 89 namespace=encoding.fromlocal(namespace),
90 90 key=encoding.fromlocal(key),
91 91 old=encoding.fromlocal(old),
92 92 new=encoding.fromlocal(new))
93 93 try:
94 94 d = bool(int(d))
95 95 except ValueError:
96 96 raise error.ResponseError(
97 97 _('push failed (unexpected response):'), d)
98 98 return d
99 99
100 100 def listkeys(self, namespace):
101 101 if not self.capable('pushkey'):
102 102 return {}
103 103 d = self._call("listkeys", namespace=encoding.fromlocal(namespace))
104 104 r = {}
105 105 for l in d.splitlines():
106 106 k, v = l.split('\t')
107 107 r[encoding.tolocal(k)] = encoding.tolocal(v)
108 108 return r
109 109
110 110 def stream_out(self):
111 111 return self._callstream('stream_out')
112 112
113 113 def changegroup(self, nodes, kind):
114 114 n = encodelist(nodes)
115 115 f = self._callstream("changegroup", roots=n)
116 116 return changegroupmod.unbundle10(self._decompress(f), 'UN')
117 117
118 118 def changegroupsubset(self, bases, heads, kind):
119 119 self.requirecap('changegroupsubset', _('look up remote changes'))
120 120 bases = encodelist(bases)
121 121 heads = encodelist(heads)
122 122 f = self._callstream("changegroupsubset",
123 123 bases=bases, heads=heads)
124 124 return changegroupmod.unbundle10(self._decompress(f), 'UN')
125 125
126 126 def getbundle(self, source, heads=None, common=None):
127 127 self.requirecap('getbundle', _('look up remote changes'))
128 128 opts = {}
129 129 if heads is not None:
130 130 opts['heads'] = encodelist(heads)
131 131 if common is not None:
132 132 opts['common'] = encodelist(common)
133 133 f = self._callstream("getbundle", **opts)
134 134 return changegroupmod.unbundle10(self._decompress(f), 'UN')
135 135
136 136 def unbundle(self, cg, heads, source):
137 137 '''Send cg (a readable file-like object representing the
138 138 changegroup to push, typically a chunkbuffer object) to the
139 139 remote server as a bundle. Return an integer indicating the
140 140 result of the push (see localrepository.addchangegroup()).'''
141 141
142 142 if heads != ['force'] and self.capable('unbundlehash'):
143 143 heads = encodelist(['hashed',
144 144 util.sha1(''.join(sorted(heads))).digest()])
145 145 else:
146 146 heads = encodelist(heads)
147 147
148 148 ret, output = self._callpush("unbundle", cg, heads=heads)
149 149 if ret == "":
150 150 raise error.ResponseError(
151 151 _('push failed:'), output)
152 152 try:
153 153 ret = int(ret)
154 154 except ValueError:
155 155 raise error.ResponseError(
156 156 _('push failed (unexpected response):'), ret)
157 157
158 158 for l in output.splitlines(True):
159 159 self.ui.status(_('remote: '), l)
160 160 return ret
161 161
162 162 def debugwireargs(self, one, two, three=None, four=None, five=None):
163 163 # don't pass optional arguments left at their default value
164 164 opts = {}
165 165 if three is not None:
166 166 opts['three'] = three
167 167 if four is not None:
168 168 opts['four'] = four
169 169 return self._call('debugwireargs', one=one, two=two, **opts)
170 170
171 171 # server side
172 172
173 173 class streamres(object):
174 174 def __init__(self, gen):
175 175 self.gen = gen
176 176
177 177 class pushres(object):
178 178 def __init__(self, res):
179 179 self.res = res
180 180
181 181 class pusherr(object):
182 182 def __init__(self, res):
183 183 self.res = res
184 184
185 185 def dispatch(repo, proto, command):
186 186 func, spec = commands[command]
187 187 args = proto.getargs(spec)
188 188 return func(repo, proto, *args)
189 189
190 190 def options(cmd, keys, others):
191 191 opts = {}
192 192 for k in keys:
193 193 if k in others:
194 194 opts[k] = others[k]
195 195 del others[k]
196 196 if others:
197 197 sys.stderr.write("abort: %s got unexpected arguments %s\n"
198 198 % (cmd, ",".join(others)))
199 199 return opts
200 200
201 201 def between(repo, proto, pairs):
202 202 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
203 203 r = []
204 204 for b in repo.between(pairs):
205 205 r.append(encodelist(b) + "\n")
206 206 return "".join(r)
207 207
208 208 def branchmap(repo, proto):
209 209 branchmap = repo.branchmap()
210 210 heads = []
211 211 for branch, nodes in branchmap.iteritems():
212 212 branchname = urllib.quote(encoding.fromlocal(branch))
213 213 branchnodes = encodelist(nodes)
214 214 heads.append('%s %s' % (branchname, branchnodes))
215 215 return '\n'.join(heads)
216 216
217 217 def branches(repo, proto, nodes):
218 218 nodes = decodelist(nodes)
219 219 r = []
220 220 for b in repo.branches(nodes):
221 221 r.append(encodelist(b) + "\n")
222 222 return "".join(r)
223 223
224 224 def capabilities(repo, proto):
225 225 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
226 226 'unbundlehash').split()
227 227 if _allowstream(repo.ui):
228 228 requiredformats = repo.requirements & repo.supportedformats
229 229 # if our local revlogs are just revlogv1, add 'stream' cap
230 230 if not requiredformats - set(('revlogv1',)):
231 231 caps.append('stream')
232 232 # otherwise, add 'streamreqs' detailing our local revlog format
233 233 else:
234 234 caps.append('streamreqs=%s' % ','.join(requiredformats))
235 235 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
236 236 caps.append('httpheader=1024')
237 237 return ' '.join(caps)
238 238
239 239 def changegroup(repo, proto, roots):
240 240 nodes = decodelist(roots)
241 241 cg = repo.changegroup(nodes, 'serve')
242 242 return streamres(proto.groupchunks(cg))
243 243
244 244 def changegroupsubset(repo, proto, bases, heads):
245 245 bases = decodelist(bases)
246 246 heads = decodelist(heads)
247 247 cg = repo.changegroupsubset(bases, heads, 'serve')
248 248 return streamres(proto.groupchunks(cg))
249 249
250 250 def debugwireargs(repo, proto, one, two, others):
251 251 # only accept optional args from the known set
252 252 opts = options('debugwireargs', ['three', 'four'], others)
253 253 return repo.debugwireargs(one, two, **opts)
254 254
255 255 def getbundle(repo, proto, others):
256 256 opts = options('getbundle', ['heads', 'common'], others)
257 257 for k, v in opts.iteritems():
258 258 opts[k] = decodelist(v)
259 259 cg = repo.getbundle('serve', **opts)
260 260 return streamres(proto.groupchunks(cg))
261 261
262 262 def heads(repo, proto):
263 263 h = repo.heads()
264 264 return encodelist(h) + "\n"
265 265
266 266 def hello(repo, proto):
267 267 '''the hello command returns a set of lines describing various
268 268 interesting things about the server, in an RFC822-like format.
269 269 Currently the only one defined is "capabilities", which
270 270 consists of a line in the form:
271 271
272 272 capabilities: space separated list of tokens
273 273 '''
274 274 return "capabilities: %s\n" % (capabilities(repo, proto))
275 275
276 276 def listkeys(repo, proto, namespace):
277 277 d = pushkeymod.list(repo, encoding.tolocal(namespace)).items()
278 278 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
279 279 for k, v in d])
280 280 return t
281 281
282 282 def lookup(repo, proto, key):
283 283 try:
284 284 r = hex(repo.lookup(encoding.tolocal(key)))
285 285 success = 1
286 286 except Exception, inst:
287 287 r = str(inst)
288 288 success = 0
289 289 return "%s %s\n" % (success, r)
290 290
291 def known(repo, proto, nodes):
291 def known(repo, proto, nodes, others):
292 292 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
293 293
294 294 def pushkey(repo, proto, namespace, key, old, new):
295 295 # compatibility with pre-1.8 clients which were accidentally
296 296 # sending raw binary nodes rather than utf-8-encoded hex
297 297 if len(new) == 20 and new.encode('string-escape') != new:
298 298 # looks like it could be a binary node
299 299 try:
300 300 new.decode('utf-8')
301 301 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
302 302 except UnicodeDecodeError:
303 303 pass # binary, leave unmodified
304 304 else:
305 305 new = encoding.tolocal(new) # normal path
306 306
307 307 r = pushkeymod.push(repo,
308 308 encoding.tolocal(namespace), encoding.tolocal(key),
309 309 encoding.tolocal(old), new)
310 310 return '%s\n' % int(r)
311 311
312 312 def _allowstream(ui):
313 313 return ui.configbool('server', 'uncompressed', True, untrusted=True)
314 314
315 315 def stream(repo, proto):
316 316 '''If the server supports streaming clone, it advertises the "stream"
317 317 capability with a value representing the version and flags of the repo
318 318 it is serving. Client checks to see if it understands the format.
319 319
320 320 The format is simple: the server writes out a line with the amount
321 321 of files, then the total amount of bytes to be transfered (separated
322 322 by a space). Then, for each file, the server first writes the filename
323 323 and filesize (separated by the null character), then the file contents.
324 324 '''
325 325
326 326 if not _allowstream(repo.ui):
327 327 return '1\n'
328 328
329 329 entries = []
330 330 total_bytes = 0
331 331 try:
332 332 # get consistent snapshot of repo, lock during scan
333 333 lock = repo.lock()
334 334 try:
335 335 repo.ui.debug('scanning\n')
336 336 for name, ename, size in repo.store.walk():
337 337 entries.append((name, size))
338 338 total_bytes += size
339 339 finally:
340 340 lock.release()
341 341 except error.LockError:
342 342 return '2\n' # error: 2
343 343
344 344 def streamer(repo, entries, total):
345 345 '''stream out all metadata files in repository.'''
346 346 yield '0\n' # success
347 347 repo.ui.debug('%d files, %d bytes to transfer\n' %
348 348 (len(entries), total_bytes))
349 349 yield '%d %d\n' % (len(entries), total_bytes)
350 350 for name, size in entries:
351 351 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
352 352 # partially encode name over the wire for backwards compat
353 353 yield '%s\0%d\n' % (store.encodedir(name), size)
354 354 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
355 355 yield chunk
356 356
357 357 return streamres(streamer(repo, entries, total_bytes))
358 358
359 359 def unbundle(repo, proto, heads):
360 360 their_heads = decodelist(heads)
361 361
362 362 def check_heads():
363 363 heads = repo.heads()
364 364 heads_hash = util.sha1(''.join(sorted(heads))).digest()
365 365 return (their_heads == ['force'] or their_heads == heads or
366 366 their_heads == ['hashed', heads_hash])
367 367
368 368 proto.redirect()
369 369
370 370 # fail early if possible
371 371 if not check_heads():
372 372 return pusherr('unsynced changes')
373 373
374 374 # write bundle data to temporary file because it can be big
375 375 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
376 376 fp = os.fdopen(fd, 'wb+')
377 377 r = 0
378 378 try:
379 379 proto.getfile(fp)
380 380 lock = repo.lock()
381 381 try:
382 382 if not check_heads():
383 383 # someone else committed/pushed/unbundled while we
384 384 # were transferring data
385 385 return pusherr('unsynced changes')
386 386
387 387 # push can proceed
388 388 fp.seek(0)
389 389 gen = changegroupmod.readbundle(fp, None)
390 390
391 391 try:
392 392 r = repo.addchangegroup(gen, 'serve', proto._client(),
393 393 lock=lock)
394 394 except util.Abort, inst:
395 395 sys.stderr.write("abort: %s\n" % inst)
396 396 finally:
397 397 lock.release()
398 398 return pushres(r)
399 399
400 400 finally:
401 401 fp.close()
402 402 os.unlink(tempname)
403 403
404 404 commands = {
405 405 'between': (between, 'pairs'),
406 406 'branchmap': (branchmap, ''),
407 407 'branches': (branches, 'nodes'),
408 408 'capabilities': (capabilities, ''),
409 409 'changegroup': (changegroup, 'roots'),
410 410 'changegroupsubset': (changegroupsubset, 'bases heads'),
411 411 'debugwireargs': (debugwireargs, 'one two *'),
412 412 'getbundle': (getbundle, '*'),
413 413 'heads': (heads, ''),
414 414 'hello': (hello, ''),
415 'known': (known, 'nodes'),
415 'known': (known, 'nodes *'),
416 416 'listkeys': (listkeys, 'namespace'),
417 417 'lookup': (lookup, 'key'),
418 418 'pushkey': (pushkey, 'namespace key old new'),
419 419 'stream_out': (stream, ''),
420 420 'unbundle': (unbundle, 'heads'),
421 421 }
General Comments 0
You need to be logged in to leave comments. Login now