##// END OF EJS Templates
wireproto: expose the list of getbundle arguments to extensions...
Pierre-Yves David -
r21615:3cb2da25 stable
parent child Browse files
Show More
@@ -1,837 +1,843 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2
12 12 import peer, error, encoding, util, store, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # client side
194 194
195 195 class wirepeer(peer.peerrepository):
196 196
197 197 def batch(self):
198 198 return remotebatch(self)
199 199 def _submitbatch(self, req):
200 200 cmds = []
201 201 for op, argsdict in req:
202 202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 203 cmds.append('%s %s' % (op, args))
204 204 rsp = self._call("batch", cmds=';'.join(cmds))
205 205 return rsp.split(';')
206 206 def _submitone(self, op, args):
207 207 return self._call(op, **args)
208 208
209 209 @batchable
210 210 def lookup(self, key):
211 211 self.requirecap('lookup', _('look up remote revision'))
212 212 f = future()
213 213 yield {'key': encoding.fromlocal(key)}, f
214 214 d = f.value
215 215 success, data = d[:-1].split(" ", 1)
216 216 if int(success):
217 217 yield bin(data)
218 218 self._abort(error.RepoError(data))
219 219
220 220 @batchable
221 221 def heads(self):
222 222 f = future()
223 223 yield {}, f
224 224 d = f.value
225 225 try:
226 226 yield decodelist(d[:-1])
227 227 except ValueError:
228 228 self._abort(error.ResponseError(_("unexpected response:"), d))
229 229
230 230 @batchable
231 231 def known(self, nodes):
232 232 f = future()
233 233 yield {'nodes': encodelist(nodes)}, f
234 234 d = f.value
235 235 try:
236 236 yield [bool(int(f)) for f in d]
237 237 except ValueError:
238 238 self._abort(error.ResponseError(_("unexpected response:"), d))
239 239
240 240 @batchable
241 241 def branchmap(self):
242 242 f = future()
243 243 yield {}, f
244 244 d = f.value
245 245 try:
246 246 branchmap = {}
247 247 for branchpart in d.splitlines():
248 248 branchname, branchheads = branchpart.split(' ', 1)
249 249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 250 branchheads = decodelist(branchheads)
251 251 branchmap[branchname] = branchheads
252 252 yield branchmap
253 253 except TypeError:
254 254 self._abort(error.ResponseError(_("unexpected response:"), d))
255 255
256 256 def branches(self, nodes):
257 257 n = encodelist(nodes)
258 258 d = self._call("branches", nodes=n)
259 259 try:
260 260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 261 return br
262 262 except ValueError:
263 263 self._abort(error.ResponseError(_("unexpected response:"), d))
264 264
265 265 def between(self, pairs):
266 266 batch = 8 # avoid giant requests
267 267 r = []
268 268 for i in xrange(0, len(pairs), batch):
269 269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 270 d = self._call("between", pairs=n)
271 271 try:
272 272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 273 except ValueError:
274 274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 275 return r
276 276
277 277 @batchable
278 278 def pushkey(self, namespace, key, old, new):
279 279 if not self.capable('pushkey'):
280 280 yield False, None
281 281 f = future()
282 282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 283 yield {'namespace': encoding.fromlocal(namespace),
284 284 'key': encoding.fromlocal(key),
285 285 'old': encoding.fromlocal(old),
286 286 'new': encoding.fromlocal(new)}, f
287 287 d = f.value
288 288 d, output = d.split('\n', 1)
289 289 try:
290 290 d = bool(int(d))
291 291 except ValueError:
292 292 raise error.ResponseError(
293 293 _('push failed (unexpected response):'), d)
294 294 for l in output.splitlines(True):
295 295 self.ui.status(_('remote: '), l)
296 296 yield d
297 297
298 298 @batchable
299 299 def listkeys(self, namespace):
300 300 if not self.capable('pushkey'):
301 301 yield {}, None
302 302 f = future()
303 303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 305 d = f.value
306 306 r = {}
307 307 for l in d.splitlines():
308 308 k, v = l.split('\t')
309 309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 310 yield r
311 311
312 312 def stream_out(self):
313 313 return self._callstream('stream_out')
314 314
315 315 def changegroup(self, nodes, kind):
316 316 n = encodelist(nodes)
317 317 f = self._callcompressable("changegroup", roots=n)
318 318 return changegroupmod.unbundle10(f, 'UN')
319 319
320 320 def changegroupsubset(self, bases, heads, kind):
321 321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 322 bases = encodelist(bases)
323 323 heads = encodelist(heads)
324 324 f = self._callcompressable("changegroupsubset",
325 325 bases=bases, heads=heads)
326 326 return changegroupmod.unbundle10(f, 'UN')
327 327
328 328 def getbundle(self, source, heads=None, common=None, bundlecaps=None,
329 329 **kwargs):
330 330 self.requirecap('getbundle', _('look up remote changes'))
331 331 opts = {}
332 332 if heads is not None:
333 333 opts['heads'] = encodelist(heads)
334 334 if common is not None:
335 335 opts['common'] = encodelist(common)
336 336 if bundlecaps is not None:
337 337 opts['bundlecaps'] = ','.join(bundlecaps)
338 338 opts.update(kwargs)
339 339 f = self._callcompressable("getbundle", **opts)
340 340 if bundlecaps is not None and 'HG2X' in bundlecaps:
341 341 return bundle2.unbundle20(self.ui, f)
342 342 else:
343 343 return changegroupmod.unbundle10(f, 'UN')
344 344
345 345 def unbundle(self, cg, heads, source):
346 346 '''Send cg (a readable file-like object representing the
347 347 changegroup to push, typically a chunkbuffer object) to the
348 348 remote server as a bundle.
349 349
350 350 When pushing a bundle10 stream, return an integer indicating the
351 351 result of the push (see localrepository.addchangegroup()).
352 352
353 353 When pushing a bundle20 stream, return a bundle20 stream.'''
354 354
355 355 if heads != ['force'] and self.capable('unbundlehash'):
356 356 heads = encodelist(['hashed',
357 357 util.sha1(''.join(sorted(heads))).digest()])
358 358 else:
359 359 heads = encodelist(heads)
360 360
361 361 if util.safehasattr(cg, 'deltaheader'):
362 362 # this a bundle10, do the old style call sequence
363 363 ret, output = self._callpush("unbundle", cg, heads=heads)
364 364 if ret == "":
365 365 raise error.ResponseError(
366 366 _('push failed:'), output)
367 367 try:
368 368 ret = int(ret)
369 369 except ValueError:
370 370 raise error.ResponseError(
371 371 _('push failed (unexpected response):'), ret)
372 372
373 373 for l in output.splitlines(True):
374 374 self.ui.status(_('remote: '), l)
375 375 else:
376 376 # bundle2 push. Send a stream, fetch a stream.
377 377 stream = self._calltwowaystream('unbundle', cg, heads=heads)
378 378 ret = bundle2.unbundle20(self.ui, stream)
379 379 return ret
380 380
381 381 def debugwireargs(self, one, two, three=None, four=None, five=None):
382 382 # don't pass optional arguments left at their default value
383 383 opts = {}
384 384 if three is not None:
385 385 opts['three'] = three
386 386 if four is not None:
387 387 opts['four'] = four
388 388 return self._call('debugwireargs', one=one, two=two, **opts)
389 389
390 390 def _call(self, cmd, **args):
391 391 """execute <cmd> on the server
392 392
393 393 The command is expected to return a simple string.
394 394
395 395 returns the server reply as a string."""
396 396 raise NotImplementedError()
397 397
398 398 def _callstream(self, cmd, **args):
399 399 """execute <cmd> on the server
400 400
401 401 The command is expected to return a stream.
402 402
403 403 returns the server reply as a file like object."""
404 404 raise NotImplementedError()
405 405
406 406 def _callcompressable(self, cmd, **args):
407 407 """execute <cmd> on the server
408 408
409 409 The command is expected to return a stream.
410 410
411 411 The stream may have been compressed in some implementations. This
412 412 function takes care of the decompression. This is the only difference
413 413 with _callstream.
414 414
415 415 returns the server reply as a file like object.
416 416 """
417 417 raise NotImplementedError()
418 418
419 419 def _callpush(self, cmd, fp, **args):
420 420 """execute a <cmd> on server
421 421
422 422 The command is expected to be related to a push. Push has a special
423 423 return method.
424 424
425 425 returns the server reply as a (ret, output) tuple. ret is either
426 426 empty (error) or a stringified int.
427 427 """
428 428 raise NotImplementedError()
429 429
430 430 def _calltwowaystream(self, cmd, fp, **args):
431 431 """execute <cmd> on server
432 432
433 433 The command will send a stream to the server and get a stream in reply.
434 434 """
435 435 raise NotImplementedError()
436 436
437 437 def _abort(self, exception):
438 438 """clearly abort the wire protocol connection and raise the exception
439 439 """
440 440 raise NotImplementedError()
441 441
442 442 # server side
443 443
444 444 # wire protocol command can either return a string or one of these classes.
445 445 class streamres(object):
446 446 """wireproto reply: binary stream
447 447
448 448 The call was successful and the result is a stream.
449 449 Iterate on the `self.gen` attribute to retrieve chunks.
450 450 """
451 451 def __init__(self, gen):
452 452 self.gen = gen
453 453
454 454 class pushres(object):
455 455 """wireproto reply: success with simple integer return
456 456
457 457 The call was successful and returned an integer contained in `self.res`.
458 458 """
459 459 def __init__(self, res):
460 460 self.res = res
461 461
462 462 class pusherr(object):
463 463 """wireproto reply: failure
464 464
465 465 The call failed. The `self.res` attribute contains the error message.
466 466 """
467 467 def __init__(self, res):
468 468 self.res = res
469 469
470 470 class ooberror(object):
471 471 """wireproto reply: failure of a batch of operation
472 472
473 473 Something failed during a batch call. The error message is stored in
474 474 `self.message`.
475 475 """
476 476 def __init__(self, message):
477 477 self.message = message
478 478
479 479 def dispatch(repo, proto, command):
480 480 repo = repo.filtered("served")
481 481 func, spec = commands[command]
482 482 args = proto.getargs(spec)
483 483 return func(repo, proto, *args)
484 484
485 485 def options(cmd, keys, others):
486 486 opts = {}
487 487 for k in keys:
488 488 if k in others:
489 489 opts[k] = others[k]
490 490 del others[k]
491 491 if others:
492 492 sys.stderr.write("abort: %s got unexpected arguments %s\n"
493 493 % (cmd, ",".join(others)))
494 494 return opts
495 495
496 496 # list of commands
497 497 commands = {}
498 498
499 499 def wireprotocommand(name, args=''):
500 500 """decorator for wire protocol command"""
501 501 def register(func):
502 502 commands[name] = (func, args)
503 503 return func
504 504 return register
505 505
506 506 @wireprotocommand('batch', 'cmds *')
507 507 def batch(repo, proto, cmds, others):
508 508 repo = repo.filtered("served")
509 509 res = []
510 510 for pair in cmds.split(';'):
511 511 op, args = pair.split(' ', 1)
512 512 vals = {}
513 513 for a in args.split(','):
514 514 if a:
515 515 n, v = a.split('=')
516 516 vals[n] = unescapearg(v)
517 517 func, spec = commands[op]
518 518 if spec:
519 519 keys = spec.split()
520 520 data = {}
521 521 for k in keys:
522 522 if k == '*':
523 523 star = {}
524 524 for key in vals.keys():
525 525 if key not in keys:
526 526 star[key] = vals[key]
527 527 data['*'] = star
528 528 else:
529 529 data[k] = vals[k]
530 530 result = func(repo, proto, *[data[k] for k in keys])
531 531 else:
532 532 result = func(repo, proto)
533 533 if isinstance(result, ooberror):
534 534 return result
535 535 res.append(escapearg(result))
536 536 return ';'.join(res)
537 537
538 538 @wireprotocommand('between', 'pairs')
539 539 def between(repo, proto, pairs):
540 540 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
541 541 r = []
542 542 for b in repo.between(pairs):
543 543 r.append(encodelist(b) + "\n")
544 544 return "".join(r)
545 545
546 546 @wireprotocommand('branchmap')
547 547 def branchmap(repo, proto):
548 548 branchmap = repo.branchmap()
549 549 heads = []
550 550 for branch, nodes in branchmap.iteritems():
551 551 branchname = urllib.quote(encoding.fromlocal(branch))
552 552 branchnodes = encodelist(nodes)
553 553 heads.append('%s %s' % (branchname, branchnodes))
554 554 return '\n'.join(heads)
555 555
556 556 @wireprotocommand('branches', 'nodes')
557 557 def branches(repo, proto, nodes):
558 558 nodes = decodelist(nodes)
559 559 r = []
560 560 for b in repo.branches(nodes):
561 561 r.append(encodelist(b) + "\n")
562 562 return "".join(r)
563 563
564 564
565 565 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
566 566 'known', 'getbundle', 'unbundlehash', 'batch']
567 567
568 568 def _capabilities(repo, proto):
569 569 """return a list of capabilities for a repo
570 570
571 571 This function exists to allow extensions to easily wrap capabilities
572 572 computation
573 573
574 574 - returns a lists: easy to alter
575 575 - change done here will be propagated to both `capabilities` and `hello`
576 576 command without any other action needed.
577 577 """
578 578 # copy to prevent modification of the global list
579 579 caps = list(wireprotocaps)
580 580 if _allowstream(repo.ui):
581 581 if repo.ui.configbool('server', 'preferuncompressed', False):
582 582 caps.append('stream-preferred')
583 583 requiredformats = repo.requirements & repo.supportedformats
584 584 # if our local revlogs are just revlogv1, add 'stream' cap
585 585 if not requiredformats - set(('revlogv1',)):
586 586 caps.append('stream')
587 587 # otherwise, add 'streamreqs' detailing our local revlog format
588 588 else:
589 589 caps.append('streamreqs=%s' % ','.join(requiredformats))
590 590 if repo.ui.configbool('experimental', 'bundle2-exp', False):
591 591 capsblob = bundle2.encodecaps(repo.bundle2caps)
592 592 caps.append('bundle2-exp=' + urllib.quote(capsblob))
593 593 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
594 594 caps.append('httpheader=1024')
595 595 return caps
596 596
597 597 # If you are writing an extension and consider wrapping this function. Wrap
598 598 # `_capabilities` instead.
599 599 @wireprotocommand('capabilities')
600 600 def capabilities(repo, proto):
601 601 return ' '.join(_capabilities(repo, proto))
602 602
603 603 @wireprotocommand('changegroup', 'roots')
604 604 def changegroup(repo, proto, roots):
605 605 nodes = decodelist(roots)
606 606 cg = changegroupmod.changegroup(repo, nodes, 'serve')
607 607 return streamres(proto.groupchunks(cg))
608 608
609 609 @wireprotocommand('changegroupsubset', 'bases heads')
610 610 def changegroupsubset(repo, proto, bases, heads):
611 611 bases = decodelist(bases)
612 612 heads = decodelist(heads)
613 613 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
614 614 return streamres(proto.groupchunks(cg))
615 615
616 616 @wireprotocommand('debugwireargs', 'one two *')
617 617 def debugwireargs(repo, proto, one, two, others):
618 618 # only accept optional args from the known set
619 619 opts = options('debugwireargs', ['three', 'four'], others)
620 620 return repo.debugwireargs(one, two, **opts)
621 621
622 # List of options accepted by getbundle.
623 #
624 # Meant to be extended by extensions. It is the extension's responsibility to
625 # ensure such options are properly processed in exchange.getbundle.
626 gboptslist = ['heads', 'common', 'bundlecaps']
627
622 628 @wireprotocommand('getbundle', '*')
623 629 def getbundle(repo, proto, others):
624 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
630 opts = options('getbundle', gboptslist, others)
625 631 for k, v in opts.iteritems():
626 632 if k in ('heads', 'common'):
627 633 opts[k] = decodelist(v)
628 634 elif k == 'bundlecaps':
629 635 opts[k] = set(v.split(','))
630 636 cg = exchange.getbundle(repo, 'serve', **opts)
631 637 return streamres(proto.groupchunks(cg))
632 638
633 639 @wireprotocommand('heads')
634 640 def heads(repo, proto):
635 641 h = repo.heads()
636 642 return encodelist(h) + "\n"
637 643
638 644 @wireprotocommand('hello')
639 645 def hello(repo, proto):
640 646 '''the hello command returns a set of lines describing various
641 647 interesting things about the server, in an RFC822-like format.
642 648 Currently the only one defined is "capabilities", which
643 649 consists of a line in the form:
644 650
645 651 capabilities: space separated list of tokens
646 652 '''
647 653 return "capabilities: %s\n" % (capabilities(repo, proto))
648 654
649 655 @wireprotocommand('listkeys', 'namespace')
650 656 def listkeys(repo, proto, namespace):
651 657 d = repo.listkeys(encoding.tolocal(namespace)).items()
652 658 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
653 659 for k, v in d])
654 660 return t
655 661
656 662 @wireprotocommand('lookup', 'key')
657 663 def lookup(repo, proto, key):
658 664 try:
659 665 k = encoding.tolocal(key)
660 666 c = repo[k]
661 667 r = c.hex()
662 668 success = 1
663 669 except Exception, inst:
664 670 r = str(inst)
665 671 success = 0
666 672 return "%s %s\n" % (success, r)
667 673
668 674 @wireprotocommand('known', 'nodes *')
669 675 def known(repo, proto, nodes, others):
670 676 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
671 677
672 678 @wireprotocommand('pushkey', 'namespace key old new')
673 679 def pushkey(repo, proto, namespace, key, old, new):
674 680 # compatibility with pre-1.8 clients which were accidentally
675 681 # sending raw binary nodes rather than utf-8-encoded hex
676 682 if len(new) == 20 and new.encode('string-escape') != new:
677 683 # looks like it could be a binary node
678 684 try:
679 685 new.decode('utf-8')
680 686 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
681 687 except UnicodeDecodeError:
682 688 pass # binary, leave unmodified
683 689 else:
684 690 new = encoding.tolocal(new) # normal path
685 691
686 692 if util.safehasattr(proto, 'restore'):
687 693
688 694 proto.redirect()
689 695
690 696 try:
691 697 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
692 698 encoding.tolocal(old), new) or False
693 699 except util.Abort:
694 700 r = False
695 701
696 702 output = proto.restore()
697 703
698 704 return '%s\n%s' % (int(r), output)
699 705
700 706 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
701 707 encoding.tolocal(old), new)
702 708 return '%s\n' % int(r)
703 709
704 710 def _allowstream(ui):
705 711 return ui.configbool('server', 'uncompressed', True, untrusted=True)
706 712
707 713 def _walkstreamfiles(repo):
708 714 # this is it's own function so extensions can override it
709 715 return repo.store.walk()
710 716
711 717 @wireprotocommand('stream_out')
712 718 def stream(repo, proto):
713 719 '''If the server supports streaming clone, it advertises the "stream"
714 720 capability with a value representing the version and flags of the repo
715 721 it is serving. Client checks to see if it understands the format.
716 722
717 723 The format is simple: the server writes out a line with the amount
718 724 of files, then the total amount of bytes to be transferred (separated
719 725 by a space). Then, for each file, the server first writes the filename
720 726 and file size (separated by the null character), then the file contents.
721 727 '''
722 728
723 729 if not _allowstream(repo.ui):
724 730 return '1\n'
725 731
726 732 entries = []
727 733 total_bytes = 0
728 734 try:
729 735 # get consistent snapshot of repo, lock during scan
730 736 lock = repo.lock()
731 737 try:
732 738 repo.ui.debug('scanning\n')
733 739 for name, ename, size in _walkstreamfiles(repo):
734 740 if size:
735 741 entries.append((name, size))
736 742 total_bytes += size
737 743 finally:
738 744 lock.release()
739 745 except error.LockError:
740 746 return '2\n' # error: 2
741 747
742 748 def streamer(repo, entries, total):
743 749 '''stream out all metadata files in repository.'''
744 750 yield '0\n' # success
745 751 repo.ui.debug('%d files, %d bytes to transfer\n' %
746 752 (len(entries), total_bytes))
747 753 yield '%d %d\n' % (len(entries), total_bytes)
748 754
749 755 sopener = repo.sopener
750 756 oldaudit = sopener.mustaudit
751 757 debugflag = repo.ui.debugflag
752 758 sopener.mustaudit = False
753 759
754 760 try:
755 761 for name, size in entries:
756 762 if debugflag:
757 763 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
758 764 # partially encode name over the wire for backwards compat
759 765 yield '%s\0%d\n' % (store.encodedir(name), size)
760 766 if size <= 65536:
761 767 fp = sopener(name)
762 768 try:
763 769 data = fp.read(size)
764 770 finally:
765 771 fp.close()
766 772 yield data
767 773 else:
768 774 for chunk in util.filechunkiter(sopener(name), limit=size):
769 775 yield chunk
770 776 # replace with "finally:" when support for python 2.4 has been dropped
771 777 except Exception:
772 778 sopener.mustaudit = oldaudit
773 779 raise
774 780 sopener.mustaudit = oldaudit
775 781
776 782 return streamres(streamer(repo, entries, total_bytes))
777 783
778 784 @wireprotocommand('unbundle', 'heads')
779 785 def unbundle(repo, proto, heads):
780 786 their_heads = decodelist(heads)
781 787
782 788 try:
783 789 proto.redirect()
784 790
785 791 exchange.check_heads(repo, their_heads, 'preparing changes')
786 792
787 793 # write bundle data to temporary file because it can be big
788 794 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
789 795 fp = os.fdopen(fd, 'wb+')
790 796 r = 0
791 797 try:
792 798 proto.getfile(fp)
793 799 fp.seek(0)
794 800 gen = exchange.readbundle(repo.ui, fp, None)
795 801 r = exchange.unbundle(repo, gen, their_heads, 'serve',
796 802 proto._client())
797 803 if util.safehasattr(r, 'addpart'):
798 804 # The return looks streameable, we are in the bundle2 case and
799 805 # should return a stream.
800 806 return streamres(r.getchunks())
801 807 return pushres(r)
802 808
803 809 finally:
804 810 fp.close()
805 811 os.unlink(tempname)
806 812 except bundle2.UnknownPartError, exc:
807 813 bundler = bundle2.bundle20(repo.ui)
808 814 part = bundle2.bundlepart('B2X:ERROR:UNKNOWNPART',
809 815 [('parttype', str(exc))])
810 816 bundler.addpart(part)
811 817 return streamres(bundler.getchunks())
812 818 except util.Abort, inst:
813 819 # The old code we moved used sys.stderr directly.
814 820 # We did not change it to minimise code change.
815 821 # This need to be moved to something proper.
816 822 # Feel free to do it.
817 823 if getattr(inst, 'duringunbundle2', False):
818 824 bundler = bundle2.bundle20(repo.ui)
819 825 manargs = [('message', str(inst))]
820 826 advargs = []
821 827 if inst.hint is not None:
822 828 advargs.append(('hint', inst.hint))
823 829 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
824 830 manargs, advargs))
825 831 return streamres(bundler.getchunks())
826 832 else:
827 833 sys.stderr.write("abort: %s\n" % inst)
828 834 return pushres(0)
829 835 except error.PushRaced, exc:
830 836 if getattr(exc, 'duringunbundle2', False):
831 837 bundler = bundle2.bundle20(repo.ui)
832 838 part = bundle2.bundlepart('B2X:ERROR:PUSHRACED',
833 839 [('message', str(exc))])
834 840 bundler.addpart(part)
835 841 return streamres(bundler.getchunks())
836 842 else:
837 843 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now