##// END OF EJS Templates
stream: sort stream capability before serialisation...
Pierre-Yves David -
r26911:d7e5e4da default
parent child Browse files
Show More
@@ -1,827 +1,827
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import sys
12 12 import tempfile
13 13 import urllib
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 bin,
18 18 hex,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 encoding,
25 25 error,
26 26 exchange,
27 27 peer,
28 28 pushkey as pushkeymod,
29 29 streamclone,
30 30 util,
31 31 )
32 32
33 33 class abstractserverproto(object):
34 34 """abstract class that summarizes the protocol API
35 35
36 36 Used as reference and documentation.
37 37 """
38 38
39 39 def getargs(self, args):
40 40 """return the value for arguments in <args>
41 41
42 42 returns a list of values (same order as <args>)"""
43 43 raise NotImplementedError()
44 44
45 45 def getfile(self, fp):
46 46 """write the whole content of a file into a file like object
47 47
48 48 The file is in the form::
49 49
50 50 (<chunk-size>\n<chunk>)+0\n
51 51
52 52 chunk size is the ascii version of the int.
53 53 """
54 54 raise NotImplementedError()
55 55
56 56 def redirect(self):
57 57 """may setup interception for stdout and stderr
58 58
59 59 See also the `restore` method."""
60 60 raise NotImplementedError()
61 61
62 62 # If the `redirect` function does install interception, the `restore`
63 63 # function MUST be defined. If interception is not used, this function
64 64 # MUST NOT be defined.
65 65 #
66 66 # left commented here on purpose
67 67 #
68 68 #def restore(self):
69 69 # """reinstall previous stdout and stderr and return intercepted stdout
70 70 # """
71 71 # raise NotImplementedError()
72 72
73 73 def groupchunks(self, cg):
74 74 """return 4096 chunks from a changegroup object
75 75
76 76 Some protocols may have compressed the contents."""
77 77 raise NotImplementedError()
78 78
79 79 class remotebatch(peer.batcher):
80 80 '''batches the queued calls; uses as few roundtrips as possible'''
81 81 def __init__(self, remote):
82 82 '''remote must support _submitbatch(encbatch) and
83 83 _submitone(op, encargs)'''
84 84 peer.batcher.__init__(self)
85 85 self.remote = remote
86 86 def submit(self):
87 87 req, rsp = [], []
88 88 for name, args, opts, resref in self.calls:
89 89 mtd = getattr(self.remote, name)
90 90 batchablefn = getattr(mtd, 'batchable', None)
91 91 if batchablefn is not None:
92 92 batchable = batchablefn(mtd.im_self, *args, **opts)
93 93 encargsorres, encresref = batchable.next()
94 94 if encresref:
95 95 req.append((name, encargsorres,))
96 96 rsp.append((batchable, encresref, resref,))
97 97 else:
98 98 resref.set(encargsorres)
99 99 else:
100 100 if req:
101 101 self._submitreq(req, rsp)
102 102 req, rsp = [], []
103 103 resref.set(mtd(*args, **opts))
104 104 if req:
105 105 self._submitreq(req, rsp)
106 106 def _submitreq(self, req, rsp):
107 107 encresults = self.remote._submitbatch(req)
108 108 for encres, r in zip(encresults, rsp):
109 109 batchable, encresref, resref = r
110 110 encresref.set(encres)
111 111 resref.set(batchable.next())
112 112
113 113 # Forward a couple of names from peer to make wireproto interactions
114 114 # slightly more sensible.
115 115 batchable = peer.batchable
116 116 future = peer.future
117 117
118 118 # list of nodes encoding / decoding
119 119
120 120 def decodelist(l, sep=' '):
121 121 if l:
122 122 return map(bin, l.split(sep))
123 123 return []
124 124
125 125 def encodelist(l, sep=' '):
126 126 try:
127 127 return sep.join(map(hex, l))
128 128 except TypeError:
129 129 raise
130 130
131 131 # batched call argument encoding
132 132
133 133 def escapearg(plain):
134 134 return (plain
135 135 .replace(':', ':c')
136 136 .replace(',', ':o')
137 137 .replace(';', ':s')
138 138 .replace('=', ':e'))
139 139
140 140 def unescapearg(escaped):
141 141 return (escaped
142 142 .replace(':e', '=')
143 143 .replace(':s', ';')
144 144 .replace(':o', ',')
145 145 .replace(':c', ':'))
146 146
147 147 # mapping of options accepted by getbundle and their types
148 148 #
149 149 # Meant to be extended by extensions. It is extensions responsibility to ensure
150 150 # such options are properly processed in exchange.getbundle.
151 151 #
152 152 # supported types are:
153 153 #
154 154 # :nodes: list of binary nodes
155 155 # :csv: list of comma-separated values
156 156 # :scsv: list of comma-separated values return as set
157 157 # :plain: string with no transformation needed.
158 158 gboptsmap = {'heads': 'nodes',
159 159 'common': 'nodes',
160 160 'obsmarkers': 'boolean',
161 161 'bundlecaps': 'scsv',
162 162 'listkeys': 'csv',
163 163 'cg': 'boolean',
164 164 'cbattempted': 'boolean'}
165 165
166 166 # client side
167 167
168 168 class wirepeer(peer.peerrepository):
169 169
170 170 def batch(self):
171 171 if self.capable('batch'):
172 172 return remotebatch(self)
173 173 else:
174 174 return peer.localbatch(self)
175 175 def _submitbatch(self, req):
176 176 cmds = []
177 177 for op, argsdict in req:
178 178 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
179 179 for k, v in argsdict.iteritems())
180 180 cmds.append('%s %s' % (op, args))
181 181 rsp = self._call("batch", cmds=';'.join(cmds))
182 182 return [unescapearg(r) for r in rsp.split(';')]
183 183 def _submitone(self, op, args):
184 184 return self._call(op, **args)
185 185
186 186 @batchable
187 187 def lookup(self, key):
188 188 self.requirecap('lookup', _('look up remote revision'))
189 189 f = future()
190 190 yield {'key': encoding.fromlocal(key)}, f
191 191 d = f.value
192 192 success, data = d[:-1].split(" ", 1)
193 193 if int(success):
194 194 yield bin(data)
195 195 self._abort(error.RepoError(data))
196 196
197 197 @batchable
198 198 def heads(self):
199 199 f = future()
200 200 yield {}, f
201 201 d = f.value
202 202 try:
203 203 yield decodelist(d[:-1])
204 204 except ValueError:
205 205 self._abort(error.ResponseError(_("unexpected response:"), d))
206 206
207 207 @batchable
208 208 def known(self, nodes):
209 209 f = future()
210 210 yield {'nodes': encodelist(nodes)}, f
211 211 d = f.value
212 212 try:
213 213 yield [bool(int(b)) for b in d]
214 214 except ValueError:
215 215 self._abort(error.ResponseError(_("unexpected response:"), d))
216 216
217 217 @batchable
218 218 def branchmap(self):
219 219 f = future()
220 220 yield {}, f
221 221 d = f.value
222 222 try:
223 223 branchmap = {}
224 224 for branchpart in d.splitlines():
225 225 branchname, branchheads = branchpart.split(' ', 1)
226 226 branchname = encoding.tolocal(urllib.unquote(branchname))
227 227 branchheads = decodelist(branchheads)
228 228 branchmap[branchname] = branchheads
229 229 yield branchmap
230 230 except TypeError:
231 231 self._abort(error.ResponseError(_("unexpected response:"), d))
232 232
233 233 def branches(self, nodes):
234 234 n = encodelist(nodes)
235 235 d = self._call("branches", nodes=n)
236 236 try:
237 237 br = [tuple(decodelist(b)) for b in d.splitlines()]
238 238 return br
239 239 except ValueError:
240 240 self._abort(error.ResponseError(_("unexpected response:"), d))
241 241
242 242 def between(self, pairs):
243 243 batch = 8 # avoid giant requests
244 244 r = []
245 245 for i in xrange(0, len(pairs), batch):
246 246 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
247 247 d = self._call("between", pairs=n)
248 248 try:
249 249 r.extend(l and decodelist(l) or [] for l in d.splitlines())
250 250 except ValueError:
251 251 self._abort(error.ResponseError(_("unexpected response:"), d))
252 252 return r
253 253
254 254 @batchable
255 255 def pushkey(self, namespace, key, old, new):
256 256 if not self.capable('pushkey'):
257 257 yield False, None
258 258 f = future()
259 259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
260 260 yield {'namespace': encoding.fromlocal(namespace),
261 261 'key': encoding.fromlocal(key),
262 262 'old': encoding.fromlocal(old),
263 263 'new': encoding.fromlocal(new)}, f
264 264 d = f.value
265 265 d, output = d.split('\n', 1)
266 266 try:
267 267 d = bool(int(d))
268 268 except ValueError:
269 269 raise error.ResponseError(
270 270 _('push failed (unexpected response):'), d)
271 271 for l in output.splitlines(True):
272 272 self.ui.status(_('remote: '), l)
273 273 yield d
274 274
275 275 @batchable
276 276 def listkeys(self, namespace):
277 277 if not self.capable('pushkey'):
278 278 yield {}, None
279 279 f = future()
280 280 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
281 281 yield {'namespace': encoding.fromlocal(namespace)}, f
282 282 d = f.value
283 283 self.ui.debug('received listkey for "%s": %i bytes\n'
284 284 % (namespace, len(d)))
285 285 yield pushkeymod.decodekeys(d)
286 286
287 287 def stream_out(self):
288 288 return self._callstream('stream_out')
289 289
290 290 def changegroup(self, nodes, kind):
291 291 n = encodelist(nodes)
292 292 f = self._callcompressable("changegroup", roots=n)
293 293 return changegroupmod.cg1unpacker(f, 'UN')
294 294
295 295 def changegroupsubset(self, bases, heads, kind):
296 296 self.requirecap('changegroupsubset', _('look up remote changes'))
297 297 bases = encodelist(bases)
298 298 heads = encodelist(heads)
299 299 f = self._callcompressable("changegroupsubset",
300 300 bases=bases, heads=heads)
301 301 return changegroupmod.cg1unpacker(f, 'UN')
302 302
303 303 def getbundle(self, source, **kwargs):
304 304 self.requirecap('getbundle', _('look up remote changes'))
305 305 opts = {}
306 306 bundlecaps = kwargs.get('bundlecaps')
307 307 if bundlecaps is not None:
308 308 kwargs['bundlecaps'] = sorted(bundlecaps)
309 309 else:
310 310 bundlecaps = () # kwargs could have it to None
311 311 for key, value in kwargs.iteritems():
312 312 if value is None:
313 313 continue
314 314 keytype = gboptsmap.get(key)
315 315 if keytype is None:
316 316 assert False, 'unexpected'
317 317 elif keytype == 'nodes':
318 318 value = encodelist(value)
319 319 elif keytype in ('csv', 'scsv'):
320 320 value = ','.join(value)
321 321 elif keytype == 'boolean':
322 322 value = '%i' % bool(value)
323 323 elif keytype != 'plain':
324 324 raise KeyError('unknown getbundle option type %s'
325 325 % keytype)
326 326 opts[key] = value
327 327 f = self._callcompressable("getbundle", **opts)
328 328 if any((cap.startswith('HG2') for cap in bundlecaps)):
329 329 return bundle2.getunbundler(self.ui, f)
330 330 else:
331 331 return changegroupmod.cg1unpacker(f, 'UN')
332 332
333 333 def unbundle(self, cg, heads, source):
334 334 '''Send cg (a readable file-like object representing the
335 335 changegroup to push, typically a chunkbuffer object) to the
336 336 remote server as a bundle.
337 337
338 338 When pushing a bundle10 stream, return an integer indicating the
339 339 result of the push (see localrepository.addchangegroup()).
340 340
341 341 When pushing a bundle20 stream, return a bundle20 stream.'''
342 342
343 343 if heads != ['force'] and self.capable('unbundlehash'):
344 344 heads = encodelist(['hashed',
345 345 util.sha1(''.join(sorted(heads))).digest()])
346 346 else:
347 347 heads = encodelist(heads)
348 348
349 349 if util.safehasattr(cg, 'deltaheader'):
350 350 # this a bundle10, do the old style call sequence
351 351 ret, output = self._callpush("unbundle", cg, heads=heads)
352 352 if ret == "":
353 353 raise error.ResponseError(
354 354 _('push failed:'), output)
355 355 try:
356 356 ret = int(ret)
357 357 except ValueError:
358 358 raise error.ResponseError(
359 359 _('push failed (unexpected response):'), ret)
360 360
361 361 for l in output.splitlines(True):
362 362 self.ui.status(_('remote: '), l)
363 363 else:
364 364 # bundle2 push. Send a stream, fetch a stream.
365 365 stream = self._calltwowaystream('unbundle', cg, heads=heads)
366 366 ret = bundle2.getunbundler(self.ui, stream)
367 367 return ret
368 368
369 369 def debugwireargs(self, one, two, three=None, four=None, five=None):
370 370 # don't pass optional arguments left at their default value
371 371 opts = {}
372 372 if three is not None:
373 373 opts['three'] = three
374 374 if four is not None:
375 375 opts['four'] = four
376 376 return self._call('debugwireargs', one=one, two=two, **opts)
377 377
378 378 def _call(self, cmd, **args):
379 379 """execute <cmd> on the server
380 380
381 381 The command is expected to return a simple string.
382 382
383 383 returns the server reply as a string."""
384 384 raise NotImplementedError()
385 385
386 386 def _callstream(self, cmd, **args):
387 387 """execute <cmd> on the server
388 388
389 389 The command is expected to return a stream.
390 390
391 391 returns the server reply as a file like object."""
392 392 raise NotImplementedError()
393 393
394 394 def _callcompressable(self, cmd, **args):
395 395 """execute <cmd> on the server
396 396
397 397 The command is expected to return a stream.
398 398
399 399 The stream may have been compressed in some implementations. This
400 400 function takes care of the decompression. This is the only difference
401 401 with _callstream.
402 402
403 403 returns the server reply as a file like object.
404 404 """
405 405 raise NotImplementedError()
406 406
407 407 def _callpush(self, cmd, fp, **args):
408 408 """execute a <cmd> on server
409 409
410 410 The command is expected to be related to a push. Push has a special
411 411 return method.
412 412
413 413 returns the server reply as a (ret, output) tuple. ret is either
414 414 empty (error) or a stringified int.
415 415 """
416 416 raise NotImplementedError()
417 417
418 418 def _calltwowaystream(self, cmd, fp, **args):
419 419 """execute <cmd> on server
420 420
421 421 The command will send a stream to the server and get a stream in reply.
422 422 """
423 423 raise NotImplementedError()
424 424
425 425 def _abort(self, exception):
426 426 """clearly abort the wire protocol connection and raise the exception
427 427 """
428 428 raise NotImplementedError()
429 429
430 430 # server side
431 431
432 432 # wire protocol command can either return a string or one of these classes.
433 433 class streamres(object):
434 434 """wireproto reply: binary stream
435 435
436 436 The call was successful and the result is a stream.
437 437 Iterate on the `self.gen` attribute to retrieve chunks.
438 438 """
439 439 def __init__(self, gen):
440 440 self.gen = gen
441 441
442 442 class pushres(object):
443 443 """wireproto reply: success with simple integer return
444 444
445 445 The call was successful and returned an integer contained in `self.res`.
446 446 """
447 447 def __init__(self, res):
448 448 self.res = res
449 449
450 450 class pusherr(object):
451 451 """wireproto reply: failure
452 452
453 453 The call failed. The `self.res` attribute contains the error message.
454 454 """
455 455 def __init__(self, res):
456 456 self.res = res
457 457
458 458 class ooberror(object):
459 459 """wireproto reply: failure of a batch of operation
460 460
461 461 Something failed during a batch call. The error message is stored in
462 462 `self.message`.
463 463 """
464 464 def __init__(self, message):
465 465 self.message = message
466 466
467 467 def dispatch(repo, proto, command):
468 468 repo = repo.filtered("served")
469 469 func, spec = commands[command]
470 470 args = proto.getargs(spec)
471 471 return func(repo, proto, *args)
472 472
473 473 def options(cmd, keys, others):
474 474 opts = {}
475 475 for k in keys:
476 476 if k in others:
477 477 opts[k] = others[k]
478 478 del others[k]
479 479 if others:
480 480 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
481 481 % (cmd, ",".join(others)))
482 482 return opts
483 483
484 484 # list of commands
485 485 commands = {}
486 486
487 487 def wireprotocommand(name, args=''):
488 488 """decorator for wire protocol command"""
489 489 def register(func):
490 490 commands[name] = (func, args)
491 491 return func
492 492 return register
493 493
494 494 @wireprotocommand('batch', 'cmds *')
495 495 def batch(repo, proto, cmds, others):
496 496 repo = repo.filtered("served")
497 497 res = []
498 498 for pair in cmds.split(';'):
499 499 op, args = pair.split(' ', 1)
500 500 vals = {}
501 501 for a in args.split(','):
502 502 if a:
503 503 n, v = a.split('=')
504 504 vals[n] = unescapearg(v)
505 505 func, spec = commands[op]
506 506 if spec:
507 507 keys = spec.split()
508 508 data = {}
509 509 for k in keys:
510 510 if k == '*':
511 511 star = {}
512 512 for key in vals.keys():
513 513 if key not in keys:
514 514 star[key] = vals[key]
515 515 data['*'] = star
516 516 else:
517 517 data[k] = vals[k]
518 518 result = func(repo, proto, *[data[k] for k in keys])
519 519 else:
520 520 result = func(repo, proto)
521 521 if isinstance(result, ooberror):
522 522 return result
523 523 res.append(escapearg(result))
524 524 return ';'.join(res)
525 525
526 526 @wireprotocommand('between', 'pairs')
527 527 def between(repo, proto, pairs):
528 528 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
529 529 r = []
530 530 for b in repo.between(pairs):
531 531 r.append(encodelist(b) + "\n")
532 532 return "".join(r)
533 533
534 534 @wireprotocommand('branchmap')
535 535 def branchmap(repo, proto):
536 536 branchmap = repo.branchmap()
537 537 heads = []
538 538 for branch, nodes in branchmap.iteritems():
539 539 branchname = urllib.quote(encoding.fromlocal(branch))
540 540 branchnodes = encodelist(nodes)
541 541 heads.append('%s %s' % (branchname, branchnodes))
542 542 return '\n'.join(heads)
543 543
544 544 @wireprotocommand('branches', 'nodes')
545 545 def branches(repo, proto, nodes):
546 546 nodes = decodelist(nodes)
547 547 r = []
548 548 for b in repo.branches(nodes):
549 549 r.append(encodelist(b) + "\n")
550 550 return "".join(r)
551 551
552 552 @wireprotocommand('clonebundles', '')
553 553 def clonebundles(repo, proto):
554 554 """Server command for returning info for available bundles to seed clones.
555 555
556 556 Clients will parse this response and determine what bundle to fetch.
557 557
558 558 Extensions may wrap this command to filter or dynamically emit data
559 559 depending on the request. e.g. you could advertise URLs for the closest
560 560 data center given the client's IP address.
561 561 """
562 562 return repo.opener.tryread('clonebundles.manifest')
563 563
564 564 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
565 565 'known', 'getbundle', 'unbundlehash', 'batch']
566 566
567 567 def _capabilities(repo, proto):
568 568 """return a list of capabilities for a repo
569 569
570 570 This function exists to allow extensions to easily wrap capabilities
571 571 computation
572 572
573 573 - returns a lists: easy to alter
574 574 - change done here will be propagated to both `capabilities` and `hello`
575 575 command without any other action needed.
576 576 """
577 577 # copy to prevent modification of the global list
578 578 caps = list(wireprotocaps)
579 579 if streamclone.allowservergeneration(repo.ui):
580 580 if repo.ui.configbool('server', 'preferuncompressed', False):
581 581 caps.append('stream-preferred')
582 582 requiredformats = repo.requirements & repo.supportedformats
583 583 # if our local revlogs are just revlogv1, add 'stream' cap
584 584 if not requiredformats - set(('revlogv1',)):
585 585 caps.append('stream')
586 586 # otherwise, add 'streamreqs' detailing our local revlog format
587 587 else:
588 caps.append('streamreqs=%s' % ','.join(requiredformats))
588 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
589 589 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
590 590 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
591 591 caps.append('bundle2=' + urllib.quote(capsblob))
592 592 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
593 593 caps.append(
594 594 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
595 595 return caps
596 596
597 597 # If you are writing an extension and consider wrapping this function. Wrap
598 598 # `_capabilities` instead.
599 599 @wireprotocommand('capabilities')
600 600 def capabilities(repo, proto):
601 601 return ' '.join(_capabilities(repo, proto))
602 602
603 603 @wireprotocommand('changegroup', 'roots')
604 604 def changegroup(repo, proto, roots):
605 605 nodes = decodelist(roots)
606 606 cg = changegroupmod.changegroup(repo, nodes, 'serve')
607 607 return streamres(proto.groupchunks(cg))
608 608
609 609 @wireprotocommand('changegroupsubset', 'bases heads')
610 610 def changegroupsubset(repo, proto, bases, heads):
611 611 bases = decodelist(bases)
612 612 heads = decodelist(heads)
613 613 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
614 614 return streamres(proto.groupchunks(cg))
615 615
616 616 @wireprotocommand('debugwireargs', 'one two *')
617 617 def debugwireargs(repo, proto, one, two, others):
618 618 # only accept optional args from the known set
619 619 opts = options('debugwireargs', ['three', 'four'], others)
620 620 return repo.debugwireargs(one, two, **opts)
621 621
622 622 # List of options accepted by getbundle.
623 623 #
624 624 # Meant to be extended by extensions. It is the extension's responsibility to
625 625 # ensure such options are properly processed in exchange.getbundle.
626 626 gboptslist = ['heads', 'common', 'bundlecaps']
627 627
628 628 @wireprotocommand('getbundle', '*')
629 629 def getbundle(repo, proto, others):
630 630 opts = options('getbundle', gboptsmap.keys(), others)
631 631 for k, v in opts.iteritems():
632 632 keytype = gboptsmap[k]
633 633 if keytype == 'nodes':
634 634 opts[k] = decodelist(v)
635 635 elif keytype == 'csv':
636 636 opts[k] = list(v.split(','))
637 637 elif keytype == 'scsv':
638 638 opts[k] = set(v.split(','))
639 639 elif keytype == 'boolean':
640 640 # Client should serialize False as '0', which is a non-empty string
641 641 # so it evaluates as a True bool.
642 642 if v == '0':
643 643 opts[k] = False
644 644 else:
645 645 opts[k] = bool(v)
646 646 elif keytype != 'plain':
647 647 raise KeyError('unknown getbundle option type %s'
648 648 % keytype)
649 649 cg = exchange.getbundle(repo, 'serve', **opts)
650 650 return streamres(proto.groupchunks(cg))
651 651
652 652 @wireprotocommand('heads')
653 653 def heads(repo, proto):
654 654 h = repo.heads()
655 655 return encodelist(h) + "\n"
656 656
657 657 @wireprotocommand('hello')
658 658 def hello(repo, proto):
659 659 '''the hello command returns a set of lines describing various
660 660 interesting things about the server, in an RFC822-like format.
661 661 Currently the only one defined is "capabilities", which
662 662 consists of a line in the form:
663 663
664 664 capabilities: space separated list of tokens
665 665 '''
666 666 return "capabilities: %s\n" % (capabilities(repo, proto))
667 667
668 668 @wireprotocommand('listkeys', 'namespace')
669 669 def listkeys(repo, proto, namespace):
670 670 d = repo.listkeys(encoding.tolocal(namespace)).items()
671 671 return pushkeymod.encodekeys(d)
672 672
673 673 @wireprotocommand('lookup', 'key')
674 674 def lookup(repo, proto, key):
675 675 try:
676 676 k = encoding.tolocal(key)
677 677 c = repo[k]
678 678 r = c.hex()
679 679 success = 1
680 680 except Exception as inst:
681 681 r = str(inst)
682 682 success = 0
683 683 return "%s %s\n" % (success, r)
684 684
685 685 @wireprotocommand('known', 'nodes *')
686 686 def known(repo, proto, nodes, others):
687 687 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
688 688
689 689 @wireprotocommand('pushkey', 'namespace key old new')
690 690 def pushkey(repo, proto, namespace, key, old, new):
691 691 # compatibility with pre-1.8 clients which were accidentally
692 692 # sending raw binary nodes rather than utf-8-encoded hex
693 693 if len(new) == 20 and new.encode('string-escape') != new:
694 694 # looks like it could be a binary node
695 695 try:
696 696 new.decode('utf-8')
697 697 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
698 698 except UnicodeDecodeError:
699 699 pass # binary, leave unmodified
700 700 else:
701 701 new = encoding.tolocal(new) # normal path
702 702
703 703 if util.safehasattr(proto, 'restore'):
704 704
705 705 proto.redirect()
706 706
707 707 try:
708 708 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
709 709 encoding.tolocal(old), new) or False
710 710 except error.Abort:
711 711 r = False
712 712
713 713 output = proto.restore()
714 714
715 715 return '%s\n%s' % (int(r), output)
716 716
717 717 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
718 718 encoding.tolocal(old), new)
719 719 return '%s\n' % int(r)
720 720
721 721 @wireprotocommand('stream_out')
722 722 def stream(repo, proto):
723 723 '''If the server supports streaming clone, it advertises the "stream"
724 724 capability with a value representing the version and flags of the repo
725 725 it is serving. Client checks to see if it understands the format.
726 726 '''
727 727 if not streamclone.allowservergeneration(repo.ui):
728 728 return '1\n'
729 729
730 730 def getstream(it):
731 731 yield '0\n'
732 732 for chunk in it:
733 733 yield chunk
734 734
735 735 try:
736 736 # LockError may be raised before the first result is yielded. Don't
737 737 # emit output until we're sure we got the lock successfully.
738 738 it = streamclone.generatev1wireproto(repo)
739 739 return streamres(getstream(it))
740 740 except error.LockError:
741 741 return '2\n'
742 742
743 743 @wireprotocommand('unbundle', 'heads')
744 744 def unbundle(repo, proto, heads):
745 745 their_heads = decodelist(heads)
746 746
747 747 try:
748 748 proto.redirect()
749 749
750 750 exchange.check_heads(repo, their_heads, 'preparing changes')
751 751
752 752 # write bundle data to temporary file because it can be big
753 753 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
754 754 fp = os.fdopen(fd, 'wb+')
755 755 r = 0
756 756 try:
757 757 proto.getfile(fp)
758 758 fp.seek(0)
759 759 gen = exchange.readbundle(repo.ui, fp, None)
760 760 r = exchange.unbundle(repo, gen, their_heads, 'serve',
761 761 proto._client())
762 762 if util.safehasattr(r, 'addpart'):
763 763 # The return looks streamable, we are in the bundle2 case and
764 764 # should return a stream.
765 765 return streamres(r.getchunks())
766 766 return pushres(r)
767 767
768 768 finally:
769 769 fp.close()
770 770 os.unlink(tempname)
771 771
772 772 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
773 773 # handle non-bundle2 case first
774 774 if not getattr(exc, 'duringunbundle2', False):
775 775 try:
776 776 raise
777 777 except error.Abort:
778 778 # The old code we moved used sys.stderr directly.
779 779 # We did not change it to minimise code change.
780 780 # This need to be moved to something proper.
781 781 # Feel free to do it.
782 782 sys.stderr.write("abort: %s\n" % exc)
783 783 return pushres(0)
784 784 except error.PushRaced:
785 785 return pusherr(str(exc))
786 786
787 787 bundler = bundle2.bundle20(repo.ui)
788 788 for out in getattr(exc, '_bundle2salvagedoutput', ()):
789 789 bundler.addpart(out)
790 790 try:
791 791 try:
792 792 raise
793 793 except error.PushkeyFailed as exc:
794 794 # check client caps
795 795 remotecaps = getattr(exc, '_replycaps', None)
796 796 if (remotecaps is not None
797 797 and 'pushkey' not in remotecaps.get('error', ())):
798 798 # no support remote side, fallback to Abort handler.
799 799 raise
800 800 part = bundler.newpart('error:pushkey')
801 801 part.addparam('in-reply-to', exc.partid)
802 802 if exc.namespace is not None:
803 803 part.addparam('namespace', exc.namespace, mandatory=False)
804 804 if exc.key is not None:
805 805 part.addparam('key', exc.key, mandatory=False)
806 806 if exc.new is not None:
807 807 part.addparam('new', exc.new, mandatory=False)
808 808 if exc.old is not None:
809 809 part.addparam('old', exc.old, mandatory=False)
810 810 if exc.ret is not None:
811 811 part.addparam('ret', exc.ret, mandatory=False)
812 812 except error.BundleValueError as exc:
813 813 errpart = bundler.newpart('error:unsupportedcontent')
814 814 if exc.parttype is not None:
815 815 errpart.addparam('parttype', exc.parttype)
816 816 if exc.params:
817 817 errpart.addparam('params', '\0'.join(exc.params))
818 818 except error.Abort as exc:
819 819 manargs = [('message', str(exc))]
820 820 advargs = []
821 821 if exc.hint is not None:
822 822 advargs.append(('hint', exc.hint))
823 823 bundler.addpart(bundle2.bundlepart('error:abort',
824 824 manargs, advargs))
825 825 except error.PushRaced as exc:
826 826 bundler.newpart('error:pushraced', [('message', str(exc))])
827 827 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now