##// END OF EJS Templates
wireproto: add a _calltwowaystream method on wirepeer...
Pierre-Yves David -
r21072:0879352d default
parent child Browse files
Show More
@@ -1,790 +1,797 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2
12 12 import peer, error, encoding, util, store, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # client side
194 194
195 195 class wirepeer(peer.peerrepository):
196 196
197 197 def batch(self):
198 198 return remotebatch(self)
199 199 def _submitbatch(self, req):
200 200 cmds = []
201 201 for op, argsdict in req:
202 202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 203 cmds.append('%s %s' % (op, args))
204 204 rsp = self._call("batch", cmds=';'.join(cmds))
205 205 return rsp.split(';')
206 206 def _submitone(self, op, args):
207 207 return self._call(op, **args)
208 208
209 209 @batchable
210 210 def lookup(self, key):
211 211 self.requirecap('lookup', _('look up remote revision'))
212 212 f = future()
213 213 yield {'key': encoding.fromlocal(key)}, f
214 214 d = f.value
215 215 success, data = d[:-1].split(" ", 1)
216 216 if int(success):
217 217 yield bin(data)
218 218 self._abort(error.RepoError(data))
219 219
220 220 @batchable
221 221 def heads(self):
222 222 f = future()
223 223 yield {}, f
224 224 d = f.value
225 225 try:
226 226 yield decodelist(d[:-1])
227 227 except ValueError:
228 228 self._abort(error.ResponseError(_("unexpected response:"), d))
229 229
230 230 @batchable
231 231 def known(self, nodes):
232 232 f = future()
233 233 yield {'nodes': encodelist(nodes)}, f
234 234 d = f.value
235 235 try:
236 236 yield [bool(int(f)) for f in d]
237 237 except ValueError:
238 238 self._abort(error.ResponseError(_("unexpected response:"), d))
239 239
240 240 @batchable
241 241 def branchmap(self):
242 242 f = future()
243 243 yield {}, f
244 244 d = f.value
245 245 try:
246 246 branchmap = {}
247 247 for branchpart in d.splitlines():
248 248 branchname, branchheads = branchpart.split(' ', 1)
249 249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 250 branchheads = decodelist(branchheads)
251 251 branchmap[branchname] = branchheads
252 252 yield branchmap
253 253 except TypeError:
254 254 self._abort(error.ResponseError(_("unexpected response:"), d))
255 255
256 256 def branches(self, nodes):
257 257 n = encodelist(nodes)
258 258 d = self._call("branches", nodes=n)
259 259 try:
260 260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 261 return br
262 262 except ValueError:
263 263 self._abort(error.ResponseError(_("unexpected response:"), d))
264 264
265 265 def between(self, pairs):
266 266 batch = 8 # avoid giant requests
267 267 r = []
268 268 for i in xrange(0, len(pairs), batch):
269 269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 270 d = self._call("between", pairs=n)
271 271 try:
272 272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 273 except ValueError:
274 274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 275 return r
276 276
277 277 @batchable
278 278 def pushkey(self, namespace, key, old, new):
279 279 if not self.capable('pushkey'):
280 280 yield False, None
281 281 f = future()
282 282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 283 yield {'namespace': encoding.fromlocal(namespace),
284 284 'key': encoding.fromlocal(key),
285 285 'old': encoding.fromlocal(old),
286 286 'new': encoding.fromlocal(new)}, f
287 287 d = f.value
288 288 d, output = d.split('\n', 1)
289 289 try:
290 290 d = bool(int(d))
291 291 except ValueError:
292 292 raise error.ResponseError(
293 293 _('push failed (unexpected response):'), d)
294 294 for l in output.splitlines(True):
295 295 self.ui.status(_('remote: '), l)
296 296 yield d
297 297
298 298 @batchable
299 299 def listkeys(self, namespace):
300 300 if not self.capable('pushkey'):
301 301 yield {}, None
302 302 f = future()
303 303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 305 d = f.value
306 306 r = {}
307 307 for l in d.splitlines():
308 308 k, v = l.split('\t')
309 309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 310 yield r
311 311
312 312 def stream_out(self):
313 313 return self._callstream('stream_out')
314 314
315 315 def changegroup(self, nodes, kind):
316 316 n = encodelist(nodes)
317 317 f = self._callcompressable("changegroup", roots=n)
318 318 return changegroupmod.unbundle10(f, 'UN')
319 319
320 320 def changegroupsubset(self, bases, heads, kind):
321 321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 322 bases = encodelist(bases)
323 323 heads = encodelist(heads)
324 324 f = self._callcompressable("changegroupsubset",
325 325 bases=bases, heads=heads)
326 326 return changegroupmod.unbundle10(f, 'UN')
327 327
328 328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
329 329 self.requirecap('getbundle', _('look up remote changes'))
330 330 opts = {}
331 331 if heads is not None:
332 332 opts['heads'] = encodelist(heads)
333 333 if common is not None:
334 334 opts['common'] = encodelist(common)
335 335 if bundlecaps is not None:
336 336 opts['bundlecaps'] = ','.join(bundlecaps)
337 337 f = self._callcompressable("getbundle", **opts)
338 338 if bundlecaps is not None and 'HG20' in bundlecaps:
339 339 return bundle2.unbundle20(self.ui, f)
340 340 else:
341 341 return changegroupmod.unbundle10(f, 'UN')
342 342
343 343 def unbundle(self, cg, heads, source):
344 344 '''Send cg (a readable file-like object representing the
345 345 changegroup to push, typically a chunkbuffer object) to the
346 346 remote server as a bundle. Return an integer indicating the
347 347 result of the push (see localrepository.addchangegroup()).'''
348 348
349 349 if heads != ['force'] and self.capable('unbundlehash'):
350 350 heads = encodelist(['hashed',
351 351 util.sha1(''.join(sorted(heads))).digest()])
352 352 else:
353 353 heads = encodelist(heads)
354 354
355 355 ret, output = self._callpush("unbundle", cg, heads=heads)
356 356 if ret == "":
357 357 raise error.ResponseError(
358 358 _('push failed:'), output)
359 359 try:
360 360 ret = int(ret)
361 361 except ValueError:
362 362 raise error.ResponseError(
363 363 _('push failed (unexpected response):'), ret)
364 364
365 365 for l in output.splitlines(True):
366 366 self.ui.status(_('remote: '), l)
367 367 return ret
368 368
369 369 def debugwireargs(self, one, two, three=None, four=None, five=None):
370 370 # don't pass optional arguments left at their default value
371 371 opts = {}
372 372 if three is not None:
373 373 opts['three'] = three
374 374 if four is not None:
375 375 opts['four'] = four
376 376 return self._call('debugwireargs', one=one, two=two, **opts)
377 377
378 378 def _call(self, cmd, **args):
379 379 """execute <cmd> on the server
380 380
381 381 The command is expected to return a simple string.
382 382
383 383 returns the server reply as a string."""
384 384 raise NotImplementedError()
385 385
386 386 def _callstream(self, cmd, **args):
387 387 """execute <cmd> on the server
388 388
389 389 The command is expected to return a stream.
390 390
391 391 returns the server reply as a file like object."""
392 392 raise NotImplementedError()
393 393
394 394 def _callcompressable(self, cmd, **args):
395 395 """execute <cmd> on the server
396 396
397 397 The command is expected to return a stream.
398 398
399 399 The stream may have been compressed in some implementations. This
400 400 function takes care of the decompression. This is the only difference
401 401 with _callstream.
402 402
403 403 returns the server reply as a file like object.
404 404 """
405 405 raise NotImplementedError()
406 406
407 407 def _callpush(self, cmd, fp, **args):
408 408 """execute a <cmd> on server
409 409
410 410 The command is expected to be related to a push. Push has a special
411 411 return method.
412 412
413 413 returns the server reply as a (ret, output) tuple. ret is either
414 414 empty (error) or a stringified int.
415 415 """
416 416 raise NotImplementedError()
417 417
418 def _calltwowaystream(self, cmd, fp, **args):
419 """execute <cmd> on server
420
421 The command will send a stream to the server and get a stream in reply.
422 """
423 raise NotImplementedError()
424
418 425 def _abort(self, exception):
419 426 """clearly abort the wire protocol connection and raise the exception
420 427 """
421 428 raise NotImplementedError()
422 429
423 430 # server side
424 431
425 432 # wire protocol command can either return a string or one of these classes.
426 433 class streamres(object):
427 434 """wireproto reply: binary stream
428 435
429 436 The call was successful and the result is a stream.
430 437 Iterate on the `self.gen` attribute to retrieve chunks.
431 438 """
432 439 def __init__(self, gen):
433 440 self.gen = gen
434 441
435 442 class pushres(object):
436 443 """wireproto reply: success with simple integer return
437 444
438 445 The call was successful and returned an integer contained in `self.res`.
439 446 """
440 447 def __init__(self, res):
441 448 self.res = res
442 449
443 450 class pusherr(object):
444 451 """wireproto reply: failure
445 452
446 453 The call failed. The `self.res` attribute contains the error message.
447 454 """
448 455 def __init__(self, res):
449 456 self.res = res
450 457
451 458 class ooberror(object):
452 459 """wireproto reply: failure of a batch of operation
453 460
454 461 Something failed during a batch call. The error message is stored in
455 462 `self.message`.
456 463 """
457 464 def __init__(self, message):
458 465 self.message = message
459 466
460 467 def dispatch(repo, proto, command):
461 468 repo = repo.filtered("served")
462 469 func, spec = commands[command]
463 470 args = proto.getargs(spec)
464 471 return func(repo, proto, *args)
465 472
466 473 def options(cmd, keys, others):
467 474 opts = {}
468 475 for k in keys:
469 476 if k in others:
470 477 opts[k] = others[k]
471 478 del others[k]
472 479 if others:
473 480 sys.stderr.write("abort: %s got unexpected arguments %s\n"
474 481 % (cmd, ",".join(others)))
475 482 return opts
476 483
477 484 # list of commands
478 485 commands = {}
479 486
480 487 def wireprotocommand(name, args=''):
481 488 """decorator for wire protocol command"""
482 489 def register(func):
483 490 commands[name] = (func, args)
484 491 return func
485 492 return register
486 493
487 494 @wireprotocommand('batch', 'cmds *')
488 495 def batch(repo, proto, cmds, others):
489 496 repo = repo.filtered("served")
490 497 res = []
491 498 for pair in cmds.split(';'):
492 499 op, args = pair.split(' ', 1)
493 500 vals = {}
494 501 for a in args.split(','):
495 502 if a:
496 503 n, v = a.split('=')
497 504 vals[n] = unescapearg(v)
498 505 func, spec = commands[op]
499 506 if spec:
500 507 keys = spec.split()
501 508 data = {}
502 509 for k in keys:
503 510 if k == '*':
504 511 star = {}
505 512 for key in vals.keys():
506 513 if key not in keys:
507 514 star[key] = vals[key]
508 515 data['*'] = star
509 516 else:
510 517 data[k] = vals[k]
511 518 result = func(repo, proto, *[data[k] for k in keys])
512 519 else:
513 520 result = func(repo, proto)
514 521 if isinstance(result, ooberror):
515 522 return result
516 523 res.append(escapearg(result))
517 524 return ';'.join(res)
518 525
519 526 @wireprotocommand('between', 'pairs')
520 527 def between(repo, proto, pairs):
521 528 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
522 529 r = []
523 530 for b in repo.between(pairs):
524 531 r.append(encodelist(b) + "\n")
525 532 return "".join(r)
526 533
527 534 @wireprotocommand('branchmap')
528 535 def branchmap(repo, proto):
529 536 branchmap = repo.branchmap()
530 537 heads = []
531 538 for branch, nodes in branchmap.iteritems():
532 539 branchname = urllib.quote(encoding.fromlocal(branch))
533 540 branchnodes = encodelist(nodes)
534 541 heads.append('%s %s' % (branchname, branchnodes))
535 542 return '\n'.join(heads)
536 543
537 544 @wireprotocommand('branches', 'nodes')
538 545 def branches(repo, proto, nodes):
539 546 nodes = decodelist(nodes)
540 547 r = []
541 548 for b in repo.branches(nodes):
542 549 r.append(encodelist(b) + "\n")
543 550 return "".join(r)
544 551
545 552
546 553 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
547 554 'known', 'getbundle', 'unbundlehash', 'batch']
548 555
549 556 def _capabilities(repo, proto):
550 557 """return a list of capabilities for a repo
551 558
552 559 This function exists to allow extensions to easily wrap capabilities
553 560 computation
554 561
555 562 - returns a lists: easy to alter
556 563 - change done here will be propagated to both `capabilities` and `hello`
557 564 command without any other action needed.
558 565 """
559 566 # copy to prevent modification of the global list
560 567 caps = list(wireprotocaps)
561 568 if _allowstream(repo.ui):
562 569 if repo.ui.configbool('server', 'preferuncompressed', False):
563 570 caps.append('stream-preferred')
564 571 requiredformats = repo.requirements & repo.supportedformats
565 572 # if our local revlogs are just revlogv1, add 'stream' cap
566 573 if not requiredformats - set(('revlogv1',)):
567 574 caps.append('stream')
568 575 # otherwise, add 'streamreqs' detailing our local revlog format
569 576 else:
570 577 caps.append('streamreqs=%s' % ','.join(requiredformats))
571 578 if repo.ui.configbool('server', 'bundle2', False):
572 579 caps.append('bundle2')
573 580 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
574 581 caps.append('httpheader=1024')
575 582 return caps
576 583
577 584 # If you are writing an extension and consider wrapping this function. Wrap
578 585 # `_capabilities` instead.
579 586 @wireprotocommand('capabilities')
580 587 def capabilities(repo, proto):
581 588 return ' '.join(_capabilities(repo, proto))
582 589
583 590 @wireprotocommand('changegroup', 'roots')
584 591 def changegroup(repo, proto, roots):
585 592 nodes = decodelist(roots)
586 593 cg = changegroupmod.changegroup(repo, nodes, 'serve')
587 594 return streamres(proto.groupchunks(cg))
588 595
589 596 @wireprotocommand('changegroupsubset', 'bases heads')
590 597 def changegroupsubset(repo, proto, bases, heads):
591 598 bases = decodelist(bases)
592 599 heads = decodelist(heads)
593 600 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
594 601 return streamres(proto.groupchunks(cg))
595 602
596 603 @wireprotocommand('debugwireargs', 'one two *')
597 604 def debugwireargs(repo, proto, one, two, others):
598 605 # only accept optional args from the known set
599 606 opts = options('debugwireargs', ['three', 'four'], others)
600 607 return repo.debugwireargs(one, two, **opts)
601 608
602 609 @wireprotocommand('getbundle', '*')
603 610 def getbundle(repo, proto, others):
604 611 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
605 612 for k, v in opts.iteritems():
606 613 if k in ('heads', 'common'):
607 614 opts[k] = decodelist(v)
608 615 elif k == 'bundlecaps':
609 616 opts[k] = set(v.split(','))
610 617 cg = exchange.getbundle(repo, 'serve', **opts)
611 618 return streamres(proto.groupchunks(cg))
612 619
613 620 @wireprotocommand('heads')
614 621 def heads(repo, proto):
615 622 h = repo.heads()
616 623 return encodelist(h) + "\n"
617 624
618 625 @wireprotocommand('hello')
619 626 def hello(repo, proto):
620 627 '''the hello command returns a set of lines describing various
621 628 interesting things about the server, in an RFC822-like format.
622 629 Currently the only one defined is "capabilities", which
623 630 consists of a line in the form:
624 631
625 632 capabilities: space separated list of tokens
626 633 '''
627 634 return "capabilities: %s\n" % (capabilities(repo, proto))
628 635
629 636 @wireprotocommand('listkeys', 'namespace')
630 637 def listkeys(repo, proto, namespace):
631 638 d = repo.listkeys(encoding.tolocal(namespace)).items()
632 639 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
633 640 for k, v in d])
634 641 return t
635 642
636 643 @wireprotocommand('lookup', 'key')
637 644 def lookup(repo, proto, key):
638 645 try:
639 646 k = encoding.tolocal(key)
640 647 c = repo[k]
641 648 r = c.hex()
642 649 success = 1
643 650 except Exception, inst:
644 651 r = str(inst)
645 652 success = 0
646 653 return "%s %s\n" % (success, r)
647 654
648 655 @wireprotocommand('known', 'nodes *')
649 656 def known(repo, proto, nodes, others):
650 657 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
651 658
652 659 @wireprotocommand('pushkey', 'namespace key old new')
653 660 def pushkey(repo, proto, namespace, key, old, new):
654 661 # compatibility with pre-1.8 clients which were accidentally
655 662 # sending raw binary nodes rather than utf-8-encoded hex
656 663 if len(new) == 20 and new.encode('string-escape') != new:
657 664 # looks like it could be a binary node
658 665 try:
659 666 new.decode('utf-8')
660 667 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
661 668 except UnicodeDecodeError:
662 669 pass # binary, leave unmodified
663 670 else:
664 671 new = encoding.tolocal(new) # normal path
665 672
666 673 if util.safehasattr(proto, 'restore'):
667 674
668 675 proto.redirect()
669 676
670 677 try:
671 678 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
672 679 encoding.tolocal(old), new) or False
673 680 except util.Abort:
674 681 r = False
675 682
676 683 output = proto.restore()
677 684
678 685 return '%s\n%s' % (int(r), output)
679 686
680 687 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
681 688 encoding.tolocal(old), new)
682 689 return '%s\n' % int(r)
683 690
684 691 def _allowstream(ui):
685 692 return ui.configbool('server', 'uncompressed', True, untrusted=True)
686 693
687 694 def _walkstreamfiles(repo):
688 695 # this is it's own function so extensions can override it
689 696 return repo.store.walk()
690 697
691 698 @wireprotocommand('stream_out')
692 699 def stream(repo, proto):
693 700 '''If the server supports streaming clone, it advertises the "stream"
694 701 capability with a value representing the version and flags of the repo
695 702 it is serving. Client checks to see if it understands the format.
696 703
697 704 The format is simple: the server writes out a line with the amount
698 705 of files, then the total amount of bytes to be transferred (separated
699 706 by a space). Then, for each file, the server first writes the filename
700 707 and file size (separated by the null character), then the file contents.
701 708 '''
702 709
703 710 if not _allowstream(repo.ui):
704 711 return '1\n'
705 712
706 713 entries = []
707 714 total_bytes = 0
708 715 try:
709 716 # get consistent snapshot of repo, lock during scan
710 717 lock = repo.lock()
711 718 try:
712 719 repo.ui.debug('scanning\n')
713 720 for name, ename, size in _walkstreamfiles(repo):
714 721 if size:
715 722 entries.append((name, size))
716 723 total_bytes += size
717 724 finally:
718 725 lock.release()
719 726 except error.LockError:
720 727 return '2\n' # error: 2
721 728
722 729 def streamer(repo, entries, total):
723 730 '''stream out all metadata files in repository.'''
724 731 yield '0\n' # success
725 732 repo.ui.debug('%d files, %d bytes to transfer\n' %
726 733 (len(entries), total_bytes))
727 734 yield '%d %d\n' % (len(entries), total_bytes)
728 735
729 736 sopener = repo.sopener
730 737 oldaudit = sopener.mustaudit
731 738 debugflag = repo.ui.debugflag
732 739 sopener.mustaudit = False
733 740
734 741 try:
735 742 for name, size in entries:
736 743 if debugflag:
737 744 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
738 745 # partially encode name over the wire for backwards compat
739 746 yield '%s\0%d\n' % (store.encodedir(name), size)
740 747 if size <= 65536:
741 748 fp = sopener(name)
742 749 try:
743 750 data = fp.read(size)
744 751 finally:
745 752 fp.close()
746 753 yield data
747 754 else:
748 755 for chunk in util.filechunkiter(sopener(name), limit=size):
749 756 yield chunk
750 757 # replace with "finally:" when support for python 2.4 has been dropped
751 758 except Exception:
752 759 sopener.mustaudit = oldaudit
753 760 raise
754 761 sopener.mustaudit = oldaudit
755 762
756 763 return streamres(streamer(repo, entries, total_bytes))
757 764
758 765 @wireprotocommand('unbundle', 'heads')
759 766 def unbundle(repo, proto, heads):
760 767 their_heads = decodelist(heads)
761 768
762 769 try:
763 770 proto.redirect()
764 771
765 772 exchange.check_heads(repo, their_heads, 'preparing changes')
766 773
767 774 # write bundle data to temporary file because it can be big
768 775 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
769 776 fp = os.fdopen(fd, 'wb+')
770 777 r = 0
771 778 try:
772 779 proto.getfile(fp)
773 780 fp.seek(0)
774 781 gen = exchange.readbundle(repo.ui, fp, None)
775 782 r = exchange.unbundle(repo, gen, their_heads, 'serve',
776 783 proto._client())
777 784 return pushres(r)
778 785
779 786 finally:
780 787 fp.close()
781 788 os.unlink(tempname)
782 789 except util.Abort, inst:
783 790 # The old code we moved used sys.stderr directly.
784 791 # We did not change it to minimise code change.
785 792 # This need to be moved to something proper.
786 793 # Feel free to do it.
787 794 sys.stderr.write("abort: %s\n" % inst)
788 795 return pushres(0)
789 796 except exchange.PushRaced, exc:
790 797 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now