##// END OF EJS Templates
wireproto: use pushkey.decodekey
Pierre-Yves David -
r21653:4188cae7 default
parent child Browse files
Show More
@@ -1,866 +1,862
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 12 import peer, error, encoding, util, store, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # mapping of options accepted by getbundle and their types
194 194 #
195 195 # Meant to be extended by extensions. It is extensions responsibility to ensure
196 196 # such options are properly processed in exchange.getbundle.
197 197 #
198 198 # supported types are:
199 199 #
200 200 # :nodes: list of binary nodes
201 201 # :csv: list of comma-separated values
202 202 # :plain: string with no transformation needed.
203 203 gboptsmap = {'heads': 'nodes',
204 204 'common': 'nodes',
205 205 'bundlecaps': 'csv'}
206 206
207 207 # client side
208 208
209 209 class wirepeer(peer.peerrepository):
210 210
211 211 def batch(self):
212 212 return remotebatch(self)
213 213 def _submitbatch(self, req):
214 214 cmds = []
215 215 for op, argsdict in req:
216 216 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
217 217 cmds.append('%s %s' % (op, args))
218 218 rsp = self._call("batch", cmds=';'.join(cmds))
219 219 return rsp.split(';')
220 220 def _submitone(self, op, args):
221 221 return self._call(op, **args)
222 222
223 223 @batchable
224 224 def lookup(self, key):
225 225 self.requirecap('lookup', _('look up remote revision'))
226 226 f = future()
227 227 yield {'key': encoding.fromlocal(key)}, f
228 228 d = f.value
229 229 success, data = d[:-1].split(" ", 1)
230 230 if int(success):
231 231 yield bin(data)
232 232 self._abort(error.RepoError(data))
233 233
234 234 @batchable
235 235 def heads(self):
236 236 f = future()
237 237 yield {}, f
238 238 d = f.value
239 239 try:
240 240 yield decodelist(d[:-1])
241 241 except ValueError:
242 242 self._abort(error.ResponseError(_("unexpected response:"), d))
243 243
244 244 @batchable
245 245 def known(self, nodes):
246 246 f = future()
247 247 yield {'nodes': encodelist(nodes)}, f
248 248 d = f.value
249 249 try:
250 250 yield [bool(int(f)) for f in d]
251 251 except ValueError:
252 252 self._abort(error.ResponseError(_("unexpected response:"), d))
253 253
254 254 @batchable
255 255 def branchmap(self):
256 256 f = future()
257 257 yield {}, f
258 258 d = f.value
259 259 try:
260 260 branchmap = {}
261 261 for branchpart in d.splitlines():
262 262 branchname, branchheads = branchpart.split(' ', 1)
263 263 branchname = encoding.tolocal(urllib.unquote(branchname))
264 264 branchheads = decodelist(branchheads)
265 265 branchmap[branchname] = branchheads
266 266 yield branchmap
267 267 except TypeError:
268 268 self._abort(error.ResponseError(_("unexpected response:"), d))
269 269
270 270 def branches(self, nodes):
271 271 n = encodelist(nodes)
272 272 d = self._call("branches", nodes=n)
273 273 try:
274 274 br = [tuple(decodelist(b)) for b in d.splitlines()]
275 275 return br
276 276 except ValueError:
277 277 self._abort(error.ResponseError(_("unexpected response:"), d))
278 278
279 279 def between(self, pairs):
280 280 batch = 8 # avoid giant requests
281 281 r = []
282 282 for i in xrange(0, len(pairs), batch):
283 283 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
284 284 d = self._call("between", pairs=n)
285 285 try:
286 286 r.extend(l and decodelist(l) or [] for l in d.splitlines())
287 287 except ValueError:
288 288 self._abort(error.ResponseError(_("unexpected response:"), d))
289 289 return r
290 290
291 291 @batchable
292 292 def pushkey(self, namespace, key, old, new):
293 293 if not self.capable('pushkey'):
294 294 yield False, None
295 295 f = future()
296 296 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
297 297 yield {'namespace': encoding.fromlocal(namespace),
298 298 'key': encoding.fromlocal(key),
299 299 'old': encoding.fromlocal(old),
300 300 'new': encoding.fromlocal(new)}, f
301 301 d = f.value
302 302 d, output = d.split('\n', 1)
303 303 try:
304 304 d = bool(int(d))
305 305 except ValueError:
306 306 raise error.ResponseError(
307 307 _('push failed (unexpected response):'), d)
308 308 for l in output.splitlines(True):
309 309 self.ui.status(_('remote: '), l)
310 310 yield d
311 311
312 312 @batchable
313 313 def listkeys(self, namespace):
314 314 if not self.capable('pushkey'):
315 315 yield {}, None
316 316 f = future()
317 317 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
318 318 yield {'namespace': encoding.fromlocal(namespace)}, f
319 319 d = f.value
320 r = {}
321 for l in d.splitlines():
322 k, v = l.split('\t')
323 r[encoding.tolocal(k)] = encoding.tolocal(v)
324 yield r
320 yield pushkeymod.decodekeys(d)
325 321
326 322 def stream_out(self):
327 323 return self._callstream('stream_out')
328 324
329 325 def changegroup(self, nodes, kind):
330 326 n = encodelist(nodes)
331 327 f = self._callcompressable("changegroup", roots=n)
332 328 return changegroupmod.unbundle10(f, 'UN')
333 329
334 330 def changegroupsubset(self, bases, heads, kind):
335 331 self.requirecap('changegroupsubset', _('look up remote changes'))
336 332 bases = encodelist(bases)
337 333 heads = encodelist(heads)
338 334 f = self._callcompressable("changegroupsubset",
339 335 bases=bases, heads=heads)
340 336 return changegroupmod.unbundle10(f, 'UN')
341 337
342 338 def getbundle(self, source, **kwargs):
343 339 self.requirecap('getbundle', _('look up remote changes'))
344 340 opts = {}
345 341 for key, value in kwargs.iteritems():
346 342 if value is None:
347 343 continue
348 344 keytype = gboptsmap.get(key)
349 345 if keytype is None:
350 346 assert False, 'unexpected'
351 347 elif keytype == 'nodes':
352 348 value = encodelist(value)
353 349 elif keytype == 'csv':
354 350 value = ','.join(value)
355 351 elif keytype != 'plain':
356 352 raise KeyError('unknown getbundle option type %s'
357 353 % keytype)
358 354 opts[key] = value
359 355 f = self._callcompressable("getbundle", **opts)
360 356 bundlecaps = kwargs.get('bundlecaps')
361 357 if bundlecaps is not None and 'HG2X' in bundlecaps:
362 358 return bundle2.unbundle20(self.ui, f)
363 359 else:
364 360 return changegroupmod.unbundle10(f, 'UN')
365 361
366 362 def unbundle(self, cg, heads, source):
367 363 '''Send cg (a readable file-like object representing the
368 364 changegroup to push, typically a chunkbuffer object) to the
369 365 remote server as a bundle.
370 366
371 367 When pushing a bundle10 stream, return an integer indicating the
372 368 result of the push (see localrepository.addchangegroup()).
373 369
374 370 When pushing a bundle20 stream, return a bundle20 stream.'''
375 371
376 372 if heads != ['force'] and self.capable('unbundlehash'):
377 373 heads = encodelist(['hashed',
378 374 util.sha1(''.join(sorted(heads))).digest()])
379 375 else:
380 376 heads = encodelist(heads)
381 377
382 378 if util.safehasattr(cg, 'deltaheader'):
383 379 # this a bundle10, do the old style call sequence
384 380 ret, output = self._callpush("unbundle", cg, heads=heads)
385 381 if ret == "":
386 382 raise error.ResponseError(
387 383 _('push failed:'), output)
388 384 try:
389 385 ret = int(ret)
390 386 except ValueError:
391 387 raise error.ResponseError(
392 388 _('push failed (unexpected response):'), ret)
393 389
394 390 for l in output.splitlines(True):
395 391 self.ui.status(_('remote: '), l)
396 392 else:
397 393 # bundle2 push. Send a stream, fetch a stream.
398 394 stream = self._calltwowaystream('unbundle', cg, heads=heads)
399 395 ret = bundle2.unbundle20(self.ui, stream)
400 396 return ret
401 397
402 398 def debugwireargs(self, one, two, three=None, four=None, five=None):
403 399 # don't pass optional arguments left at their default value
404 400 opts = {}
405 401 if three is not None:
406 402 opts['three'] = three
407 403 if four is not None:
408 404 opts['four'] = four
409 405 return self._call('debugwireargs', one=one, two=two, **opts)
410 406
411 407 def _call(self, cmd, **args):
412 408 """execute <cmd> on the server
413 409
414 410 The command is expected to return a simple string.
415 411
416 412 returns the server reply as a string."""
417 413 raise NotImplementedError()
418 414
419 415 def _callstream(self, cmd, **args):
420 416 """execute <cmd> on the server
421 417
422 418 The command is expected to return a stream.
423 419
424 420 returns the server reply as a file like object."""
425 421 raise NotImplementedError()
426 422
427 423 def _callcompressable(self, cmd, **args):
428 424 """execute <cmd> on the server
429 425
430 426 The command is expected to return a stream.
431 427
432 428 The stream may have been compressed in some implementations. This
433 429 function takes care of the decompression. This is the only difference
434 430 with _callstream.
435 431
436 432 returns the server reply as a file like object.
437 433 """
438 434 raise NotImplementedError()
439 435
440 436 def _callpush(self, cmd, fp, **args):
441 437 """execute a <cmd> on server
442 438
443 439 The command is expected to be related to a push. Push has a special
444 440 return method.
445 441
446 442 returns the server reply as a (ret, output) tuple. ret is either
447 443 empty (error) or a stringified int.
448 444 """
449 445 raise NotImplementedError()
450 446
451 447 def _calltwowaystream(self, cmd, fp, **args):
452 448 """execute <cmd> on server
453 449
454 450 The command will send a stream to the server and get a stream in reply.
455 451 """
456 452 raise NotImplementedError()
457 453
458 454 def _abort(self, exception):
459 455 """clearly abort the wire protocol connection and raise the exception
460 456 """
461 457 raise NotImplementedError()
462 458
463 459 # server side
464 460
465 461 # wire protocol command can either return a string or one of these classes.
466 462 class streamres(object):
467 463 """wireproto reply: binary stream
468 464
469 465 The call was successful and the result is a stream.
470 466 Iterate on the `self.gen` attribute to retrieve chunks.
471 467 """
472 468 def __init__(self, gen):
473 469 self.gen = gen
474 470
475 471 class pushres(object):
476 472 """wireproto reply: success with simple integer return
477 473
478 474 The call was successful and returned an integer contained in `self.res`.
479 475 """
480 476 def __init__(self, res):
481 477 self.res = res
482 478
483 479 class pusherr(object):
484 480 """wireproto reply: failure
485 481
486 482 The call failed. The `self.res` attribute contains the error message.
487 483 """
488 484 def __init__(self, res):
489 485 self.res = res
490 486
491 487 class ooberror(object):
492 488 """wireproto reply: failure of a batch of operation
493 489
494 490 Something failed during a batch call. The error message is stored in
495 491 `self.message`.
496 492 """
497 493 def __init__(self, message):
498 494 self.message = message
499 495
500 496 def dispatch(repo, proto, command):
501 497 repo = repo.filtered("served")
502 498 func, spec = commands[command]
503 499 args = proto.getargs(spec)
504 500 return func(repo, proto, *args)
505 501
506 502 def options(cmd, keys, others):
507 503 opts = {}
508 504 for k in keys:
509 505 if k in others:
510 506 opts[k] = others[k]
511 507 del others[k]
512 508 if others:
513 509 sys.stderr.write("abort: %s got unexpected arguments %s\n"
514 510 % (cmd, ",".join(others)))
515 511 return opts
516 512
517 513 # list of commands
518 514 commands = {}
519 515
520 516 def wireprotocommand(name, args=''):
521 517 """decorator for wire protocol command"""
522 518 def register(func):
523 519 commands[name] = (func, args)
524 520 return func
525 521 return register
526 522
527 523 @wireprotocommand('batch', 'cmds *')
528 524 def batch(repo, proto, cmds, others):
529 525 repo = repo.filtered("served")
530 526 res = []
531 527 for pair in cmds.split(';'):
532 528 op, args = pair.split(' ', 1)
533 529 vals = {}
534 530 for a in args.split(','):
535 531 if a:
536 532 n, v = a.split('=')
537 533 vals[n] = unescapearg(v)
538 534 func, spec = commands[op]
539 535 if spec:
540 536 keys = spec.split()
541 537 data = {}
542 538 for k in keys:
543 539 if k == '*':
544 540 star = {}
545 541 for key in vals.keys():
546 542 if key not in keys:
547 543 star[key] = vals[key]
548 544 data['*'] = star
549 545 else:
550 546 data[k] = vals[k]
551 547 result = func(repo, proto, *[data[k] for k in keys])
552 548 else:
553 549 result = func(repo, proto)
554 550 if isinstance(result, ooberror):
555 551 return result
556 552 res.append(escapearg(result))
557 553 return ';'.join(res)
558 554
559 555 @wireprotocommand('between', 'pairs')
560 556 def between(repo, proto, pairs):
561 557 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
562 558 r = []
563 559 for b in repo.between(pairs):
564 560 r.append(encodelist(b) + "\n")
565 561 return "".join(r)
566 562
567 563 @wireprotocommand('branchmap')
568 564 def branchmap(repo, proto):
569 565 branchmap = repo.branchmap()
570 566 heads = []
571 567 for branch, nodes in branchmap.iteritems():
572 568 branchname = urllib.quote(encoding.fromlocal(branch))
573 569 branchnodes = encodelist(nodes)
574 570 heads.append('%s %s' % (branchname, branchnodes))
575 571 return '\n'.join(heads)
576 572
577 573 @wireprotocommand('branches', 'nodes')
578 574 def branches(repo, proto, nodes):
579 575 nodes = decodelist(nodes)
580 576 r = []
581 577 for b in repo.branches(nodes):
582 578 r.append(encodelist(b) + "\n")
583 579 return "".join(r)
584 580
585 581
586 582 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
587 583 'known', 'getbundle', 'unbundlehash', 'batch']
588 584
589 585 def _capabilities(repo, proto):
590 586 """return a list of capabilities for a repo
591 587
592 588 This function exists to allow extensions to easily wrap capabilities
593 589 computation
594 590
595 591 - returns a lists: easy to alter
596 592 - change done here will be propagated to both `capabilities` and `hello`
597 593 command without any other action needed.
598 594 """
599 595 # copy to prevent modification of the global list
600 596 caps = list(wireprotocaps)
601 597 if _allowstream(repo.ui):
602 598 if repo.ui.configbool('server', 'preferuncompressed', False):
603 599 caps.append('stream-preferred')
604 600 requiredformats = repo.requirements & repo.supportedformats
605 601 # if our local revlogs are just revlogv1, add 'stream' cap
606 602 if not requiredformats - set(('revlogv1',)):
607 603 caps.append('stream')
608 604 # otherwise, add 'streamreqs' detailing our local revlog format
609 605 else:
610 606 caps.append('streamreqs=%s' % ','.join(requiredformats))
611 607 if repo.ui.configbool('experimental', 'bundle2-exp', False):
612 608 capsblob = bundle2.encodecaps(repo.bundle2caps)
613 609 caps.append('bundle2-exp=' + urllib.quote(capsblob))
614 610 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
615 611 caps.append('httpheader=1024')
616 612 return caps
617 613
618 614 # If you are writing an extension and consider wrapping this function. Wrap
619 615 # `_capabilities` instead.
620 616 @wireprotocommand('capabilities')
621 617 def capabilities(repo, proto):
622 618 return ' '.join(_capabilities(repo, proto))
623 619
624 620 @wireprotocommand('changegroup', 'roots')
625 621 def changegroup(repo, proto, roots):
626 622 nodes = decodelist(roots)
627 623 cg = changegroupmod.changegroup(repo, nodes, 'serve')
628 624 return streamres(proto.groupchunks(cg))
629 625
630 626 @wireprotocommand('changegroupsubset', 'bases heads')
631 627 def changegroupsubset(repo, proto, bases, heads):
632 628 bases = decodelist(bases)
633 629 heads = decodelist(heads)
634 630 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
635 631 return streamres(proto.groupchunks(cg))
636 632
637 633 @wireprotocommand('debugwireargs', 'one two *')
638 634 def debugwireargs(repo, proto, one, two, others):
639 635 # only accept optional args from the known set
640 636 opts = options('debugwireargs', ['three', 'four'], others)
641 637 return repo.debugwireargs(one, two, **opts)
642 638
643 639 # List of options accepted by getbundle.
644 640 #
645 641 # Meant to be extended by extensions. It is the extension's responsibility to
646 642 # ensure such options are properly processed in exchange.getbundle.
647 643 gboptslist = ['heads', 'common', 'bundlecaps']
648 644
649 645 @wireprotocommand('getbundle', '*')
650 646 def getbundle(repo, proto, others):
651 647 opts = options('getbundle', gboptsmap.keys(), others)
652 648 for k, v in opts.iteritems():
653 649 keytype = gboptsmap[k]
654 650 if keytype == 'nodes':
655 651 opts[k] = decodelist(v)
656 652 elif keytype == 'csv':
657 653 opts[k] = set(v.split(','))
658 654 elif keytype != 'plain':
659 655 raise KeyError('unknown getbundle option type %s'
660 656 % keytype)
661 657 cg = exchange.getbundle(repo, 'serve', **opts)
662 658 return streamres(proto.groupchunks(cg))
663 659
664 660 @wireprotocommand('heads')
665 661 def heads(repo, proto):
666 662 h = repo.heads()
667 663 return encodelist(h) + "\n"
668 664
669 665 @wireprotocommand('hello')
670 666 def hello(repo, proto):
671 667 '''the hello command returns a set of lines describing various
672 668 interesting things about the server, in an RFC822-like format.
673 669 Currently the only one defined is "capabilities", which
674 670 consists of a line in the form:
675 671
676 672 capabilities: space separated list of tokens
677 673 '''
678 674 return "capabilities: %s\n" % (capabilities(repo, proto))
679 675
680 676 @wireprotocommand('listkeys', 'namespace')
681 677 def listkeys(repo, proto, namespace):
682 678 d = repo.listkeys(encoding.tolocal(namespace)).items()
683 679 return pushkeymod.encodekeys(d)
684 680
685 681 @wireprotocommand('lookup', 'key')
686 682 def lookup(repo, proto, key):
687 683 try:
688 684 k = encoding.tolocal(key)
689 685 c = repo[k]
690 686 r = c.hex()
691 687 success = 1
692 688 except Exception, inst:
693 689 r = str(inst)
694 690 success = 0
695 691 return "%s %s\n" % (success, r)
696 692
697 693 @wireprotocommand('known', 'nodes *')
698 694 def known(repo, proto, nodes, others):
699 695 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
700 696
701 697 @wireprotocommand('pushkey', 'namespace key old new')
702 698 def pushkey(repo, proto, namespace, key, old, new):
703 699 # compatibility with pre-1.8 clients which were accidentally
704 700 # sending raw binary nodes rather than utf-8-encoded hex
705 701 if len(new) == 20 and new.encode('string-escape') != new:
706 702 # looks like it could be a binary node
707 703 try:
708 704 new.decode('utf-8')
709 705 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
710 706 except UnicodeDecodeError:
711 707 pass # binary, leave unmodified
712 708 else:
713 709 new = encoding.tolocal(new) # normal path
714 710
715 711 if util.safehasattr(proto, 'restore'):
716 712
717 713 proto.redirect()
718 714
719 715 try:
720 716 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
721 717 encoding.tolocal(old), new) or False
722 718 except util.Abort:
723 719 r = False
724 720
725 721 output = proto.restore()
726 722
727 723 return '%s\n%s' % (int(r), output)
728 724
729 725 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
730 726 encoding.tolocal(old), new)
731 727 return '%s\n' % int(r)
732 728
733 729 def _allowstream(ui):
734 730 return ui.configbool('server', 'uncompressed', True, untrusted=True)
735 731
736 732 def _walkstreamfiles(repo):
737 733 # this is it's own function so extensions can override it
738 734 return repo.store.walk()
739 735
740 736 @wireprotocommand('stream_out')
741 737 def stream(repo, proto):
742 738 '''If the server supports streaming clone, it advertises the "stream"
743 739 capability with a value representing the version and flags of the repo
744 740 it is serving. Client checks to see if it understands the format.
745 741
746 742 The format is simple: the server writes out a line with the amount
747 743 of files, then the total amount of bytes to be transferred (separated
748 744 by a space). Then, for each file, the server first writes the filename
749 745 and file size (separated by the null character), then the file contents.
750 746 '''
751 747
752 748 if not _allowstream(repo.ui):
753 749 return '1\n'
754 750
755 751 entries = []
756 752 total_bytes = 0
757 753 try:
758 754 # get consistent snapshot of repo, lock during scan
759 755 lock = repo.lock()
760 756 try:
761 757 repo.ui.debug('scanning\n')
762 758 for name, ename, size in _walkstreamfiles(repo):
763 759 if size:
764 760 entries.append((name, size))
765 761 total_bytes += size
766 762 finally:
767 763 lock.release()
768 764 except error.LockError:
769 765 return '2\n' # error: 2
770 766
771 767 def streamer(repo, entries, total):
772 768 '''stream out all metadata files in repository.'''
773 769 yield '0\n' # success
774 770 repo.ui.debug('%d files, %d bytes to transfer\n' %
775 771 (len(entries), total_bytes))
776 772 yield '%d %d\n' % (len(entries), total_bytes)
777 773
778 774 sopener = repo.sopener
779 775 oldaudit = sopener.mustaudit
780 776 debugflag = repo.ui.debugflag
781 777 sopener.mustaudit = False
782 778
783 779 try:
784 780 for name, size in entries:
785 781 if debugflag:
786 782 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
787 783 # partially encode name over the wire for backwards compat
788 784 yield '%s\0%d\n' % (store.encodedir(name), size)
789 785 if size <= 65536:
790 786 fp = sopener(name)
791 787 try:
792 788 data = fp.read(size)
793 789 finally:
794 790 fp.close()
795 791 yield data
796 792 else:
797 793 for chunk in util.filechunkiter(sopener(name), limit=size):
798 794 yield chunk
799 795 # replace with "finally:" when support for python 2.4 has been dropped
800 796 except Exception:
801 797 sopener.mustaudit = oldaudit
802 798 raise
803 799 sopener.mustaudit = oldaudit
804 800
805 801 return streamres(streamer(repo, entries, total_bytes))
806 802
807 803 @wireprotocommand('unbundle', 'heads')
808 804 def unbundle(repo, proto, heads):
809 805 their_heads = decodelist(heads)
810 806
811 807 try:
812 808 proto.redirect()
813 809
814 810 exchange.check_heads(repo, their_heads, 'preparing changes')
815 811
816 812 # write bundle data to temporary file because it can be big
817 813 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
818 814 fp = os.fdopen(fd, 'wb+')
819 815 r = 0
820 816 try:
821 817 proto.getfile(fp)
822 818 fp.seek(0)
823 819 gen = exchange.readbundle(repo.ui, fp, None)
824 820 r = exchange.unbundle(repo, gen, their_heads, 'serve',
825 821 proto._client())
826 822 if util.safehasattr(r, 'addpart'):
827 823 # The return looks streameable, we are in the bundle2 case and
828 824 # should return a stream.
829 825 return streamres(r.getchunks())
830 826 return pushres(r)
831 827
832 828 finally:
833 829 fp.close()
834 830 os.unlink(tempname)
835 831 except error.BundleValueError, exc:
836 832 bundler = bundle2.bundle20(repo.ui)
837 833 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
838 834 if exc.parttype is not None:
839 835 errpart.addparam('parttype', exc.parttype)
840 836 if exc.params:
841 837 errpart.addparam('params', '\0'.join(exc.params))
842 838 return streamres(bundler.getchunks())
843 839 except util.Abort, inst:
844 840 # The old code we moved used sys.stderr directly.
845 841 # We did not change it to minimise code change.
846 842 # This need to be moved to something proper.
847 843 # Feel free to do it.
848 844 if getattr(inst, 'duringunbundle2', False):
849 845 bundler = bundle2.bundle20(repo.ui)
850 846 manargs = [('message', str(inst))]
851 847 advargs = []
852 848 if inst.hint is not None:
853 849 advargs.append(('hint', inst.hint))
854 850 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
855 851 manargs, advargs))
856 852 return streamres(bundler.getchunks())
857 853 else:
858 854 sys.stderr.write("abort: %s\n" % inst)
859 855 return pushres(0)
860 856 except error.PushRaced, exc:
861 857 if getattr(exc, 'duringunbundle2', False):
862 858 bundler = bundle2.bundle20(repo.ui)
863 859 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
864 860 return streamres(bundler.getchunks())
865 861 else:
866 862 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now