##// END OF EJS Templates
wireproto: add a ``boolean`` type for getbundle parameters...
Pierre-Yves David -
r21988:12cd3827 default
parent child Browse files
Show More
@@ -1,863 +1,867
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 12 import peer, error, encoding, util, store, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # mapping of options accepted by getbundle and their types
194 194 #
195 195 # Meant to be extended by extensions. It is extensions responsibility to ensure
196 196 # such options are properly processed in exchange.getbundle.
197 197 #
198 198 # supported types are:
199 199 #
200 200 # :nodes: list of binary nodes
201 201 # :csv: list of comma-separated values
202 202 # :plain: string with no transformation needed.
203 203 gboptsmap = {'heads': 'nodes',
204 204 'common': 'nodes',
205 205 'bundlecaps': 'csv',
206 206 'listkeys': 'csv'}
207 207
208 208 # client side
209 209
210 210 class wirepeer(peer.peerrepository):
211 211
212 212 def batch(self):
213 213 return remotebatch(self)
214 214 def _submitbatch(self, req):
215 215 cmds = []
216 216 for op, argsdict in req:
217 217 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
218 218 cmds.append('%s %s' % (op, args))
219 219 rsp = self._call("batch", cmds=';'.join(cmds))
220 220 return rsp.split(';')
221 221 def _submitone(self, op, args):
222 222 return self._call(op, **args)
223 223
224 224 @batchable
225 225 def lookup(self, key):
226 226 self.requirecap('lookup', _('look up remote revision'))
227 227 f = future()
228 228 yield {'key': encoding.fromlocal(key)}, f
229 229 d = f.value
230 230 success, data = d[:-1].split(" ", 1)
231 231 if int(success):
232 232 yield bin(data)
233 233 self._abort(error.RepoError(data))
234 234
235 235 @batchable
236 236 def heads(self):
237 237 f = future()
238 238 yield {}, f
239 239 d = f.value
240 240 try:
241 241 yield decodelist(d[:-1])
242 242 except ValueError:
243 243 self._abort(error.ResponseError(_("unexpected response:"), d))
244 244
245 245 @batchable
246 246 def known(self, nodes):
247 247 f = future()
248 248 yield {'nodes': encodelist(nodes)}, f
249 249 d = f.value
250 250 try:
251 251 yield [bool(int(f)) for f in d]
252 252 except ValueError:
253 253 self._abort(error.ResponseError(_("unexpected response:"), d))
254 254
255 255 @batchable
256 256 def branchmap(self):
257 257 f = future()
258 258 yield {}, f
259 259 d = f.value
260 260 try:
261 261 branchmap = {}
262 262 for branchpart in d.splitlines():
263 263 branchname, branchheads = branchpart.split(' ', 1)
264 264 branchname = encoding.tolocal(urllib.unquote(branchname))
265 265 branchheads = decodelist(branchheads)
266 266 branchmap[branchname] = branchheads
267 267 yield branchmap
268 268 except TypeError:
269 269 self._abort(error.ResponseError(_("unexpected response:"), d))
270 270
271 271 def branches(self, nodes):
272 272 n = encodelist(nodes)
273 273 d = self._call("branches", nodes=n)
274 274 try:
275 275 br = [tuple(decodelist(b)) for b in d.splitlines()]
276 276 return br
277 277 except ValueError:
278 278 self._abort(error.ResponseError(_("unexpected response:"), d))
279 279
280 280 def between(self, pairs):
281 281 batch = 8 # avoid giant requests
282 282 r = []
283 283 for i in xrange(0, len(pairs), batch):
284 284 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
285 285 d = self._call("between", pairs=n)
286 286 try:
287 287 r.extend(l and decodelist(l) or [] for l in d.splitlines())
288 288 except ValueError:
289 289 self._abort(error.ResponseError(_("unexpected response:"), d))
290 290 return r
291 291
292 292 @batchable
293 293 def pushkey(self, namespace, key, old, new):
294 294 if not self.capable('pushkey'):
295 295 yield False, None
296 296 f = future()
297 297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 298 yield {'namespace': encoding.fromlocal(namespace),
299 299 'key': encoding.fromlocal(key),
300 300 'old': encoding.fromlocal(old),
301 301 'new': encoding.fromlocal(new)}, f
302 302 d = f.value
303 303 d, output = d.split('\n', 1)
304 304 try:
305 305 d = bool(int(d))
306 306 except ValueError:
307 307 raise error.ResponseError(
308 308 _('push failed (unexpected response):'), d)
309 309 for l in output.splitlines(True):
310 310 self.ui.status(_('remote: '), l)
311 311 yield d
312 312
313 313 @batchable
314 314 def listkeys(self, namespace):
315 315 if not self.capable('pushkey'):
316 316 yield {}, None
317 317 f = future()
318 318 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
319 319 yield {'namespace': encoding.fromlocal(namespace)}, f
320 320 d = f.value
321 321 yield pushkeymod.decodekeys(d)
322 322
323 323 def stream_out(self):
324 324 return self._callstream('stream_out')
325 325
326 326 def changegroup(self, nodes, kind):
327 327 n = encodelist(nodes)
328 328 f = self._callcompressable("changegroup", roots=n)
329 329 return changegroupmod.unbundle10(f, 'UN')
330 330
331 331 def changegroupsubset(self, bases, heads, kind):
332 332 self.requirecap('changegroupsubset', _('look up remote changes'))
333 333 bases = encodelist(bases)
334 334 heads = encodelist(heads)
335 335 f = self._callcompressable("changegroupsubset",
336 336 bases=bases, heads=heads)
337 337 return changegroupmod.unbundle10(f, 'UN')
338 338
339 339 def getbundle(self, source, **kwargs):
340 340 self.requirecap('getbundle', _('look up remote changes'))
341 341 opts = {}
342 342 for key, value in kwargs.iteritems():
343 343 if value is None:
344 344 continue
345 345 keytype = gboptsmap.get(key)
346 346 if keytype is None:
347 347 assert False, 'unexpected'
348 348 elif keytype == 'nodes':
349 349 value = encodelist(value)
350 350 elif keytype == 'csv':
351 351 value = ','.join(value)
352 elif keytype == 'boolean':
353 value = bool(value)
352 354 elif keytype != 'plain':
353 355 raise KeyError('unknown getbundle option type %s'
354 356 % keytype)
355 357 opts[key] = value
356 358 f = self._callcompressable("getbundle", **opts)
357 359 bundlecaps = kwargs.get('bundlecaps')
358 360 if bundlecaps is not None and 'HG2X' in bundlecaps:
359 361 return bundle2.unbundle20(self.ui, f)
360 362 else:
361 363 return changegroupmod.unbundle10(f, 'UN')
362 364
363 365 def unbundle(self, cg, heads, source):
364 366 '''Send cg (a readable file-like object representing the
365 367 changegroup to push, typically a chunkbuffer object) to the
366 368 remote server as a bundle.
367 369
368 370 When pushing a bundle10 stream, return an integer indicating the
369 371 result of the push (see localrepository.addchangegroup()).
370 372
371 373 When pushing a bundle20 stream, return a bundle20 stream.'''
372 374
373 375 if heads != ['force'] and self.capable('unbundlehash'):
374 376 heads = encodelist(['hashed',
375 377 util.sha1(''.join(sorted(heads))).digest()])
376 378 else:
377 379 heads = encodelist(heads)
378 380
379 381 if util.safehasattr(cg, 'deltaheader'):
380 382 # this a bundle10, do the old style call sequence
381 383 ret, output = self._callpush("unbundle", cg, heads=heads)
382 384 if ret == "":
383 385 raise error.ResponseError(
384 386 _('push failed:'), output)
385 387 try:
386 388 ret = int(ret)
387 389 except ValueError:
388 390 raise error.ResponseError(
389 391 _('push failed (unexpected response):'), ret)
390 392
391 393 for l in output.splitlines(True):
392 394 self.ui.status(_('remote: '), l)
393 395 else:
394 396 # bundle2 push. Send a stream, fetch a stream.
395 397 stream = self._calltwowaystream('unbundle', cg, heads=heads)
396 398 ret = bundle2.unbundle20(self.ui, stream)
397 399 return ret
398 400
399 401 def debugwireargs(self, one, two, three=None, four=None, five=None):
400 402 # don't pass optional arguments left at their default value
401 403 opts = {}
402 404 if three is not None:
403 405 opts['three'] = three
404 406 if four is not None:
405 407 opts['four'] = four
406 408 return self._call('debugwireargs', one=one, two=two, **opts)
407 409
408 410 def _call(self, cmd, **args):
409 411 """execute <cmd> on the server
410 412
411 413 The command is expected to return a simple string.
412 414
413 415 returns the server reply as a string."""
414 416 raise NotImplementedError()
415 417
416 418 def _callstream(self, cmd, **args):
417 419 """execute <cmd> on the server
418 420
419 421 The command is expected to return a stream.
420 422
421 423 returns the server reply as a file like object."""
422 424 raise NotImplementedError()
423 425
424 426 def _callcompressable(self, cmd, **args):
425 427 """execute <cmd> on the server
426 428
427 429 The command is expected to return a stream.
428 430
429 431 The stream may have been compressed in some implementations. This
430 432 function takes care of the decompression. This is the only difference
431 433 with _callstream.
432 434
433 435 returns the server reply as a file like object.
434 436 """
435 437 raise NotImplementedError()
436 438
437 439 def _callpush(self, cmd, fp, **args):
438 440 """execute a <cmd> on server
439 441
440 442 The command is expected to be related to a push. Push has a special
441 443 return method.
442 444
443 445 returns the server reply as a (ret, output) tuple. ret is either
444 446 empty (error) or a stringified int.
445 447 """
446 448 raise NotImplementedError()
447 449
448 450 def _calltwowaystream(self, cmd, fp, **args):
449 451 """execute <cmd> on server
450 452
451 453 The command will send a stream to the server and get a stream in reply.
452 454 """
453 455 raise NotImplementedError()
454 456
455 457 def _abort(self, exception):
456 458 """clearly abort the wire protocol connection and raise the exception
457 459 """
458 460 raise NotImplementedError()
459 461
460 462 # server side
461 463
462 464 # wire protocol command can either return a string or one of these classes.
463 465 class streamres(object):
464 466 """wireproto reply: binary stream
465 467
466 468 The call was successful and the result is a stream.
467 469 Iterate on the `self.gen` attribute to retrieve chunks.
468 470 """
469 471 def __init__(self, gen):
470 472 self.gen = gen
471 473
472 474 class pushres(object):
473 475 """wireproto reply: success with simple integer return
474 476
475 477 The call was successful and returned an integer contained in `self.res`.
476 478 """
477 479 def __init__(self, res):
478 480 self.res = res
479 481
480 482 class pusherr(object):
481 483 """wireproto reply: failure
482 484
483 485 The call failed. The `self.res` attribute contains the error message.
484 486 """
485 487 def __init__(self, res):
486 488 self.res = res
487 489
488 490 class ooberror(object):
489 491 """wireproto reply: failure of a batch of operation
490 492
491 493 Something failed during a batch call. The error message is stored in
492 494 `self.message`.
493 495 """
494 496 def __init__(self, message):
495 497 self.message = message
496 498
497 499 def dispatch(repo, proto, command):
498 500 repo = repo.filtered("served")
499 501 func, spec = commands[command]
500 502 args = proto.getargs(spec)
501 503 return func(repo, proto, *args)
502 504
503 505 def options(cmd, keys, others):
504 506 opts = {}
505 507 for k in keys:
506 508 if k in others:
507 509 opts[k] = others[k]
508 510 del others[k]
509 511 if others:
510 512 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
511 513 % (cmd, ",".join(others)))
512 514 return opts
513 515
514 516 # list of commands
515 517 commands = {}
516 518
517 519 def wireprotocommand(name, args=''):
518 520 """decorator for wire protocol command"""
519 521 def register(func):
520 522 commands[name] = (func, args)
521 523 return func
522 524 return register
523 525
524 526 @wireprotocommand('batch', 'cmds *')
525 527 def batch(repo, proto, cmds, others):
526 528 repo = repo.filtered("served")
527 529 res = []
528 530 for pair in cmds.split(';'):
529 531 op, args = pair.split(' ', 1)
530 532 vals = {}
531 533 for a in args.split(','):
532 534 if a:
533 535 n, v = a.split('=')
534 536 vals[n] = unescapearg(v)
535 537 func, spec = commands[op]
536 538 if spec:
537 539 keys = spec.split()
538 540 data = {}
539 541 for k in keys:
540 542 if k == '*':
541 543 star = {}
542 544 for key in vals.keys():
543 545 if key not in keys:
544 546 star[key] = vals[key]
545 547 data['*'] = star
546 548 else:
547 549 data[k] = vals[k]
548 550 result = func(repo, proto, *[data[k] for k in keys])
549 551 else:
550 552 result = func(repo, proto)
551 553 if isinstance(result, ooberror):
552 554 return result
553 555 res.append(escapearg(result))
554 556 return ';'.join(res)
555 557
556 558 @wireprotocommand('between', 'pairs')
557 559 def between(repo, proto, pairs):
558 560 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
559 561 r = []
560 562 for b in repo.between(pairs):
561 563 r.append(encodelist(b) + "\n")
562 564 return "".join(r)
563 565
564 566 @wireprotocommand('branchmap')
565 567 def branchmap(repo, proto):
566 568 branchmap = repo.branchmap()
567 569 heads = []
568 570 for branch, nodes in branchmap.iteritems():
569 571 branchname = urllib.quote(encoding.fromlocal(branch))
570 572 branchnodes = encodelist(nodes)
571 573 heads.append('%s %s' % (branchname, branchnodes))
572 574 return '\n'.join(heads)
573 575
574 576 @wireprotocommand('branches', 'nodes')
575 577 def branches(repo, proto, nodes):
576 578 nodes = decodelist(nodes)
577 579 r = []
578 580 for b in repo.branches(nodes):
579 581 r.append(encodelist(b) + "\n")
580 582 return "".join(r)
581 583
582 584
583 585 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
584 586 'known', 'getbundle', 'unbundlehash', 'batch']
585 587
586 588 def _capabilities(repo, proto):
587 589 """return a list of capabilities for a repo
588 590
589 591 This function exists to allow extensions to easily wrap capabilities
590 592 computation
591 593
592 594 - returns a lists: easy to alter
593 595 - change done here will be propagated to both `capabilities` and `hello`
594 596 command without any other action needed.
595 597 """
596 598 # copy to prevent modification of the global list
597 599 caps = list(wireprotocaps)
598 600 if _allowstream(repo.ui):
599 601 if repo.ui.configbool('server', 'preferuncompressed', False):
600 602 caps.append('stream-preferred')
601 603 requiredformats = repo.requirements & repo.supportedformats
602 604 # if our local revlogs are just revlogv1, add 'stream' cap
603 605 if not requiredformats - set(('revlogv1',)):
604 606 caps.append('stream')
605 607 # otherwise, add 'streamreqs' detailing our local revlog format
606 608 else:
607 609 caps.append('streamreqs=%s' % ','.join(requiredformats))
608 610 if repo.ui.configbool('experimental', 'bundle2-exp', False):
609 611 capsblob = bundle2.encodecaps(repo.bundle2caps)
610 612 caps.append('bundle2-exp=' + urllib.quote(capsblob))
611 613 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
612 614 caps.append('httpheader=1024')
613 615 return caps
614 616
615 617 # If you are writing an extension and consider wrapping this function. Wrap
616 618 # `_capabilities` instead.
617 619 @wireprotocommand('capabilities')
618 620 def capabilities(repo, proto):
619 621 return ' '.join(_capabilities(repo, proto))
620 622
621 623 @wireprotocommand('changegroup', 'roots')
622 624 def changegroup(repo, proto, roots):
623 625 nodes = decodelist(roots)
624 626 cg = changegroupmod.changegroup(repo, nodes, 'serve')
625 627 return streamres(proto.groupchunks(cg))
626 628
627 629 @wireprotocommand('changegroupsubset', 'bases heads')
628 630 def changegroupsubset(repo, proto, bases, heads):
629 631 bases = decodelist(bases)
630 632 heads = decodelist(heads)
631 633 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
632 634 return streamres(proto.groupchunks(cg))
633 635
634 636 @wireprotocommand('debugwireargs', 'one two *')
635 637 def debugwireargs(repo, proto, one, two, others):
636 638 # only accept optional args from the known set
637 639 opts = options('debugwireargs', ['three', 'four'], others)
638 640 return repo.debugwireargs(one, two, **opts)
639 641
640 642 # List of options accepted by getbundle.
641 643 #
642 644 # Meant to be extended by extensions. It is the extension's responsibility to
643 645 # ensure such options are properly processed in exchange.getbundle.
644 646 gboptslist = ['heads', 'common', 'bundlecaps']
645 647
646 648 @wireprotocommand('getbundle', '*')
647 649 def getbundle(repo, proto, others):
648 650 opts = options('getbundle', gboptsmap.keys(), others)
649 651 for k, v in opts.iteritems():
650 652 keytype = gboptsmap[k]
651 653 if keytype == 'nodes':
652 654 opts[k] = decodelist(v)
653 655 elif keytype == 'csv':
654 656 opts[k] = set(v.split(','))
657 elif keytype == 'boolean':
658 opts[k] = '%i' % bool(v)
655 659 elif keytype != 'plain':
656 660 raise KeyError('unknown getbundle option type %s'
657 661 % keytype)
658 662 cg = exchange.getbundle(repo, 'serve', **opts)
659 663 return streamres(proto.groupchunks(cg))
660 664
661 665 @wireprotocommand('heads')
662 666 def heads(repo, proto):
663 667 h = repo.heads()
664 668 return encodelist(h) + "\n"
665 669
666 670 @wireprotocommand('hello')
667 671 def hello(repo, proto):
668 672 '''the hello command returns a set of lines describing various
669 673 interesting things about the server, in an RFC822-like format.
670 674 Currently the only one defined is "capabilities", which
671 675 consists of a line in the form:
672 676
673 677 capabilities: space separated list of tokens
674 678 '''
675 679 return "capabilities: %s\n" % (capabilities(repo, proto))
676 680
677 681 @wireprotocommand('listkeys', 'namespace')
678 682 def listkeys(repo, proto, namespace):
679 683 d = repo.listkeys(encoding.tolocal(namespace)).items()
680 684 return pushkeymod.encodekeys(d)
681 685
682 686 @wireprotocommand('lookup', 'key')
683 687 def lookup(repo, proto, key):
684 688 try:
685 689 k = encoding.tolocal(key)
686 690 c = repo[k]
687 691 r = c.hex()
688 692 success = 1
689 693 except Exception, inst:
690 694 r = str(inst)
691 695 success = 0
692 696 return "%s %s\n" % (success, r)
693 697
694 698 @wireprotocommand('known', 'nodes *')
695 699 def known(repo, proto, nodes, others):
696 700 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
697 701
698 702 @wireprotocommand('pushkey', 'namespace key old new')
699 703 def pushkey(repo, proto, namespace, key, old, new):
700 704 # compatibility with pre-1.8 clients which were accidentally
701 705 # sending raw binary nodes rather than utf-8-encoded hex
702 706 if len(new) == 20 and new.encode('string-escape') != new:
703 707 # looks like it could be a binary node
704 708 try:
705 709 new.decode('utf-8')
706 710 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
707 711 except UnicodeDecodeError:
708 712 pass # binary, leave unmodified
709 713 else:
710 714 new = encoding.tolocal(new) # normal path
711 715
712 716 if util.safehasattr(proto, 'restore'):
713 717
714 718 proto.redirect()
715 719
716 720 try:
717 721 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
718 722 encoding.tolocal(old), new) or False
719 723 except util.Abort:
720 724 r = False
721 725
722 726 output = proto.restore()
723 727
724 728 return '%s\n%s' % (int(r), output)
725 729
726 730 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
727 731 encoding.tolocal(old), new)
728 732 return '%s\n' % int(r)
729 733
730 734 def _allowstream(ui):
731 735 return ui.configbool('server', 'uncompressed', True, untrusted=True)
732 736
733 737 def _walkstreamfiles(repo):
734 738 # this is it's own function so extensions can override it
735 739 return repo.store.walk()
736 740
737 741 @wireprotocommand('stream_out')
738 742 def stream(repo, proto):
739 743 '''If the server supports streaming clone, it advertises the "stream"
740 744 capability with a value representing the version and flags of the repo
741 745 it is serving. Client checks to see if it understands the format.
742 746
743 747 The format is simple: the server writes out a line with the amount
744 748 of files, then the total amount of bytes to be transferred (separated
745 749 by a space). Then, for each file, the server first writes the filename
746 750 and file size (separated by the null character), then the file contents.
747 751 '''
748 752
749 753 if not _allowstream(repo.ui):
750 754 return '1\n'
751 755
752 756 entries = []
753 757 total_bytes = 0
754 758 try:
755 759 # get consistent snapshot of repo, lock during scan
756 760 lock = repo.lock()
757 761 try:
758 762 repo.ui.debug('scanning\n')
759 763 for name, ename, size in _walkstreamfiles(repo):
760 764 if size:
761 765 entries.append((name, size))
762 766 total_bytes += size
763 767 finally:
764 768 lock.release()
765 769 except error.LockError:
766 770 return '2\n' # error: 2
767 771
768 772 def streamer(repo, entries, total):
769 773 '''stream out all metadata files in repository.'''
770 774 yield '0\n' # success
771 775 repo.ui.debug('%d files, %d bytes to transfer\n' %
772 776 (len(entries), total_bytes))
773 777 yield '%d %d\n' % (len(entries), total_bytes)
774 778
775 779 sopener = repo.sopener
776 780 oldaudit = sopener.mustaudit
777 781 debugflag = repo.ui.debugflag
778 782 sopener.mustaudit = False
779 783
780 784 try:
781 785 for name, size in entries:
782 786 if debugflag:
783 787 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
784 788 # partially encode name over the wire for backwards compat
785 789 yield '%s\0%d\n' % (store.encodedir(name), size)
786 790 if size <= 65536:
787 791 fp = sopener(name)
788 792 try:
789 793 data = fp.read(size)
790 794 finally:
791 795 fp.close()
792 796 yield data
793 797 else:
794 798 for chunk in util.filechunkiter(sopener(name), limit=size):
795 799 yield chunk
796 800 # replace with "finally:" when support for python 2.4 has been dropped
797 801 except Exception:
798 802 sopener.mustaudit = oldaudit
799 803 raise
800 804 sopener.mustaudit = oldaudit
801 805
802 806 return streamres(streamer(repo, entries, total_bytes))
803 807
804 808 @wireprotocommand('unbundle', 'heads')
805 809 def unbundle(repo, proto, heads):
806 810 their_heads = decodelist(heads)
807 811
808 812 try:
809 813 proto.redirect()
810 814
811 815 exchange.check_heads(repo, their_heads, 'preparing changes')
812 816
813 817 # write bundle data to temporary file because it can be big
814 818 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
815 819 fp = os.fdopen(fd, 'wb+')
816 820 r = 0
817 821 try:
818 822 proto.getfile(fp)
819 823 fp.seek(0)
820 824 gen = exchange.readbundle(repo.ui, fp, None)
821 825 r = exchange.unbundle(repo, gen, their_heads, 'serve',
822 826 proto._client())
823 827 if util.safehasattr(r, 'addpart'):
824 828 # The return looks streameable, we are in the bundle2 case and
825 829 # should return a stream.
826 830 return streamres(r.getchunks())
827 831 return pushres(r)
828 832
829 833 finally:
830 834 fp.close()
831 835 os.unlink(tempname)
832 836 except error.BundleValueError, exc:
833 837 bundler = bundle2.bundle20(repo.ui)
834 838 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
835 839 if exc.parttype is not None:
836 840 errpart.addparam('parttype', exc.parttype)
837 841 if exc.params:
838 842 errpart.addparam('params', '\0'.join(exc.params))
839 843 return streamres(bundler.getchunks())
840 844 except util.Abort, inst:
841 845 # The old code we moved used sys.stderr directly.
842 846 # We did not change it to minimise code change.
843 847 # This need to be moved to something proper.
844 848 # Feel free to do it.
845 849 if getattr(inst, 'duringunbundle2', False):
846 850 bundler = bundle2.bundle20(repo.ui)
847 851 manargs = [('message', str(inst))]
848 852 advargs = []
849 853 if inst.hint is not None:
850 854 advargs.append(('hint', inst.hint))
851 855 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
852 856 manargs, advargs))
853 857 return streamres(bundler.getchunks())
854 858 else:
855 859 sys.stderr.write("abort: %s\n" % inst)
856 860 return pushres(0)
857 861 except error.PushRaced, exc:
858 862 if getattr(exc, 'duringunbundle2', False):
859 863 bundler = bundle2.bundle20(repo.ui)
860 864 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
861 865 return streamres(bundler.getchunks())
862 866 else:
863 867 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now