##// END OF EJS Templates
bundle2: refactor error bundle creation for the wireprotocol...
Pierre-Yves David -
r24796:61ff209f default
parent child Browse files
Show More
@@ -1,875 +1,878 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 12 import peer, error, encoding, util, store, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 try:
176 176 return sep.join(map(hex, l))
177 177 except TypeError:
178 178 print l
179 179 raise
180 180
181 181 # batched call argument encoding
182 182
183 183 def escapearg(plain):
184 184 return (plain
185 185 .replace(':', '::')
186 186 .replace(',', ':,')
187 187 .replace(';', ':;')
188 188 .replace('=', ':='))
189 189
190 190 def unescapearg(escaped):
191 191 return (escaped
192 192 .replace(':=', '=')
193 193 .replace(':;', ';')
194 194 .replace(':,', ',')
195 195 .replace('::', ':'))
196 196
197 197 # mapping of options accepted by getbundle and their types
198 198 #
199 199 # Meant to be extended by extensions. It is extensions responsibility to ensure
200 200 # such options are properly processed in exchange.getbundle.
201 201 #
202 202 # supported types are:
203 203 #
204 204 # :nodes: list of binary nodes
205 205 # :csv: list of comma-separated values
206 206 # :plain: string with no transformation needed.
207 207 gboptsmap = {'heads': 'nodes',
208 208 'common': 'nodes',
209 209 'obsmarkers': 'boolean',
210 210 'bundlecaps': 'csv',
211 211 'listkeys': 'csv',
212 212 'cg': 'boolean'}
213 213
214 214 # client side
215 215
216 216 class wirepeer(peer.peerrepository):
217 217
218 218 def batch(self):
219 219 return remotebatch(self)
220 220 def _submitbatch(self, req):
221 221 cmds = []
222 222 for op, argsdict in req:
223 223 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
224 224 cmds.append('%s %s' % (op, args))
225 225 rsp = self._call("batch", cmds=';'.join(cmds))
226 226 return rsp.split(';')
227 227 def _submitone(self, op, args):
228 228 return self._call(op, **args)
229 229
230 230 @batchable
231 231 def lookup(self, key):
232 232 self.requirecap('lookup', _('look up remote revision'))
233 233 f = future()
234 234 yield {'key': encoding.fromlocal(key)}, f
235 235 d = f.value
236 236 success, data = d[:-1].split(" ", 1)
237 237 if int(success):
238 238 yield bin(data)
239 239 self._abort(error.RepoError(data))
240 240
241 241 @batchable
242 242 def heads(self):
243 243 f = future()
244 244 yield {}, f
245 245 d = f.value
246 246 try:
247 247 yield decodelist(d[:-1])
248 248 except ValueError:
249 249 self._abort(error.ResponseError(_("unexpected response:"), d))
250 250
251 251 @batchable
252 252 def known(self, nodes):
253 253 f = future()
254 254 yield {'nodes': encodelist(nodes)}, f
255 255 d = f.value
256 256 try:
257 257 yield [bool(int(b)) for b in d]
258 258 except ValueError:
259 259 self._abort(error.ResponseError(_("unexpected response:"), d))
260 260
261 261 @batchable
262 262 def branchmap(self):
263 263 f = future()
264 264 yield {}, f
265 265 d = f.value
266 266 try:
267 267 branchmap = {}
268 268 for branchpart in d.splitlines():
269 269 branchname, branchheads = branchpart.split(' ', 1)
270 270 branchname = encoding.tolocal(urllib.unquote(branchname))
271 271 branchheads = decodelist(branchheads)
272 272 branchmap[branchname] = branchheads
273 273 yield branchmap
274 274 except TypeError:
275 275 self._abort(error.ResponseError(_("unexpected response:"), d))
276 276
277 277 def branches(self, nodes):
278 278 n = encodelist(nodes)
279 279 d = self._call("branches", nodes=n)
280 280 try:
281 281 br = [tuple(decodelist(b)) for b in d.splitlines()]
282 282 return br
283 283 except ValueError:
284 284 self._abort(error.ResponseError(_("unexpected response:"), d))
285 285
286 286 def between(self, pairs):
287 287 batch = 8 # avoid giant requests
288 288 r = []
289 289 for i in xrange(0, len(pairs), batch):
290 290 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
291 291 d = self._call("between", pairs=n)
292 292 try:
293 293 r.extend(l and decodelist(l) or [] for l in d.splitlines())
294 294 except ValueError:
295 295 self._abort(error.ResponseError(_("unexpected response:"), d))
296 296 return r
297 297
298 298 @batchable
299 299 def pushkey(self, namespace, key, old, new):
300 300 if not self.capable('pushkey'):
301 301 yield False, None
302 302 f = future()
303 303 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
304 304 yield {'namespace': encoding.fromlocal(namespace),
305 305 'key': encoding.fromlocal(key),
306 306 'old': encoding.fromlocal(old),
307 307 'new': encoding.fromlocal(new)}, f
308 308 d = f.value
309 309 d, output = d.split('\n', 1)
310 310 try:
311 311 d = bool(int(d))
312 312 except ValueError:
313 313 raise error.ResponseError(
314 314 _('push failed (unexpected response):'), d)
315 315 for l in output.splitlines(True):
316 316 self.ui.status(_('remote: '), l)
317 317 yield d
318 318
319 319 @batchable
320 320 def listkeys(self, namespace):
321 321 if not self.capable('pushkey'):
322 322 yield {}, None
323 323 f = future()
324 324 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
325 325 yield {'namespace': encoding.fromlocal(namespace)}, f
326 326 d = f.value
327 327 yield pushkeymod.decodekeys(d)
328 328
329 329 def stream_out(self):
330 330 return self._callstream('stream_out')
331 331
332 332 def changegroup(self, nodes, kind):
333 333 n = encodelist(nodes)
334 334 f = self._callcompressable("changegroup", roots=n)
335 335 return changegroupmod.cg1unpacker(f, 'UN')
336 336
337 337 def changegroupsubset(self, bases, heads, kind):
338 338 self.requirecap('changegroupsubset', _('look up remote changes'))
339 339 bases = encodelist(bases)
340 340 heads = encodelist(heads)
341 341 f = self._callcompressable("changegroupsubset",
342 342 bases=bases, heads=heads)
343 343 return changegroupmod.cg1unpacker(f, 'UN')
344 344
345 345 def getbundle(self, source, **kwargs):
346 346 self.requirecap('getbundle', _('look up remote changes'))
347 347 opts = {}
348 348 for key, value in kwargs.iteritems():
349 349 if value is None:
350 350 continue
351 351 keytype = gboptsmap.get(key)
352 352 if keytype is None:
353 353 assert False, 'unexpected'
354 354 elif keytype == 'nodes':
355 355 value = encodelist(value)
356 356 elif keytype == 'csv':
357 357 value = ','.join(value)
358 358 elif keytype == 'boolean':
359 359 value = '%i' % bool(value)
360 360 elif keytype != 'plain':
361 361 raise KeyError('unknown getbundle option type %s'
362 362 % keytype)
363 363 opts[key] = value
364 364 f = self._callcompressable("getbundle", **opts)
365 365 bundlecaps = kwargs.get('bundlecaps')
366 366 if bundlecaps is None:
367 367 bundlecaps = () # kwargs could have it to None
368 368 if util.any((cap.startswith('HG2') for cap in bundlecaps)):
369 369 return bundle2.getunbundler(self.ui, f)
370 370 else:
371 371 return changegroupmod.cg1unpacker(f, 'UN')
372 372
373 373 def unbundle(self, cg, heads, source):
374 374 '''Send cg (a readable file-like object representing the
375 375 changegroup to push, typically a chunkbuffer object) to the
376 376 remote server as a bundle.
377 377
378 378 When pushing a bundle10 stream, return an integer indicating the
379 379 result of the push (see localrepository.addchangegroup()).
380 380
381 381 When pushing a bundle20 stream, return a bundle20 stream.'''
382 382
383 383 if heads != ['force'] and self.capable('unbundlehash'):
384 384 heads = encodelist(['hashed',
385 385 util.sha1(''.join(sorted(heads))).digest()])
386 386 else:
387 387 heads = encodelist(heads)
388 388
389 389 if util.safehasattr(cg, 'deltaheader'):
390 390 # this a bundle10, do the old style call sequence
391 391 ret, output = self._callpush("unbundle", cg, heads=heads)
392 392 if ret == "":
393 393 raise error.ResponseError(
394 394 _('push failed:'), output)
395 395 try:
396 396 ret = int(ret)
397 397 except ValueError:
398 398 raise error.ResponseError(
399 399 _('push failed (unexpected response):'), ret)
400 400
401 401 for l in output.splitlines(True):
402 402 self.ui.status(_('remote: '), l)
403 403 else:
404 404 # bundle2 push. Send a stream, fetch a stream.
405 405 stream = self._calltwowaystream('unbundle', cg, heads=heads)
406 406 ret = bundle2.getunbundler(self.ui, stream)
407 407 return ret
408 408
409 409 def debugwireargs(self, one, two, three=None, four=None, five=None):
410 410 # don't pass optional arguments left at their default value
411 411 opts = {}
412 412 if three is not None:
413 413 opts['three'] = three
414 414 if four is not None:
415 415 opts['four'] = four
416 416 return self._call('debugwireargs', one=one, two=two, **opts)
417 417
418 418 def _call(self, cmd, **args):
419 419 """execute <cmd> on the server
420 420
421 421 The command is expected to return a simple string.
422 422
423 423 returns the server reply as a string."""
424 424 raise NotImplementedError()
425 425
426 426 def _callstream(self, cmd, **args):
427 427 """execute <cmd> on the server
428 428
429 429 The command is expected to return a stream.
430 430
431 431 returns the server reply as a file like object."""
432 432 raise NotImplementedError()
433 433
434 434 def _callcompressable(self, cmd, **args):
435 435 """execute <cmd> on the server
436 436
437 437 The command is expected to return a stream.
438 438
439 439 The stream may have been compressed in some implementations. This
440 440 function takes care of the decompression. This is the only difference
441 441 with _callstream.
442 442
443 443 returns the server reply as a file like object.
444 444 """
445 445 raise NotImplementedError()
446 446
447 447 def _callpush(self, cmd, fp, **args):
448 448 """execute a <cmd> on server
449 449
450 450 The command is expected to be related to a push. Push has a special
451 451 return method.
452 452
453 453 returns the server reply as a (ret, output) tuple. ret is either
454 454 empty (error) or a stringified int.
455 455 """
456 456 raise NotImplementedError()
457 457
458 458 def _calltwowaystream(self, cmd, fp, **args):
459 459 """execute <cmd> on server
460 460
461 461 The command will send a stream to the server and get a stream in reply.
462 462 """
463 463 raise NotImplementedError()
464 464
465 465 def _abort(self, exception):
466 466 """clearly abort the wire protocol connection and raise the exception
467 467 """
468 468 raise NotImplementedError()
469 469
470 470 # server side
471 471
472 472 # wire protocol command can either return a string or one of these classes.
473 473 class streamres(object):
474 474 """wireproto reply: binary stream
475 475
476 476 The call was successful and the result is a stream.
477 477 Iterate on the `self.gen` attribute to retrieve chunks.
478 478 """
479 479 def __init__(self, gen):
480 480 self.gen = gen
481 481
482 482 class pushres(object):
483 483 """wireproto reply: success with simple integer return
484 484
485 485 The call was successful and returned an integer contained in `self.res`.
486 486 """
487 487 def __init__(self, res):
488 488 self.res = res
489 489
490 490 class pusherr(object):
491 491 """wireproto reply: failure
492 492
493 493 The call failed. The `self.res` attribute contains the error message.
494 494 """
495 495 def __init__(self, res):
496 496 self.res = res
497 497
498 498 class ooberror(object):
499 499 """wireproto reply: failure of a batch of operation
500 500
501 501 Something failed during a batch call. The error message is stored in
502 502 `self.message`.
503 503 """
504 504 def __init__(self, message):
505 505 self.message = message
506 506
507 507 def dispatch(repo, proto, command):
508 508 repo = repo.filtered("served")
509 509 func, spec = commands[command]
510 510 args = proto.getargs(spec)
511 511 return func(repo, proto, *args)
512 512
513 513 def options(cmd, keys, others):
514 514 opts = {}
515 515 for k in keys:
516 516 if k in others:
517 517 opts[k] = others[k]
518 518 del others[k]
519 519 if others:
520 520 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
521 521 % (cmd, ",".join(others)))
522 522 return opts
523 523
524 524 # list of commands
525 525 commands = {}
526 526
527 527 def wireprotocommand(name, args=''):
528 528 """decorator for wire protocol command"""
529 529 def register(func):
530 530 commands[name] = (func, args)
531 531 return func
532 532 return register
533 533
534 534 @wireprotocommand('batch', 'cmds *')
535 535 def batch(repo, proto, cmds, others):
536 536 repo = repo.filtered("served")
537 537 res = []
538 538 for pair in cmds.split(';'):
539 539 op, args = pair.split(' ', 1)
540 540 vals = {}
541 541 for a in args.split(','):
542 542 if a:
543 543 n, v = a.split('=')
544 544 vals[n] = unescapearg(v)
545 545 func, spec = commands[op]
546 546 if spec:
547 547 keys = spec.split()
548 548 data = {}
549 549 for k in keys:
550 550 if k == '*':
551 551 star = {}
552 552 for key in vals.keys():
553 553 if key not in keys:
554 554 star[key] = vals[key]
555 555 data['*'] = star
556 556 else:
557 557 data[k] = vals[k]
558 558 result = func(repo, proto, *[data[k] for k in keys])
559 559 else:
560 560 result = func(repo, proto)
561 561 if isinstance(result, ooberror):
562 562 return result
563 563 res.append(escapearg(result))
564 564 return ';'.join(res)
565 565
566 566 @wireprotocommand('between', 'pairs')
567 567 def between(repo, proto, pairs):
568 568 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
569 569 r = []
570 570 for b in repo.between(pairs):
571 571 r.append(encodelist(b) + "\n")
572 572 return "".join(r)
573 573
574 574 @wireprotocommand('branchmap')
575 575 def branchmap(repo, proto):
576 576 branchmap = repo.branchmap()
577 577 heads = []
578 578 for branch, nodes in branchmap.iteritems():
579 579 branchname = urllib.quote(encoding.fromlocal(branch))
580 580 branchnodes = encodelist(nodes)
581 581 heads.append('%s %s' % (branchname, branchnodes))
582 582 return '\n'.join(heads)
583 583
584 584 @wireprotocommand('branches', 'nodes')
585 585 def branches(repo, proto, nodes):
586 586 nodes = decodelist(nodes)
587 587 r = []
588 588 for b in repo.branches(nodes):
589 589 r.append(encodelist(b) + "\n")
590 590 return "".join(r)
591 591
592 592
593 593 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
594 594 'known', 'getbundle', 'unbundlehash', 'batch']
595 595
596 596 def _capabilities(repo, proto):
597 597 """return a list of capabilities for a repo
598 598
599 599 This function exists to allow extensions to easily wrap capabilities
600 600 computation
601 601
602 602 - returns a lists: easy to alter
603 603 - change done here will be propagated to both `capabilities` and `hello`
604 604 command without any other action needed.
605 605 """
606 606 # copy to prevent modification of the global list
607 607 caps = list(wireprotocaps)
608 608 if _allowstream(repo.ui):
609 609 if repo.ui.configbool('server', 'preferuncompressed', False):
610 610 caps.append('stream-preferred')
611 611 requiredformats = repo.requirements & repo.supportedformats
612 612 # if our local revlogs are just revlogv1, add 'stream' cap
613 613 if not requiredformats - set(('revlogv1',)):
614 614 caps.append('stream')
615 615 # otherwise, add 'streamreqs' detailing our local revlog format
616 616 else:
617 617 caps.append('streamreqs=%s' % ','.join(requiredformats))
618 618 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
619 619 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
620 620 caps.append('bundle2=' + urllib.quote(capsblob))
621 621 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
622 622 caps.append('httpheader=1024')
623 623 return caps
624 624
625 625 # If you are writing an extension and consider wrapping this function. Wrap
626 626 # `_capabilities` instead.
627 627 @wireprotocommand('capabilities')
628 628 def capabilities(repo, proto):
629 629 return ' '.join(_capabilities(repo, proto))
630 630
631 631 @wireprotocommand('changegroup', 'roots')
632 632 def changegroup(repo, proto, roots):
633 633 nodes = decodelist(roots)
634 634 cg = changegroupmod.changegroup(repo, nodes, 'serve')
635 635 return streamres(proto.groupchunks(cg))
636 636
637 637 @wireprotocommand('changegroupsubset', 'bases heads')
638 638 def changegroupsubset(repo, proto, bases, heads):
639 639 bases = decodelist(bases)
640 640 heads = decodelist(heads)
641 641 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
642 642 return streamres(proto.groupchunks(cg))
643 643
644 644 @wireprotocommand('debugwireargs', 'one two *')
645 645 def debugwireargs(repo, proto, one, two, others):
646 646 # only accept optional args from the known set
647 647 opts = options('debugwireargs', ['three', 'four'], others)
648 648 return repo.debugwireargs(one, two, **opts)
649 649
650 650 # List of options accepted by getbundle.
651 651 #
652 652 # Meant to be extended by extensions. It is the extension's responsibility to
653 653 # ensure such options are properly processed in exchange.getbundle.
654 654 gboptslist = ['heads', 'common', 'bundlecaps']
655 655
656 656 @wireprotocommand('getbundle', '*')
657 657 def getbundle(repo, proto, others):
658 658 opts = options('getbundle', gboptsmap.keys(), others)
659 659 for k, v in opts.iteritems():
660 660 keytype = gboptsmap[k]
661 661 if keytype == 'nodes':
662 662 opts[k] = decodelist(v)
663 663 elif keytype == 'csv':
664 664 opts[k] = set(v.split(','))
665 665 elif keytype == 'boolean':
666 666 opts[k] = bool(v)
667 667 elif keytype != 'plain':
668 668 raise KeyError('unknown getbundle option type %s'
669 669 % keytype)
670 670 cg = exchange.getbundle(repo, 'serve', **opts)
671 671 return streamres(proto.groupchunks(cg))
672 672
673 673 @wireprotocommand('heads')
674 674 def heads(repo, proto):
675 675 h = repo.heads()
676 676 return encodelist(h) + "\n"
677 677
678 678 @wireprotocommand('hello')
679 679 def hello(repo, proto):
680 680 '''the hello command returns a set of lines describing various
681 681 interesting things about the server, in an RFC822-like format.
682 682 Currently the only one defined is "capabilities", which
683 683 consists of a line in the form:
684 684
685 685 capabilities: space separated list of tokens
686 686 '''
687 687 return "capabilities: %s\n" % (capabilities(repo, proto))
688 688
689 689 @wireprotocommand('listkeys', 'namespace')
690 690 def listkeys(repo, proto, namespace):
691 691 d = repo.listkeys(encoding.tolocal(namespace)).items()
692 692 return pushkeymod.encodekeys(d)
693 693
694 694 @wireprotocommand('lookup', 'key')
695 695 def lookup(repo, proto, key):
696 696 try:
697 697 k = encoding.tolocal(key)
698 698 c = repo[k]
699 699 r = c.hex()
700 700 success = 1
701 701 except Exception, inst:
702 702 r = str(inst)
703 703 success = 0
704 704 return "%s %s\n" % (success, r)
705 705
706 706 @wireprotocommand('known', 'nodes *')
707 707 def known(repo, proto, nodes, others):
708 708 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
709 709
710 710 @wireprotocommand('pushkey', 'namespace key old new')
711 711 def pushkey(repo, proto, namespace, key, old, new):
712 712 # compatibility with pre-1.8 clients which were accidentally
713 713 # sending raw binary nodes rather than utf-8-encoded hex
714 714 if len(new) == 20 and new.encode('string-escape') != new:
715 715 # looks like it could be a binary node
716 716 try:
717 717 new.decode('utf-8')
718 718 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
719 719 except UnicodeDecodeError:
720 720 pass # binary, leave unmodified
721 721 else:
722 722 new = encoding.tolocal(new) # normal path
723 723
724 724 if util.safehasattr(proto, 'restore'):
725 725
726 726 proto.redirect()
727 727
728 728 try:
729 729 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
730 730 encoding.tolocal(old), new) or False
731 731 except util.Abort:
732 732 r = False
733 733
734 734 output = proto.restore()
735 735
736 736 return '%s\n%s' % (int(r), output)
737 737
738 738 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
739 739 encoding.tolocal(old), new)
740 740 return '%s\n' % int(r)
741 741
742 742 def _allowstream(ui):
743 743 return ui.configbool('server', 'uncompressed', True, untrusted=True)
744 744
745 745 def _walkstreamfiles(repo):
746 746 # this is it's own function so extensions can override it
747 747 return repo.store.walk()
748 748
749 749 @wireprotocommand('stream_out')
750 750 def stream(repo, proto):
751 751 '''If the server supports streaming clone, it advertises the "stream"
752 752 capability with a value representing the version and flags of the repo
753 753 it is serving. Client checks to see if it understands the format.
754 754
755 755 The format is simple: the server writes out a line with the amount
756 756 of files, then the total amount of bytes to be transferred (separated
757 757 by a space). Then, for each file, the server first writes the filename
758 758 and file size (separated by the null character), then the file contents.
759 759 '''
760 760
761 761 if not _allowstream(repo.ui):
762 762 return '1\n'
763 763
764 764 entries = []
765 765 total_bytes = 0
766 766 try:
767 767 # get consistent snapshot of repo, lock during scan
768 768 lock = repo.lock()
769 769 try:
770 770 repo.ui.debug('scanning\n')
771 771 for name, ename, size in _walkstreamfiles(repo):
772 772 if size:
773 773 entries.append((name, size))
774 774 total_bytes += size
775 775 finally:
776 776 lock.release()
777 777 except error.LockError:
778 778 return '2\n' # error: 2
779 779
780 780 def streamer(repo, entries, total):
781 781 '''stream out all metadata files in repository.'''
782 782 yield '0\n' # success
783 783 repo.ui.debug('%d files, %d bytes to transfer\n' %
784 784 (len(entries), total_bytes))
785 785 yield '%d %d\n' % (len(entries), total_bytes)
786 786
787 787 sopener = repo.svfs
788 788 oldaudit = sopener.mustaudit
789 789 debugflag = repo.ui.debugflag
790 790 sopener.mustaudit = False
791 791
792 792 try:
793 793 for name, size in entries:
794 794 if debugflag:
795 795 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
796 796 # partially encode name over the wire for backwards compat
797 797 yield '%s\0%d\n' % (store.encodedir(name), size)
798 798 if size <= 65536:
799 799 fp = sopener(name)
800 800 try:
801 801 data = fp.read(size)
802 802 finally:
803 803 fp.close()
804 804 yield data
805 805 else:
806 806 for chunk in util.filechunkiter(sopener(name), limit=size):
807 807 yield chunk
808 808 # replace with "finally:" when support for python 2.4 has been dropped
809 809 except Exception:
810 810 sopener.mustaudit = oldaudit
811 811 raise
812 812 sopener.mustaudit = oldaudit
813 813
814 814 return streamres(streamer(repo, entries, total_bytes))
815 815
816 816 @wireprotocommand('unbundle', 'heads')
817 817 def unbundle(repo, proto, heads):
818 818 their_heads = decodelist(heads)
819 819
820 820 try:
821 821 proto.redirect()
822 822
823 823 exchange.check_heads(repo, their_heads, 'preparing changes')
824 824
825 825 # write bundle data to temporary file because it can be big
826 826 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
827 827 fp = os.fdopen(fd, 'wb+')
828 828 r = 0
829 829 try:
830 830 proto.getfile(fp)
831 831 fp.seek(0)
832 832 gen = exchange.readbundle(repo.ui, fp, None)
833 833 r = exchange.unbundle(repo, gen, their_heads, 'serve',
834 834 proto._client())
835 835 if util.safehasattr(r, 'addpart'):
836 836 # The return looks streamable, we are in the bundle2 case and
837 837 # should return a stream.
838 838 return streamres(r.getchunks())
839 839 return pushres(r)
840 840
841 841 finally:
842 842 fp.close()
843 843 os.unlink(tempname)
844 except error.BundleValueError, exc:
845 bundler = bundle2.bundle20(repo.ui)
844
845 except (error.BundleValueError, util.Abort, error.PushRaced), exc:
846 # handle non-bundle2 case first
847 if not getattr(exc, 'duringunbundle2', False):
848 try:
849 raise
850 except util.Abort:
851 # The old code we moved used sys.stderr directly.
852 # We did not change it to minimise code change.
853 # This need to be moved to something proper.
854 # Feel free to do it.
855 sys.stderr.write("abort: %s\n" % exc)
856 return pushres(0)
857 except error.PushRaced:
858 return pusherr(str(exc))
859
860 bundler = bundle2.bundle20(repo.ui)
861 try:
862 raise
863 except error.BundleValueError, exc:
846 864 errpart = bundler.newpart('error:unsupportedcontent')
847 865 if exc.parttype is not None:
848 866 errpart.addparam('parttype', exc.parttype)
849 867 if exc.params:
850 868 errpart.addparam('params', '\0'.join(exc.params))
851 return streamres(bundler.getchunks())
852 except util.Abort, inst:
853 # The old code we moved used sys.stderr directly.
854 # We did not change it to minimise code change.
855 # This need to be moved to something proper.
856 # Feel free to do it.
857 if getattr(inst, 'duringunbundle2', False):
858 bundler = bundle2.bundle20(repo.ui)
859 manargs = [('message', str(inst))]
869 except util.Abort, exc:
870 manargs = [('message', str(exc))]
860 871 advargs = []
861 if inst.hint is not None:
862 advargs.append(('hint', inst.hint))
872 if exc.hint is not None:
873 advargs.append(('hint', exc.hint))
863 874 bundler.addpart(bundle2.bundlepart('error:abort',
864 875 manargs, advargs))
865 return streamres(bundler.getchunks())
866 else:
867 sys.stderr.write("abort: %s\n" % inst)
868 return pushres(0)
869 except error.PushRaced, exc:
870 if getattr(exc, 'duringunbundle2', False):
871 bundler = bundle2.bundle20(repo.ui)
876 except error.PushRaced, exc:
872 877 bundler.newpart('error:pushraced', [('message', str(exc))])
873 return streamres(bundler.getchunks())
874 else:
875 return pusherr(str(exc))
878 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now