##// END OF EJS Templates
wireproto: use decorator for the getbundle command
Pierre-Yves David -
r20915:6aae815f default
parent child Browse files
Show More
@@ -1,801 +1,801 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import peer, error, encoding, util, store
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # client side
194 194
195 195 class wirepeer(peer.peerrepository):
196 196
197 197 def batch(self):
198 198 return remotebatch(self)
199 199 def _submitbatch(self, req):
200 200 cmds = []
201 201 for op, argsdict in req:
202 202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 203 cmds.append('%s %s' % (op, args))
204 204 rsp = self._call("batch", cmds=';'.join(cmds))
205 205 return rsp.split(';')
206 206 def _submitone(self, op, args):
207 207 return self._call(op, **args)
208 208
209 209 @batchable
210 210 def lookup(self, key):
211 211 self.requirecap('lookup', _('look up remote revision'))
212 212 f = future()
213 213 yield {'key': encoding.fromlocal(key)}, f
214 214 d = f.value
215 215 success, data = d[:-1].split(" ", 1)
216 216 if int(success):
217 217 yield bin(data)
218 218 self._abort(error.RepoError(data))
219 219
220 220 @batchable
221 221 def heads(self):
222 222 f = future()
223 223 yield {}, f
224 224 d = f.value
225 225 try:
226 226 yield decodelist(d[:-1])
227 227 except ValueError:
228 228 self._abort(error.ResponseError(_("unexpected response:"), d))
229 229
230 230 @batchable
231 231 def known(self, nodes):
232 232 f = future()
233 233 yield {'nodes': encodelist(nodes)}, f
234 234 d = f.value
235 235 try:
236 236 yield [bool(int(f)) for f in d]
237 237 except ValueError:
238 238 self._abort(error.ResponseError(_("unexpected response:"), d))
239 239
240 240 @batchable
241 241 def branchmap(self):
242 242 f = future()
243 243 yield {}, f
244 244 d = f.value
245 245 try:
246 246 branchmap = {}
247 247 for branchpart in d.splitlines():
248 248 branchname, branchheads = branchpart.split(' ', 1)
249 249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 250 branchheads = decodelist(branchheads)
251 251 branchmap[branchname] = branchheads
252 252 yield branchmap
253 253 except TypeError:
254 254 self._abort(error.ResponseError(_("unexpected response:"), d))
255 255
256 256 def branches(self, nodes):
257 257 n = encodelist(nodes)
258 258 d = self._call("branches", nodes=n)
259 259 try:
260 260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 261 return br
262 262 except ValueError:
263 263 self._abort(error.ResponseError(_("unexpected response:"), d))
264 264
265 265 def between(self, pairs):
266 266 batch = 8 # avoid giant requests
267 267 r = []
268 268 for i in xrange(0, len(pairs), batch):
269 269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 270 d = self._call("between", pairs=n)
271 271 try:
272 272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 273 except ValueError:
274 274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 275 return r
276 276
277 277 @batchable
278 278 def pushkey(self, namespace, key, old, new):
279 279 if not self.capable('pushkey'):
280 280 yield False, None
281 281 f = future()
282 282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 283 yield {'namespace': encoding.fromlocal(namespace),
284 284 'key': encoding.fromlocal(key),
285 285 'old': encoding.fromlocal(old),
286 286 'new': encoding.fromlocal(new)}, f
287 287 d = f.value
288 288 d, output = d.split('\n', 1)
289 289 try:
290 290 d = bool(int(d))
291 291 except ValueError:
292 292 raise error.ResponseError(
293 293 _('push failed (unexpected response):'), d)
294 294 for l in output.splitlines(True):
295 295 self.ui.status(_('remote: '), l)
296 296 yield d
297 297
298 298 @batchable
299 299 def listkeys(self, namespace):
300 300 if not self.capable('pushkey'):
301 301 yield {}, None
302 302 f = future()
303 303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 305 d = f.value
306 306 r = {}
307 307 for l in d.splitlines():
308 308 k, v = l.split('\t')
309 309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 310 yield r
311 311
312 312 def stream_out(self):
313 313 return self._callstream('stream_out')
314 314
315 315 def changegroup(self, nodes, kind):
316 316 n = encodelist(nodes)
317 317 f = self._callcompressable("changegroup", roots=n)
318 318 return changegroupmod.unbundle10(f, 'UN')
319 319
320 320 def changegroupsubset(self, bases, heads, kind):
321 321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 322 bases = encodelist(bases)
323 323 heads = encodelist(heads)
324 324 f = self._callcompressable("changegroupsubset",
325 325 bases=bases, heads=heads)
326 326 return changegroupmod.unbundle10(f, 'UN')
327 327
328 328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
329 329 self.requirecap('getbundle', _('look up remote changes'))
330 330 opts = {}
331 331 if heads is not None:
332 332 opts['heads'] = encodelist(heads)
333 333 if common is not None:
334 334 opts['common'] = encodelist(common)
335 335 if bundlecaps is not None:
336 336 opts['bundlecaps'] = ','.join(bundlecaps)
337 337 f = self._callcompressable("getbundle", **opts)
338 338 return changegroupmod.unbundle10(f, 'UN')
339 339
340 340 def unbundle(self, cg, heads, source):
341 341 '''Send cg (a readable file-like object representing the
342 342 changegroup to push, typically a chunkbuffer object) to the
343 343 remote server as a bundle. Return an integer indicating the
344 344 result of the push (see localrepository.addchangegroup()).'''
345 345
346 346 if heads != ['force'] and self.capable('unbundlehash'):
347 347 heads = encodelist(['hashed',
348 348 util.sha1(''.join(sorted(heads))).digest()])
349 349 else:
350 350 heads = encodelist(heads)
351 351
352 352 ret, output = self._callpush("unbundle", cg, heads=heads)
353 353 if ret == "":
354 354 raise error.ResponseError(
355 355 _('push failed:'), output)
356 356 try:
357 357 ret = int(ret)
358 358 except ValueError:
359 359 raise error.ResponseError(
360 360 _('push failed (unexpected response):'), ret)
361 361
362 362 for l in output.splitlines(True):
363 363 self.ui.status(_('remote: '), l)
364 364 return ret
365 365
366 366 def debugwireargs(self, one, two, three=None, four=None, five=None):
367 367 # don't pass optional arguments left at their default value
368 368 opts = {}
369 369 if three is not None:
370 370 opts['three'] = three
371 371 if four is not None:
372 372 opts['four'] = four
373 373 return self._call('debugwireargs', one=one, two=two, **opts)
374 374
375 375 def _call(self, cmd, **args):
376 376 """execute <cmd> on the server
377 377
378 378 The command is expected to return a simple string.
379 379
380 380 returns the server reply as a string."""
381 381 raise NotImplementedError()
382 382
383 383 def _callstream(self, cmd, **args):
384 384 """execute <cmd> on the server
385 385
386 386 The command is expected to return a stream.
387 387
388 388 returns the server reply as a file like object."""
389 389 raise NotImplementedError()
390 390
391 391 def _callcompressable(self, cmd, **args):
392 392 """execute <cmd> on the server
393 393
394 394 The command is expected to return a stream.
395 395
396 396 The stream may have been compressed in some implementaitons. This
397 397 function takes care of the decompression. This is the only difference
398 398 with _callstream.
399 399
400 400 returns the server reply as a file like object.
401 401 """
402 402 raise NotImplementedError()
403 403
404 404 def _callpush(self, cmd, fp, **args):
405 405 """execute a <cmd> on server
406 406
407 407 The command is expected to be related to a push. Push has a special
408 408 return method.
409 409
410 410 returns the server reply as a (ret, output) tuple. ret is either
411 411 empty (error) or a stringified int.
412 412 """
413 413 raise NotImplementedError()
414 414
415 415 def _abort(self, exception):
416 416 """clearly abort the wire protocol connection and raise the exception
417 417 """
418 418 raise NotImplementedError()
419 419
420 420 # server side
421 421
422 422 # wire protocol command can either return a string or one of these classes.
423 423 class streamres(object):
424 424 """wireproto reply: binary stream
425 425
426 426 The call was successful and the result is a stream.
427 427 Iterate on the `self.gen` attribute to retrieve chunks.
428 428 """
429 429 def __init__(self, gen):
430 430 self.gen = gen
431 431
432 432 class pushres(object):
433 433 """wireproto reply: success with simple integer return
434 434
435 435 The call was successful and returned an integer contained in `self.res`.
436 436 """
437 437 def __init__(self, res):
438 438 self.res = res
439 439
440 440 class pusherr(object):
441 441 """wireproto reply: failure
442 442
443 443 The call failed. The `self.res` attribute contains the error message.
444 444 """
445 445 def __init__(self, res):
446 446 self.res = res
447 447
448 448 class ooberror(object):
449 449 """wireproto reply: failure of a batch of operation
450 450
451 451 Something failed during a batch call. The error message is stored in
452 452 `self.message`.
453 453 """
454 454 def __init__(self, message):
455 455 self.message = message
456 456
457 457 def dispatch(repo, proto, command):
458 458 repo = repo.filtered("served")
459 459 func, spec = commands[command]
460 460 args = proto.getargs(spec)
461 461 return func(repo, proto, *args)
462 462
463 463 def options(cmd, keys, others):
464 464 opts = {}
465 465 for k in keys:
466 466 if k in others:
467 467 opts[k] = others[k]
468 468 del others[k]
469 469 if others:
470 470 sys.stderr.write("abort: %s got unexpected arguments %s\n"
471 471 % (cmd, ",".join(others)))
472 472 return opts
473 473
474 474 # list of commands
475 475 commands = {}
476 476
477 477 def wireprotocommand(name, args=''):
478 478 """decorator for wireprotocol command"""
479 479 def register(func):
480 480 commands[name] = (func, args)
481 481 return func
482 482 return register
483 483
484 484 @wireprotocommand('batch', 'cmds *')
485 485 def batch(repo, proto, cmds, others):
486 486 repo = repo.filtered("served")
487 487 res = []
488 488 for pair in cmds.split(';'):
489 489 op, args = pair.split(' ', 1)
490 490 vals = {}
491 491 for a in args.split(','):
492 492 if a:
493 493 n, v = a.split('=')
494 494 vals[n] = unescapearg(v)
495 495 func, spec = commands[op]
496 496 if spec:
497 497 keys = spec.split()
498 498 data = {}
499 499 for k in keys:
500 500 if k == '*':
501 501 star = {}
502 502 for key in vals.keys():
503 503 if key not in keys:
504 504 star[key] = vals[key]
505 505 data['*'] = star
506 506 else:
507 507 data[k] = vals[k]
508 508 result = func(repo, proto, *[data[k] for k in keys])
509 509 else:
510 510 result = func(repo, proto)
511 511 if isinstance(result, ooberror):
512 512 return result
513 513 res.append(escapearg(result))
514 514 return ';'.join(res)
515 515
516 516 @wireprotocommand('between', 'pairs')
517 517 def between(repo, proto, pairs):
518 518 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
519 519 r = []
520 520 for b in repo.between(pairs):
521 521 r.append(encodelist(b) + "\n")
522 522 return "".join(r)
523 523
524 524 @wireprotocommand('branchmap')
525 525 def branchmap(repo, proto):
526 526 branchmap = repo.branchmap()
527 527 heads = []
528 528 for branch, nodes in branchmap.iteritems():
529 529 branchname = urllib.quote(encoding.fromlocal(branch))
530 530 branchnodes = encodelist(nodes)
531 531 heads.append('%s %s' % (branchname, branchnodes))
532 532 return '\n'.join(heads)
533 533
534 534 @wireprotocommand('branches', 'nodes')
535 535 def branches(repo, proto, nodes):
536 536 nodes = decodelist(nodes)
537 537 r = []
538 538 for b in repo.branches(nodes):
539 539 r.append(encodelist(b) + "\n")
540 540 return "".join(r)
541 541
542 542
543 543 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
544 544 'known', 'getbundle', 'unbundlehash', 'batch']
545 545
546 546 def _capabilities(repo, proto):
547 547 """return a list of capabilities for a repo
548 548
549 549 This function exists to allow extensions to easily wrap capabilities
550 550 computation
551 551
552 552 - returns a lists: easy to alter
553 553 - change done here will be propagated to both `capabilities` and `hello`
554 554 command without any other effort. without any other action needed.
555 555 """
556 556 # copy to prevent modification of the global list
557 557 caps = list(wireprotocaps)
558 558 if _allowstream(repo.ui):
559 559 if repo.ui.configbool('server', 'preferuncompressed', False):
560 560 caps.append('stream-preferred')
561 561 requiredformats = repo.requirements & repo.supportedformats
562 562 # if our local revlogs are just revlogv1, add 'stream' cap
563 563 if not requiredformats - set(('revlogv1',)):
564 564 caps.append('stream')
565 565 # otherwise, add 'streamreqs' detailing our local revlog format
566 566 else:
567 567 caps.append('streamreqs=%s' % ','.join(requiredformats))
568 568 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
569 569 caps.append('httpheader=1024')
570 570 return caps
571 571
572 572 # If you are writting and extension and consider wrapping this function. Wrap
573 573 # `_capabilities` instead.
574 574 @wireprotocommand('capabilities')
575 575 def capabilities(repo, proto):
576 576 return ' '.join(_capabilities(repo, proto))
577 577
578 578 @wireprotocommand('changegroup', 'roots')
579 579 def changegroup(repo, proto, roots):
580 580 nodes = decodelist(roots)
581 581 cg = repo.changegroup(nodes, 'serve')
582 582 return streamres(proto.groupchunks(cg))
583 583
584 584 @wireprotocommand('changegroupsubset', 'bases heads')
585 585 def changegroupsubset(repo, proto, bases, heads):
586 586 bases = decodelist(bases)
587 587 heads = decodelist(heads)
588 588 cg = repo.changegroupsubset(bases, heads, 'serve')
589 589 return streamres(proto.groupchunks(cg))
590 590
591 591 @wireprotocommand('debugwireargs', 'one two *')
592 592 def debugwireargs(repo, proto, one, two, others):
593 593 # only accept optional args from the known set
594 594 opts = options('debugwireargs', ['three', 'four'], others)
595 595 return repo.debugwireargs(one, two, **opts)
596 596
597 @wireprotocommand('getbundle', '*')
597 598 def getbundle(repo, proto, others):
598 599 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
599 600 for k, v in opts.iteritems():
600 601 if k in ('heads', 'common'):
601 602 opts[k] = decodelist(v)
602 603 elif k == 'bundlecaps':
603 604 opts[k] = set(v.split(','))
604 605 cg = repo.getbundle('serve', **opts)
605 606 return streamres(proto.groupchunks(cg))
606 607
607 608 def heads(repo, proto):
608 609 h = repo.heads()
609 610 return encodelist(h) + "\n"
610 611
611 612 def hello(repo, proto):
612 613 '''the hello command returns a set of lines describing various
613 614 interesting things about the server, in an RFC822-like format.
614 615 Currently the only one defined is "capabilities", which
615 616 consists of a line in the form:
616 617
617 618 capabilities: space separated list of tokens
618 619 '''
619 620 return "capabilities: %s\n" % (capabilities(repo, proto))
620 621
621 622 def listkeys(repo, proto, namespace):
622 623 d = repo.listkeys(encoding.tolocal(namespace)).items()
623 624 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
624 625 for k, v in d])
625 626 return t
626 627
627 628 def lookup(repo, proto, key):
628 629 try:
629 630 k = encoding.tolocal(key)
630 631 c = repo[k]
631 632 r = c.hex()
632 633 success = 1
633 634 except Exception, inst:
634 635 r = str(inst)
635 636 success = 0
636 637 return "%s %s\n" % (success, r)
637 638
638 639 def known(repo, proto, nodes, others):
639 640 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
640 641
641 642 def pushkey(repo, proto, namespace, key, old, new):
642 643 # compatibility with pre-1.8 clients which were accidentally
643 644 # sending raw binary nodes rather than utf-8-encoded hex
644 645 if len(new) == 20 and new.encode('string-escape') != new:
645 646 # looks like it could be a binary node
646 647 try:
647 648 new.decode('utf-8')
648 649 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
649 650 except UnicodeDecodeError:
650 651 pass # binary, leave unmodified
651 652 else:
652 653 new = encoding.tolocal(new) # normal path
653 654
654 655 if util.safehasattr(proto, 'restore'):
655 656
656 657 proto.redirect()
657 658
658 659 try:
659 660 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
660 661 encoding.tolocal(old), new) or False
661 662 except util.Abort:
662 663 r = False
663 664
664 665 output = proto.restore()
665 666
666 667 return '%s\n%s' % (int(r), output)
667 668
668 669 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
669 670 encoding.tolocal(old), new)
670 671 return '%s\n' % int(r)
671 672
672 673 def _allowstream(ui):
673 674 return ui.configbool('server', 'uncompressed', True, untrusted=True)
674 675
675 676 def _walkstreamfiles(repo):
676 677 # this is it's own function so extensions can override it
677 678 return repo.store.walk()
678 679
679 680 def stream(repo, proto):
680 681 '''If the server supports streaming clone, it advertises the "stream"
681 682 capability with a value representing the version and flags of the repo
682 683 it is serving. Client checks to see if it understands the format.
683 684
684 685 The format is simple: the server writes out a line with the amount
685 686 of files, then the total amount of bytes to be transferred (separated
686 687 by a space). Then, for each file, the server first writes the filename
687 688 and filesize (separated by the null character), then the file contents.
688 689 '''
689 690
690 691 if not _allowstream(repo.ui):
691 692 return '1\n'
692 693
693 694 entries = []
694 695 total_bytes = 0
695 696 try:
696 697 # get consistent snapshot of repo, lock during scan
697 698 lock = repo.lock()
698 699 try:
699 700 repo.ui.debug('scanning\n')
700 701 for name, ename, size in _walkstreamfiles(repo):
701 702 if size:
702 703 entries.append((name, size))
703 704 total_bytes += size
704 705 finally:
705 706 lock.release()
706 707 except error.LockError:
707 708 return '2\n' # error: 2
708 709
709 710 def streamer(repo, entries, total):
710 711 '''stream out all metadata files in repository.'''
711 712 yield '0\n' # success
712 713 repo.ui.debug('%d files, %d bytes to transfer\n' %
713 714 (len(entries), total_bytes))
714 715 yield '%d %d\n' % (len(entries), total_bytes)
715 716
716 717 sopener = repo.sopener
717 718 oldaudit = sopener.mustaudit
718 719 debugflag = repo.ui.debugflag
719 720 sopener.mustaudit = False
720 721
721 722 try:
722 723 for name, size in entries:
723 724 if debugflag:
724 725 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
725 726 # partially encode name over the wire for backwards compat
726 727 yield '%s\0%d\n' % (store.encodedir(name), size)
727 728 if size <= 65536:
728 729 fp = sopener(name)
729 730 try:
730 731 data = fp.read(size)
731 732 finally:
732 733 fp.close()
733 734 yield data
734 735 else:
735 736 for chunk in util.filechunkiter(sopener(name), limit=size):
736 737 yield chunk
737 738 # replace with "finally:" when support for python 2.4 has been dropped
738 739 except Exception:
739 740 sopener.mustaudit = oldaudit
740 741 raise
741 742 sopener.mustaudit = oldaudit
742 743
743 744 return streamres(streamer(repo, entries, total_bytes))
744 745
745 746 def unbundle(repo, proto, heads):
746 747 their_heads = decodelist(heads)
747 748
748 749 def check_heads():
749 750 heads = repo.heads()
750 751 heads_hash = util.sha1(''.join(sorted(heads))).digest()
751 752 return (their_heads == ['force'] or their_heads == heads or
752 753 their_heads == ['hashed', heads_hash])
753 754
754 755 proto.redirect()
755 756
756 757 # fail early if possible
757 758 if not check_heads():
758 759 return pusherr('repository changed while preparing changes - '
759 760 'please try again')
760 761
761 762 # write bundle data to temporary file because it can be big
762 763 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
763 764 fp = os.fdopen(fd, 'wb+')
764 765 r = 0
765 766 try:
766 767 proto.getfile(fp)
767 768 lock = repo.lock()
768 769 try:
769 770 if not check_heads():
770 771 # someone else committed/pushed/unbundled while we
771 772 # were transferring data
772 773 return pusherr('repository changed while uploading changes - '
773 774 'please try again')
774 775
775 776 # push can proceed
776 777 fp.seek(0)
777 778 gen = changegroupmod.readbundle(fp, None)
778 779
779 780 try:
780 781 r = repo.addchangegroup(gen, 'serve', proto._client())
781 782 except util.Abort, inst:
782 783 sys.stderr.write("abort: %s\n" % inst)
783 784 finally:
784 785 lock.release()
785 786 return pushres(r)
786 787
787 788 finally:
788 789 fp.close()
789 790 os.unlink(tempname)
790 791
791 792 commands.update({
792 'getbundle': (getbundle, '*'),
793 793 'heads': (heads, ''),
794 794 'hello': (hello, ''),
795 795 'known': (known, 'nodes *'),
796 796 'listkeys': (listkeys, 'namespace'),
797 797 'lookup': (lookup, 'key'),
798 798 'pushkey': (pushkey, 'namespace key old new'),
799 799 'stream_out': (stream, ''),
800 800 'unbundle': (unbundle, 'heads'),
801 801 })
General Comments 0
You need to be logged in to leave comments. Login now