##// END OF EJS Templates
getbundle: sort bundlecaps before exchanging then over the wire...
Pierre-Yves David -
r25128:631766d1 default
parent child Browse files
Show More
@@ -1,880 +1,882
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 12 import peer, error, encoding, util, store, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 try:
176 176 return sep.join(map(hex, l))
177 177 except TypeError:
178 178 print l
179 179 raise
180 180
181 181 # batched call argument encoding
182 182
183 183 def escapearg(plain):
184 184 return (plain
185 185 .replace(':', '::')
186 186 .replace(',', ':,')
187 187 .replace(';', ':;')
188 188 .replace('=', ':='))
189 189
190 190 def unescapearg(escaped):
191 191 return (escaped
192 192 .replace(':=', '=')
193 193 .replace(':;', ';')
194 194 .replace(':,', ',')
195 195 .replace('::', ':'))
196 196
197 197 # mapping of options accepted by getbundle and their types
198 198 #
199 199 # Meant to be extended by extensions. It is extensions responsibility to ensure
200 200 # such options are properly processed in exchange.getbundle.
201 201 #
202 202 # supported types are:
203 203 #
204 204 # :nodes: list of binary nodes
205 205 # :csv: list of comma-separated values
206 206 # :plain: string with no transformation needed.
207 207 gboptsmap = {'heads': 'nodes',
208 208 'common': 'nodes',
209 209 'obsmarkers': 'boolean',
210 210 'bundlecaps': 'csv',
211 211 'listkeys': 'csv',
212 212 'cg': 'boolean'}
213 213
214 214 # client side
215 215
216 216 class wirepeer(peer.peerrepository):
217 217
218 218 def batch(self):
219 219 return remotebatch(self)
220 220 def _submitbatch(self, req):
221 221 cmds = []
222 222 for op, argsdict in req:
223 223 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
224 224 cmds.append('%s %s' % (op, args))
225 225 rsp = self._call("batch", cmds=';'.join(cmds))
226 226 return rsp.split(';')
227 227 def _submitone(self, op, args):
228 228 return self._call(op, **args)
229 229
230 230 @batchable
231 231 def lookup(self, key):
232 232 self.requirecap('lookup', _('look up remote revision'))
233 233 f = future()
234 234 yield {'key': encoding.fromlocal(key)}, f
235 235 d = f.value
236 236 success, data = d[:-1].split(" ", 1)
237 237 if int(success):
238 238 yield bin(data)
239 239 self._abort(error.RepoError(data))
240 240
241 241 @batchable
242 242 def heads(self):
243 243 f = future()
244 244 yield {}, f
245 245 d = f.value
246 246 try:
247 247 yield decodelist(d[:-1])
248 248 except ValueError:
249 249 self._abort(error.ResponseError(_("unexpected response:"), d))
250 250
251 251 @batchable
252 252 def known(self, nodes):
253 253 f = future()
254 254 yield {'nodes': encodelist(nodes)}, f
255 255 d = f.value
256 256 try:
257 257 yield [bool(int(b)) for b in d]
258 258 except ValueError:
259 259 self._abort(error.ResponseError(_("unexpected response:"), d))
260 260
261 261 @batchable
262 262 def branchmap(self):
263 263 f = future()
264 264 yield {}, f
265 265 d = f.value
266 266 try:
267 267 branchmap = {}
268 268 for branchpart in d.splitlines():
269 269 branchname, branchheads = branchpart.split(' ', 1)
270 270 branchname = encoding.tolocal(urllib.unquote(branchname))
271 271 branchheads = decodelist(branchheads)
272 272 branchmap[branchname] = branchheads
273 273 yield branchmap
274 274 except TypeError:
275 275 self._abort(error.ResponseError(_("unexpected response:"), d))
276 276
277 277 def branches(self, nodes):
278 278 n = encodelist(nodes)
279 279 d = self._call("branches", nodes=n)
280 280 try:
281 281 br = [tuple(decodelist(b)) for b in d.splitlines()]
282 282 return br
283 283 except ValueError:
284 284 self._abort(error.ResponseError(_("unexpected response:"), d))
285 285
286 286 def between(self, pairs):
287 287 batch = 8 # avoid giant requests
288 288 r = []
289 289 for i in xrange(0, len(pairs), batch):
290 290 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
291 291 d = self._call("between", pairs=n)
292 292 try:
293 293 r.extend(l and decodelist(l) or [] for l in d.splitlines())
294 294 except ValueError:
295 295 self._abort(error.ResponseError(_("unexpected response:"), d))
296 296 return r
297 297
298 298 @batchable
299 299 def pushkey(self, namespace, key, old, new):
300 300 if not self.capable('pushkey'):
301 301 yield False, None
302 302 f = future()
303 303 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
304 304 yield {'namespace': encoding.fromlocal(namespace),
305 305 'key': encoding.fromlocal(key),
306 306 'old': encoding.fromlocal(old),
307 307 'new': encoding.fromlocal(new)}, f
308 308 d = f.value
309 309 d, output = d.split('\n', 1)
310 310 try:
311 311 d = bool(int(d))
312 312 except ValueError:
313 313 raise error.ResponseError(
314 314 _('push failed (unexpected response):'), d)
315 315 for l in output.splitlines(True):
316 316 self.ui.status(_('remote: '), l)
317 317 yield d
318 318
319 319 @batchable
320 320 def listkeys(self, namespace):
321 321 if not self.capable('pushkey'):
322 322 yield {}, None
323 323 f = future()
324 324 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
325 325 yield {'namespace': encoding.fromlocal(namespace)}, f
326 326 d = f.value
327 327 yield pushkeymod.decodekeys(d)
328 328
329 329 def stream_out(self):
330 330 return self._callstream('stream_out')
331 331
332 332 def changegroup(self, nodes, kind):
333 333 n = encodelist(nodes)
334 334 f = self._callcompressable("changegroup", roots=n)
335 335 return changegroupmod.cg1unpacker(f, 'UN')
336 336
337 337 def changegroupsubset(self, bases, heads, kind):
338 338 self.requirecap('changegroupsubset', _('look up remote changes'))
339 339 bases = encodelist(bases)
340 340 heads = encodelist(heads)
341 341 f = self._callcompressable("changegroupsubset",
342 342 bases=bases, heads=heads)
343 343 return changegroupmod.cg1unpacker(f, 'UN')
344 344
345 345 def getbundle(self, source, **kwargs):
346 346 self.requirecap('getbundle', _('look up remote changes'))
347 347 opts = {}
348 bundlecaps = kwargs.get('bundlecaps')
349 if bundlecaps is not None:
350 kwargs['bundlecaps'] = sorted(bundlecaps)
351 else:
352 bundlecaps = () # kwargs could have it to None
348 353 for key, value in kwargs.iteritems():
349 354 if value is None:
350 355 continue
351 356 keytype = gboptsmap.get(key)
352 357 if keytype is None:
353 358 assert False, 'unexpected'
354 359 elif keytype == 'nodes':
355 360 value = encodelist(value)
356 361 elif keytype == 'csv':
357 362 value = ','.join(value)
358 363 elif keytype == 'boolean':
359 364 value = '%i' % bool(value)
360 365 elif keytype != 'plain':
361 366 raise KeyError('unknown getbundle option type %s'
362 367 % keytype)
363 368 opts[key] = value
364 369 f = self._callcompressable("getbundle", **opts)
365 bundlecaps = kwargs.get('bundlecaps')
366 if bundlecaps is None:
367 bundlecaps = () # kwargs could have it to None
368 370 if util.any((cap.startswith('HG2') for cap in bundlecaps)):
369 371 return bundle2.getunbundler(self.ui, f)
370 372 else:
371 373 return changegroupmod.cg1unpacker(f, 'UN')
372 374
373 375 def unbundle(self, cg, heads, source):
374 376 '''Send cg (a readable file-like object representing the
375 377 changegroup to push, typically a chunkbuffer object) to the
376 378 remote server as a bundle.
377 379
378 380 When pushing a bundle10 stream, return an integer indicating the
379 381 result of the push (see localrepository.addchangegroup()).
380 382
381 383 When pushing a bundle20 stream, return a bundle20 stream.'''
382 384
383 385 if heads != ['force'] and self.capable('unbundlehash'):
384 386 heads = encodelist(['hashed',
385 387 util.sha1(''.join(sorted(heads))).digest()])
386 388 else:
387 389 heads = encodelist(heads)
388 390
389 391 if util.safehasattr(cg, 'deltaheader'):
390 392 # this a bundle10, do the old style call sequence
391 393 ret, output = self._callpush("unbundle", cg, heads=heads)
392 394 if ret == "":
393 395 raise error.ResponseError(
394 396 _('push failed:'), output)
395 397 try:
396 398 ret = int(ret)
397 399 except ValueError:
398 400 raise error.ResponseError(
399 401 _('push failed (unexpected response):'), ret)
400 402
401 403 for l in output.splitlines(True):
402 404 self.ui.status(_('remote: '), l)
403 405 else:
404 406 # bundle2 push. Send a stream, fetch a stream.
405 407 stream = self._calltwowaystream('unbundle', cg, heads=heads)
406 408 ret = bundle2.getunbundler(self.ui, stream)
407 409 return ret
408 410
409 411 def debugwireargs(self, one, two, three=None, four=None, five=None):
410 412 # don't pass optional arguments left at their default value
411 413 opts = {}
412 414 if three is not None:
413 415 opts['three'] = three
414 416 if four is not None:
415 417 opts['four'] = four
416 418 return self._call('debugwireargs', one=one, two=two, **opts)
417 419
418 420 def _call(self, cmd, **args):
419 421 """execute <cmd> on the server
420 422
421 423 The command is expected to return a simple string.
422 424
423 425 returns the server reply as a string."""
424 426 raise NotImplementedError()
425 427
426 428 def _callstream(self, cmd, **args):
427 429 """execute <cmd> on the server
428 430
429 431 The command is expected to return a stream.
430 432
431 433 returns the server reply as a file like object."""
432 434 raise NotImplementedError()
433 435
434 436 def _callcompressable(self, cmd, **args):
435 437 """execute <cmd> on the server
436 438
437 439 The command is expected to return a stream.
438 440
439 441 The stream may have been compressed in some implementations. This
440 442 function takes care of the decompression. This is the only difference
441 443 with _callstream.
442 444
443 445 returns the server reply as a file like object.
444 446 """
445 447 raise NotImplementedError()
446 448
447 449 def _callpush(self, cmd, fp, **args):
448 450 """execute a <cmd> on server
449 451
450 452 The command is expected to be related to a push. Push has a special
451 453 return method.
452 454
453 455 returns the server reply as a (ret, output) tuple. ret is either
454 456 empty (error) or a stringified int.
455 457 """
456 458 raise NotImplementedError()
457 459
458 460 def _calltwowaystream(self, cmd, fp, **args):
459 461 """execute <cmd> on server
460 462
461 463 The command will send a stream to the server and get a stream in reply.
462 464 """
463 465 raise NotImplementedError()
464 466
465 467 def _abort(self, exception):
466 468 """clearly abort the wire protocol connection and raise the exception
467 469 """
468 470 raise NotImplementedError()
469 471
470 472 # server side
471 473
472 474 # wire protocol command can either return a string or one of these classes.
473 475 class streamres(object):
474 476 """wireproto reply: binary stream
475 477
476 478 The call was successful and the result is a stream.
477 479 Iterate on the `self.gen` attribute to retrieve chunks.
478 480 """
479 481 def __init__(self, gen):
480 482 self.gen = gen
481 483
482 484 class pushres(object):
483 485 """wireproto reply: success with simple integer return
484 486
485 487 The call was successful and returned an integer contained in `self.res`.
486 488 """
487 489 def __init__(self, res):
488 490 self.res = res
489 491
490 492 class pusherr(object):
491 493 """wireproto reply: failure
492 494
493 495 The call failed. The `self.res` attribute contains the error message.
494 496 """
495 497 def __init__(self, res):
496 498 self.res = res
497 499
498 500 class ooberror(object):
499 501 """wireproto reply: failure of a batch of operation
500 502
501 503 Something failed during a batch call. The error message is stored in
502 504 `self.message`.
503 505 """
504 506 def __init__(self, message):
505 507 self.message = message
506 508
507 509 def dispatch(repo, proto, command):
508 510 repo = repo.filtered("served")
509 511 func, spec = commands[command]
510 512 args = proto.getargs(spec)
511 513 return func(repo, proto, *args)
512 514
513 515 def options(cmd, keys, others):
514 516 opts = {}
515 517 for k in keys:
516 518 if k in others:
517 519 opts[k] = others[k]
518 520 del others[k]
519 521 if others:
520 522 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
521 523 % (cmd, ",".join(others)))
522 524 return opts
523 525
524 526 # list of commands
525 527 commands = {}
526 528
527 529 def wireprotocommand(name, args=''):
528 530 """decorator for wire protocol command"""
529 531 def register(func):
530 532 commands[name] = (func, args)
531 533 return func
532 534 return register
533 535
534 536 @wireprotocommand('batch', 'cmds *')
535 537 def batch(repo, proto, cmds, others):
536 538 repo = repo.filtered("served")
537 539 res = []
538 540 for pair in cmds.split(';'):
539 541 op, args = pair.split(' ', 1)
540 542 vals = {}
541 543 for a in args.split(','):
542 544 if a:
543 545 n, v = a.split('=')
544 546 vals[n] = unescapearg(v)
545 547 func, spec = commands[op]
546 548 if spec:
547 549 keys = spec.split()
548 550 data = {}
549 551 for k in keys:
550 552 if k == '*':
551 553 star = {}
552 554 for key in vals.keys():
553 555 if key not in keys:
554 556 star[key] = vals[key]
555 557 data['*'] = star
556 558 else:
557 559 data[k] = vals[k]
558 560 result = func(repo, proto, *[data[k] for k in keys])
559 561 else:
560 562 result = func(repo, proto)
561 563 if isinstance(result, ooberror):
562 564 return result
563 565 res.append(escapearg(result))
564 566 return ';'.join(res)
565 567
566 568 @wireprotocommand('between', 'pairs')
567 569 def between(repo, proto, pairs):
568 570 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
569 571 r = []
570 572 for b in repo.between(pairs):
571 573 r.append(encodelist(b) + "\n")
572 574 return "".join(r)
573 575
574 576 @wireprotocommand('branchmap')
575 577 def branchmap(repo, proto):
576 578 branchmap = repo.branchmap()
577 579 heads = []
578 580 for branch, nodes in branchmap.iteritems():
579 581 branchname = urllib.quote(encoding.fromlocal(branch))
580 582 branchnodes = encodelist(nodes)
581 583 heads.append('%s %s' % (branchname, branchnodes))
582 584 return '\n'.join(heads)
583 585
584 586 @wireprotocommand('branches', 'nodes')
585 587 def branches(repo, proto, nodes):
586 588 nodes = decodelist(nodes)
587 589 r = []
588 590 for b in repo.branches(nodes):
589 591 r.append(encodelist(b) + "\n")
590 592 return "".join(r)
591 593
592 594
593 595 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
594 596 'known', 'getbundle', 'unbundlehash', 'batch']
595 597
596 598 def _capabilities(repo, proto):
597 599 """return a list of capabilities for a repo
598 600
599 601 This function exists to allow extensions to easily wrap capabilities
600 602 computation
601 603
602 604 - returns a lists: easy to alter
603 605 - change done here will be propagated to both `capabilities` and `hello`
604 606 command without any other action needed.
605 607 """
606 608 # copy to prevent modification of the global list
607 609 caps = list(wireprotocaps)
608 610 if _allowstream(repo.ui):
609 611 if repo.ui.configbool('server', 'preferuncompressed', False):
610 612 caps.append('stream-preferred')
611 613 requiredformats = repo.requirements & repo.supportedformats
612 614 # if our local revlogs are just revlogv1, add 'stream' cap
613 615 if not requiredformats - set(('revlogv1',)):
614 616 caps.append('stream')
615 617 # otherwise, add 'streamreqs' detailing our local revlog format
616 618 else:
617 619 caps.append('streamreqs=%s' % ','.join(requiredformats))
618 620 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
619 621 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
620 622 caps.append('bundle2=' + urllib.quote(capsblob))
621 623 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
622 624 caps.append('httpheader=1024')
623 625 return caps
624 626
625 627 # If you are writing an extension and consider wrapping this function. Wrap
626 628 # `_capabilities` instead.
627 629 @wireprotocommand('capabilities')
628 630 def capabilities(repo, proto):
629 631 return ' '.join(_capabilities(repo, proto))
630 632
631 633 @wireprotocommand('changegroup', 'roots')
632 634 def changegroup(repo, proto, roots):
633 635 nodes = decodelist(roots)
634 636 cg = changegroupmod.changegroup(repo, nodes, 'serve')
635 637 return streamres(proto.groupchunks(cg))
636 638
637 639 @wireprotocommand('changegroupsubset', 'bases heads')
638 640 def changegroupsubset(repo, proto, bases, heads):
639 641 bases = decodelist(bases)
640 642 heads = decodelist(heads)
641 643 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
642 644 return streamres(proto.groupchunks(cg))
643 645
644 646 @wireprotocommand('debugwireargs', 'one two *')
645 647 def debugwireargs(repo, proto, one, two, others):
646 648 # only accept optional args from the known set
647 649 opts = options('debugwireargs', ['three', 'four'], others)
648 650 return repo.debugwireargs(one, two, **opts)
649 651
650 652 # List of options accepted by getbundle.
651 653 #
652 654 # Meant to be extended by extensions. It is the extension's responsibility to
653 655 # ensure such options are properly processed in exchange.getbundle.
654 656 gboptslist = ['heads', 'common', 'bundlecaps']
655 657
656 658 @wireprotocommand('getbundle', '*')
657 659 def getbundle(repo, proto, others):
658 660 opts = options('getbundle', gboptsmap.keys(), others)
659 661 for k, v in opts.iteritems():
660 662 keytype = gboptsmap[k]
661 663 if keytype == 'nodes':
662 664 opts[k] = decodelist(v)
663 665 elif keytype == 'csv':
664 666 opts[k] = set(v.split(','))
665 667 elif keytype == 'boolean':
666 668 opts[k] = bool(v)
667 669 elif keytype != 'plain':
668 670 raise KeyError('unknown getbundle option type %s'
669 671 % keytype)
670 672 cg = exchange.getbundle(repo, 'serve', **opts)
671 673 return streamres(proto.groupchunks(cg))
672 674
673 675 @wireprotocommand('heads')
674 676 def heads(repo, proto):
675 677 h = repo.heads()
676 678 return encodelist(h) + "\n"
677 679
678 680 @wireprotocommand('hello')
679 681 def hello(repo, proto):
680 682 '''the hello command returns a set of lines describing various
681 683 interesting things about the server, in an RFC822-like format.
682 684 Currently the only one defined is "capabilities", which
683 685 consists of a line in the form:
684 686
685 687 capabilities: space separated list of tokens
686 688 '''
687 689 return "capabilities: %s\n" % (capabilities(repo, proto))
688 690
689 691 @wireprotocommand('listkeys', 'namespace')
690 692 def listkeys(repo, proto, namespace):
691 693 d = repo.listkeys(encoding.tolocal(namespace)).items()
692 694 return pushkeymod.encodekeys(d)
693 695
694 696 @wireprotocommand('lookup', 'key')
695 697 def lookup(repo, proto, key):
696 698 try:
697 699 k = encoding.tolocal(key)
698 700 c = repo[k]
699 701 r = c.hex()
700 702 success = 1
701 703 except Exception, inst:
702 704 r = str(inst)
703 705 success = 0
704 706 return "%s %s\n" % (success, r)
705 707
706 708 @wireprotocommand('known', 'nodes *')
707 709 def known(repo, proto, nodes, others):
708 710 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
709 711
710 712 @wireprotocommand('pushkey', 'namespace key old new')
711 713 def pushkey(repo, proto, namespace, key, old, new):
712 714 # compatibility with pre-1.8 clients which were accidentally
713 715 # sending raw binary nodes rather than utf-8-encoded hex
714 716 if len(new) == 20 and new.encode('string-escape') != new:
715 717 # looks like it could be a binary node
716 718 try:
717 719 new.decode('utf-8')
718 720 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
719 721 except UnicodeDecodeError:
720 722 pass # binary, leave unmodified
721 723 else:
722 724 new = encoding.tolocal(new) # normal path
723 725
724 726 if util.safehasattr(proto, 'restore'):
725 727
726 728 proto.redirect()
727 729
728 730 try:
729 731 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
730 732 encoding.tolocal(old), new) or False
731 733 except util.Abort:
732 734 r = False
733 735
734 736 output = proto.restore()
735 737
736 738 return '%s\n%s' % (int(r), output)
737 739
738 740 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
739 741 encoding.tolocal(old), new)
740 742 return '%s\n' % int(r)
741 743
742 744 def _allowstream(ui):
743 745 return ui.configbool('server', 'uncompressed', True, untrusted=True)
744 746
745 747 def _walkstreamfiles(repo):
746 748 # this is it's own function so extensions can override it
747 749 return repo.store.walk()
748 750
749 751 @wireprotocommand('stream_out')
750 752 def stream(repo, proto):
751 753 '''If the server supports streaming clone, it advertises the "stream"
752 754 capability with a value representing the version and flags of the repo
753 755 it is serving. Client checks to see if it understands the format.
754 756
755 757 The format is simple: the server writes out a line with the amount
756 758 of files, then the total amount of bytes to be transferred (separated
757 759 by a space). Then, for each file, the server first writes the filename
758 760 and file size (separated by the null character), then the file contents.
759 761 '''
760 762
761 763 if not _allowstream(repo.ui):
762 764 return '1\n'
763 765
764 766 entries = []
765 767 total_bytes = 0
766 768 try:
767 769 # get consistent snapshot of repo, lock during scan
768 770 lock = repo.lock()
769 771 try:
770 772 repo.ui.debug('scanning\n')
771 773 for name, ename, size in _walkstreamfiles(repo):
772 774 if size:
773 775 entries.append((name, size))
774 776 total_bytes += size
775 777 finally:
776 778 lock.release()
777 779 except error.LockError:
778 780 return '2\n' # error: 2
779 781
780 782 def streamer(repo, entries, total):
781 783 '''stream out all metadata files in repository.'''
782 784 yield '0\n' # success
783 785 repo.ui.debug('%d files, %d bytes to transfer\n' %
784 786 (len(entries), total_bytes))
785 787 yield '%d %d\n' % (len(entries), total_bytes)
786 788
787 789 sopener = repo.svfs
788 790 oldaudit = sopener.mustaudit
789 791 debugflag = repo.ui.debugflag
790 792 sopener.mustaudit = False
791 793
792 794 try:
793 795 for name, size in entries:
794 796 if debugflag:
795 797 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
796 798 # partially encode name over the wire for backwards compat
797 799 yield '%s\0%d\n' % (store.encodedir(name), size)
798 800 if size <= 65536:
799 801 fp = sopener(name)
800 802 try:
801 803 data = fp.read(size)
802 804 finally:
803 805 fp.close()
804 806 yield data
805 807 else:
806 808 for chunk in util.filechunkiter(sopener(name), limit=size):
807 809 yield chunk
808 810 # replace with "finally:" when support for python 2.4 has been dropped
809 811 except Exception:
810 812 sopener.mustaudit = oldaudit
811 813 raise
812 814 sopener.mustaudit = oldaudit
813 815
814 816 return streamres(streamer(repo, entries, total_bytes))
815 817
816 818 @wireprotocommand('unbundle', 'heads')
817 819 def unbundle(repo, proto, heads):
818 820 their_heads = decodelist(heads)
819 821
820 822 try:
821 823 proto.redirect()
822 824
823 825 exchange.check_heads(repo, their_heads, 'preparing changes')
824 826
825 827 # write bundle data to temporary file because it can be big
826 828 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
827 829 fp = os.fdopen(fd, 'wb+')
828 830 r = 0
829 831 try:
830 832 proto.getfile(fp)
831 833 fp.seek(0)
832 834 gen = exchange.readbundle(repo.ui, fp, None)
833 835 r = exchange.unbundle(repo, gen, their_heads, 'serve',
834 836 proto._client())
835 837 if util.safehasattr(r, 'addpart'):
836 838 # The return looks streamable, we are in the bundle2 case and
837 839 # should return a stream.
838 840 return streamres(r.getchunks())
839 841 return pushres(r)
840 842
841 843 finally:
842 844 fp.close()
843 845 os.unlink(tempname)
844 846
845 847 except (error.BundleValueError, util.Abort, error.PushRaced), exc:
846 848 # handle non-bundle2 case first
847 849 if not getattr(exc, 'duringunbundle2', False):
848 850 try:
849 851 raise
850 852 except util.Abort:
851 853 # The old code we moved used sys.stderr directly.
852 854 # We did not change it to minimise code change.
853 855 # This need to be moved to something proper.
854 856 # Feel free to do it.
855 857 sys.stderr.write("abort: %s\n" % exc)
856 858 return pushres(0)
857 859 except error.PushRaced:
858 860 return pusherr(str(exc))
859 861
860 862 bundler = bundle2.bundle20(repo.ui)
861 863 for out in getattr(exc, '_bundle2salvagedoutput', ()):
862 864 bundler.addpart(out)
863 865 try:
864 866 raise
865 867 except error.BundleValueError, exc:
866 868 errpart = bundler.newpart('error:unsupportedcontent')
867 869 if exc.parttype is not None:
868 870 errpart.addparam('parttype', exc.parttype)
869 871 if exc.params:
870 872 errpart.addparam('params', '\0'.join(exc.params))
871 873 except util.Abort, exc:
872 874 manargs = [('message', str(exc))]
873 875 advargs = []
874 876 if exc.hint is not None:
875 877 advargs.append(('hint', exc.hint))
876 878 bundler.addpart(bundle2.bundlepart('error:abort',
877 879 manargs, advargs))
878 880 except error.PushRaced, exc:
879 881 bundler.newpart('error:pushraced', [('message', str(exc))])
880 882 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now