##// END OF EJS Templates
wireprotocol: fix 'boolean' handling...
Pierre-Yves David -
r22351:7e6dd496 default
parent child Browse files
Show More
@@ -1,868 +1,868
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 12 import peer, error, encoding, util, store, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # mapping of options accepted by getbundle and their types
194 194 #
195 195 # Meant to be extended by extensions. It is extensions responsibility to ensure
196 196 # such options are properly processed in exchange.getbundle.
197 197 #
198 198 # supported types are:
199 199 #
200 200 # :nodes: list of binary nodes
201 201 # :csv: list of comma-separated values
202 202 # :plain: string with no transformation needed.
203 203 gboptsmap = {'heads': 'nodes',
204 204 'common': 'nodes',
205 205 'bundlecaps': 'csv',
206 206 'listkeys': 'csv',
207 207 'cg': 'boolean'}
208 208
209 209 # client side
210 210
211 211 class wirepeer(peer.peerrepository):
212 212
213 213 def batch(self):
214 214 return remotebatch(self)
215 215 def _submitbatch(self, req):
216 216 cmds = []
217 217 for op, argsdict in req:
218 218 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
219 219 cmds.append('%s %s' % (op, args))
220 220 rsp = self._call("batch", cmds=';'.join(cmds))
221 221 return rsp.split(';')
222 222 def _submitone(self, op, args):
223 223 return self._call(op, **args)
224 224
225 225 @batchable
226 226 def lookup(self, key):
227 227 self.requirecap('lookup', _('look up remote revision'))
228 228 f = future()
229 229 yield {'key': encoding.fromlocal(key)}, f
230 230 d = f.value
231 231 success, data = d[:-1].split(" ", 1)
232 232 if int(success):
233 233 yield bin(data)
234 234 self._abort(error.RepoError(data))
235 235
236 236 @batchable
237 237 def heads(self):
238 238 f = future()
239 239 yield {}, f
240 240 d = f.value
241 241 try:
242 242 yield decodelist(d[:-1])
243 243 except ValueError:
244 244 self._abort(error.ResponseError(_("unexpected response:"), d))
245 245
246 246 @batchable
247 247 def known(self, nodes):
248 248 f = future()
249 249 yield {'nodes': encodelist(nodes)}, f
250 250 d = f.value
251 251 try:
252 252 yield [bool(int(b)) for b in d]
253 253 except ValueError:
254 254 self._abort(error.ResponseError(_("unexpected response:"), d))
255 255
256 256 @batchable
257 257 def branchmap(self):
258 258 f = future()
259 259 yield {}, f
260 260 d = f.value
261 261 try:
262 262 branchmap = {}
263 263 for branchpart in d.splitlines():
264 264 branchname, branchheads = branchpart.split(' ', 1)
265 265 branchname = encoding.tolocal(urllib.unquote(branchname))
266 266 branchheads = decodelist(branchheads)
267 267 branchmap[branchname] = branchheads
268 268 yield branchmap
269 269 except TypeError:
270 270 self._abort(error.ResponseError(_("unexpected response:"), d))
271 271
272 272 def branches(self, nodes):
273 273 n = encodelist(nodes)
274 274 d = self._call("branches", nodes=n)
275 275 try:
276 276 br = [tuple(decodelist(b)) for b in d.splitlines()]
277 277 return br
278 278 except ValueError:
279 279 self._abort(error.ResponseError(_("unexpected response:"), d))
280 280
281 281 def between(self, pairs):
282 282 batch = 8 # avoid giant requests
283 283 r = []
284 284 for i in xrange(0, len(pairs), batch):
285 285 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
286 286 d = self._call("between", pairs=n)
287 287 try:
288 288 r.extend(l and decodelist(l) or [] for l in d.splitlines())
289 289 except ValueError:
290 290 self._abort(error.ResponseError(_("unexpected response:"), d))
291 291 return r
292 292
293 293 @batchable
294 294 def pushkey(self, namespace, key, old, new):
295 295 if not self.capable('pushkey'):
296 296 yield False, None
297 297 f = future()
298 298 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
299 299 yield {'namespace': encoding.fromlocal(namespace),
300 300 'key': encoding.fromlocal(key),
301 301 'old': encoding.fromlocal(old),
302 302 'new': encoding.fromlocal(new)}, f
303 303 d = f.value
304 304 d, output = d.split('\n', 1)
305 305 try:
306 306 d = bool(int(d))
307 307 except ValueError:
308 308 raise error.ResponseError(
309 309 _('push failed (unexpected response):'), d)
310 310 for l in output.splitlines(True):
311 311 self.ui.status(_('remote: '), l)
312 312 yield d
313 313
314 314 @batchable
315 315 def listkeys(self, namespace):
316 316 if not self.capable('pushkey'):
317 317 yield {}, None
318 318 f = future()
319 319 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
320 320 yield {'namespace': encoding.fromlocal(namespace)}, f
321 321 d = f.value
322 322 yield pushkeymod.decodekeys(d)
323 323
324 324 def stream_out(self):
325 325 return self._callstream('stream_out')
326 326
327 327 def changegroup(self, nodes, kind):
328 328 n = encodelist(nodes)
329 329 f = self._callcompressable("changegroup", roots=n)
330 330 return changegroupmod.unbundle10(f, 'UN')
331 331
332 332 def changegroupsubset(self, bases, heads, kind):
333 333 self.requirecap('changegroupsubset', _('look up remote changes'))
334 334 bases = encodelist(bases)
335 335 heads = encodelist(heads)
336 336 f = self._callcompressable("changegroupsubset",
337 337 bases=bases, heads=heads)
338 338 return changegroupmod.unbundle10(f, 'UN')
339 339
340 340 def getbundle(self, source, **kwargs):
341 341 self.requirecap('getbundle', _('look up remote changes'))
342 342 opts = {}
343 343 for key, value in kwargs.iteritems():
344 344 if value is None:
345 345 continue
346 346 keytype = gboptsmap.get(key)
347 347 if keytype is None:
348 348 assert False, 'unexpected'
349 349 elif keytype == 'nodes':
350 350 value = encodelist(value)
351 351 elif keytype == 'csv':
352 352 value = ','.join(value)
353 353 elif keytype == 'boolean':
354 value = bool(value)
354 value = '%i' % bool(value)
355 355 elif keytype != 'plain':
356 356 raise KeyError('unknown getbundle option type %s'
357 357 % keytype)
358 358 opts[key] = value
359 359 f = self._callcompressable("getbundle", **opts)
360 360 bundlecaps = kwargs.get('bundlecaps')
361 361 if bundlecaps is not None and 'HG2X' in bundlecaps:
362 362 return bundle2.unbundle20(self.ui, f)
363 363 else:
364 364 return changegroupmod.unbundle10(f, 'UN')
365 365
366 366 def unbundle(self, cg, heads, source):
367 367 '''Send cg (a readable file-like object representing the
368 368 changegroup to push, typically a chunkbuffer object) to the
369 369 remote server as a bundle.
370 370
371 371 When pushing a bundle10 stream, return an integer indicating the
372 372 result of the push (see localrepository.addchangegroup()).
373 373
374 374 When pushing a bundle20 stream, return a bundle20 stream.'''
375 375
376 376 if heads != ['force'] and self.capable('unbundlehash'):
377 377 heads = encodelist(['hashed',
378 378 util.sha1(''.join(sorted(heads))).digest()])
379 379 else:
380 380 heads = encodelist(heads)
381 381
382 382 if util.safehasattr(cg, 'deltaheader'):
383 383 # this a bundle10, do the old style call sequence
384 384 ret, output = self._callpush("unbundle", cg, heads=heads)
385 385 if ret == "":
386 386 raise error.ResponseError(
387 387 _('push failed:'), output)
388 388 try:
389 389 ret = int(ret)
390 390 except ValueError:
391 391 raise error.ResponseError(
392 392 _('push failed (unexpected response):'), ret)
393 393
394 394 for l in output.splitlines(True):
395 395 self.ui.status(_('remote: '), l)
396 396 else:
397 397 # bundle2 push. Send a stream, fetch a stream.
398 398 stream = self._calltwowaystream('unbundle', cg, heads=heads)
399 399 ret = bundle2.unbundle20(self.ui, stream)
400 400 return ret
401 401
402 402 def debugwireargs(self, one, two, three=None, four=None, five=None):
403 403 # don't pass optional arguments left at their default value
404 404 opts = {}
405 405 if three is not None:
406 406 opts['three'] = three
407 407 if four is not None:
408 408 opts['four'] = four
409 409 return self._call('debugwireargs', one=one, two=two, **opts)
410 410
411 411 def _call(self, cmd, **args):
412 412 """execute <cmd> on the server
413 413
414 414 The command is expected to return a simple string.
415 415
416 416 returns the server reply as a string."""
417 417 raise NotImplementedError()
418 418
419 419 def _callstream(self, cmd, **args):
420 420 """execute <cmd> on the server
421 421
422 422 The command is expected to return a stream.
423 423
424 424 returns the server reply as a file like object."""
425 425 raise NotImplementedError()
426 426
427 427 def _callcompressable(self, cmd, **args):
428 428 """execute <cmd> on the server
429 429
430 430 The command is expected to return a stream.
431 431
432 432 The stream may have been compressed in some implementations. This
433 433 function takes care of the decompression. This is the only difference
434 434 with _callstream.
435 435
436 436 returns the server reply as a file like object.
437 437 """
438 438 raise NotImplementedError()
439 439
440 440 def _callpush(self, cmd, fp, **args):
441 441 """execute a <cmd> on server
442 442
443 443 The command is expected to be related to a push. Push has a special
444 444 return method.
445 445
446 446 returns the server reply as a (ret, output) tuple. ret is either
447 447 empty (error) or a stringified int.
448 448 """
449 449 raise NotImplementedError()
450 450
451 451 def _calltwowaystream(self, cmd, fp, **args):
452 452 """execute <cmd> on server
453 453
454 454 The command will send a stream to the server and get a stream in reply.
455 455 """
456 456 raise NotImplementedError()
457 457
458 458 def _abort(self, exception):
459 459 """clearly abort the wire protocol connection and raise the exception
460 460 """
461 461 raise NotImplementedError()
462 462
463 463 # server side
464 464
465 465 # wire protocol command can either return a string or one of these classes.
466 466 class streamres(object):
467 467 """wireproto reply: binary stream
468 468
469 469 The call was successful and the result is a stream.
470 470 Iterate on the `self.gen` attribute to retrieve chunks.
471 471 """
472 472 def __init__(self, gen):
473 473 self.gen = gen
474 474
475 475 class pushres(object):
476 476 """wireproto reply: success with simple integer return
477 477
478 478 The call was successful and returned an integer contained in `self.res`.
479 479 """
480 480 def __init__(self, res):
481 481 self.res = res
482 482
483 483 class pusherr(object):
484 484 """wireproto reply: failure
485 485
486 486 The call failed. The `self.res` attribute contains the error message.
487 487 """
488 488 def __init__(self, res):
489 489 self.res = res
490 490
491 491 class ooberror(object):
492 492 """wireproto reply: failure of a batch of operation
493 493
494 494 Something failed during a batch call. The error message is stored in
495 495 `self.message`.
496 496 """
497 497 def __init__(self, message):
498 498 self.message = message
499 499
500 500 def dispatch(repo, proto, command):
501 501 repo = repo.filtered("served")
502 502 func, spec = commands[command]
503 503 args = proto.getargs(spec)
504 504 return func(repo, proto, *args)
505 505
506 506 def options(cmd, keys, others):
507 507 opts = {}
508 508 for k in keys:
509 509 if k in others:
510 510 opts[k] = others[k]
511 511 del others[k]
512 512 if others:
513 513 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
514 514 % (cmd, ",".join(others)))
515 515 return opts
516 516
517 517 # list of commands
518 518 commands = {}
519 519
520 520 def wireprotocommand(name, args=''):
521 521 """decorator for wire protocol command"""
522 522 def register(func):
523 523 commands[name] = (func, args)
524 524 return func
525 525 return register
526 526
527 527 @wireprotocommand('batch', 'cmds *')
528 528 def batch(repo, proto, cmds, others):
529 529 repo = repo.filtered("served")
530 530 res = []
531 531 for pair in cmds.split(';'):
532 532 op, args = pair.split(' ', 1)
533 533 vals = {}
534 534 for a in args.split(','):
535 535 if a:
536 536 n, v = a.split('=')
537 537 vals[n] = unescapearg(v)
538 538 func, spec = commands[op]
539 539 if spec:
540 540 keys = spec.split()
541 541 data = {}
542 542 for k in keys:
543 543 if k == '*':
544 544 star = {}
545 545 for key in vals.keys():
546 546 if key not in keys:
547 547 star[key] = vals[key]
548 548 data['*'] = star
549 549 else:
550 550 data[k] = vals[k]
551 551 result = func(repo, proto, *[data[k] for k in keys])
552 552 else:
553 553 result = func(repo, proto)
554 554 if isinstance(result, ooberror):
555 555 return result
556 556 res.append(escapearg(result))
557 557 return ';'.join(res)
558 558
559 559 @wireprotocommand('between', 'pairs')
560 560 def between(repo, proto, pairs):
561 561 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
562 562 r = []
563 563 for b in repo.between(pairs):
564 564 r.append(encodelist(b) + "\n")
565 565 return "".join(r)
566 566
567 567 @wireprotocommand('branchmap')
568 568 def branchmap(repo, proto):
569 569 branchmap = repo.branchmap()
570 570 heads = []
571 571 for branch, nodes in branchmap.iteritems():
572 572 branchname = urllib.quote(encoding.fromlocal(branch))
573 573 branchnodes = encodelist(nodes)
574 574 heads.append('%s %s' % (branchname, branchnodes))
575 575 return '\n'.join(heads)
576 576
577 577 @wireprotocommand('branches', 'nodes')
578 578 def branches(repo, proto, nodes):
579 579 nodes = decodelist(nodes)
580 580 r = []
581 581 for b in repo.branches(nodes):
582 582 r.append(encodelist(b) + "\n")
583 583 return "".join(r)
584 584
585 585
586 586 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
587 587 'known', 'getbundle', 'unbundlehash', 'batch']
588 588
589 589 def _capabilities(repo, proto):
590 590 """return a list of capabilities for a repo
591 591
592 592 This function exists to allow extensions to easily wrap capabilities
593 593 computation
594 594
595 595 - returns a lists: easy to alter
596 596 - change done here will be propagated to both `capabilities` and `hello`
597 597 command without any other action needed.
598 598 """
599 599 # copy to prevent modification of the global list
600 600 caps = list(wireprotocaps)
601 601 if _allowstream(repo.ui):
602 602 if repo.ui.configbool('server', 'preferuncompressed', False):
603 603 caps.append('stream-preferred')
604 604 requiredformats = repo.requirements & repo.supportedformats
605 605 # if our local revlogs are just revlogv1, add 'stream' cap
606 606 if not requiredformats - set(('revlogv1',)):
607 607 caps.append('stream')
608 608 # otherwise, add 'streamreqs' detailing our local revlog format
609 609 else:
610 610 caps.append('streamreqs=%s' % ','.join(requiredformats))
611 611 if repo.ui.configbool('experimental', 'bundle2-exp', False):
612 612 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
613 613 caps.append('bundle2-exp=' + urllib.quote(capsblob))
614 614 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
615 615 caps.append('httpheader=1024')
616 616 return caps
617 617
618 618 # If you are writing an extension and consider wrapping this function. Wrap
619 619 # `_capabilities` instead.
620 620 @wireprotocommand('capabilities')
621 621 def capabilities(repo, proto):
622 622 return ' '.join(_capabilities(repo, proto))
623 623
624 624 @wireprotocommand('changegroup', 'roots')
625 625 def changegroup(repo, proto, roots):
626 626 nodes = decodelist(roots)
627 627 cg = changegroupmod.changegroup(repo, nodes, 'serve')
628 628 return streamres(proto.groupchunks(cg))
629 629
630 630 @wireprotocommand('changegroupsubset', 'bases heads')
631 631 def changegroupsubset(repo, proto, bases, heads):
632 632 bases = decodelist(bases)
633 633 heads = decodelist(heads)
634 634 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
635 635 return streamres(proto.groupchunks(cg))
636 636
637 637 @wireprotocommand('debugwireargs', 'one two *')
638 638 def debugwireargs(repo, proto, one, two, others):
639 639 # only accept optional args from the known set
640 640 opts = options('debugwireargs', ['three', 'four'], others)
641 641 return repo.debugwireargs(one, two, **opts)
642 642
643 643 # List of options accepted by getbundle.
644 644 #
645 645 # Meant to be extended by extensions. It is the extension's responsibility to
646 646 # ensure such options are properly processed in exchange.getbundle.
647 647 gboptslist = ['heads', 'common', 'bundlecaps']
648 648
649 649 @wireprotocommand('getbundle', '*')
650 650 def getbundle(repo, proto, others):
651 651 opts = options('getbundle', gboptsmap.keys(), others)
652 652 for k, v in opts.iteritems():
653 653 keytype = gboptsmap[k]
654 654 if keytype == 'nodes':
655 655 opts[k] = decodelist(v)
656 656 elif keytype == 'csv':
657 657 opts[k] = set(v.split(','))
658 658 elif keytype == 'boolean':
659 opts[k] = '%i' % bool(v)
659 opts[k] = bool(v)
660 660 elif keytype != 'plain':
661 661 raise KeyError('unknown getbundle option type %s'
662 662 % keytype)
663 663 cg = exchange.getbundle(repo, 'serve', **opts)
664 664 return streamres(proto.groupchunks(cg))
665 665
666 666 @wireprotocommand('heads')
667 667 def heads(repo, proto):
668 668 h = repo.heads()
669 669 return encodelist(h) + "\n"
670 670
671 671 @wireprotocommand('hello')
672 672 def hello(repo, proto):
673 673 '''the hello command returns a set of lines describing various
674 674 interesting things about the server, in an RFC822-like format.
675 675 Currently the only one defined is "capabilities", which
676 676 consists of a line in the form:
677 677
678 678 capabilities: space separated list of tokens
679 679 '''
680 680 return "capabilities: %s\n" % (capabilities(repo, proto))
681 681
682 682 @wireprotocommand('listkeys', 'namespace')
683 683 def listkeys(repo, proto, namespace):
684 684 d = repo.listkeys(encoding.tolocal(namespace)).items()
685 685 return pushkeymod.encodekeys(d)
686 686
687 687 @wireprotocommand('lookup', 'key')
688 688 def lookup(repo, proto, key):
689 689 try:
690 690 k = encoding.tolocal(key)
691 691 c = repo[k]
692 692 r = c.hex()
693 693 success = 1
694 694 except Exception, inst:
695 695 r = str(inst)
696 696 success = 0
697 697 return "%s %s\n" % (success, r)
698 698
699 699 @wireprotocommand('known', 'nodes *')
700 700 def known(repo, proto, nodes, others):
701 701 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
702 702
703 703 @wireprotocommand('pushkey', 'namespace key old new')
704 704 def pushkey(repo, proto, namespace, key, old, new):
705 705 # compatibility with pre-1.8 clients which were accidentally
706 706 # sending raw binary nodes rather than utf-8-encoded hex
707 707 if len(new) == 20 and new.encode('string-escape') != new:
708 708 # looks like it could be a binary node
709 709 try:
710 710 new.decode('utf-8')
711 711 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
712 712 except UnicodeDecodeError:
713 713 pass # binary, leave unmodified
714 714 else:
715 715 new = encoding.tolocal(new) # normal path
716 716
717 717 if util.safehasattr(proto, 'restore'):
718 718
719 719 proto.redirect()
720 720
721 721 try:
722 722 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
723 723 encoding.tolocal(old), new) or False
724 724 except util.Abort:
725 725 r = False
726 726
727 727 output = proto.restore()
728 728
729 729 return '%s\n%s' % (int(r), output)
730 730
731 731 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
732 732 encoding.tolocal(old), new)
733 733 return '%s\n' % int(r)
734 734
735 735 def _allowstream(ui):
736 736 return ui.configbool('server', 'uncompressed', True, untrusted=True)
737 737
738 738 def _walkstreamfiles(repo):
739 739 # this is it's own function so extensions can override it
740 740 return repo.store.walk()
741 741
742 742 @wireprotocommand('stream_out')
743 743 def stream(repo, proto):
744 744 '''If the server supports streaming clone, it advertises the "stream"
745 745 capability with a value representing the version and flags of the repo
746 746 it is serving. Client checks to see if it understands the format.
747 747
748 748 The format is simple: the server writes out a line with the amount
749 749 of files, then the total amount of bytes to be transferred (separated
750 750 by a space). Then, for each file, the server first writes the filename
751 751 and file size (separated by the null character), then the file contents.
752 752 '''
753 753
754 754 if not _allowstream(repo.ui):
755 755 return '1\n'
756 756
757 757 entries = []
758 758 total_bytes = 0
759 759 try:
760 760 # get consistent snapshot of repo, lock during scan
761 761 lock = repo.lock()
762 762 try:
763 763 repo.ui.debug('scanning\n')
764 764 for name, ename, size in _walkstreamfiles(repo):
765 765 if size:
766 766 entries.append((name, size))
767 767 total_bytes += size
768 768 finally:
769 769 lock.release()
770 770 except error.LockError:
771 771 return '2\n' # error: 2
772 772
773 773 def streamer(repo, entries, total):
774 774 '''stream out all metadata files in repository.'''
775 775 yield '0\n' # success
776 776 repo.ui.debug('%d files, %d bytes to transfer\n' %
777 777 (len(entries), total_bytes))
778 778 yield '%d %d\n' % (len(entries), total_bytes)
779 779
780 780 sopener = repo.sopener
781 781 oldaudit = sopener.mustaudit
782 782 debugflag = repo.ui.debugflag
783 783 sopener.mustaudit = False
784 784
785 785 try:
786 786 for name, size in entries:
787 787 if debugflag:
788 788 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
789 789 # partially encode name over the wire for backwards compat
790 790 yield '%s\0%d\n' % (store.encodedir(name), size)
791 791 if size <= 65536:
792 792 fp = sopener(name)
793 793 try:
794 794 data = fp.read(size)
795 795 finally:
796 796 fp.close()
797 797 yield data
798 798 else:
799 799 for chunk in util.filechunkiter(sopener(name), limit=size):
800 800 yield chunk
801 801 # replace with "finally:" when support for python 2.4 has been dropped
802 802 except Exception:
803 803 sopener.mustaudit = oldaudit
804 804 raise
805 805 sopener.mustaudit = oldaudit
806 806
807 807 return streamres(streamer(repo, entries, total_bytes))
808 808
809 809 @wireprotocommand('unbundle', 'heads')
810 810 def unbundle(repo, proto, heads):
811 811 their_heads = decodelist(heads)
812 812
813 813 try:
814 814 proto.redirect()
815 815
816 816 exchange.check_heads(repo, their_heads, 'preparing changes')
817 817
818 818 # write bundle data to temporary file because it can be big
819 819 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
820 820 fp = os.fdopen(fd, 'wb+')
821 821 r = 0
822 822 try:
823 823 proto.getfile(fp)
824 824 fp.seek(0)
825 825 gen = exchange.readbundle(repo.ui, fp, None)
826 826 r = exchange.unbundle(repo, gen, their_heads, 'serve',
827 827 proto._client())
828 828 if util.safehasattr(r, 'addpart'):
829 829 # The return looks streameable, we are in the bundle2 case and
830 830 # should return a stream.
831 831 return streamres(r.getchunks())
832 832 return pushres(r)
833 833
834 834 finally:
835 835 fp.close()
836 836 os.unlink(tempname)
837 837 except error.BundleValueError, exc:
838 838 bundler = bundle2.bundle20(repo.ui)
839 839 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
840 840 if exc.parttype is not None:
841 841 errpart.addparam('parttype', exc.parttype)
842 842 if exc.params:
843 843 errpart.addparam('params', '\0'.join(exc.params))
844 844 return streamres(bundler.getchunks())
845 845 except util.Abort, inst:
846 846 # The old code we moved used sys.stderr directly.
847 847 # We did not change it to minimise code change.
848 848 # This need to be moved to something proper.
849 849 # Feel free to do it.
850 850 if getattr(inst, 'duringunbundle2', False):
851 851 bundler = bundle2.bundle20(repo.ui)
852 852 manargs = [('message', str(inst))]
853 853 advargs = []
854 854 if inst.hint is not None:
855 855 advargs.append(('hint', inst.hint))
856 856 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
857 857 manargs, advargs))
858 858 return streamres(bundler.getchunks())
859 859 else:
860 860 sys.stderr.write("abort: %s\n" % inst)
861 861 return pushres(0)
862 862 except error.PushRaced, exc:
863 863 if getattr(exc, 'duringunbundle2', False):
864 864 bundler = bundle2.bundle20(repo.ui)
865 865 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
866 866 return streamres(bundler.getchunks())
867 867 else:
868 868 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now