##// END OF EJS Templates
wireproto: use safehasattr or getattr instead of hasattr
Augie Fackler -
r14970:592e45b7 default
parent child Browse files
Show More
@@ -1,602 +1,603 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import repo, error, encoding, util, store
13 13 import pushkey as pushkeymod
14 14
15 15 # abstract batching support
16 16
17 17 class future(object):
18 18 '''placeholder for a value to be set later'''
19 19 def set(self, value):
20 if hasattr(self, 'value'):
20 if util.safehasattr(self, 'value'):
21 21 raise error.RepoError("future is already set")
22 22 self.value = value
23 23
24 24 class batcher(object):
25 25 '''base class for batches of commands submittable in a single request
26 26
27 27 All methods invoked on instances of this class are simply queued and return a
28 28 a future for the result. Once you call submit(), all the queued calls are
29 29 performed and the results set in their respective futures.
30 30 '''
31 31 def __init__(self):
32 32 self.calls = []
33 33 def __getattr__(self, name):
34 34 def call(*args, **opts):
35 35 resref = future()
36 36 self.calls.append((name, args, opts, resref,))
37 37 return resref
38 38 return call
39 39 def submit(self):
40 40 pass
41 41
42 42 class localbatch(batcher):
43 43 '''performs the queued calls directly'''
44 44 def __init__(self, local):
45 45 batcher.__init__(self)
46 46 self.local = local
47 47 def submit(self):
48 48 for name, args, opts, resref in self.calls:
49 49 resref.set(getattr(self.local, name)(*args, **opts))
50 50
51 51 class remotebatch(batcher):
52 52 '''batches the queued calls; uses as few roundtrips as possible'''
53 53 def __init__(self, remote):
54 54 '''remote must support _submitbatch(encbatch) and _submitone(op, encargs)'''
55 55 batcher.__init__(self)
56 56 self.remote = remote
57 57 def submit(self):
58 58 req, rsp = [], []
59 59 for name, args, opts, resref in self.calls:
60 60 mtd = getattr(self.remote, name)
61 if hasattr(mtd, 'batchable'):
62 batchable = getattr(mtd, 'batchable')(mtd.im_self, *args, **opts)
61 batchablefn = getattr(mtd, 'batchable', None)
62 if batchablefn is not None:
63 batchable = batchablefn(mtd.im_self, *args, **opts)
63 64 encargsorres, encresref = batchable.next()
64 65 if encresref:
65 66 req.append((name, encargsorres,))
66 67 rsp.append((batchable, encresref, resref,))
67 68 else:
68 69 resref.set(encargsorres)
69 70 else:
70 71 if req:
71 72 self._submitreq(req, rsp)
72 73 req, rsp = [], []
73 74 resref.set(mtd(*args, **opts))
74 75 if req:
75 76 self._submitreq(req, rsp)
76 77 def _submitreq(self, req, rsp):
77 78 encresults = self.remote._submitbatch(req)
78 79 for encres, r in zip(encresults, rsp):
79 80 batchable, encresref, resref = r
80 81 encresref.set(encres)
81 82 resref.set(batchable.next())
82 83
83 84 def batchable(f):
84 85 '''annotation for batchable methods
85 86
86 87 Such methods must implement a coroutine as follows:
87 88
88 89 @batchable
89 90 def sample(self, one, two=None):
90 91 # Handle locally computable results first:
91 92 if not one:
92 93 yield "a local result", None
93 94 # Build list of encoded arguments suitable for your wire protocol:
94 95 encargs = [('one', encode(one),), ('two', encode(two),)]
95 96 # Create future for injection of encoded result:
96 97 encresref = future()
97 98 # Return encoded arguments and future:
98 99 yield encargs, encresref
99 100 # Assuming the future to be filled with the result from the batched request
100 101 # now. Decode it:
101 102 yield decode(encresref.value)
102 103
103 104 The decorator returns a function which wraps this coroutine as a plain method,
104 105 but adds the original method as an attribute called "batchable", which is
105 106 used by remotebatch to split the call into separate encoding and decoding
106 107 phases.
107 108 '''
108 109 def plain(*args, **opts):
109 110 batchable = f(*args, **opts)
110 111 encargsorres, encresref = batchable.next()
111 112 if not encresref:
112 113 return encargsorres # a local result in this case
113 114 self = args[0]
114 115 encresref.set(self._submitone(f.func_name, encargsorres))
115 116 return batchable.next()
116 117 setattr(plain, 'batchable', f)
117 118 return plain
118 119
119 120 # list of nodes encoding / decoding
120 121
121 122 def decodelist(l, sep=' '):
122 123 if l:
123 124 return map(bin, l.split(sep))
124 125 return []
125 126
126 127 def encodelist(l, sep=' '):
127 128 return sep.join(map(hex, l))
128 129
129 130 # batched call argument encoding
130 131
131 132 def escapearg(plain):
132 133 return (plain
133 134 .replace(':', '::')
134 135 .replace(',', ':,')
135 136 .replace(';', ':;')
136 137 .replace('=', ':='))
137 138
138 139 def unescapearg(escaped):
139 140 return (escaped
140 141 .replace(':=', '=')
141 142 .replace(':;', ';')
142 143 .replace(':,', ',')
143 144 .replace('::', ':'))
144 145
145 146 # client side
146 147
147 148 def todict(**args):
148 149 return args
149 150
150 151 class wirerepository(repo.repository):
151 152
152 153 def batch(self):
153 154 return remotebatch(self)
154 155 def _submitbatch(self, req):
155 156 cmds = []
156 157 for op, argsdict in req:
157 158 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
158 159 cmds.append('%s %s' % (op, args))
159 160 rsp = self._call("batch", cmds=';'.join(cmds))
160 161 return rsp.split(';')
161 162 def _submitone(self, op, args):
162 163 return self._call(op, **args)
163 164
164 165 @batchable
165 166 def lookup(self, key):
166 167 self.requirecap('lookup', _('look up remote revision'))
167 168 f = future()
168 169 yield todict(key=encoding.fromlocal(key)), f
169 170 d = f.value
170 171 success, data = d[:-1].split(" ", 1)
171 172 if int(success):
172 173 yield bin(data)
173 174 self._abort(error.RepoError(data))
174 175
175 176 @batchable
176 177 def heads(self):
177 178 f = future()
178 179 yield {}, f
179 180 d = f.value
180 181 try:
181 182 yield decodelist(d[:-1])
182 183 except ValueError:
183 184 self._abort(error.ResponseError(_("unexpected response:"), d))
184 185
185 186 @batchable
186 187 def known(self, nodes):
187 188 f = future()
188 189 yield todict(nodes=encodelist(nodes)), f
189 190 d = f.value
190 191 try:
191 192 yield [bool(int(f)) for f in d]
192 193 except ValueError:
193 194 self._abort(error.ResponseError(_("unexpected response:"), d))
194 195
195 196 @batchable
196 197 def branchmap(self):
197 198 f = future()
198 199 yield {}, f
199 200 d = f.value
200 201 try:
201 202 branchmap = {}
202 203 for branchpart in d.splitlines():
203 204 branchname, branchheads = branchpart.split(' ', 1)
204 205 branchname = encoding.tolocal(urllib.unquote(branchname))
205 206 branchheads = decodelist(branchheads)
206 207 branchmap[branchname] = branchheads
207 208 yield branchmap
208 209 except TypeError:
209 210 self._abort(error.ResponseError(_("unexpected response:"), d))
210 211
211 212 def branches(self, nodes):
212 213 n = encodelist(nodes)
213 214 d = self._call("branches", nodes=n)
214 215 try:
215 216 br = [tuple(decodelist(b)) for b in d.splitlines()]
216 217 return br
217 218 except ValueError:
218 219 self._abort(error.ResponseError(_("unexpected response:"), d))
219 220
220 221 def between(self, pairs):
221 222 batch = 8 # avoid giant requests
222 223 r = []
223 224 for i in xrange(0, len(pairs), batch):
224 225 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
225 226 d = self._call("between", pairs=n)
226 227 try:
227 228 r.extend(l and decodelist(l) or [] for l in d.splitlines())
228 229 except ValueError:
229 230 self._abort(error.ResponseError(_("unexpected response:"), d))
230 231 return r
231 232
232 233 @batchable
233 234 def pushkey(self, namespace, key, old, new):
234 235 if not self.capable('pushkey'):
235 236 yield False, None
236 237 f = future()
237 238 yield todict(namespace=encoding.fromlocal(namespace),
238 239 key=encoding.fromlocal(key),
239 240 old=encoding.fromlocal(old),
240 241 new=encoding.fromlocal(new)), f
241 242 d = f.value
242 243 try:
243 244 d = bool(int(d))
244 245 except ValueError:
245 246 raise error.ResponseError(
246 247 _('push failed (unexpected response):'), d)
247 248 yield d
248 249
249 250 @batchable
250 251 def listkeys(self, namespace):
251 252 if not self.capable('pushkey'):
252 253 yield {}, None
253 254 f = future()
254 255 yield todict(namespace=encoding.fromlocal(namespace)), f
255 256 d = f.value
256 257 r = {}
257 258 for l in d.splitlines():
258 259 k, v = l.split('\t')
259 260 r[encoding.tolocal(k)] = encoding.tolocal(v)
260 261 yield r
261 262
262 263 def stream_out(self):
263 264 return self._callstream('stream_out')
264 265
265 266 def changegroup(self, nodes, kind):
266 267 n = encodelist(nodes)
267 268 f = self._callstream("changegroup", roots=n)
268 269 return changegroupmod.unbundle10(self._decompress(f), 'UN')
269 270
270 271 def changegroupsubset(self, bases, heads, kind):
271 272 self.requirecap('changegroupsubset', _('look up remote changes'))
272 273 bases = encodelist(bases)
273 274 heads = encodelist(heads)
274 275 f = self._callstream("changegroupsubset",
275 276 bases=bases, heads=heads)
276 277 return changegroupmod.unbundle10(self._decompress(f), 'UN')
277 278
278 279 def getbundle(self, source, heads=None, common=None):
279 280 self.requirecap('getbundle', _('look up remote changes'))
280 281 opts = {}
281 282 if heads is not None:
282 283 opts['heads'] = encodelist(heads)
283 284 if common is not None:
284 285 opts['common'] = encodelist(common)
285 286 f = self._callstream("getbundle", **opts)
286 287 return changegroupmod.unbundle10(self._decompress(f), 'UN')
287 288
288 289 def unbundle(self, cg, heads, source):
289 290 '''Send cg (a readable file-like object representing the
290 291 changegroup to push, typically a chunkbuffer object) to the
291 292 remote server as a bundle. Return an integer indicating the
292 293 result of the push (see localrepository.addchangegroup()).'''
293 294
294 295 if heads != ['force'] and self.capable('unbundlehash'):
295 296 heads = encodelist(['hashed',
296 297 util.sha1(''.join(sorted(heads))).digest()])
297 298 else:
298 299 heads = encodelist(heads)
299 300
300 301 ret, output = self._callpush("unbundle", cg, heads=heads)
301 302 if ret == "":
302 303 raise error.ResponseError(
303 304 _('push failed:'), output)
304 305 try:
305 306 ret = int(ret)
306 307 except ValueError:
307 308 raise error.ResponseError(
308 309 _('push failed (unexpected response):'), ret)
309 310
310 311 for l in output.splitlines(True):
311 312 self.ui.status(_('remote: '), l)
312 313 return ret
313 314
314 315 def debugwireargs(self, one, two, three=None, four=None, five=None):
315 316 # don't pass optional arguments left at their default value
316 317 opts = {}
317 318 if three is not None:
318 319 opts['three'] = three
319 320 if four is not None:
320 321 opts['four'] = four
321 322 return self._call('debugwireargs', one=one, two=two, **opts)
322 323
323 324 # server side
324 325
325 326 class streamres(object):
326 327 def __init__(self, gen):
327 328 self.gen = gen
328 329
329 330 class pushres(object):
330 331 def __init__(self, res):
331 332 self.res = res
332 333
333 334 class pusherr(object):
334 335 def __init__(self, res):
335 336 self.res = res
336 337
337 338 def dispatch(repo, proto, command):
338 339 func, spec = commands[command]
339 340 args = proto.getargs(spec)
340 341 return func(repo, proto, *args)
341 342
342 343 def options(cmd, keys, others):
343 344 opts = {}
344 345 for k in keys:
345 346 if k in others:
346 347 opts[k] = others[k]
347 348 del others[k]
348 349 if others:
349 350 sys.stderr.write("abort: %s got unexpected arguments %s\n"
350 351 % (cmd, ",".join(others)))
351 352 return opts
352 353
353 354 def batch(repo, proto, cmds, others):
354 355 res = []
355 356 for pair in cmds.split(';'):
356 357 op, args = pair.split(' ', 1)
357 358 vals = {}
358 359 for a in args.split(','):
359 360 if a:
360 361 n, v = a.split('=')
361 362 vals[n] = unescapearg(v)
362 363 func, spec = commands[op]
363 364 if spec:
364 365 keys = spec.split()
365 366 data = {}
366 367 for k in keys:
367 368 if k == '*':
368 369 star = {}
369 370 for key in vals.keys():
370 371 if key not in keys:
371 372 star[key] = vals[key]
372 373 data['*'] = star
373 374 else:
374 375 data[k] = vals[k]
375 376 result = func(repo, proto, *[data[k] for k in keys])
376 377 else:
377 378 result = func(repo, proto)
378 379 res.append(escapearg(result))
379 380 return ';'.join(res)
380 381
381 382 def between(repo, proto, pairs):
382 383 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
383 384 r = []
384 385 for b in repo.between(pairs):
385 386 r.append(encodelist(b) + "\n")
386 387 return "".join(r)
387 388
388 389 def branchmap(repo, proto):
389 390 branchmap = repo.branchmap()
390 391 heads = []
391 392 for branch, nodes in branchmap.iteritems():
392 393 branchname = urllib.quote(encoding.fromlocal(branch))
393 394 branchnodes = encodelist(nodes)
394 395 heads.append('%s %s' % (branchname, branchnodes))
395 396 return '\n'.join(heads)
396 397
397 398 def branches(repo, proto, nodes):
398 399 nodes = decodelist(nodes)
399 400 r = []
400 401 for b in repo.branches(nodes):
401 402 r.append(encodelist(b) + "\n")
402 403 return "".join(r)
403 404
404 405 def capabilities(repo, proto):
405 406 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
406 407 'unbundlehash batch').split()
407 408 if _allowstream(repo.ui):
408 409 requiredformats = repo.requirements & repo.supportedformats
409 410 # if our local revlogs are just revlogv1, add 'stream' cap
410 411 if not requiredformats - set(('revlogv1',)):
411 412 caps.append('stream')
412 413 # otherwise, add 'streamreqs' detailing our local revlog format
413 414 else:
414 415 caps.append('streamreqs=%s' % ','.join(requiredformats))
415 416 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
416 417 caps.append('httpheader=1024')
417 418 return ' '.join(caps)
418 419
419 420 def changegroup(repo, proto, roots):
420 421 nodes = decodelist(roots)
421 422 cg = repo.changegroup(nodes, 'serve')
422 423 return streamres(proto.groupchunks(cg))
423 424
424 425 def changegroupsubset(repo, proto, bases, heads):
425 426 bases = decodelist(bases)
426 427 heads = decodelist(heads)
427 428 cg = repo.changegroupsubset(bases, heads, 'serve')
428 429 return streamres(proto.groupchunks(cg))
429 430
430 431 def debugwireargs(repo, proto, one, two, others):
431 432 # only accept optional args from the known set
432 433 opts = options('debugwireargs', ['three', 'four'], others)
433 434 return repo.debugwireargs(one, two, **opts)
434 435
435 436 def getbundle(repo, proto, others):
436 437 opts = options('getbundle', ['heads', 'common'], others)
437 438 for k, v in opts.iteritems():
438 439 opts[k] = decodelist(v)
439 440 cg = repo.getbundle('serve', **opts)
440 441 return streamres(proto.groupchunks(cg))
441 442
442 443 def heads(repo, proto):
443 444 h = repo.heads()
444 445 return encodelist(h) + "\n"
445 446
446 447 def hello(repo, proto):
447 448 '''the hello command returns a set of lines describing various
448 449 interesting things about the server, in an RFC822-like format.
449 450 Currently the only one defined is "capabilities", which
450 451 consists of a line in the form:
451 452
452 453 capabilities: space separated list of tokens
453 454 '''
454 455 return "capabilities: %s\n" % (capabilities(repo, proto))
455 456
456 457 def listkeys(repo, proto, namespace):
457 458 d = pushkeymod.list(repo, encoding.tolocal(namespace)).items()
458 459 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
459 460 for k, v in d])
460 461 return t
461 462
462 463 def lookup(repo, proto, key):
463 464 try:
464 465 r = hex(repo.lookup(encoding.tolocal(key)))
465 466 success = 1
466 467 except Exception, inst:
467 468 r = str(inst)
468 469 success = 0
469 470 return "%s %s\n" % (success, r)
470 471
471 472 def known(repo, proto, nodes, others):
472 473 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
473 474
474 475 def pushkey(repo, proto, namespace, key, old, new):
475 476 # compatibility with pre-1.8 clients which were accidentally
476 477 # sending raw binary nodes rather than utf-8-encoded hex
477 478 if len(new) == 20 and new.encode('string-escape') != new:
478 479 # looks like it could be a binary node
479 480 try:
480 481 new.decode('utf-8')
481 482 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
482 483 except UnicodeDecodeError:
483 484 pass # binary, leave unmodified
484 485 else:
485 486 new = encoding.tolocal(new) # normal path
486 487
487 488 r = pushkeymod.push(repo,
488 489 encoding.tolocal(namespace), encoding.tolocal(key),
489 490 encoding.tolocal(old), new)
490 491 return '%s\n' % int(r)
491 492
492 493 def _allowstream(ui):
493 494 return ui.configbool('server', 'uncompressed', True, untrusted=True)
494 495
495 496 def stream(repo, proto):
496 497 '''If the server supports streaming clone, it advertises the "stream"
497 498 capability with a value representing the version and flags of the repo
498 499 it is serving. Client checks to see if it understands the format.
499 500
500 501 The format is simple: the server writes out a line with the amount
501 502 of files, then the total amount of bytes to be transfered (separated
502 503 by a space). Then, for each file, the server first writes the filename
503 504 and filesize (separated by the null character), then the file contents.
504 505 '''
505 506
506 507 if not _allowstream(repo.ui):
507 508 return '1\n'
508 509
509 510 entries = []
510 511 total_bytes = 0
511 512 try:
512 513 # get consistent snapshot of repo, lock during scan
513 514 lock = repo.lock()
514 515 try:
515 516 repo.ui.debug('scanning\n')
516 517 for name, ename, size in repo.store.walk():
517 518 entries.append((name, size))
518 519 total_bytes += size
519 520 finally:
520 521 lock.release()
521 522 except error.LockError:
522 523 return '2\n' # error: 2
523 524
524 525 def streamer(repo, entries, total):
525 526 '''stream out all metadata files in repository.'''
526 527 yield '0\n' # success
527 528 repo.ui.debug('%d files, %d bytes to transfer\n' %
528 529 (len(entries), total_bytes))
529 530 yield '%d %d\n' % (len(entries), total_bytes)
530 531 for name, size in entries:
531 532 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
532 533 # partially encode name over the wire for backwards compat
533 534 yield '%s\0%d\n' % (store.encodedir(name), size)
534 535 for chunk in util.filechunkiter(repo.sopener(name), limit=size):
535 536 yield chunk
536 537
537 538 return streamres(streamer(repo, entries, total_bytes))
538 539
539 540 def unbundle(repo, proto, heads):
540 541 their_heads = decodelist(heads)
541 542
542 543 def check_heads():
543 544 heads = repo.heads()
544 545 heads_hash = util.sha1(''.join(sorted(heads))).digest()
545 546 return (their_heads == ['force'] or their_heads == heads or
546 547 their_heads == ['hashed', heads_hash])
547 548
548 549 proto.redirect()
549 550
550 551 # fail early if possible
551 552 if not check_heads():
552 553 return pusherr('unsynced changes')
553 554
554 555 # write bundle data to temporary file because it can be big
555 556 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
556 557 fp = os.fdopen(fd, 'wb+')
557 558 r = 0
558 559 try:
559 560 proto.getfile(fp)
560 561 lock = repo.lock()
561 562 try:
562 563 if not check_heads():
563 564 # someone else committed/pushed/unbundled while we
564 565 # were transferring data
565 566 return pusherr('unsynced changes')
566 567
567 568 # push can proceed
568 569 fp.seek(0)
569 570 gen = changegroupmod.readbundle(fp, None)
570 571
571 572 try:
572 573 r = repo.addchangegroup(gen, 'serve', proto._client(),
573 574 lock=lock)
574 575 except util.Abort, inst:
575 576 sys.stderr.write("abort: %s\n" % inst)
576 577 finally:
577 578 lock.release()
578 579 return pushres(r)
579 580
580 581 finally:
581 582 fp.close()
582 583 os.unlink(tempname)
583 584
584 585 commands = {
585 586 'batch': (batch, 'cmds *'),
586 587 'between': (between, 'pairs'),
587 588 'branchmap': (branchmap, ''),
588 589 'branches': (branches, 'nodes'),
589 590 'capabilities': (capabilities, ''),
590 591 'changegroup': (changegroup, 'roots'),
591 592 'changegroupsubset': (changegroupsubset, 'bases heads'),
592 593 'debugwireargs': (debugwireargs, 'one two *'),
593 594 'getbundle': (getbundle, '*'),
594 595 'heads': (heads, ''),
595 596 'hello': (hello, ''),
596 597 'known': (known, 'nodes *'),
597 598 'listkeys': (listkeys, 'namespace'),
598 599 'lookup': (lookup, 'key'),
599 600 'pushkey': (pushkey, 'namespace key old new'),
600 601 'stream_out': (stream, ''),
601 602 'unbundle': (unbundle, 'heads'),
602 603 }
General Comments 0
You need to be logged in to leave comments. Login now