##// END OF EJS Templates
wireproto: add decorator for wire protocol command...
Pierre-Yves David -
r20906:7a634b34 default
parent child Browse files
Show More
@@ -1,791 +1,801 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import peer, error, encoding, util, store
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # client side
194 194
195 195 class wirepeer(peer.peerrepository):
196 196
197 197 def batch(self):
198 198 return remotebatch(self)
199 199 def _submitbatch(self, req):
200 200 cmds = []
201 201 for op, argsdict in req:
202 202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 203 cmds.append('%s %s' % (op, args))
204 204 rsp = self._call("batch", cmds=';'.join(cmds))
205 205 return rsp.split(';')
206 206 def _submitone(self, op, args):
207 207 return self._call(op, **args)
208 208
209 209 @batchable
210 210 def lookup(self, key):
211 211 self.requirecap('lookup', _('look up remote revision'))
212 212 f = future()
213 213 yield {'key': encoding.fromlocal(key)}, f
214 214 d = f.value
215 215 success, data = d[:-1].split(" ", 1)
216 216 if int(success):
217 217 yield bin(data)
218 218 self._abort(error.RepoError(data))
219 219
220 220 @batchable
221 221 def heads(self):
222 222 f = future()
223 223 yield {}, f
224 224 d = f.value
225 225 try:
226 226 yield decodelist(d[:-1])
227 227 except ValueError:
228 228 self._abort(error.ResponseError(_("unexpected response:"), d))
229 229
230 230 @batchable
231 231 def known(self, nodes):
232 232 f = future()
233 233 yield {'nodes': encodelist(nodes)}, f
234 234 d = f.value
235 235 try:
236 236 yield [bool(int(f)) for f in d]
237 237 except ValueError:
238 238 self._abort(error.ResponseError(_("unexpected response:"), d))
239 239
240 240 @batchable
241 241 def branchmap(self):
242 242 f = future()
243 243 yield {}, f
244 244 d = f.value
245 245 try:
246 246 branchmap = {}
247 247 for branchpart in d.splitlines():
248 248 branchname, branchheads = branchpart.split(' ', 1)
249 249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 250 branchheads = decodelist(branchheads)
251 251 branchmap[branchname] = branchheads
252 252 yield branchmap
253 253 except TypeError:
254 254 self._abort(error.ResponseError(_("unexpected response:"), d))
255 255
256 256 def branches(self, nodes):
257 257 n = encodelist(nodes)
258 258 d = self._call("branches", nodes=n)
259 259 try:
260 260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 261 return br
262 262 except ValueError:
263 263 self._abort(error.ResponseError(_("unexpected response:"), d))
264 264
265 265 def between(self, pairs):
266 266 batch = 8 # avoid giant requests
267 267 r = []
268 268 for i in xrange(0, len(pairs), batch):
269 269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 270 d = self._call("between", pairs=n)
271 271 try:
272 272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 273 except ValueError:
274 274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 275 return r
276 276
277 277 @batchable
278 278 def pushkey(self, namespace, key, old, new):
279 279 if not self.capable('pushkey'):
280 280 yield False, None
281 281 f = future()
282 282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 283 yield {'namespace': encoding.fromlocal(namespace),
284 284 'key': encoding.fromlocal(key),
285 285 'old': encoding.fromlocal(old),
286 286 'new': encoding.fromlocal(new)}, f
287 287 d = f.value
288 288 d, output = d.split('\n', 1)
289 289 try:
290 290 d = bool(int(d))
291 291 except ValueError:
292 292 raise error.ResponseError(
293 293 _('push failed (unexpected response):'), d)
294 294 for l in output.splitlines(True):
295 295 self.ui.status(_('remote: '), l)
296 296 yield d
297 297
298 298 @batchable
299 299 def listkeys(self, namespace):
300 300 if not self.capable('pushkey'):
301 301 yield {}, None
302 302 f = future()
303 303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 305 d = f.value
306 306 r = {}
307 307 for l in d.splitlines():
308 308 k, v = l.split('\t')
309 309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 310 yield r
311 311
312 312 def stream_out(self):
313 313 return self._callstream('stream_out')
314 314
315 315 def changegroup(self, nodes, kind):
316 316 n = encodelist(nodes)
317 317 f = self._callcompressable("changegroup", roots=n)
318 318 return changegroupmod.unbundle10(f, 'UN')
319 319
320 320 def changegroupsubset(self, bases, heads, kind):
321 321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 322 bases = encodelist(bases)
323 323 heads = encodelist(heads)
324 324 f = self._callcompressable("changegroupsubset",
325 325 bases=bases, heads=heads)
326 326 return changegroupmod.unbundle10(f, 'UN')
327 327
328 328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
329 329 self.requirecap('getbundle', _('look up remote changes'))
330 330 opts = {}
331 331 if heads is not None:
332 332 opts['heads'] = encodelist(heads)
333 333 if common is not None:
334 334 opts['common'] = encodelist(common)
335 335 if bundlecaps is not None:
336 336 opts['bundlecaps'] = ','.join(bundlecaps)
337 337 f = self._callcompressable("getbundle", **opts)
338 338 return changegroupmod.unbundle10(f, 'UN')
339 339
340 340 def unbundle(self, cg, heads, source):
341 341 '''Send cg (a readable file-like object representing the
342 342 changegroup to push, typically a chunkbuffer object) to the
343 343 remote server as a bundle. Return an integer indicating the
344 344 result of the push (see localrepository.addchangegroup()).'''
345 345
346 346 if heads != ['force'] and self.capable('unbundlehash'):
347 347 heads = encodelist(['hashed',
348 348 util.sha1(''.join(sorted(heads))).digest()])
349 349 else:
350 350 heads = encodelist(heads)
351 351
352 352 ret, output = self._callpush("unbundle", cg, heads=heads)
353 353 if ret == "":
354 354 raise error.ResponseError(
355 355 _('push failed:'), output)
356 356 try:
357 357 ret = int(ret)
358 358 except ValueError:
359 359 raise error.ResponseError(
360 360 _('push failed (unexpected response):'), ret)
361 361
362 362 for l in output.splitlines(True):
363 363 self.ui.status(_('remote: '), l)
364 364 return ret
365 365
366 366 def debugwireargs(self, one, two, three=None, four=None, five=None):
367 367 # don't pass optional arguments left at their default value
368 368 opts = {}
369 369 if three is not None:
370 370 opts['three'] = three
371 371 if four is not None:
372 372 opts['four'] = four
373 373 return self._call('debugwireargs', one=one, two=two, **opts)
374 374
375 375 def _call(self, cmd, **args):
376 376 """execute <cmd> on the server
377 377
378 378 The command is expected to return a simple string.
379 379
380 380 returns the server reply as a string."""
381 381 raise NotImplementedError()
382 382
383 383 def _callstream(self, cmd, **args):
384 384 """execute <cmd> on the server
385 385
386 386 The command is expected to return a stream.
387 387
388 388 returns the server reply as a file like object."""
389 389 raise NotImplementedError()
390 390
391 391 def _callcompressable(self, cmd, **args):
392 392 """execute <cmd> on the server
393 393
394 394 The command is expected to return a stream.
395 395
396 396 The stream may have been compressed in some implementaitons. This
397 397 function takes care of the decompression. This is the only difference
398 398 with _callstream.
399 399
400 400 returns the server reply as a file like object.
401 401 """
402 402 raise NotImplementedError()
403 403
404 404 def _callpush(self, cmd, fp, **args):
405 405 """execute a <cmd> on server
406 406
407 407 The command is expected to be related to a push. Push has a special
408 408 return method.
409 409
410 410 returns the server reply as a (ret, output) tuple. ret is either
411 411 empty (error) or a stringified int.
412 412 """
413 413 raise NotImplementedError()
414 414
415 415 def _abort(self, exception):
416 416 """clearly abort the wire protocol connection and raise the exception
417 417 """
418 418 raise NotImplementedError()
419 419
420 420 # server side
421 421
422 422 # wire protocol command can either return a string or one of these classes.
423 423 class streamres(object):
424 424 """wireproto reply: binary stream
425 425
426 426 The call was successful and the result is a stream.
427 427 Iterate on the `self.gen` attribute to retrieve chunks.
428 428 """
429 429 def __init__(self, gen):
430 430 self.gen = gen
431 431
432 432 class pushres(object):
433 433 """wireproto reply: success with simple integer return
434 434
435 435 The call was successful and returned an integer contained in `self.res`.
436 436 """
437 437 def __init__(self, res):
438 438 self.res = res
439 439
440 440 class pusherr(object):
441 441 """wireproto reply: failure
442 442
443 443 The call failed. The `self.res` attribute contains the error message.
444 444 """
445 445 def __init__(self, res):
446 446 self.res = res
447 447
448 448 class ooberror(object):
449 449 """wireproto reply: failure of a batch of operation
450 450
451 451 Something failed during a batch call. The error message is stored in
452 452 `self.message`.
453 453 """
454 454 def __init__(self, message):
455 455 self.message = message
456 456
457 457 def dispatch(repo, proto, command):
458 458 repo = repo.filtered("served")
459 459 func, spec = commands[command]
460 460 args = proto.getargs(spec)
461 461 return func(repo, proto, *args)
462 462
463 463 def options(cmd, keys, others):
464 464 opts = {}
465 465 for k in keys:
466 466 if k in others:
467 467 opts[k] = others[k]
468 468 del others[k]
469 469 if others:
470 470 sys.stderr.write("abort: %s got unexpected arguments %s\n"
471 471 % (cmd, ",".join(others)))
472 472 return opts
473 473
474 # list of commands
475 commands = {}
476
477 def wireprotocommand(name, args=''):
478 """decorator for wireprotocol command"""
479 def register(func):
480 commands[name] = (func, args)
481 return func
482 return register
483
474 484 def batch(repo, proto, cmds, others):
475 485 repo = repo.filtered("served")
476 486 res = []
477 487 for pair in cmds.split(';'):
478 488 op, args = pair.split(' ', 1)
479 489 vals = {}
480 490 for a in args.split(','):
481 491 if a:
482 492 n, v = a.split('=')
483 493 vals[n] = unescapearg(v)
484 494 func, spec = commands[op]
485 495 if spec:
486 496 keys = spec.split()
487 497 data = {}
488 498 for k in keys:
489 499 if k == '*':
490 500 star = {}
491 501 for key in vals.keys():
492 502 if key not in keys:
493 503 star[key] = vals[key]
494 504 data['*'] = star
495 505 else:
496 506 data[k] = vals[k]
497 507 result = func(repo, proto, *[data[k] for k in keys])
498 508 else:
499 509 result = func(repo, proto)
500 510 if isinstance(result, ooberror):
501 511 return result
502 512 res.append(escapearg(result))
503 513 return ';'.join(res)
504 514
505 515 def between(repo, proto, pairs):
506 516 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
507 517 r = []
508 518 for b in repo.between(pairs):
509 519 r.append(encodelist(b) + "\n")
510 520 return "".join(r)
511 521
512 522 def branchmap(repo, proto):
513 523 branchmap = repo.branchmap()
514 524 heads = []
515 525 for branch, nodes in branchmap.iteritems():
516 526 branchname = urllib.quote(encoding.fromlocal(branch))
517 527 branchnodes = encodelist(nodes)
518 528 heads.append('%s %s' % (branchname, branchnodes))
519 529 return '\n'.join(heads)
520 530
521 531 def branches(repo, proto, nodes):
522 532 nodes = decodelist(nodes)
523 533 r = []
524 534 for b in repo.branches(nodes):
525 535 r.append(encodelist(b) + "\n")
526 536 return "".join(r)
527 537
528 538
529 539 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
530 540 'known', 'getbundle', 'unbundlehash', 'batch']
531 541
532 542 def _capabilities(repo, proto):
533 543 """return a list of capabilities for a repo
534 544
535 545 This function exists to allow extensions to easily wrap capabilities
536 546 computation
537 547
538 548 - returns a lists: easy to alter
539 549 - change done here will be propagated to both `capabilities` and `hello`
540 550 command without any other effort. without any other action needed.
541 551 """
542 552 # copy to prevent modification of the global list
543 553 caps = list(wireprotocaps)
544 554 if _allowstream(repo.ui):
545 555 if repo.ui.configbool('server', 'preferuncompressed', False):
546 556 caps.append('stream-preferred')
547 557 requiredformats = repo.requirements & repo.supportedformats
548 558 # if our local revlogs are just revlogv1, add 'stream' cap
549 559 if not requiredformats - set(('revlogv1',)):
550 560 caps.append('stream')
551 561 # otherwise, add 'streamreqs' detailing our local revlog format
552 562 else:
553 563 caps.append('streamreqs=%s' % ','.join(requiredformats))
554 564 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
555 565 caps.append('httpheader=1024')
556 566 return caps
557 567
558 568 # If you are writting and extension and consider wrapping this function. Wrap
559 569 # `_capabilities` instead.
560 570 def capabilities(repo, proto):
561 571 return ' '.join(_capabilities(repo, proto))
562 572
563 573 def changegroup(repo, proto, roots):
564 574 nodes = decodelist(roots)
565 575 cg = repo.changegroup(nodes, 'serve')
566 576 return streamres(proto.groupchunks(cg))
567 577
568 578 def changegroupsubset(repo, proto, bases, heads):
569 579 bases = decodelist(bases)
570 580 heads = decodelist(heads)
571 581 cg = repo.changegroupsubset(bases, heads, 'serve')
572 582 return streamres(proto.groupchunks(cg))
573 583
574 584 def debugwireargs(repo, proto, one, two, others):
575 585 # only accept optional args from the known set
576 586 opts = options('debugwireargs', ['three', 'four'], others)
577 587 return repo.debugwireargs(one, two, **opts)
578 588
579 589 def getbundle(repo, proto, others):
580 590 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
581 591 for k, v in opts.iteritems():
582 592 if k in ('heads', 'common'):
583 593 opts[k] = decodelist(v)
584 594 elif k == 'bundlecaps':
585 595 opts[k] = set(v.split(','))
586 596 cg = repo.getbundle('serve', **opts)
587 597 return streamres(proto.groupchunks(cg))
588 598
589 599 def heads(repo, proto):
590 600 h = repo.heads()
591 601 return encodelist(h) + "\n"
592 602
593 603 def hello(repo, proto):
594 604 '''the hello command returns a set of lines describing various
595 605 interesting things about the server, in an RFC822-like format.
596 606 Currently the only one defined is "capabilities", which
597 607 consists of a line in the form:
598 608
599 609 capabilities: space separated list of tokens
600 610 '''
601 611 return "capabilities: %s\n" % (capabilities(repo, proto))
602 612
603 613 def listkeys(repo, proto, namespace):
604 614 d = repo.listkeys(encoding.tolocal(namespace)).items()
605 615 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
606 616 for k, v in d])
607 617 return t
608 618
609 619 def lookup(repo, proto, key):
610 620 try:
611 621 k = encoding.tolocal(key)
612 622 c = repo[k]
613 623 r = c.hex()
614 624 success = 1
615 625 except Exception, inst:
616 626 r = str(inst)
617 627 success = 0
618 628 return "%s %s\n" % (success, r)
619 629
620 630 def known(repo, proto, nodes, others):
621 631 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
622 632
623 633 def pushkey(repo, proto, namespace, key, old, new):
624 634 # compatibility with pre-1.8 clients which were accidentally
625 635 # sending raw binary nodes rather than utf-8-encoded hex
626 636 if len(new) == 20 and new.encode('string-escape') != new:
627 637 # looks like it could be a binary node
628 638 try:
629 639 new.decode('utf-8')
630 640 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
631 641 except UnicodeDecodeError:
632 642 pass # binary, leave unmodified
633 643 else:
634 644 new = encoding.tolocal(new) # normal path
635 645
636 646 if util.safehasattr(proto, 'restore'):
637 647
638 648 proto.redirect()
639 649
640 650 try:
641 651 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
642 652 encoding.tolocal(old), new) or False
643 653 except util.Abort:
644 654 r = False
645 655
646 656 output = proto.restore()
647 657
648 658 return '%s\n%s' % (int(r), output)
649 659
650 660 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
651 661 encoding.tolocal(old), new)
652 662 return '%s\n' % int(r)
653 663
654 664 def _allowstream(ui):
655 665 return ui.configbool('server', 'uncompressed', True, untrusted=True)
656 666
657 667 def _walkstreamfiles(repo):
658 668 # this is it's own function so extensions can override it
659 669 return repo.store.walk()
660 670
661 671 def stream(repo, proto):
662 672 '''If the server supports streaming clone, it advertises the "stream"
663 673 capability with a value representing the version and flags of the repo
664 674 it is serving. Client checks to see if it understands the format.
665 675
666 676 The format is simple: the server writes out a line with the amount
667 677 of files, then the total amount of bytes to be transferred (separated
668 678 by a space). Then, for each file, the server first writes the filename
669 679 and filesize (separated by the null character), then the file contents.
670 680 '''
671 681
672 682 if not _allowstream(repo.ui):
673 683 return '1\n'
674 684
675 685 entries = []
676 686 total_bytes = 0
677 687 try:
678 688 # get consistent snapshot of repo, lock during scan
679 689 lock = repo.lock()
680 690 try:
681 691 repo.ui.debug('scanning\n')
682 692 for name, ename, size in _walkstreamfiles(repo):
683 693 if size:
684 694 entries.append((name, size))
685 695 total_bytes += size
686 696 finally:
687 697 lock.release()
688 698 except error.LockError:
689 699 return '2\n' # error: 2
690 700
691 701 def streamer(repo, entries, total):
692 702 '''stream out all metadata files in repository.'''
693 703 yield '0\n' # success
694 704 repo.ui.debug('%d files, %d bytes to transfer\n' %
695 705 (len(entries), total_bytes))
696 706 yield '%d %d\n' % (len(entries), total_bytes)
697 707
698 708 sopener = repo.sopener
699 709 oldaudit = sopener.mustaudit
700 710 debugflag = repo.ui.debugflag
701 711 sopener.mustaudit = False
702 712
703 713 try:
704 714 for name, size in entries:
705 715 if debugflag:
706 716 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
707 717 # partially encode name over the wire for backwards compat
708 718 yield '%s\0%d\n' % (store.encodedir(name), size)
709 719 if size <= 65536:
710 720 fp = sopener(name)
711 721 try:
712 722 data = fp.read(size)
713 723 finally:
714 724 fp.close()
715 725 yield data
716 726 else:
717 727 for chunk in util.filechunkiter(sopener(name), limit=size):
718 728 yield chunk
719 729 # replace with "finally:" when support for python 2.4 has been dropped
720 730 except Exception:
721 731 sopener.mustaudit = oldaudit
722 732 raise
723 733 sopener.mustaudit = oldaudit
724 734
725 735 return streamres(streamer(repo, entries, total_bytes))
726 736
727 737 def unbundle(repo, proto, heads):
728 738 their_heads = decodelist(heads)
729 739
730 740 def check_heads():
731 741 heads = repo.heads()
732 742 heads_hash = util.sha1(''.join(sorted(heads))).digest()
733 743 return (their_heads == ['force'] or their_heads == heads or
734 744 their_heads == ['hashed', heads_hash])
735 745
736 746 proto.redirect()
737 747
738 748 # fail early if possible
739 749 if not check_heads():
740 750 return pusherr('repository changed while preparing changes - '
741 751 'please try again')
742 752
743 753 # write bundle data to temporary file because it can be big
744 754 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
745 755 fp = os.fdopen(fd, 'wb+')
746 756 r = 0
747 757 try:
748 758 proto.getfile(fp)
749 759 lock = repo.lock()
750 760 try:
751 761 if not check_heads():
752 762 # someone else committed/pushed/unbundled while we
753 763 # were transferring data
754 764 return pusherr('repository changed while uploading changes - '
755 765 'please try again')
756 766
757 767 # push can proceed
758 768 fp.seek(0)
759 769 gen = changegroupmod.readbundle(fp, None)
760 770
761 771 try:
762 772 r = repo.addchangegroup(gen, 'serve', proto._client())
763 773 except util.Abort, inst:
764 774 sys.stderr.write("abort: %s\n" % inst)
765 775 finally:
766 776 lock.release()
767 777 return pushres(r)
768 778
769 779 finally:
770 780 fp.close()
771 781 os.unlink(tempname)
772 782
773 commands = {
783 commands.update({
774 784 'batch': (batch, 'cmds *'),
775 785 'between': (between, 'pairs'),
776 786 'branchmap': (branchmap, ''),
777 787 'branches': (branches, 'nodes'),
778 788 'capabilities': (capabilities, ''),
779 789 'changegroup': (changegroup, 'roots'),
780 790 'changegroupsubset': (changegroupsubset, 'bases heads'),
781 791 'debugwireargs': (debugwireargs, 'one two *'),
782 792 'getbundle': (getbundle, '*'),
783 793 'heads': (heads, ''),
784 794 'hello': (hello, ''),
785 795 'known': (known, 'nodes *'),
786 796 'listkeys': (listkeys, 'namespace'),
787 797 'lookup': (lookup, 'key'),
788 798 'pushkey': (pushkey, 'namespace key old new'),
789 799 'stream_out': (stream, ''),
790 800 'unbundle': (unbundle, 'heads'),
791 }
801 })
General Comments 0
You need to be logged in to leave comments. Login now