##// END OF EJS Templates
wireproto: use decorator for the changegroupsubset command
Pierre-Yves David -
r20913:fc7219ec default
parent child Browse files
Show More
@@ -1,801 +1,801 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import peer, error, encoding, util, store
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # client side
194 194
195 195 class wirepeer(peer.peerrepository):
196 196
197 197 def batch(self):
198 198 return remotebatch(self)
199 199 def _submitbatch(self, req):
200 200 cmds = []
201 201 for op, argsdict in req:
202 202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 203 cmds.append('%s %s' % (op, args))
204 204 rsp = self._call("batch", cmds=';'.join(cmds))
205 205 return rsp.split(';')
206 206 def _submitone(self, op, args):
207 207 return self._call(op, **args)
208 208
209 209 @batchable
210 210 def lookup(self, key):
211 211 self.requirecap('lookup', _('look up remote revision'))
212 212 f = future()
213 213 yield {'key': encoding.fromlocal(key)}, f
214 214 d = f.value
215 215 success, data = d[:-1].split(" ", 1)
216 216 if int(success):
217 217 yield bin(data)
218 218 self._abort(error.RepoError(data))
219 219
220 220 @batchable
221 221 def heads(self):
222 222 f = future()
223 223 yield {}, f
224 224 d = f.value
225 225 try:
226 226 yield decodelist(d[:-1])
227 227 except ValueError:
228 228 self._abort(error.ResponseError(_("unexpected response:"), d))
229 229
230 230 @batchable
231 231 def known(self, nodes):
232 232 f = future()
233 233 yield {'nodes': encodelist(nodes)}, f
234 234 d = f.value
235 235 try:
236 236 yield [bool(int(f)) for f in d]
237 237 except ValueError:
238 238 self._abort(error.ResponseError(_("unexpected response:"), d))
239 239
240 240 @batchable
241 241 def branchmap(self):
242 242 f = future()
243 243 yield {}, f
244 244 d = f.value
245 245 try:
246 246 branchmap = {}
247 247 for branchpart in d.splitlines():
248 248 branchname, branchheads = branchpart.split(' ', 1)
249 249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 250 branchheads = decodelist(branchheads)
251 251 branchmap[branchname] = branchheads
252 252 yield branchmap
253 253 except TypeError:
254 254 self._abort(error.ResponseError(_("unexpected response:"), d))
255 255
256 256 def branches(self, nodes):
257 257 n = encodelist(nodes)
258 258 d = self._call("branches", nodes=n)
259 259 try:
260 260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 261 return br
262 262 except ValueError:
263 263 self._abort(error.ResponseError(_("unexpected response:"), d))
264 264
265 265 def between(self, pairs):
266 266 batch = 8 # avoid giant requests
267 267 r = []
268 268 for i in xrange(0, len(pairs), batch):
269 269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 270 d = self._call("between", pairs=n)
271 271 try:
272 272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 273 except ValueError:
274 274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 275 return r
276 276
277 277 @batchable
278 278 def pushkey(self, namespace, key, old, new):
279 279 if not self.capable('pushkey'):
280 280 yield False, None
281 281 f = future()
282 282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 283 yield {'namespace': encoding.fromlocal(namespace),
284 284 'key': encoding.fromlocal(key),
285 285 'old': encoding.fromlocal(old),
286 286 'new': encoding.fromlocal(new)}, f
287 287 d = f.value
288 288 d, output = d.split('\n', 1)
289 289 try:
290 290 d = bool(int(d))
291 291 except ValueError:
292 292 raise error.ResponseError(
293 293 _('push failed (unexpected response):'), d)
294 294 for l in output.splitlines(True):
295 295 self.ui.status(_('remote: '), l)
296 296 yield d
297 297
298 298 @batchable
299 299 def listkeys(self, namespace):
300 300 if not self.capable('pushkey'):
301 301 yield {}, None
302 302 f = future()
303 303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 305 d = f.value
306 306 r = {}
307 307 for l in d.splitlines():
308 308 k, v = l.split('\t')
309 309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 310 yield r
311 311
312 312 def stream_out(self):
313 313 return self._callstream('stream_out')
314 314
315 315 def changegroup(self, nodes, kind):
316 316 n = encodelist(nodes)
317 317 f = self._callcompressable("changegroup", roots=n)
318 318 return changegroupmod.unbundle10(f, 'UN')
319 319
320 320 def changegroupsubset(self, bases, heads, kind):
321 321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 322 bases = encodelist(bases)
323 323 heads = encodelist(heads)
324 324 f = self._callcompressable("changegroupsubset",
325 325 bases=bases, heads=heads)
326 326 return changegroupmod.unbundle10(f, 'UN')
327 327
328 328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
329 329 self.requirecap('getbundle', _('look up remote changes'))
330 330 opts = {}
331 331 if heads is not None:
332 332 opts['heads'] = encodelist(heads)
333 333 if common is not None:
334 334 opts['common'] = encodelist(common)
335 335 if bundlecaps is not None:
336 336 opts['bundlecaps'] = ','.join(bundlecaps)
337 337 f = self._callcompressable("getbundle", **opts)
338 338 return changegroupmod.unbundle10(f, 'UN')
339 339
340 340 def unbundle(self, cg, heads, source):
341 341 '''Send cg (a readable file-like object representing the
342 342 changegroup to push, typically a chunkbuffer object) to the
343 343 remote server as a bundle. Return an integer indicating the
344 344 result of the push (see localrepository.addchangegroup()).'''
345 345
346 346 if heads != ['force'] and self.capable('unbundlehash'):
347 347 heads = encodelist(['hashed',
348 348 util.sha1(''.join(sorted(heads))).digest()])
349 349 else:
350 350 heads = encodelist(heads)
351 351
352 352 ret, output = self._callpush("unbundle", cg, heads=heads)
353 353 if ret == "":
354 354 raise error.ResponseError(
355 355 _('push failed:'), output)
356 356 try:
357 357 ret = int(ret)
358 358 except ValueError:
359 359 raise error.ResponseError(
360 360 _('push failed (unexpected response):'), ret)
361 361
362 362 for l in output.splitlines(True):
363 363 self.ui.status(_('remote: '), l)
364 364 return ret
365 365
366 366 def debugwireargs(self, one, two, three=None, four=None, five=None):
367 367 # don't pass optional arguments left at their default value
368 368 opts = {}
369 369 if three is not None:
370 370 opts['three'] = three
371 371 if four is not None:
372 372 opts['four'] = four
373 373 return self._call('debugwireargs', one=one, two=two, **opts)
374 374
375 375 def _call(self, cmd, **args):
376 376 """execute <cmd> on the server
377 377
378 378 The command is expected to return a simple string.
379 379
380 380 returns the server reply as a string."""
381 381 raise NotImplementedError()
382 382
383 383 def _callstream(self, cmd, **args):
384 384 """execute <cmd> on the server
385 385
386 386 The command is expected to return a stream.
387 387
388 388 returns the server reply as a file like object."""
389 389 raise NotImplementedError()
390 390
391 391 def _callcompressable(self, cmd, **args):
392 392 """execute <cmd> on the server
393 393
394 394 The command is expected to return a stream.
395 395
396 396 The stream may have been compressed in some implementaitons. This
397 397 function takes care of the decompression. This is the only difference
398 398 with _callstream.
399 399
400 400 returns the server reply as a file like object.
401 401 """
402 402 raise NotImplementedError()
403 403
404 404 def _callpush(self, cmd, fp, **args):
405 405 """execute a <cmd> on server
406 406
407 407 The command is expected to be related to a push. Push has a special
408 408 return method.
409 409
410 410 returns the server reply as a (ret, output) tuple. ret is either
411 411 empty (error) or a stringified int.
412 412 """
413 413 raise NotImplementedError()
414 414
415 415 def _abort(self, exception):
416 416 """clearly abort the wire protocol connection and raise the exception
417 417 """
418 418 raise NotImplementedError()
419 419
420 420 # server side
421 421
422 422 # wire protocol command can either return a string or one of these classes.
423 423 class streamres(object):
424 424 """wireproto reply: binary stream
425 425
426 426 The call was successful and the result is a stream.
427 427 Iterate on the `self.gen` attribute to retrieve chunks.
428 428 """
429 429 def __init__(self, gen):
430 430 self.gen = gen
431 431
432 432 class pushres(object):
433 433 """wireproto reply: success with simple integer return
434 434
435 435 The call was successful and returned an integer contained in `self.res`.
436 436 """
437 437 def __init__(self, res):
438 438 self.res = res
439 439
440 440 class pusherr(object):
441 441 """wireproto reply: failure
442 442
443 443 The call failed. The `self.res` attribute contains the error message.
444 444 """
445 445 def __init__(self, res):
446 446 self.res = res
447 447
448 448 class ooberror(object):
449 449 """wireproto reply: failure of a batch of operation
450 450
451 451 Something failed during a batch call. The error message is stored in
452 452 `self.message`.
453 453 """
454 454 def __init__(self, message):
455 455 self.message = message
456 456
457 457 def dispatch(repo, proto, command):
458 458 repo = repo.filtered("served")
459 459 func, spec = commands[command]
460 460 args = proto.getargs(spec)
461 461 return func(repo, proto, *args)
462 462
463 463 def options(cmd, keys, others):
464 464 opts = {}
465 465 for k in keys:
466 466 if k in others:
467 467 opts[k] = others[k]
468 468 del others[k]
469 469 if others:
470 470 sys.stderr.write("abort: %s got unexpected arguments %s\n"
471 471 % (cmd, ",".join(others)))
472 472 return opts
473 473
474 474 # list of commands
475 475 commands = {}
476 476
477 477 def wireprotocommand(name, args=''):
478 478 """decorator for wireprotocol command"""
479 479 def register(func):
480 480 commands[name] = (func, args)
481 481 return func
482 482 return register
483 483
484 484 @wireprotocommand('batch', 'cmds *')
485 485 def batch(repo, proto, cmds, others):
486 486 repo = repo.filtered("served")
487 487 res = []
488 488 for pair in cmds.split(';'):
489 489 op, args = pair.split(' ', 1)
490 490 vals = {}
491 491 for a in args.split(','):
492 492 if a:
493 493 n, v = a.split('=')
494 494 vals[n] = unescapearg(v)
495 495 func, spec = commands[op]
496 496 if spec:
497 497 keys = spec.split()
498 498 data = {}
499 499 for k in keys:
500 500 if k == '*':
501 501 star = {}
502 502 for key in vals.keys():
503 503 if key not in keys:
504 504 star[key] = vals[key]
505 505 data['*'] = star
506 506 else:
507 507 data[k] = vals[k]
508 508 result = func(repo, proto, *[data[k] for k in keys])
509 509 else:
510 510 result = func(repo, proto)
511 511 if isinstance(result, ooberror):
512 512 return result
513 513 res.append(escapearg(result))
514 514 return ';'.join(res)
515 515
516 516 @wireprotocommand('between', 'pairs')
517 517 def between(repo, proto, pairs):
518 518 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
519 519 r = []
520 520 for b in repo.between(pairs):
521 521 r.append(encodelist(b) + "\n")
522 522 return "".join(r)
523 523
524 524 @wireprotocommand('branchmap')
525 525 def branchmap(repo, proto):
526 526 branchmap = repo.branchmap()
527 527 heads = []
528 528 for branch, nodes in branchmap.iteritems():
529 529 branchname = urllib.quote(encoding.fromlocal(branch))
530 530 branchnodes = encodelist(nodes)
531 531 heads.append('%s %s' % (branchname, branchnodes))
532 532 return '\n'.join(heads)
533 533
534 534 @wireprotocommand('branches', 'nodes')
535 535 def branches(repo, proto, nodes):
536 536 nodes = decodelist(nodes)
537 537 r = []
538 538 for b in repo.branches(nodes):
539 539 r.append(encodelist(b) + "\n")
540 540 return "".join(r)
541 541
542 542
543 543 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
544 544 'known', 'getbundle', 'unbundlehash', 'batch']
545 545
546 546 def _capabilities(repo, proto):
547 547 """return a list of capabilities for a repo
548 548
549 549 This function exists to allow extensions to easily wrap capabilities
550 550 computation
551 551
552 552 - returns a lists: easy to alter
553 553 - change done here will be propagated to both `capabilities` and `hello`
554 554 command without any other effort. without any other action needed.
555 555 """
556 556 # copy to prevent modification of the global list
557 557 caps = list(wireprotocaps)
558 558 if _allowstream(repo.ui):
559 559 if repo.ui.configbool('server', 'preferuncompressed', False):
560 560 caps.append('stream-preferred')
561 561 requiredformats = repo.requirements & repo.supportedformats
562 562 # if our local revlogs are just revlogv1, add 'stream' cap
563 563 if not requiredformats - set(('revlogv1',)):
564 564 caps.append('stream')
565 565 # otherwise, add 'streamreqs' detailing our local revlog format
566 566 else:
567 567 caps.append('streamreqs=%s' % ','.join(requiredformats))
568 568 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
569 569 caps.append('httpheader=1024')
570 570 return caps
571 571
572 572 # If you are writting and extension and consider wrapping this function. Wrap
573 573 # `_capabilities` instead.
574 574 @wireprotocommand('capabilities')
575 575 def capabilities(repo, proto):
576 576 return ' '.join(_capabilities(repo, proto))
577 577
578 578 @wireprotocommand('changegroup', 'roots')
579 579 def changegroup(repo, proto, roots):
580 580 nodes = decodelist(roots)
581 581 cg = repo.changegroup(nodes, 'serve')
582 582 return streamres(proto.groupchunks(cg))
583 583
584 @wireprotocommand('changegroupsubset', 'bases heads')
584 585 def changegroupsubset(repo, proto, bases, heads):
585 586 bases = decodelist(bases)
586 587 heads = decodelist(heads)
587 588 cg = repo.changegroupsubset(bases, heads, 'serve')
588 589 return streamres(proto.groupchunks(cg))
589 590
590 591 def debugwireargs(repo, proto, one, two, others):
591 592 # only accept optional args from the known set
592 593 opts = options('debugwireargs', ['three', 'four'], others)
593 594 return repo.debugwireargs(one, two, **opts)
594 595
595 596 def getbundle(repo, proto, others):
596 597 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
597 598 for k, v in opts.iteritems():
598 599 if k in ('heads', 'common'):
599 600 opts[k] = decodelist(v)
600 601 elif k == 'bundlecaps':
601 602 opts[k] = set(v.split(','))
602 603 cg = repo.getbundle('serve', **opts)
603 604 return streamres(proto.groupchunks(cg))
604 605
605 606 def heads(repo, proto):
606 607 h = repo.heads()
607 608 return encodelist(h) + "\n"
608 609
609 610 def hello(repo, proto):
610 611 '''the hello command returns a set of lines describing various
611 612 interesting things about the server, in an RFC822-like format.
612 613 Currently the only one defined is "capabilities", which
613 614 consists of a line in the form:
614 615
615 616 capabilities: space separated list of tokens
616 617 '''
617 618 return "capabilities: %s\n" % (capabilities(repo, proto))
618 619
619 620 def listkeys(repo, proto, namespace):
620 621 d = repo.listkeys(encoding.tolocal(namespace)).items()
621 622 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
622 623 for k, v in d])
623 624 return t
624 625
625 626 def lookup(repo, proto, key):
626 627 try:
627 628 k = encoding.tolocal(key)
628 629 c = repo[k]
629 630 r = c.hex()
630 631 success = 1
631 632 except Exception, inst:
632 633 r = str(inst)
633 634 success = 0
634 635 return "%s %s\n" % (success, r)
635 636
636 637 def known(repo, proto, nodes, others):
637 638 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
638 639
639 640 def pushkey(repo, proto, namespace, key, old, new):
640 641 # compatibility with pre-1.8 clients which were accidentally
641 642 # sending raw binary nodes rather than utf-8-encoded hex
642 643 if len(new) == 20 and new.encode('string-escape') != new:
643 644 # looks like it could be a binary node
644 645 try:
645 646 new.decode('utf-8')
646 647 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
647 648 except UnicodeDecodeError:
648 649 pass # binary, leave unmodified
649 650 else:
650 651 new = encoding.tolocal(new) # normal path
651 652
652 653 if util.safehasattr(proto, 'restore'):
653 654
654 655 proto.redirect()
655 656
656 657 try:
657 658 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
658 659 encoding.tolocal(old), new) or False
659 660 except util.Abort:
660 661 r = False
661 662
662 663 output = proto.restore()
663 664
664 665 return '%s\n%s' % (int(r), output)
665 666
666 667 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
667 668 encoding.tolocal(old), new)
668 669 return '%s\n' % int(r)
669 670
670 671 def _allowstream(ui):
671 672 return ui.configbool('server', 'uncompressed', True, untrusted=True)
672 673
673 674 def _walkstreamfiles(repo):
674 675 # this is it's own function so extensions can override it
675 676 return repo.store.walk()
676 677
677 678 def stream(repo, proto):
678 679 '''If the server supports streaming clone, it advertises the "stream"
679 680 capability with a value representing the version and flags of the repo
680 681 it is serving. Client checks to see if it understands the format.
681 682
682 683 The format is simple: the server writes out a line with the amount
683 684 of files, then the total amount of bytes to be transferred (separated
684 685 by a space). Then, for each file, the server first writes the filename
685 686 and filesize (separated by the null character), then the file contents.
686 687 '''
687 688
688 689 if not _allowstream(repo.ui):
689 690 return '1\n'
690 691
691 692 entries = []
692 693 total_bytes = 0
693 694 try:
694 695 # get consistent snapshot of repo, lock during scan
695 696 lock = repo.lock()
696 697 try:
697 698 repo.ui.debug('scanning\n')
698 699 for name, ename, size in _walkstreamfiles(repo):
699 700 if size:
700 701 entries.append((name, size))
701 702 total_bytes += size
702 703 finally:
703 704 lock.release()
704 705 except error.LockError:
705 706 return '2\n' # error: 2
706 707
707 708 def streamer(repo, entries, total):
708 709 '''stream out all metadata files in repository.'''
709 710 yield '0\n' # success
710 711 repo.ui.debug('%d files, %d bytes to transfer\n' %
711 712 (len(entries), total_bytes))
712 713 yield '%d %d\n' % (len(entries), total_bytes)
713 714
714 715 sopener = repo.sopener
715 716 oldaudit = sopener.mustaudit
716 717 debugflag = repo.ui.debugflag
717 718 sopener.mustaudit = False
718 719
719 720 try:
720 721 for name, size in entries:
721 722 if debugflag:
722 723 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
723 724 # partially encode name over the wire for backwards compat
724 725 yield '%s\0%d\n' % (store.encodedir(name), size)
725 726 if size <= 65536:
726 727 fp = sopener(name)
727 728 try:
728 729 data = fp.read(size)
729 730 finally:
730 731 fp.close()
731 732 yield data
732 733 else:
733 734 for chunk in util.filechunkiter(sopener(name), limit=size):
734 735 yield chunk
735 736 # replace with "finally:" when support for python 2.4 has been dropped
736 737 except Exception:
737 738 sopener.mustaudit = oldaudit
738 739 raise
739 740 sopener.mustaudit = oldaudit
740 741
741 742 return streamres(streamer(repo, entries, total_bytes))
742 743
743 744 def unbundle(repo, proto, heads):
744 745 their_heads = decodelist(heads)
745 746
746 747 def check_heads():
747 748 heads = repo.heads()
748 749 heads_hash = util.sha1(''.join(sorted(heads))).digest()
749 750 return (their_heads == ['force'] or their_heads == heads or
750 751 their_heads == ['hashed', heads_hash])
751 752
752 753 proto.redirect()
753 754
754 755 # fail early if possible
755 756 if not check_heads():
756 757 return pusherr('repository changed while preparing changes - '
757 758 'please try again')
758 759
759 760 # write bundle data to temporary file because it can be big
760 761 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
761 762 fp = os.fdopen(fd, 'wb+')
762 763 r = 0
763 764 try:
764 765 proto.getfile(fp)
765 766 lock = repo.lock()
766 767 try:
767 768 if not check_heads():
768 769 # someone else committed/pushed/unbundled while we
769 770 # were transferring data
770 771 return pusherr('repository changed while uploading changes - '
771 772 'please try again')
772 773
773 774 # push can proceed
774 775 fp.seek(0)
775 776 gen = changegroupmod.readbundle(fp, None)
776 777
777 778 try:
778 779 r = repo.addchangegroup(gen, 'serve', proto._client())
779 780 except util.Abort, inst:
780 781 sys.stderr.write("abort: %s\n" % inst)
781 782 finally:
782 783 lock.release()
783 784 return pushres(r)
784 785
785 786 finally:
786 787 fp.close()
787 788 os.unlink(tempname)
788 789
789 790 commands.update({
790 'changegroupsubset': (changegroupsubset, 'bases heads'),
791 791 'debugwireargs': (debugwireargs, 'one two *'),
792 792 'getbundle': (getbundle, '*'),
793 793 'heads': (heads, ''),
794 794 'hello': (hello, ''),
795 795 'known': (known, 'nodes *'),
796 796 'listkeys': (listkeys, 'namespace'),
797 797 'lookup': (lookup, 'key'),
798 798 'pushkey': (pushkey, 'namespace key old new'),
799 799 'stream_out': (stream, ''),
800 800 'unbundle': (unbundle, 'heads'),
801 801 })
General Comments 0
You need to be logged in to leave comments. Login now