##// END OF EJS Templates
wireproto: remove unused 'store' import
Martin von Zweigbergk -
r25240:a415e94f default
parent child Browse files
Show More
@@ -1,833 +1,833 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 import peer, error, encoding, util, store, exchange
12 import peer, error, encoding, util, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 try:
176 176 return sep.join(map(hex, l))
177 177 except TypeError:
178 178 print l
179 179 raise
180 180
181 181 # batched call argument encoding
182 182
183 183 def escapearg(plain):
184 184 return (plain
185 185 .replace(':', '::')
186 186 .replace(',', ':,')
187 187 .replace(';', ':;')
188 188 .replace('=', ':='))
189 189
190 190 def unescapearg(escaped):
191 191 return (escaped
192 192 .replace(':=', '=')
193 193 .replace(':;', ';')
194 194 .replace(':,', ',')
195 195 .replace('::', ':'))
196 196
197 197 # mapping of options accepted by getbundle and their types
198 198 #
199 199 # Meant to be extended by extensions. It is extensions responsibility to ensure
200 200 # such options are properly processed in exchange.getbundle.
201 201 #
202 202 # supported types are:
203 203 #
204 204 # :nodes: list of binary nodes
205 205 # :csv: list of comma-separated values
206 206 # :plain: string with no transformation needed.
207 207 gboptsmap = {'heads': 'nodes',
208 208 'common': 'nodes',
209 209 'obsmarkers': 'boolean',
210 210 'bundlecaps': 'csv',
211 211 'listkeys': 'csv',
212 212 'cg': 'boolean'}
213 213
214 214 # client side
215 215
216 216 class wirepeer(peer.peerrepository):
217 217
218 218 def batch(self):
219 219 return remotebatch(self)
220 220 def _submitbatch(self, req):
221 221 cmds = []
222 222 for op, argsdict in req:
223 223 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
224 224 cmds.append('%s %s' % (op, args))
225 225 rsp = self._call("batch", cmds=';'.join(cmds))
226 226 return rsp.split(';')
227 227 def _submitone(self, op, args):
228 228 return self._call(op, **args)
229 229
230 230 @batchable
231 231 def lookup(self, key):
232 232 self.requirecap('lookup', _('look up remote revision'))
233 233 f = future()
234 234 yield {'key': encoding.fromlocal(key)}, f
235 235 d = f.value
236 236 success, data = d[:-1].split(" ", 1)
237 237 if int(success):
238 238 yield bin(data)
239 239 self._abort(error.RepoError(data))
240 240
241 241 @batchable
242 242 def heads(self):
243 243 f = future()
244 244 yield {}, f
245 245 d = f.value
246 246 try:
247 247 yield decodelist(d[:-1])
248 248 except ValueError:
249 249 self._abort(error.ResponseError(_("unexpected response:"), d))
250 250
251 251 @batchable
252 252 def known(self, nodes):
253 253 f = future()
254 254 yield {'nodes': encodelist(nodes)}, f
255 255 d = f.value
256 256 try:
257 257 yield [bool(int(b)) for b in d]
258 258 except ValueError:
259 259 self._abort(error.ResponseError(_("unexpected response:"), d))
260 260
261 261 @batchable
262 262 def branchmap(self):
263 263 f = future()
264 264 yield {}, f
265 265 d = f.value
266 266 try:
267 267 branchmap = {}
268 268 for branchpart in d.splitlines():
269 269 branchname, branchheads = branchpart.split(' ', 1)
270 270 branchname = encoding.tolocal(urllib.unquote(branchname))
271 271 branchheads = decodelist(branchheads)
272 272 branchmap[branchname] = branchheads
273 273 yield branchmap
274 274 except TypeError:
275 275 self._abort(error.ResponseError(_("unexpected response:"), d))
276 276
277 277 def branches(self, nodes):
278 278 n = encodelist(nodes)
279 279 d = self._call("branches", nodes=n)
280 280 try:
281 281 br = [tuple(decodelist(b)) for b in d.splitlines()]
282 282 return br
283 283 except ValueError:
284 284 self._abort(error.ResponseError(_("unexpected response:"), d))
285 285
286 286 def between(self, pairs):
287 287 batch = 8 # avoid giant requests
288 288 r = []
289 289 for i in xrange(0, len(pairs), batch):
290 290 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
291 291 d = self._call("between", pairs=n)
292 292 try:
293 293 r.extend(l and decodelist(l) or [] for l in d.splitlines())
294 294 except ValueError:
295 295 self._abort(error.ResponseError(_("unexpected response:"), d))
296 296 return r
297 297
298 298 @batchable
299 299 def pushkey(self, namespace, key, old, new):
300 300 if not self.capable('pushkey'):
301 301 yield False, None
302 302 f = future()
303 303 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
304 304 yield {'namespace': encoding.fromlocal(namespace),
305 305 'key': encoding.fromlocal(key),
306 306 'old': encoding.fromlocal(old),
307 307 'new': encoding.fromlocal(new)}, f
308 308 d = f.value
309 309 d, output = d.split('\n', 1)
310 310 try:
311 311 d = bool(int(d))
312 312 except ValueError:
313 313 raise error.ResponseError(
314 314 _('push failed (unexpected response):'), d)
315 315 for l in output.splitlines(True):
316 316 self.ui.status(_('remote: '), l)
317 317 yield d
318 318
319 319 @batchable
320 320 def listkeys(self, namespace):
321 321 if not self.capable('pushkey'):
322 322 yield {}, None
323 323 f = future()
324 324 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
325 325 yield {'namespace': encoding.fromlocal(namespace)}, f
326 326 d = f.value
327 327 yield pushkeymod.decodekeys(d)
328 328
329 329 def stream_out(self):
330 330 return self._callstream('stream_out')
331 331
332 332 def changegroup(self, nodes, kind):
333 333 n = encodelist(nodes)
334 334 f = self._callcompressable("changegroup", roots=n)
335 335 return changegroupmod.cg1unpacker(f, 'UN')
336 336
337 337 def changegroupsubset(self, bases, heads, kind):
338 338 self.requirecap('changegroupsubset', _('look up remote changes'))
339 339 bases = encodelist(bases)
340 340 heads = encodelist(heads)
341 341 f = self._callcompressable("changegroupsubset",
342 342 bases=bases, heads=heads)
343 343 return changegroupmod.cg1unpacker(f, 'UN')
344 344
345 345 def getbundle(self, source, **kwargs):
346 346 self.requirecap('getbundle', _('look up remote changes'))
347 347 opts = {}
348 348 bundlecaps = kwargs.get('bundlecaps')
349 349 if bundlecaps is not None:
350 350 kwargs['bundlecaps'] = sorted(bundlecaps)
351 351 else:
352 352 bundlecaps = () # kwargs could have it to None
353 353 for key, value in kwargs.iteritems():
354 354 if value is None:
355 355 continue
356 356 keytype = gboptsmap.get(key)
357 357 if keytype is None:
358 358 assert False, 'unexpected'
359 359 elif keytype == 'nodes':
360 360 value = encodelist(value)
361 361 elif keytype == 'csv':
362 362 value = ','.join(value)
363 363 elif keytype == 'boolean':
364 364 value = '%i' % bool(value)
365 365 elif keytype != 'plain':
366 366 raise KeyError('unknown getbundle option type %s'
367 367 % keytype)
368 368 opts[key] = value
369 369 f = self._callcompressable("getbundle", **opts)
370 370 if any((cap.startswith('HG2') for cap in bundlecaps)):
371 371 return bundle2.getunbundler(self.ui, f)
372 372 else:
373 373 return changegroupmod.cg1unpacker(f, 'UN')
374 374
375 375 def unbundle(self, cg, heads, source):
376 376 '''Send cg (a readable file-like object representing the
377 377 changegroup to push, typically a chunkbuffer object) to the
378 378 remote server as a bundle.
379 379
380 380 When pushing a bundle10 stream, return an integer indicating the
381 381 result of the push (see localrepository.addchangegroup()).
382 382
383 383 When pushing a bundle20 stream, return a bundle20 stream.'''
384 384
385 385 if heads != ['force'] and self.capable('unbundlehash'):
386 386 heads = encodelist(['hashed',
387 387 util.sha1(''.join(sorted(heads))).digest()])
388 388 else:
389 389 heads = encodelist(heads)
390 390
391 391 if util.safehasattr(cg, 'deltaheader'):
392 392 # this a bundle10, do the old style call sequence
393 393 ret, output = self._callpush("unbundle", cg, heads=heads)
394 394 if ret == "":
395 395 raise error.ResponseError(
396 396 _('push failed:'), output)
397 397 try:
398 398 ret = int(ret)
399 399 except ValueError:
400 400 raise error.ResponseError(
401 401 _('push failed (unexpected response):'), ret)
402 402
403 403 for l in output.splitlines(True):
404 404 self.ui.status(_('remote: '), l)
405 405 else:
406 406 # bundle2 push. Send a stream, fetch a stream.
407 407 stream = self._calltwowaystream('unbundle', cg, heads=heads)
408 408 ret = bundle2.getunbundler(self.ui, stream)
409 409 return ret
410 410
411 411 def debugwireargs(self, one, two, three=None, four=None, five=None):
412 412 # don't pass optional arguments left at their default value
413 413 opts = {}
414 414 if three is not None:
415 415 opts['three'] = three
416 416 if four is not None:
417 417 opts['four'] = four
418 418 return self._call('debugwireargs', one=one, two=two, **opts)
419 419
420 420 def _call(self, cmd, **args):
421 421 """execute <cmd> on the server
422 422
423 423 The command is expected to return a simple string.
424 424
425 425 returns the server reply as a string."""
426 426 raise NotImplementedError()
427 427
428 428 def _callstream(self, cmd, **args):
429 429 """execute <cmd> on the server
430 430
431 431 The command is expected to return a stream.
432 432
433 433 returns the server reply as a file like object."""
434 434 raise NotImplementedError()
435 435
436 436 def _callcompressable(self, cmd, **args):
437 437 """execute <cmd> on the server
438 438
439 439 The command is expected to return a stream.
440 440
441 441 The stream may have been compressed in some implementations. This
442 442 function takes care of the decompression. This is the only difference
443 443 with _callstream.
444 444
445 445 returns the server reply as a file like object.
446 446 """
447 447 raise NotImplementedError()
448 448
449 449 def _callpush(self, cmd, fp, **args):
450 450 """execute a <cmd> on server
451 451
452 452 The command is expected to be related to a push. Push has a special
453 453 return method.
454 454
455 455 returns the server reply as a (ret, output) tuple. ret is either
456 456 empty (error) or a stringified int.
457 457 """
458 458 raise NotImplementedError()
459 459
460 460 def _calltwowaystream(self, cmd, fp, **args):
461 461 """execute <cmd> on server
462 462
463 463 The command will send a stream to the server and get a stream in reply.
464 464 """
465 465 raise NotImplementedError()
466 466
467 467 def _abort(self, exception):
468 468 """clearly abort the wire protocol connection and raise the exception
469 469 """
470 470 raise NotImplementedError()
471 471
472 472 # server side
473 473
474 474 # wire protocol command can either return a string or one of these classes.
475 475 class streamres(object):
476 476 """wireproto reply: binary stream
477 477
478 478 The call was successful and the result is a stream.
479 479 Iterate on the `self.gen` attribute to retrieve chunks.
480 480 """
481 481 def __init__(self, gen):
482 482 self.gen = gen
483 483
484 484 class pushres(object):
485 485 """wireproto reply: success with simple integer return
486 486
487 487 The call was successful and returned an integer contained in `self.res`.
488 488 """
489 489 def __init__(self, res):
490 490 self.res = res
491 491
492 492 class pusherr(object):
493 493 """wireproto reply: failure
494 494
495 495 The call failed. The `self.res` attribute contains the error message.
496 496 """
497 497 def __init__(self, res):
498 498 self.res = res
499 499
500 500 class ooberror(object):
501 501 """wireproto reply: failure of a batch of operation
502 502
503 503 Something failed during a batch call. The error message is stored in
504 504 `self.message`.
505 505 """
506 506 def __init__(self, message):
507 507 self.message = message
508 508
509 509 def dispatch(repo, proto, command):
510 510 repo = repo.filtered("served")
511 511 func, spec = commands[command]
512 512 args = proto.getargs(spec)
513 513 return func(repo, proto, *args)
514 514
515 515 def options(cmd, keys, others):
516 516 opts = {}
517 517 for k in keys:
518 518 if k in others:
519 519 opts[k] = others[k]
520 520 del others[k]
521 521 if others:
522 522 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
523 523 % (cmd, ",".join(others)))
524 524 return opts
525 525
526 526 # list of commands
527 527 commands = {}
528 528
529 529 def wireprotocommand(name, args=''):
530 530 """decorator for wire protocol command"""
531 531 def register(func):
532 532 commands[name] = (func, args)
533 533 return func
534 534 return register
535 535
536 536 @wireprotocommand('batch', 'cmds *')
537 537 def batch(repo, proto, cmds, others):
538 538 repo = repo.filtered("served")
539 539 res = []
540 540 for pair in cmds.split(';'):
541 541 op, args = pair.split(' ', 1)
542 542 vals = {}
543 543 for a in args.split(','):
544 544 if a:
545 545 n, v = a.split('=')
546 546 vals[n] = unescapearg(v)
547 547 func, spec = commands[op]
548 548 if spec:
549 549 keys = spec.split()
550 550 data = {}
551 551 for k in keys:
552 552 if k == '*':
553 553 star = {}
554 554 for key in vals.keys():
555 555 if key not in keys:
556 556 star[key] = vals[key]
557 557 data['*'] = star
558 558 else:
559 559 data[k] = vals[k]
560 560 result = func(repo, proto, *[data[k] for k in keys])
561 561 else:
562 562 result = func(repo, proto)
563 563 if isinstance(result, ooberror):
564 564 return result
565 565 res.append(escapearg(result))
566 566 return ';'.join(res)
567 567
568 568 @wireprotocommand('between', 'pairs')
569 569 def between(repo, proto, pairs):
570 570 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
571 571 r = []
572 572 for b in repo.between(pairs):
573 573 r.append(encodelist(b) + "\n")
574 574 return "".join(r)
575 575
576 576 @wireprotocommand('branchmap')
577 577 def branchmap(repo, proto):
578 578 branchmap = repo.branchmap()
579 579 heads = []
580 580 for branch, nodes in branchmap.iteritems():
581 581 branchname = urllib.quote(encoding.fromlocal(branch))
582 582 branchnodes = encodelist(nodes)
583 583 heads.append('%s %s' % (branchname, branchnodes))
584 584 return '\n'.join(heads)
585 585
586 586 @wireprotocommand('branches', 'nodes')
587 587 def branches(repo, proto, nodes):
588 588 nodes = decodelist(nodes)
589 589 r = []
590 590 for b in repo.branches(nodes):
591 591 r.append(encodelist(b) + "\n")
592 592 return "".join(r)
593 593
594 594
595 595 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
596 596 'known', 'getbundle', 'unbundlehash', 'batch']
597 597
598 598 def _capabilities(repo, proto):
599 599 """return a list of capabilities for a repo
600 600
601 601 This function exists to allow extensions to easily wrap capabilities
602 602 computation
603 603
604 604 - returns a lists: easy to alter
605 605 - change done here will be propagated to both `capabilities` and `hello`
606 606 command without any other action needed.
607 607 """
608 608 # copy to prevent modification of the global list
609 609 caps = list(wireprotocaps)
610 610 if _allowstream(repo.ui):
611 611 if repo.ui.configbool('server', 'preferuncompressed', False):
612 612 caps.append('stream-preferred')
613 613 requiredformats = repo.requirements & repo.supportedformats
614 614 # if our local revlogs are just revlogv1, add 'stream' cap
615 615 if not requiredformats - set(('revlogv1',)):
616 616 caps.append('stream')
617 617 # otherwise, add 'streamreqs' detailing our local revlog format
618 618 else:
619 619 caps.append('streamreqs=%s' % ','.join(requiredformats))
620 620 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
621 621 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
622 622 caps.append('bundle2=' + urllib.quote(capsblob))
623 623 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
624 624 caps.append('httpheader=1024')
625 625 return caps
626 626
627 627 # If you are writing an extension and consider wrapping this function. Wrap
628 628 # `_capabilities` instead.
629 629 @wireprotocommand('capabilities')
630 630 def capabilities(repo, proto):
631 631 return ' '.join(_capabilities(repo, proto))
632 632
633 633 @wireprotocommand('changegroup', 'roots')
634 634 def changegroup(repo, proto, roots):
635 635 nodes = decodelist(roots)
636 636 cg = changegroupmod.changegroup(repo, nodes, 'serve')
637 637 return streamres(proto.groupchunks(cg))
638 638
639 639 @wireprotocommand('changegroupsubset', 'bases heads')
640 640 def changegroupsubset(repo, proto, bases, heads):
641 641 bases = decodelist(bases)
642 642 heads = decodelist(heads)
643 643 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
644 644 return streamres(proto.groupchunks(cg))
645 645
646 646 @wireprotocommand('debugwireargs', 'one two *')
647 647 def debugwireargs(repo, proto, one, two, others):
648 648 # only accept optional args from the known set
649 649 opts = options('debugwireargs', ['three', 'four'], others)
650 650 return repo.debugwireargs(one, two, **opts)
651 651
652 652 # List of options accepted by getbundle.
653 653 #
654 654 # Meant to be extended by extensions. It is the extension's responsibility to
655 655 # ensure such options are properly processed in exchange.getbundle.
656 656 gboptslist = ['heads', 'common', 'bundlecaps']
657 657
658 658 @wireprotocommand('getbundle', '*')
659 659 def getbundle(repo, proto, others):
660 660 opts = options('getbundle', gboptsmap.keys(), others)
661 661 for k, v in opts.iteritems():
662 662 keytype = gboptsmap[k]
663 663 if keytype == 'nodes':
664 664 opts[k] = decodelist(v)
665 665 elif keytype == 'csv':
666 666 opts[k] = set(v.split(','))
667 667 elif keytype == 'boolean':
668 668 opts[k] = bool(v)
669 669 elif keytype != 'plain':
670 670 raise KeyError('unknown getbundle option type %s'
671 671 % keytype)
672 672 cg = exchange.getbundle(repo, 'serve', **opts)
673 673 return streamres(proto.groupchunks(cg))
674 674
675 675 @wireprotocommand('heads')
676 676 def heads(repo, proto):
677 677 h = repo.heads()
678 678 return encodelist(h) + "\n"
679 679
680 680 @wireprotocommand('hello')
681 681 def hello(repo, proto):
682 682 '''the hello command returns a set of lines describing various
683 683 interesting things about the server, in an RFC822-like format.
684 684 Currently the only one defined is "capabilities", which
685 685 consists of a line in the form:
686 686
687 687 capabilities: space separated list of tokens
688 688 '''
689 689 return "capabilities: %s\n" % (capabilities(repo, proto))
690 690
691 691 @wireprotocommand('listkeys', 'namespace')
692 692 def listkeys(repo, proto, namespace):
693 693 d = repo.listkeys(encoding.tolocal(namespace)).items()
694 694 return pushkeymod.encodekeys(d)
695 695
696 696 @wireprotocommand('lookup', 'key')
697 697 def lookup(repo, proto, key):
698 698 try:
699 699 k = encoding.tolocal(key)
700 700 c = repo[k]
701 701 r = c.hex()
702 702 success = 1
703 703 except Exception, inst:
704 704 r = str(inst)
705 705 success = 0
706 706 return "%s %s\n" % (success, r)
707 707
708 708 @wireprotocommand('known', 'nodes *')
709 709 def known(repo, proto, nodes, others):
710 710 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
711 711
712 712 @wireprotocommand('pushkey', 'namespace key old new')
713 713 def pushkey(repo, proto, namespace, key, old, new):
714 714 # compatibility with pre-1.8 clients which were accidentally
715 715 # sending raw binary nodes rather than utf-8-encoded hex
716 716 if len(new) == 20 and new.encode('string-escape') != new:
717 717 # looks like it could be a binary node
718 718 try:
719 719 new.decode('utf-8')
720 720 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
721 721 except UnicodeDecodeError:
722 722 pass # binary, leave unmodified
723 723 else:
724 724 new = encoding.tolocal(new) # normal path
725 725
726 726 if util.safehasattr(proto, 'restore'):
727 727
728 728 proto.redirect()
729 729
730 730 try:
731 731 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
732 732 encoding.tolocal(old), new) or False
733 733 except util.Abort:
734 734 r = False
735 735
736 736 output = proto.restore()
737 737
738 738 return '%s\n%s' % (int(r), output)
739 739
740 740 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
741 741 encoding.tolocal(old), new)
742 742 return '%s\n' % int(r)
743 743
744 744 def _allowstream(ui):
745 745 return ui.configbool('server', 'uncompressed', True, untrusted=True)
746 746
747 747 @wireprotocommand('stream_out')
748 748 def stream(repo, proto):
749 749 '''If the server supports streaming clone, it advertises the "stream"
750 750 capability with a value representing the version and flags of the repo
751 751 it is serving. Client checks to see if it understands the format.
752 752 '''
753 753 if not _allowstream(repo.ui):
754 754 return '1\n'
755 755
756 756 def getstream(it):
757 757 yield '0\n'
758 758 for chunk in it:
759 759 yield chunk
760 760
761 761 try:
762 762 # LockError may be raised before the first result is yielded. Don't
763 763 # emit output until we're sure we got the lock successfully.
764 764 it = exchange.generatestreamclone(repo)
765 765 return streamres(getstream(it))
766 766 except error.LockError:
767 767 return '2\n'
768 768
769 769 @wireprotocommand('unbundle', 'heads')
770 770 def unbundle(repo, proto, heads):
771 771 their_heads = decodelist(heads)
772 772
773 773 try:
774 774 proto.redirect()
775 775
776 776 exchange.check_heads(repo, their_heads, 'preparing changes')
777 777
778 778 # write bundle data to temporary file because it can be big
779 779 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
780 780 fp = os.fdopen(fd, 'wb+')
781 781 r = 0
782 782 try:
783 783 proto.getfile(fp)
784 784 fp.seek(0)
785 785 gen = exchange.readbundle(repo.ui, fp, None)
786 786 r = exchange.unbundle(repo, gen, their_heads, 'serve',
787 787 proto._client())
788 788 if util.safehasattr(r, 'addpart'):
789 789 # The return looks streamable, we are in the bundle2 case and
790 790 # should return a stream.
791 791 return streamres(r.getchunks())
792 792 return pushres(r)
793 793
794 794 finally:
795 795 fp.close()
796 796 os.unlink(tempname)
797 797
798 798 except (error.BundleValueError, util.Abort, error.PushRaced), exc:
799 799 # handle non-bundle2 case first
800 800 if not getattr(exc, 'duringunbundle2', False):
801 801 try:
802 802 raise
803 803 except util.Abort:
804 804 # The old code we moved used sys.stderr directly.
805 805 # We did not change it to minimise code change.
806 806 # This need to be moved to something proper.
807 807 # Feel free to do it.
808 808 sys.stderr.write("abort: %s\n" % exc)
809 809 return pushres(0)
810 810 except error.PushRaced:
811 811 return pusherr(str(exc))
812 812
813 813 bundler = bundle2.bundle20(repo.ui)
814 814 for out in getattr(exc, '_bundle2salvagedoutput', ()):
815 815 bundler.addpart(out)
816 816 try:
817 817 raise
818 818 except error.BundleValueError, exc:
819 819 errpart = bundler.newpart('error:unsupportedcontent')
820 820 if exc.parttype is not None:
821 821 errpart.addparam('parttype', exc.parttype)
822 822 if exc.params:
823 823 errpart.addparam('params', '\0'.join(exc.params))
824 824 except util.Abort, exc:
825 825 manargs = [('message', str(exc))]
826 826 advargs = []
827 827 if exc.hint is not None:
828 828 advargs.append(('hint', exc.hint))
829 829 bundler.addpart(bundle2.bundlepart('error:abort',
830 830 manargs, advargs))
831 831 except error.PushRaced, exc:
832 832 bundler.newpart('error:pushraced', [('message', str(exc))])
833 833 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now