##// END OF EJS Templates
clone: don't include empty revlogs in stream
Mads Kiilerich -
r18381:7ac4449f default
parent child Browse files
Show More
@@ -1,653 +1,654
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import peer, error, encoding, util, store
13 13
14 14 # abstract batching support
15 15
16 16 class future(object):
17 17 '''placeholder for a value to be set later'''
18 18 def set(self, value):
19 19 if util.safehasattr(self, 'value'):
20 20 raise error.RepoError("future is already set")
21 21 self.value = value
22 22
23 23 class batcher(object):
24 24 '''base class for batches of commands submittable in a single request
25 25
26 26 All methods invoked on instances of this class are simply queued and
27 27 return a a future for the result. Once you call submit(), all the queued
28 28 calls are performed and the results set in their respective futures.
29 29 '''
30 30 def __init__(self):
31 31 self.calls = []
32 32 def __getattr__(self, name):
33 33 def call(*args, **opts):
34 34 resref = future()
35 35 self.calls.append((name, args, opts, resref,))
36 36 return resref
37 37 return call
38 38 def submit(self):
39 39 pass
40 40
41 41 class localbatch(batcher):
42 42 '''performs the queued calls directly'''
43 43 def __init__(self, local):
44 44 batcher.__init__(self)
45 45 self.local = local
46 46 def submit(self):
47 47 for name, args, opts, resref in self.calls:
48 48 resref.set(getattr(self.local, name)(*args, **opts))
49 49
50 50 class remotebatch(batcher):
51 51 '''batches the queued calls; uses as few roundtrips as possible'''
52 52 def __init__(self, remote):
53 53 '''remote must support _submitbatch(encbatch) and
54 54 _submitone(op, encargs)'''
55 55 batcher.__init__(self)
56 56 self.remote = remote
57 57 def submit(self):
58 58 req, rsp = [], []
59 59 for name, args, opts, resref in self.calls:
60 60 mtd = getattr(self.remote, name)
61 61 batchablefn = getattr(mtd, 'batchable', None)
62 62 if batchablefn is not None:
63 63 batchable = batchablefn(mtd.im_self, *args, **opts)
64 64 encargsorres, encresref = batchable.next()
65 65 if encresref:
66 66 req.append((name, encargsorres,))
67 67 rsp.append((batchable, encresref, resref,))
68 68 else:
69 69 resref.set(encargsorres)
70 70 else:
71 71 if req:
72 72 self._submitreq(req, rsp)
73 73 req, rsp = [], []
74 74 resref.set(mtd(*args, **opts))
75 75 if req:
76 76 self._submitreq(req, rsp)
77 77 def _submitreq(self, req, rsp):
78 78 encresults = self.remote._submitbatch(req)
79 79 for encres, r in zip(encresults, rsp):
80 80 batchable, encresref, resref = r
81 81 encresref.set(encres)
82 82 resref.set(batchable.next())
83 83
84 84 def batchable(f):
85 85 '''annotation for batchable methods
86 86
87 87 Such methods must implement a coroutine as follows:
88 88
89 89 @batchable
90 90 def sample(self, one, two=None):
91 91 # Handle locally computable results first:
92 92 if not one:
93 93 yield "a local result", None
94 94 # Build list of encoded arguments suitable for your wire protocol:
95 95 encargs = [('one', encode(one),), ('two', encode(two),)]
96 96 # Create future for injection of encoded result:
97 97 encresref = future()
98 98 # Return encoded arguments and future:
99 99 yield encargs, encresref
100 100 # Assuming the future to be filled with the result from the batched
101 101 # request now. Decode it:
102 102 yield decode(encresref.value)
103 103
104 104 The decorator returns a function which wraps this coroutine as a plain
105 105 method, but adds the original method as an attribute called "batchable",
106 106 which is used by remotebatch to split the call into separate encoding and
107 107 decoding phases.
108 108 '''
109 109 def plain(*args, **opts):
110 110 batchable = f(*args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if not encresref:
113 113 return encargsorres # a local result in this case
114 114 self = args[0]
115 115 encresref.set(self._submitone(f.func_name, encargsorres))
116 116 return batchable.next()
117 117 setattr(plain, 'batchable', f)
118 118 return plain
119 119
120 120 # list of nodes encoding / decoding
121 121
122 122 def decodelist(l, sep=' '):
123 123 if l:
124 124 return map(bin, l.split(sep))
125 125 return []
126 126
127 127 def encodelist(l, sep=' '):
128 128 return sep.join(map(hex, l))
129 129
130 130 # batched call argument encoding
131 131
132 132 def escapearg(plain):
133 133 return (plain
134 134 .replace(':', '::')
135 135 .replace(',', ':,')
136 136 .replace(';', ':;')
137 137 .replace('=', ':='))
138 138
139 139 def unescapearg(escaped):
140 140 return (escaped
141 141 .replace(':=', '=')
142 142 .replace(':;', ';')
143 143 .replace(':,', ',')
144 144 .replace('::', ':'))
145 145
146 146 # client side
147 147
148 148 def todict(**args):
149 149 return args
150 150
151 151 class wirepeer(peer.peerrepository):
152 152
153 153 def batch(self):
154 154 return remotebatch(self)
155 155 def _submitbatch(self, req):
156 156 cmds = []
157 157 for op, argsdict in req:
158 158 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
159 159 cmds.append('%s %s' % (op, args))
160 160 rsp = self._call("batch", cmds=';'.join(cmds))
161 161 return rsp.split(';')
162 162 def _submitone(self, op, args):
163 163 return self._call(op, **args)
164 164
165 165 @batchable
166 166 def lookup(self, key):
167 167 self.requirecap('lookup', _('look up remote revision'))
168 168 f = future()
169 169 yield todict(key=encoding.fromlocal(key)), f
170 170 d = f.value
171 171 success, data = d[:-1].split(" ", 1)
172 172 if int(success):
173 173 yield bin(data)
174 174 self._abort(error.RepoError(data))
175 175
176 176 @batchable
177 177 def heads(self):
178 178 f = future()
179 179 yield {}, f
180 180 d = f.value
181 181 try:
182 182 yield decodelist(d[:-1])
183 183 except ValueError:
184 184 self._abort(error.ResponseError(_("unexpected response:"), d))
185 185
186 186 @batchable
187 187 def known(self, nodes):
188 188 f = future()
189 189 yield todict(nodes=encodelist(nodes)), f
190 190 d = f.value
191 191 try:
192 192 yield [bool(int(f)) for f in d]
193 193 except ValueError:
194 194 self._abort(error.ResponseError(_("unexpected response:"), d))
195 195
196 196 @batchable
197 197 def branchmap(self):
198 198 f = future()
199 199 yield {}, f
200 200 d = f.value
201 201 try:
202 202 branchmap = {}
203 203 for branchpart in d.splitlines():
204 204 branchname, branchheads = branchpart.split(' ', 1)
205 205 branchname = encoding.tolocal(urllib.unquote(branchname))
206 206 branchheads = decodelist(branchheads)
207 207 branchmap[branchname] = branchheads
208 208 yield branchmap
209 209 except TypeError:
210 210 self._abort(error.ResponseError(_("unexpected response:"), d))
211 211
212 212 def branches(self, nodes):
213 213 n = encodelist(nodes)
214 214 d = self._call("branches", nodes=n)
215 215 try:
216 216 br = [tuple(decodelist(b)) for b in d.splitlines()]
217 217 return br
218 218 except ValueError:
219 219 self._abort(error.ResponseError(_("unexpected response:"), d))
220 220
221 221 def between(self, pairs):
222 222 batch = 8 # avoid giant requests
223 223 r = []
224 224 for i in xrange(0, len(pairs), batch):
225 225 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
226 226 d = self._call("between", pairs=n)
227 227 try:
228 228 r.extend(l and decodelist(l) or [] for l in d.splitlines())
229 229 except ValueError:
230 230 self._abort(error.ResponseError(_("unexpected response:"), d))
231 231 return r
232 232
233 233 @batchable
234 234 def pushkey(self, namespace, key, old, new):
235 235 if not self.capable('pushkey'):
236 236 yield False, None
237 237 f = future()
238 238 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
239 239 yield todict(namespace=encoding.fromlocal(namespace),
240 240 key=encoding.fromlocal(key),
241 241 old=encoding.fromlocal(old),
242 242 new=encoding.fromlocal(new)), f
243 243 d = f.value
244 244 d, output = d.split('\n', 1)
245 245 try:
246 246 d = bool(int(d))
247 247 except ValueError:
248 248 raise error.ResponseError(
249 249 _('push failed (unexpected response):'), d)
250 250 for l in output.splitlines(True):
251 251 self.ui.status(_('remote: '), l)
252 252 yield d
253 253
254 254 @batchable
255 255 def listkeys(self, namespace):
256 256 if not self.capable('pushkey'):
257 257 yield {}, None
258 258 f = future()
259 259 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
260 260 yield todict(namespace=encoding.fromlocal(namespace)), f
261 261 d = f.value
262 262 r = {}
263 263 for l in d.splitlines():
264 264 k, v = l.split('\t')
265 265 r[encoding.tolocal(k)] = encoding.tolocal(v)
266 266 yield r
267 267
268 268 def stream_out(self):
269 269 return self._callstream('stream_out')
270 270
271 271 def changegroup(self, nodes, kind):
272 272 n = encodelist(nodes)
273 273 f = self._callstream("changegroup", roots=n)
274 274 return changegroupmod.unbundle10(self._decompress(f), 'UN')
275 275
276 276 def changegroupsubset(self, bases, heads, kind):
277 277 self.requirecap('changegroupsubset', _('look up remote changes'))
278 278 bases = encodelist(bases)
279 279 heads = encodelist(heads)
280 280 f = self._callstream("changegroupsubset",
281 281 bases=bases, heads=heads)
282 282 return changegroupmod.unbundle10(self._decompress(f), 'UN')
283 283
284 284 def getbundle(self, source, heads=None, common=None):
285 285 self.requirecap('getbundle', _('look up remote changes'))
286 286 opts = {}
287 287 if heads is not None:
288 288 opts['heads'] = encodelist(heads)
289 289 if common is not None:
290 290 opts['common'] = encodelist(common)
291 291 f = self._callstream("getbundle", **opts)
292 292 return changegroupmod.unbundle10(self._decompress(f), 'UN')
293 293
294 294 def unbundle(self, cg, heads, source):
295 295 '''Send cg (a readable file-like object representing the
296 296 changegroup to push, typically a chunkbuffer object) to the
297 297 remote server as a bundle. Return an integer indicating the
298 298 result of the push (see localrepository.addchangegroup()).'''
299 299
300 300 if heads != ['force'] and self.capable('unbundlehash'):
301 301 heads = encodelist(['hashed',
302 302 util.sha1(''.join(sorted(heads))).digest()])
303 303 else:
304 304 heads = encodelist(heads)
305 305
306 306 ret, output = self._callpush("unbundle", cg, heads=heads)
307 307 if ret == "":
308 308 raise error.ResponseError(
309 309 _('push failed:'), output)
310 310 try:
311 311 ret = int(ret)
312 312 except ValueError:
313 313 raise error.ResponseError(
314 314 _('push failed (unexpected response):'), ret)
315 315
316 316 for l in output.splitlines(True):
317 317 self.ui.status(_('remote: '), l)
318 318 return ret
319 319
320 320 def debugwireargs(self, one, two, three=None, four=None, five=None):
321 321 # don't pass optional arguments left at their default value
322 322 opts = {}
323 323 if three is not None:
324 324 opts['three'] = three
325 325 if four is not None:
326 326 opts['four'] = four
327 327 return self._call('debugwireargs', one=one, two=two, **opts)
328 328
329 329 # server side
330 330
331 331 class streamres(object):
332 332 def __init__(self, gen):
333 333 self.gen = gen
334 334
335 335 class pushres(object):
336 336 def __init__(self, res):
337 337 self.res = res
338 338
339 339 class pusherr(object):
340 340 def __init__(self, res):
341 341 self.res = res
342 342
343 343 class ooberror(object):
344 344 def __init__(self, message):
345 345 self.message = message
346 346
347 347 def dispatch(repo, proto, command):
348 348 repo = repo.filtered("unserved")
349 349 func, spec = commands[command]
350 350 args = proto.getargs(spec)
351 351 return func(repo, proto, *args)
352 352
353 353 def options(cmd, keys, others):
354 354 opts = {}
355 355 for k in keys:
356 356 if k in others:
357 357 opts[k] = others[k]
358 358 del others[k]
359 359 if others:
360 360 sys.stderr.write("abort: %s got unexpected arguments %s\n"
361 361 % (cmd, ",".join(others)))
362 362 return opts
363 363
364 364 def batch(repo, proto, cmds, others):
365 365 repo = repo.filtered("unserved")
366 366 res = []
367 367 for pair in cmds.split(';'):
368 368 op, args = pair.split(' ', 1)
369 369 vals = {}
370 370 for a in args.split(','):
371 371 if a:
372 372 n, v = a.split('=')
373 373 vals[n] = unescapearg(v)
374 374 func, spec = commands[op]
375 375 if spec:
376 376 keys = spec.split()
377 377 data = {}
378 378 for k in keys:
379 379 if k == '*':
380 380 star = {}
381 381 for key in vals.keys():
382 382 if key not in keys:
383 383 star[key] = vals[key]
384 384 data['*'] = star
385 385 else:
386 386 data[k] = vals[k]
387 387 result = func(repo, proto, *[data[k] for k in keys])
388 388 else:
389 389 result = func(repo, proto)
390 390 if isinstance(result, ooberror):
391 391 return result
392 392 res.append(escapearg(result))
393 393 return ';'.join(res)
394 394
395 395 def between(repo, proto, pairs):
396 396 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
397 397 r = []
398 398 for b in repo.between(pairs):
399 399 r.append(encodelist(b) + "\n")
400 400 return "".join(r)
401 401
402 402 def branchmap(repo, proto):
403 403 branchmap = repo.branchmap()
404 404 heads = []
405 405 for branch, nodes in branchmap.iteritems():
406 406 branchname = urllib.quote(encoding.fromlocal(branch))
407 407 branchnodes = encodelist(nodes)
408 408 heads.append('%s %s' % (branchname, branchnodes))
409 409 return '\n'.join(heads)
410 410
411 411 def branches(repo, proto, nodes):
412 412 nodes = decodelist(nodes)
413 413 r = []
414 414 for b in repo.branches(nodes):
415 415 r.append(encodelist(b) + "\n")
416 416 return "".join(r)
417 417
418 418 def capabilities(repo, proto):
419 419 caps = ('lookup changegroupsubset branchmap pushkey known getbundle '
420 420 'unbundlehash batch').split()
421 421 if _allowstream(repo.ui):
422 422 if repo.ui.configbool('server', 'preferuncompressed', False):
423 423 caps.append('stream-preferred')
424 424 requiredformats = repo.requirements & repo.supportedformats
425 425 # if our local revlogs are just revlogv1, add 'stream' cap
426 426 if not requiredformats - set(('revlogv1',)):
427 427 caps.append('stream')
428 428 # otherwise, add 'streamreqs' detailing our local revlog format
429 429 else:
430 430 caps.append('streamreqs=%s' % ','.join(requiredformats))
431 431 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
432 432 caps.append('httpheader=1024')
433 433 return ' '.join(caps)
434 434
435 435 def changegroup(repo, proto, roots):
436 436 nodes = decodelist(roots)
437 437 cg = repo.changegroup(nodes, 'serve')
438 438 return streamres(proto.groupchunks(cg))
439 439
440 440 def changegroupsubset(repo, proto, bases, heads):
441 441 bases = decodelist(bases)
442 442 heads = decodelist(heads)
443 443 cg = repo.changegroupsubset(bases, heads, 'serve')
444 444 return streamres(proto.groupchunks(cg))
445 445
446 446 def debugwireargs(repo, proto, one, two, others):
447 447 # only accept optional args from the known set
448 448 opts = options('debugwireargs', ['three', 'four'], others)
449 449 return repo.debugwireargs(one, two, **opts)
450 450
451 451 def getbundle(repo, proto, others):
452 452 opts = options('getbundle', ['heads', 'common'], others)
453 453 for k, v in opts.iteritems():
454 454 opts[k] = decodelist(v)
455 455 cg = repo.getbundle('serve', **opts)
456 456 return streamres(proto.groupchunks(cg))
457 457
458 458 def heads(repo, proto):
459 459 h = repo.heads()
460 460 return encodelist(h) + "\n"
461 461
462 462 def hello(repo, proto):
463 463 '''the hello command returns a set of lines describing various
464 464 interesting things about the server, in an RFC822-like format.
465 465 Currently the only one defined is "capabilities", which
466 466 consists of a line in the form:
467 467
468 468 capabilities: space separated list of tokens
469 469 '''
470 470 return "capabilities: %s\n" % (capabilities(repo, proto))
471 471
472 472 def listkeys(repo, proto, namespace):
473 473 d = repo.listkeys(encoding.tolocal(namespace)).items()
474 474 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
475 475 for k, v in d])
476 476 return t
477 477
478 478 def lookup(repo, proto, key):
479 479 try:
480 480 k = encoding.tolocal(key)
481 481 c = repo[k]
482 482 r = c.hex()
483 483 success = 1
484 484 except Exception, inst:
485 485 r = str(inst)
486 486 success = 0
487 487 return "%s %s\n" % (success, r)
488 488
489 489 def known(repo, proto, nodes, others):
490 490 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
491 491
492 492 def pushkey(repo, proto, namespace, key, old, new):
493 493 # compatibility with pre-1.8 clients which were accidentally
494 494 # sending raw binary nodes rather than utf-8-encoded hex
495 495 if len(new) == 20 and new.encode('string-escape') != new:
496 496 # looks like it could be a binary node
497 497 try:
498 498 new.decode('utf-8')
499 499 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
500 500 except UnicodeDecodeError:
501 501 pass # binary, leave unmodified
502 502 else:
503 503 new = encoding.tolocal(new) # normal path
504 504
505 505 if util.safehasattr(proto, 'restore'):
506 506
507 507 proto.redirect()
508 508
509 509 try:
510 510 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
511 511 encoding.tolocal(old), new) or False
512 512 except util.Abort:
513 513 r = False
514 514
515 515 output = proto.restore()
516 516
517 517 return '%s\n%s' % (int(r), output)
518 518
519 519 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
520 520 encoding.tolocal(old), new)
521 521 return '%s\n' % int(r)
522 522
523 523 def _allowstream(ui):
524 524 return ui.configbool('server', 'uncompressed', True, untrusted=True)
525 525
526 526 def stream(repo, proto):
527 527 '''If the server supports streaming clone, it advertises the "stream"
528 528 capability with a value representing the version and flags of the repo
529 529 it is serving. Client checks to see if it understands the format.
530 530
531 531 The format is simple: the server writes out a line with the amount
532 532 of files, then the total amount of bytes to be transferred (separated
533 533 by a space). Then, for each file, the server first writes the filename
534 534 and filesize (separated by the null character), then the file contents.
535 535 '''
536 536
537 537 if not _allowstream(repo.ui):
538 538 return '1\n'
539 539
540 540 entries = []
541 541 total_bytes = 0
542 542 try:
543 543 # get consistent snapshot of repo, lock during scan
544 544 lock = repo.lock()
545 545 try:
546 546 repo.ui.debug('scanning\n')
547 547 for name, ename, size in repo.store.walk():
548 if size:
548 549 entries.append((name, size))
549 550 total_bytes += size
550 551 finally:
551 552 lock.release()
552 553 except error.LockError:
553 554 return '2\n' # error: 2
554 555
555 556 def streamer(repo, entries, total):
556 557 '''stream out all metadata files in repository.'''
557 558 yield '0\n' # success
558 559 repo.ui.debug('%d files, %d bytes to transfer\n' %
559 560 (len(entries), total_bytes))
560 561 yield '%d %d\n' % (len(entries), total_bytes)
561 562
562 563 sopener = repo.sopener
563 564 oldaudit = sopener.mustaudit
564 565 debugflag = repo.ui.debugflag
565 566 sopener.mustaudit = False
566 567
567 568 try:
568 569 for name, size in entries:
569 570 if debugflag:
570 571 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
571 572 # partially encode name over the wire for backwards compat
572 573 yield '%s\0%d\n' % (store.encodedir(name), size)
573 574 if size <= 65536:
574 575 fp = sopener(name)
575 576 try:
576 577 data = fp.read(size)
577 578 finally:
578 579 fp.close()
579 580 yield data
580 581 else:
581 582 for chunk in util.filechunkiter(sopener(name), limit=size):
582 583 yield chunk
583 584 # replace with "finally:" when support for python 2.4 has been dropped
584 585 except Exception:
585 586 sopener.mustaudit = oldaudit
586 587 raise
587 588 sopener.mustaudit = oldaudit
588 589
589 590 return streamres(streamer(repo, entries, total_bytes))
590 591
591 592 def unbundle(repo, proto, heads):
592 593 their_heads = decodelist(heads)
593 594
594 595 def check_heads():
595 596 heads = repo.heads()
596 597 heads_hash = util.sha1(''.join(sorted(heads))).digest()
597 598 return (their_heads == ['force'] or their_heads == heads or
598 599 their_heads == ['hashed', heads_hash])
599 600
600 601 proto.redirect()
601 602
602 603 # fail early if possible
603 604 if not check_heads():
604 605 return pusherr('unsynced changes')
605 606
606 607 # write bundle data to temporary file because it can be big
607 608 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
608 609 fp = os.fdopen(fd, 'wb+')
609 610 r = 0
610 611 try:
611 612 proto.getfile(fp)
612 613 lock = repo.lock()
613 614 try:
614 615 if not check_heads():
615 616 # someone else committed/pushed/unbundled while we
616 617 # were transferring data
617 618 return pusherr('unsynced changes')
618 619
619 620 # push can proceed
620 621 fp.seek(0)
621 622 gen = changegroupmod.readbundle(fp, None)
622 623
623 624 try:
624 625 r = repo.addchangegroup(gen, 'serve', proto._client())
625 626 except util.Abort, inst:
626 627 sys.stderr.write("abort: %s\n" % inst)
627 628 finally:
628 629 lock.release()
629 630 return pushres(r)
630 631
631 632 finally:
632 633 fp.close()
633 634 os.unlink(tempname)
634 635
635 636 commands = {
636 637 'batch': (batch, 'cmds *'),
637 638 'between': (between, 'pairs'),
638 639 'branchmap': (branchmap, ''),
639 640 'branches': (branches, 'nodes'),
640 641 'capabilities': (capabilities, ''),
641 642 'changegroup': (changegroup, 'roots'),
642 643 'changegroupsubset': (changegroupsubset, 'bases heads'),
643 644 'debugwireargs': (debugwireargs, 'one two *'),
644 645 'getbundle': (getbundle, '*'),
645 646 'heads': (heads, ''),
646 647 'hello': (hello, ''),
647 648 'known': (known, 'nodes *'),
648 649 'listkeys': (listkeys, 'namespace'),
649 650 'lookup': (lookup, 'key'),
650 651 'pushkey': (pushkey, 'namespace key old new'),
651 652 'stream_out': (stream, ''),
652 653 'unbundle': (unbundle, 'heads'),
653 654 }
General Comments 0
You need to be logged in to leave comments. Login now