##// END OF EJS Templates
wireproto: perform chunking and compression at protocol layer (API)...
Gregory Szorc -
r30466:2add671b default
parent child Browse files
Show More
@@ -1,189 +1,189
1 1 # Copyright 2011 Fog Creek Software
2 2 #
3 3 # This software may be used and distributed according to the terms of the
4 4 # GNU General Public License version 2 or any later version.
5 5 from __future__ import absolute_import
6 6
7 7 import os
8 8 import re
9 9
10 10 from mercurial.i18n import _
11 11
12 12 from mercurial import (
13 13 error,
14 14 httppeer,
15 15 util,
16 16 wireproto,
17 17 )
18 18
19 19 from . import (
20 20 lfutil,
21 21 )
22 22
23 23 urlerr = util.urlerr
24 24 urlreq = util.urlreq
25 25
26 26 LARGEFILES_REQUIRED_MSG = ('\nThis repository uses the largefiles extension.'
27 27 '\n\nPlease enable it in your Mercurial config '
28 28 'file.\n')
29 29
30 30 # these will all be replaced by largefiles.uisetup
31 31 capabilitiesorig = None
32 32 ssholdcallstream = None
33 33 httpoldcallstream = None
34 34
35 35 def putlfile(repo, proto, sha):
36 36 '''Server command for putting a largefile into a repository's local store
37 37 and into the user cache.'''
38 38 proto.redirect()
39 39
40 40 path = lfutil.storepath(repo, sha)
41 41 util.makedirs(os.path.dirname(path))
42 42 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
43 43
44 44 try:
45 45 proto.getfile(tmpfp)
46 46 tmpfp._fp.seek(0)
47 47 if sha != lfutil.hexsha1(tmpfp._fp):
48 48 raise IOError(0, _('largefile contents do not match hash'))
49 49 tmpfp.close()
50 50 lfutil.linktousercache(repo, sha)
51 51 except IOError as e:
52 52 repo.ui.warn(_('largefiles: failed to put %s into store: %s\n') %
53 53 (sha, e.strerror))
54 54 return wireproto.pushres(1)
55 55 finally:
56 56 tmpfp.discard()
57 57
58 58 return wireproto.pushres(0)
59 59
60 60 def getlfile(repo, proto, sha):
61 61 '''Server command for retrieving a largefile from the repository-local
62 62 cache or user cache.'''
63 63 filename = lfutil.findfile(repo, sha)
64 64 if not filename:
65 65 raise error.Abort(_('requested largefile %s not present in cache')
66 66 % sha)
67 67 f = open(filename, 'rb')
68 68 length = os.fstat(f.fileno())[6]
69 69
70 70 # Since we can't set an HTTP content-length header here, and
71 71 # Mercurial core provides no way to give the length of a streamres
72 72 # (and reading the entire file into RAM would be ill-advised), we
73 73 # just send the length on the first line of the response, like the
74 74 # ssh proto does for string responses.
75 75 def generator():
76 76 yield '%d\n' % length
77 77 for chunk in util.filechunkiter(f):
78 78 yield chunk
79 return wireproto.streamres(generator())
79 return wireproto.streamres(gen=generator())
80 80
81 81 def statlfile(repo, proto, sha):
82 82 '''Server command for checking if a largefile is present - returns '2\n' if
83 83 the largefile is missing, '0\n' if it seems to be in good condition.
84 84
85 85 The value 1 is reserved for mismatched checksum, but that is too expensive
86 86 to be verified on every stat and must be caught be running 'hg verify'
87 87 server side.'''
88 88 filename = lfutil.findfile(repo, sha)
89 89 if not filename:
90 90 return '2\n'
91 91 return '0\n'
92 92
93 93 def wirereposetup(ui, repo):
94 94 class lfileswirerepository(repo.__class__):
95 95 def putlfile(self, sha, fd):
96 96 # unfortunately, httprepository._callpush tries to convert its
97 97 # input file-like into a bundle before sending it, so we can't use
98 98 # it ...
99 99 if issubclass(self.__class__, httppeer.httppeer):
100 100 res = self._call('putlfile', data=fd, sha=sha,
101 101 headers={'content-type':'application/mercurial-0.1'})
102 102 try:
103 103 d, output = res.split('\n', 1)
104 104 for l in output.splitlines(True):
105 105 self.ui.warn(_('remote: '), l) # assume l ends with \n
106 106 return int(d)
107 107 except ValueError:
108 108 self.ui.warn(_('unexpected putlfile response: %r\n') % res)
109 109 return 1
110 110 # ... but we can't use sshrepository._call because the data=
111 111 # argument won't get sent, and _callpush does exactly what we want
112 112 # in this case: send the data straight through
113 113 else:
114 114 try:
115 115 ret, output = self._callpush("putlfile", fd, sha=sha)
116 116 if ret == "":
117 117 raise error.ResponseError(_('putlfile failed:'),
118 118 output)
119 119 return int(ret)
120 120 except IOError:
121 121 return 1
122 122 except ValueError:
123 123 raise error.ResponseError(
124 124 _('putlfile failed (unexpected response):'), ret)
125 125
126 126 def getlfile(self, sha):
127 127 """returns an iterable with the chunks of the file with sha sha"""
128 128 stream = self._callstream("getlfile", sha=sha)
129 129 length = stream.readline()
130 130 try:
131 131 length = int(length)
132 132 except ValueError:
133 133 self._abort(error.ResponseError(_("unexpected response:"),
134 134 length))
135 135
136 136 # SSH streams will block if reading more than length
137 137 for chunk in util.filechunkiter(stream, limit=length):
138 138 yield chunk
139 139 # HTTP streams must hit the end to process the last empty
140 140 # chunk of Chunked-Encoding so the connection can be reused.
141 141 if issubclass(self.__class__, httppeer.httppeer):
142 142 chunk = stream.read(1)
143 143 if chunk:
144 144 self._abort(error.ResponseError(_("unexpected response:"),
145 145 chunk))
146 146
147 147 @wireproto.batchable
148 148 def statlfile(self, sha):
149 149 f = wireproto.future()
150 150 result = {'sha': sha}
151 151 yield result, f
152 152 try:
153 153 yield int(f.value)
154 154 except (ValueError, urlerr.httperror):
155 155 # If the server returns anything but an integer followed by a
156 156 # newline, newline, it's not speaking our language; if we get
157 157 # an HTTP error, we can't be sure the largefile is present;
158 158 # either way, consider it missing.
159 159 yield 2
160 160
161 161 repo.__class__ = lfileswirerepository
162 162
163 163 # advertise the largefiles=serve capability
164 164 def capabilities(repo, proto):
165 165 '''Wrap server command to announce largefile server capability'''
166 166 return capabilitiesorig(repo, proto) + ' largefiles=serve'
167 167
168 168 def heads(repo, proto):
169 169 '''Wrap server command - largefile capable clients will know to call
170 170 lheads instead'''
171 171 if lfutil.islfilesrepo(repo):
172 172 return wireproto.ooberror(LARGEFILES_REQUIRED_MSG)
173 173 return wireproto.heads(repo, proto)
174 174
175 175 def sshrepocallstream(self, cmd, **args):
176 176 if cmd == 'heads' and self.capable('largefiles'):
177 177 cmd = 'lheads'
178 178 if cmd == 'batch' and self.capable('largefiles'):
179 179 args['cmds'] = args['cmds'].replace('heads ', 'lheads ')
180 180 return ssholdcallstream(self, cmd, **args)
181 181
182 182 headsre = re.compile(r'(^|;)heads\b')
183 183
184 184 def httprepocallstream(self, cmd, **args):
185 185 if cmd == 'heads' and self.capable('largefiles'):
186 186 cmd = 'lheads'
187 187 if cmd == 'batch' and self.capable('largefiles'):
188 188 args['cmds'] = headsre.sub('lheads', args['cmds'])
189 189 return httpoldcallstream(self, cmd, **args)
@@ -1,126 +1,124
1 1 #
2 2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import cgi
11 11
12 12 from .common import (
13 13 HTTP_OK,
14 14 )
15 15
16 16 from .. import (
17 17 util,
18 18 wireproto,
19 19 )
20 20 stringio = util.stringio
21 21
22 22 urlerr = util.urlerr
23 23 urlreq = util.urlreq
24 24
25 25 HGTYPE = 'application/mercurial-0.1'
26 26 HGERRTYPE = 'application/hg-error'
27 27
28 28 class webproto(wireproto.abstractserverproto):
29 29 def __init__(self, req, ui):
30 30 self.req = req
31 31 self.response = ''
32 32 self.ui = ui
33 33 def getargs(self, args):
34 34 knownargs = self._args()
35 35 data = {}
36 36 keys = args.split()
37 37 for k in keys:
38 38 if k == '*':
39 39 star = {}
40 40 for key in knownargs.keys():
41 41 if key != 'cmd' and key not in keys:
42 42 star[key] = knownargs[key][0]
43 43 data['*'] = star
44 44 else:
45 45 data[k] = knownargs[k][0]
46 46 return [data[k] for k in keys]
47 47 def _args(self):
48 48 args = self.req.form.copy()
49 49 postlen = int(self.req.env.get('HTTP_X_HGARGS_POST', 0))
50 50 if postlen:
51 51 args.update(cgi.parse_qs(
52 52 self.req.read(postlen), keep_blank_values=True))
53 53 return args
54 54 chunks = []
55 55 i = 1
56 56 while True:
57 57 h = self.req.env.get('HTTP_X_HGARG_' + str(i))
58 58 if h is None:
59 59 break
60 60 chunks += [h]
61 61 i += 1
62 62 args.update(cgi.parse_qs(''.join(chunks), keep_blank_values=True))
63 63 return args
64 64 def getfile(self, fp):
65 65 length = int(self.req.env['CONTENT_LENGTH'])
66 66 for s in util.filechunkiter(self.req, limit=length):
67 67 fp.write(s)
68 68 def redirect(self):
69 69 self.oldio = self.ui.fout, self.ui.ferr
70 70 self.ui.ferr = self.ui.fout = stringio()
71 71 def restore(self):
72 72 val = self.ui.fout.getvalue()
73 73 self.ui.ferr, self.ui.fout = self.oldio
74 74 return val
75 75
76 def groupchunks(self, fh):
77 def getchunks():
78 while True:
79 chunk = fh.read(32768)
80 if not chunk:
81 break
82 yield chunk
83
84 return self.compresschunks(getchunks())
85
86 76 def compresschunks(self, chunks):
87 77 # Don't allow untrusted settings because disabling compression or
88 78 # setting a very high compression level could lead to flooding
89 79 # the server's network or CPU.
90 80 opts = {'level': self.ui.configint('server', 'zliblevel', -1)}
91 81 return util.compengines['zlib'].compressstream(chunks, opts)
92 82
93 83 def _client(self):
94 84 return 'remote:%s:%s:%s' % (
95 85 self.req.env.get('wsgi.url_scheme') or 'http',
96 86 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
97 87 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
98 88
99 89 def iscmd(cmd):
100 90 return cmd in wireproto.commands
101 91
102 92 def call(repo, req, cmd):
103 93 p = webproto(req, repo.ui)
104 94 rsp = wireproto.dispatch(repo, p, cmd)
105 95 if isinstance(rsp, str):
106 96 req.respond(HTTP_OK, HGTYPE, body=rsp)
107 97 return []
108 98 elif isinstance(rsp, wireproto.streamres):
99 if rsp.reader:
100 gen = iter(lambda: rsp.reader.read(32768), '')
101 else:
102 gen = rsp.gen
103
104 if rsp.v1compressible:
105 gen = p.compresschunks(gen)
106
109 107 req.respond(HTTP_OK, HGTYPE)
110 return rsp.gen
108 return gen
111 109 elif isinstance(rsp, wireproto.pushres):
112 110 val = p.restore()
113 111 rsp = '%d\n%s' % (rsp.res, val)
114 112 req.respond(HTTP_OK, HGTYPE, body=rsp)
115 113 return []
116 114 elif isinstance(rsp, wireproto.pusherr):
117 115 # drain the incoming bundle
118 116 req.drain()
119 117 p.restore()
120 118 rsp = '0\n%s\n' % rsp.res
121 119 req.respond(HTTP_OK, HGTYPE, body=rsp)
122 120 return []
123 121 elif isinstance(rsp, wireproto.ooberror):
124 122 rsp = rsp.message
125 123 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
126 124 return []
@@ -1,135 +1,134
1 1 # sshserver.py - ssh protocol server support for mercurial
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import os
12 12 import sys
13 13
14 14 from .i18n import _
15 15 from . import (
16 16 error,
17 17 hook,
18 18 util,
19 19 wireproto,
20 20 )
21 21
22 22 class sshserver(wireproto.abstractserverproto):
23 23 def __init__(self, ui, repo):
24 24 self.ui = ui
25 25 self.repo = repo
26 26 self.lock = None
27 27 self.fin = ui.fin
28 28 self.fout = ui.fout
29 29
30 30 hook.redirect(True)
31 31 ui.fout = repo.ui.fout = ui.ferr
32 32
33 33 # Prevent insertion/deletion of CRs
34 34 util.setbinary(self.fin)
35 35 util.setbinary(self.fout)
36 36
37 37 def getargs(self, args):
38 38 data = {}
39 39 keys = args.split()
40 40 for n in xrange(len(keys)):
41 41 argline = self.fin.readline()[:-1]
42 42 arg, l = argline.split()
43 43 if arg not in keys:
44 44 raise error.Abort(_("unexpected parameter %r") % arg)
45 45 if arg == '*':
46 46 star = {}
47 47 for k in xrange(int(l)):
48 48 argline = self.fin.readline()[:-1]
49 49 arg, l = argline.split()
50 50 val = self.fin.read(int(l))
51 51 star[arg] = val
52 52 data['*'] = star
53 53 else:
54 54 val = self.fin.read(int(l))
55 55 data[arg] = val
56 56 return [data[k] for k in keys]
57 57
58 58 def getarg(self, name):
59 59 return self.getargs(name)[0]
60 60
61 61 def getfile(self, fpout):
62 62 self.sendresponse('')
63 63 count = int(self.fin.readline())
64 64 while count:
65 65 fpout.write(self.fin.read(count))
66 66 count = int(self.fin.readline())
67 67
68 68 def redirect(self):
69 69 pass
70 70
71 def groupchunks(self, fh):
72 return iter(lambda: fh.read(4096), '')
73
74 def compresschunks(self, chunks):
75 for chunk in chunks:
76 yield chunk
77
78 71 def sendresponse(self, v):
79 72 self.fout.write("%d\n" % len(v))
80 73 self.fout.write(v)
81 74 self.fout.flush()
82 75
83 76 def sendstream(self, source):
84 77 write = self.fout.write
85 for chunk in source.gen:
78
79 if source.reader:
80 gen = iter(lambda: source.reader.read(4096), '')
81 else:
82 gen = source.gen
83
84 for chunk in gen:
86 85 write(chunk)
87 86 self.fout.flush()
88 87
89 88 def sendpushresponse(self, rsp):
90 89 self.sendresponse('')
91 90 self.sendresponse(str(rsp.res))
92 91
93 92 def sendpusherror(self, rsp):
94 93 self.sendresponse(rsp.res)
95 94
96 95 def sendooberror(self, rsp):
97 96 self.ui.ferr.write('%s\n-\n' % rsp.message)
98 97 self.ui.ferr.flush()
99 98 self.fout.write('\n')
100 99 self.fout.flush()
101 100
102 101 def serve_forever(self):
103 102 try:
104 103 while self.serve_one():
105 104 pass
106 105 finally:
107 106 if self.lock is not None:
108 107 self.lock.release()
109 108 sys.exit(0)
110 109
111 110 handlers = {
112 111 str: sendresponse,
113 112 wireproto.streamres: sendstream,
114 113 wireproto.pushres: sendpushresponse,
115 114 wireproto.pusherr: sendpusherror,
116 115 wireproto.ooberror: sendooberror,
117 116 }
118 117
119 118 def serve_one(self):
120 119 cmd = self.fin.readline()[:-1]
121 120 if cmd and cmd in wireproto.commands:
122 121 rsp = wireproto.dispatch(self.repo, self, cmd)
123 122 self.handlers[rsp.__class__](self, rsp)
124 123 elif cmd:
125 124 impl = getattr(self, 'do_' + cmd, None)
126 125 if impl:
127 126 r = impl()
128 127 if r is not None:
129 128 self.sendresponse(r)
130 129 else: self.sendresponse("")
131 130 return cmd != ''
132 131
133 132 def _client(self):
134 133 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
135 134 return 'remote:ssh:' + client
@@ -1,965 +1,959
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import itertools
12 12 import os
13 13 import sys
14 14 import tempfile
15 15
16 16 from .i18n import _
17 17 from .node import (
18 18 bin,
19 19 hex,
20 20 )
21 21
22 22 from . import (
23 23 bundle2,
24 24 changegroup as changegroupmod,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 streamclone,
31 31 util,
32 32 )
33 33
34 34 urlerr = util.urlerr
35 35 urlreq = util.urlreq
36 36
37 37 bundle2required = _(
38 38 'incompatible Mercurial client; bundle2 required\n'
39 39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
40 40
41 41 class abstractserverproto(object):
42 42 """abstract class that summarizes the protocol API
43 43
44 44 Used as reference and documentation.
45 45 """
46 46
47 47 def getargs(self, args):
48 48 """return the value for arguments in <args>
49 49
50 50 returns a list of values (same order as <args>)"""
51 51 raise NotImplementedError()
52 52
53 53 def getfile(self, fp):
54 54 """write the whole content of a file into a file like object
55 55
56 56 The file is in the form::
57 57
58 58 (<chunk-size>\n<chunk>)+0\n
59 59
60 60 chunk size is the ascii version of the int.
61 61 """
62 62 raise NotImplementedError()
63 63
64 64 def redirect(self):
65 65 """may setup interception for stdout and stderr
66 66
67 67 See also the `restore` method."""
68 68 raise NotImplementedError()
69 69
70 70 # If the `redirect` function does install interception, the `restore`
71 71 # function MUST be defined. If interception is not used, this function
72 72 # MUST NOT be defined.
73 73 #
74 74 # left commented here on purpose
75 75 #
76 76 #def restore(self):
77 77 # """reinstall previous stdout and stderr and return intercepted stdout
78 78 # """
79 79 # raise NotImplementedError()
80 80
81 def groupchunks(self, fh):
82 """Generator of chunks to send to the client.
83
84 Some protocols may have compressed the contents.
85 """
86 raise NotImplementedError()
87
88 def compresschunks(self, chunks):
89 """Generator of possible compressed chunks to send to the client.
90
91 This is like ``groupchunks()`` except it accepts a generator as
92 its argument.
93 """
94 raise NotImplementedError()
95
96 81 class remotebatch(peer.batcher):
97 82 '''batches the queued calls; uses as few roundtrips as possible'''
98 83 def __init__(self, remote):
99 84 '''remote must support _submitbatch(encbatch) and
100 85 _submitone(op, encargs)'''
101 86 peer.batcher.__init__(self)
102 87 self.remote = remote
103 88 def submit(self):
104 89 req, rsp = [], []
105 90 for name, args, opts, resref in self.calls:
106 91 mtd = getattr(self.remote, name)
107 92 batchablefn = getattr(mtd, 'batchable', None)
108 93 if batchablefn is not None:
109 94 batchable = batchablefn(mtd.im_self, *args, **opts)
110 95 encargsorres, encresref = next(batchable)
111 96 if encresref:
112 97 req.append((name, encargsorres,))
113 98 rsp.append((batchable, encresref, resref,))
114 99 else:
115 100 resref.set(encargsorres)
116 101 else:
117 102 if req:
118 103 self._submitreq(req, rsp)
119 104 req, rsp = [], []
120 105 resref.set(mtd(*args, **opts))
121 106 if req:
122 107 self._submitreq(req, rsp)
123 108 def _submitreq(self, req, rsp):
124 109 encresults = self.remote._submitbatch(req)
125 110 for encres, r in zip(encresults, rsp):
126 111 batchable, encresref, resref = r
127 112 encresref.set(encres)
128 113 resref.set(next(batchable))
129 114
130 115 class remoteiterbatcher(peer.iterbatcher):
131 116 def __init__(self, remote):
132 117 super(remoteiterbatcher, self).__init__()
133 118 self._remote = remote
134 119
135 120 def __getattr__(self, name):
136 121 if not getattr(self._remote, name, False):
137 122 raise AttributeError(
138 123 'Attempted to iterbatch non-batchable call to %r' % name)
139 124 return super(remoteiterbatcher, self).__getattr__(name)
140 125
141 126 def submit(self):
142 127 """Break the batch request into many patch calls and pipeline them.
143 128
144 129 This is mostly valuable over http where request sizes can be
145 130 limited, but can be used in other places as well.
146 131 """
147 132 req, rsp = [], []
148 133 for name, args, opts, resref in self.calls:
149 134 mtd = getattr(self._remote, name)
150 135 batchable = mtd.batchable(mtd.im_self, *args, **opts)
151 136 encargsorres, encresref = next(batchable)
152 137 assert encresref
153 138 req.append((name, encargsorres))
154 139 rsp.append((batchable, encresref))
155 140 if req:
156 141 self._resultiter = self._remote._submitbatch(req)
157 142 self._rsp = rsp
158 143
159 144 def results(self):
160 145 for (batchable, encresref), encres in itertools.izip(
161 146 self._rsp, self._resultiter):
162 147 encresref.set(encres)
163 148 yield next(batchable)
164 149
165 150 # Forward a couple of names from peer to make wireproto interactions
166 151 # slightly more sensible.
167 152 batchable = peer.batchable
168 153 future = peer.future
169 154
170 155 # list of nodes encoding / decoding
171 156
172 157 def decodelist(l, sep=' '):
173 158 if l:
174 159 return map(bin, l.split(sep))
175 160 return []
176 161
177 162 def encodelist(l, sep=' '):
178 163 try:
179 164 return sep.join(map(hex, l))
180 165 except TypeError:
181 166 raise
182 167
183 168 # batched call argument encoding
184 169
185 170 def escapearg(plain):
186 171 return (plain
187 172 .replace(':', ':c')
188 173 .replace(',', ':o')
189 174 .replace(';', ':s')
190 175 .replace('=', ':e'))
191 176
192 177 def unescapearg(escaped):
193 178 return (escaped
194 179 .replace(':e', '=')
195 180 .replace(':s', ';')
196 181 .replace(':o', ',')
197 182 .replace(':c', ':'))
198 183
199 184 def encodebatchcmds(req):
200 185 """Return a ``cmds`` argument value for the ``batch`` command."""
201 186 cmds = []
202 187 for op, argsdict in req:
203 188 # Old servers didn't properly unescape argument names. So prevent
204 189 # the sending of argument names that may not be decoded properly by
205 190 # servers.
206 191 assert all(escapearg(k) == k for k in argsdict)
207 192
208 193 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
209 194 for k, v in argsdict.iteritems())
210 195 cmds.append('%s %s' % (op, args))
211 196
212 197 return ';'.join(cmds)
213 198
214 199 # mapping of options accepted by getbundle and their types
215 200 #
216 201 # Meant to be extended by extensions. It is extensions responsibility to ensure
217 202 # such options are properly processed in exchange.getbundle.
218 203 #
219 204 # supported types are:
220 205 #
221 206 # :nodes: list of binary nodes
222 207 # :csv: list of comma-separated values
223 208 # :scsv: list of comma-separated values return as set
224 209 # :plain: string with no transformation needed.
225 210 gboptsmap = {'heads': 'nodes',
226 211 'common': 'nodes',
227 212 'obsmarkers': 'boolean',
228 213 'bundlecaps': 'scsv',
229 214 'listkeys': 'csv',
230 215 'cg': 'boolean',
231 216 'cbattempted': 'boolean'}
232 217
233 218 # client side
234 219
235 220 class wirepeer(peer.peerrepository):
236 221 """Client-side interface for communicating with a peer repository.
237 222
238 223 Methods commonly call wire protocol commands of the same name.
239 224
240 225 See also httppeer.py and sshpeer.py for protocol-specific
241 226 implementations of this interface.
242 227 """
243 228 def batch(self):
244 229 if self.capable('batch'):
245 230 return remotebatch(self)
246 231 else:
247 232 return peer.localbatch(self)
248 233 def _submitbatch(self, req):
249 234 """run batch request <req> on the server
250 235
251 236 Returns an iterator of the raw responses from the server.
252 237 """
253 238 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
254 239 chunk = rsp.read(1024)
255 240 work = [chunk]
256 241 while chunk:
257 242 while ';' not in chunk and chunk:
258 243 chunk = rsp.read(1024)
259 244 work.append(chunk)
260 245 merged = ''.join(work)
261 246 while ';' in merged:
262 247 one, merged = merged.split(';', 1)
263 248 yield unescapearg(one)
264 249 chunk = rsp.read(1024)
265 250 work = [merged, chunk]
266 251 yield unescapearg(''.join(work))
267 252
268 253 def _submitone(self, op, args):
269 254 return self._call(op, **args)
270 255
271 256 def iterbatch(self):
272 257 return remoteiterbatcher(self)
273 258
274 259 @batchable
275 260 def lookup(self, key):
276 261 self.requirecap('lookup', _('look up remote revision'))
277 262 f = future()
278 263 yield {'key': encoding.fromlocal(key)}, f
279 264 d = f.value
280 265 success, data = d[:-1].split(" ", 1)
281 266 if int(success):
282 267 yield bin(data)
283 268 self._abort(error.RepoError(data))
284 269
285 270 @batchable
286 271 def heads(self):
287 272 f = future()
288 273 yield {}, f
289 274 d = f.value
290 275 try:
291 276 yield decodelist(d[:-1])
292 277 except ValueError:
293 278 self._abort(error.ResponseError(_("unexpected response:"), d))
294 279
295 280 @batchable
296 281 def known(self, nodes):
297 282 f = future()
298 283 yield {'nodes': encodelist(nodes)}, f
299 284 d = f.value
300 285 try:
301 286 yield [bool(int(b)) for b in d]
302 287 except ValueError:
303 288 self._abort(error.ResponseError(_("unexpected response:"), d))
304 289
305 290 @batchable
306 291 def branchmap(self):
307 292 f = future()
308 293 yield {}, f
309 294 d = f.value
310 295 try:
311 296 branchmap = {}
312 297 for branchpart in d.splitlines():
313 298 branchname, branchheads = branchpart.split(' ', 1)
314 299 branchname = encoding.tolocal(urlreq.unquote(branchname))
315 300 branchheads = decodelist(branchheads)
316 301 branchmap[branchname] = branchheads
317 302 yield branchmap
318 303 except TypeError:
319 304 self._abort(error.ResponseError(_("unexpected response:"), d))
320 305
321 306 def branches(self, nodes):
322 307 n = encodelist(nodes)
323 308 d = self._call("branches", nodes=n)
324 309 try:
325 310 br = [tuple(decodelist(b)) for b in d.splitlines()]
326 311 return br
327 312 except ValueError:
328 313 self._abort(error.ResponseError(_("unexpected response:"), d))
329 314
330 315 def between(self, pairs):
331 316 batch = 8 # avoid giant requests
332 317 r = []
333 318 for i in xrange(0, len(pairs), batch):
334 319 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
335 320 d = self._call("between", pairs=n)
336 321 try:
337 322 r.extend(l and decodelist(l) or [] for l in d.splitlines())
338 323 except ValueError:
339 324 self._abort(error.ResponseError(_("unexpected response:"), d))
340 325 return r
341 326
342 327 @batchable
343 328 def pushkey(self, namespace, key, old, new):
344 329 if not self.capable('pushkey'):
345 330 yield False, None
346 331 f = future()
347 332 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
348 333 yield {'namespace': encoding.fromlocal(namespace),
349 334 'key': encoding.fromlocal(key),
350 335 'old': encoding.fromlocal(old),
351 336 'new': encoding.fromlocal(new)}, f
352 337 d = f.value
353 338 d, output = d.split('\n', 1)
354 339 try:
355 340 d = bool(int(d))
356 341 except ValueError:
357 342 raise error.ResponseError(
358 343 _('push failed (unexpected response):'), d)
359 344 for l in output.splitlines(True):
360 345 self.ui.status(_('remote: '), l)
361 346 yield d
362 347
363 348 @batchable
364 349 def listkeys(self, namespace):
365 350 if not self.capable('pushkey'):
366 351 yield {}, None
367 352 f = future()
368 353 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
369 354 yield {'namespace': encoding.fromlocal(namespace)}, f
370 355 d = f.value
371 356 self.ui.debug('received listkey for "%s": %i bytes\n'
372 357 % (namespace, len(d)))
373 358 yield pushkeymod.decodekeys(d)
374 359
375 360 def stream_out(self):
376 361 return self._callstream('stream_out')
377 362
378 363 def changegroup(self, nodes, kind):
379 364 n = encodelist(nodes)
380 365 f = self._callcompressable("changegroup", roots=n)
381 366 return changegroupmod.cg1unpacker(f, 'UN')
382 367
383 368 def changegroupsubset(self, bases, heads, kind):
384 369 self.requirecap('changegroupsubset', _('look up remote changes'))
385 370 bases = encodelist(bases)
386 371 heads = encodelist(heads)
387 372 f = self._callcompressable("changegroupsubset",
388 373 bases=bases, heads=heads)
389 374 return changegroupmod.cg1unpacker(f, 'UN')
390 375
391 376 def getbundle(self, source, **kwargs):
392 377 self.requirecap('getbundle', _('look up remote changes'))
393 378 opts = {}
394 379 bundlecaps = kwargs.get('bundlecaps')
395 380 if bundlecaps is not None:
396 381 kwargs['bundlecaps'] = sorted(bundlecaps)
397 382 else:
398 383 bundlecaps = () # kwargs could have it to None
399 384 for key, value in kwargs.iteritems():
400 385 if value is None:
401 386 continue
402 387 keytype = gboptsmap.get(key)
403 388 if keytype is None:
404 389 assert False, 'unexpected'
405 390 elif keytype == 'nodes':
406 391 value = encodelist(value)
407 392 elif keytype in ('csv', 'scsv'):
408 393 value = ','.join(value)
409 394 elif keytype == 'boolean':
410 395 value = '%i' % bool(value)
411 396 elif keytype != 'plain':
412 397 raise KeyError('unknown getbundle option type %s'
413 398 % keytype)
414 399 opts[key] = value
415 400 f = self._callcompressable("getbundle", **opts)
416 401 if any((cap.startswith('HG2') for cap in bundlecaps)):
417 402 return bundle2.getunbundler(self.ui, f)
418 403 else:
419 404 return changegroupmod.cg1unpacker(f, 'UN')
420 405
421 406 def unbundle(self, cg, heads, url):
422 407 '''Send cg (a readable file-like object representing the
423 408 changegroup to push, typically a chunkbuffer object) to the
424 409 remote server as a bundle.
425 410
426 411 When pushing a bundle10 stream, return an integer indicating the
427 412 result of the push (see localrepository.addchangegroup()).
428 413
429 414 When pushing a bundle20 stream, return a bundle20 stream.
430 415
431 416 `url` is the url the client thinks it's pushing to, which is
432 417 visible to hooks.
433 418 '''
434 419
435 420 if heads != ['force'] and self.capable('unbundlehash'):
436 421 heads = encodelist(['hashed',
437 422 hashlib.sha1(''.join(sorted(heads))).digest()])
438 423 else:
439 424 heads = encodelist(heads)
440 425
441 426 if util.safehasattr(cg, 'deltaheader'):
442 427 # this a bundle10, do the old style call sequence
443 428 ret, output = self._callpush("unbundle", cg, heads=heads)
444 429 if ret == "":
445 430 raise error.ResponseError(
446 431 _('push failed:'), output)
447 432 try:
448 433 ret = int(ret)
449 434 except ValueError:
450 435 raise error.ResponseError(
451 436 _('push failed (unexpected response):'), ret)
452 437
453 438 for l in output.splitlines(True):
454 439 self.ui.status(_('remote: '), l)
455 440 else:
456 441 # bundle2 push. Send a stream, fetch a stream.
457 442 stream = self._calltwowaystream('unbundle', cg, heads=heads)
458 443 ret = bundle2.getunbundler(self.ui, stream)
459 444 return ret
460 445
461 446 def debugwireargs(self, one, two, three=None, four=None, five=None):
462 447 # don't pass optional arguments left at their default value
463 448 opts = {}
464 449 if three is not None:
465 450 opts['three'] = three
466 451 if four is not None:
467 452 opts['four'] = four
468 453 return self._call('debugwireargs', one=one, two=two, **opts)
469 454
470 455 def _call(self, cmd, **args):
471 456 """execute <cmd> on the server
472 457
473 458 The command is expected to return a simple string.
474 459
475 460 returns the server reply as a string."""
476 461 raise NotImplementedError()
477 462
478 463 def _callstream(self, cmd, **args):
479 464 """execute <cmd> on the server
480 465
481 466 The command is expected to return a stream. Note that if the
482 467 command doesn't return a stream, _callstream behaves
483 468 differently for ssh and http peers.
484 469
485 470 returns the server reply as a file like object.
486 471 """
487 472 raise NotImplementedError()
488 473
489 474 def _callcompressable(self, cmd, **args):
490 475 """execute <cmd> on the server
491 476
492 477 The command is expected to return a stream.
493 478
494 479 The stream may have been compressed in some implementations. This
495 480 function takes care of the decompression. This is the only difference
496 481 with _callstream.
497 482
498 483 returns the server reply as a file like object.
499 484 """
500 485 raise NotImplementedError()
501 486
502 487 def _callpush(self, cmd, fp, **args):
503 488 """execute a <cmd> on server
504 489
505 490 The command is expected to be related to a push. Push has a special
506 491 return method.
507 492
508 493 returns the server reply as a (ret, output) tuple. ret is either
509 494 empty (error) or a stringified int.
510 495 """
511 496 raise NotImplementedError()
512 497
513 498 def _calltwowaystream(self, cmd, fp, **args):
514 499 """execute <cmd> on server
515 500
516 501 The command will send a stream to the server and get a stream in reply.
517 502 """
518 503 raise NotImplementedError()
519 504
520 505 def _abort(self, exception):
521 506 """clearly abort the wire protocol connection and raise the exception
522 507 """
523 508 raise NotImplementedError()
524 509
525 510 # server side
526 511
527 512 # wire protocol command can either return a string or one of these classes.
528 513 class streamres(object):
529 514 """wireproto reply: binary stream
530 515
531 516 The call was successful and the result is a stream.
532 Iterate on the `self.gen` attribute to retrieve chunks.
517
518 Accepts either a generator or an object with a ``read(size)`` method.
519
520 ``v1compressible`` indicates whether this data can be compressed to
521 "version 1" clients (technically: HTTP peers using
522 application/mercurial-0.1 media type). This flag should NOT be used on
523 new commands because new clients should support a more modern compression
524 mechanism.
533 525 """
534 def __init__(self, gen):
526 def __init__(self, gen=None, reader=None, v1compressible=False):
535 527 self.gen = gen
528 self.reader = reader
529 self.v1compressible = v1compressible
536 530
537 531 class pushres(object):
538 532 """wireproto reply: success with simple integer return
539 533
540 534 The call was successful and returned an integer contained in `self.res`.
541 535 """
542 536 def __init__(self, res):
543 537 self.res = res
544 538
545 539 class pusherr(object):
546 540 """wireproto reply: failure
547 541
548 542 The call failed. The `self.res` attribute contains the error message.
549 543 """
550 544 def __init__(self, res):
551 545 self.res = res
552 546
553 547 class ooberror(object):
554 548 """wireproto reply: failure of a batch of operation
555 549
556 550 Something failed during a batch call. The error message is stored in
557 551 `self.message`.
558 552 """
559 553 def __init__(self, message):
560 554 self.message = message
561 555
562 556 def getdispatchrepo(repo, proto, command):
563 557 """Obtain the repo used for processing wire protocol commands.
564 558
565 559 The intent of this function is to serve as a monkeypatch point for
566 560 extensions that need commands to operate on different repo views under
567 561 specialized circumstances.
568 562 """
569 563 return repo.filtered('served')
570 564
571 565 def dispatch(repo, proto, command):
572 566 repo = getdispatchrepo(repo, proto, command)
573 567 func, spec = commands[command]
574 568 args = proto.getargs(spec)
575 569 return func(repo, proto, *args)
576 570
577 571 def options(cmd, keys, others):
578 572 opts = {}
579 573 for k in keys:
580 574 if k in others:
581 575 opts[k] = others[k]
582 576 del others[k]
583 577 if others:
584 578 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
585 579 % (cmd, ",".join(others)))
586 580 return opts
587 581
588 582 def bundle1allowed(repo, action):
589 583 """Whether a bundle1 operation is allowed from the server.
590 584
591 585 Priority is:
592 586
593 587 1. server.bundle1gd.<action> (if generaldelta active)
594 588 2. server.bundle1.<action>
595 589 3. server.bundle1gd (if generaldelta active)
596 590 4. server.bundle1
597 591 """
598 592 ui = repo.ui
599 593 gd = 'generaldelta' in repo.requirements
600 594
601 595 if gd:
602 596 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
603 597 if v is not None:
604 598 return v
605 599
606 600 v = ui.configbool('server', 'bundle1.%s' % action, None)
607 601 if v is not None:
608 602 return v
609 603
610 604 if gd:
611 605 v = ui.configbool('server', 'bundle1gd', None)
612 606 if v is not None:
613 607 return v
614 608
615 609 return ui.configbool('server', 'bundle1', True)
616 610
617 611 # list of commands
618 612 commands = {}
619 613
620 614 def wireprotocommand(name, args=''):
621 615 """decorator for wire protocol command"""
622 616 def register(func):
623 617 commands[name] = (func, args)
624 618 return func
625 619 return register
626 620
627 621 @wireprotocommand('batch', 'cmds *')
628 622 def batch(repo, proto, cmds, others):
629 623 repo = repo.filtered("served")
630 624 res = []
631 625 for pair in cmds.split(';'):
632 626 op, args = pair.split(' ', 1)
633 627 vals = {}
634 628 for a in args.split(','):
635 629 if a:
636 630 n, v = a.split('=')
637 631 vals[unescapearg(n)] = unescapearg(v)
638 632 func, spec = commands[op]
639 633 if spec:
640 634 keys = spec.split()
641 635 data = {}
642 636 for k in keys:
643 637 if k == '*':
644 638 star = {}
645 639 for key in vals.keys():
646 640 if key not in keys:
647 641 star[key] = vals[key]
648 642 data['*'] = star
649 643 else:
650 644 data[k] = vals[k]
651 645 result = func(repo, proto, *[data[k] for k in keys])
652 646 else:
653 647 result = func(repo, proto)
654 648 if isinstance(result, ooberror):
655 649 return result
656 650 res.append(escapearg(result))
657 651 return ';'.join(res)
658 652
659 653 @wireprotocommand('between', 'pairs')
660 654 def between(repo, proto, pairs):
661 655 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
662 656 r = []
663 657 for b in repo.between(pairs):
664 658 r.append(encodelist(b) + "\n")
665 659 return "".join(r)
666 660
667 661 @wireprotocommand('branchmap')
668 662 def branchmap(repo, proto):
669 663 branchmap = repo.branchmap()
670 664 heads = []
671 665 for branch, nodes in branchmap.iteritems():
672 666 branchname = urlreq.quote(encoding.fromlocal(branch))
673 667 branchnodes = encodelist(nodes)
674 668 heads.append('%s %s' % (branchname, branchnodes))
675 669 return '\n'.join(heads)
676 670
677 671 @wireprotocommand('branches', 'nodes')
678 672 def branches(repo, proto, nodes):
679 673 nodes = decodelist(nodes)
680 674 r = []
681 675 for b in repo.branches(nodes):
682 676 r.append(encodelist(b) + "\n")
683 677 return "".join(r)
684 678
685 679 @wireprotocommand('clonebundles', '')
686 680 def clonebundles(repo, proto):
687 681 """Server command for returning info for available bundles to seed clones.
688 682
689 683 Clients will parse this response and determine what bundle to fetch.
690 684
691 685 Extensions may wrap this command to filter or dynamically emit data
692 686 depending on the request. e.g. you could advertise URLs for the closest
693 687 data center given the client's IP address.
694 688 """
695 689 return repo.opener.tryread('clonebundles.manifest')
696 690
697 691 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
698 692 'known', 'getbundle', 'unbundlehash', 'batch']
699 693
700 694 def _capabilities(repo, proto):
701 695 """return a list of capabilities for a repo
702 696
703 697 This function exists to allow extensions to easily wrap capabilities
704 698 computation
705 699
706 700 - returns a lists: easy to alter
707 701 - change done here will be propagated to both `capabilities` and `hello`
708 702 command without any other action needed.
709 703 """
710 704 # copy to prevent modification of the global list
711 705 caps = list(wireprotocaps)
712 706 if streamclone.allowservergeneration(repo.ui):
713 707 if repo.ui.configbool('server', 'preferuncompressed', False):
714 708 caps.append('stream-preferred')
715 709 requiredformats = repo.requirements & repo.supportedformats
716 710 # if our local revlogs are just revlogv1, add 'stream' cap
717 711 if not requiredformats - set(('revlogv1',)):
718 712 caps.append('stream')
719 713 # otherwise, add 'streamreqs' detailing our local revlog format
720 714 else:
721 715 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
722 716 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
723 717 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
724 718 caps.append('bundle2=' + urlreq.quote(capsblob))
725 719 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
726 720 caps.append(
727 721 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
728 722 if repo.ui.configbool('experimental', 'httppostargs', False):
729 723 caps.append('httppostargs')
730 724 return caps
731 725
732 726 # If you are writing an extension and consider wrapping this function. Wrap
733 727 # `_capabilities` instead.
734 728 @wireprotocommand('capabilities')
735 729 def capabilities(repo, proto):
736 730 return ' '.join(_capabilities(repo, proto))
737 731
738 732 @wireprotocommand('changegroup', 'roots')
739 733 def changegroup(repo, proto, roots):
740 734 nodes = decodelist(roots)
741 735 cg = changegroupmod.changegroup(repo, nodes, 'serve')
742 return streamres(proto.groupchunks(cg))
736 return streamres(reader=cg, v1compressible=True)
743 737
744 738 @wireprotocommand('changegroupsubset', 'bases heads')
745 739 def changegroupsubset(repo, proto, bases, heads):
746 740 bases = decodelist(bases)
747 741 heads = decodelist(heads)
748 742 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
749 return streamres(proto.groupchunks(cg))
743 return streamres(reader=cg, v1compressible=True)
750 744
751 745 @wireprotocommand('debugwireargs', 'one two *')
752 746 def debugwireargs(repo, proto, one, two, others):
753 747 # only accept optional args from the known set
754 748 opts = options('debugwireargs', ['three', 'four'], others)
755 749 return repo.debugwireargs(one, two, **opts)
756 750
757 751 @wireprotocommand('getbundle', '*')
758 752 def getbundle(repo, proto, others):
759 753 opts = options('getbundle', gboptsmap.keys(), others)
760 754 for k, v in opts.iteritems():
761 755 keytype = gboptsmap[k]
762 756 if keytype == 'nodes':
763 757 opts[k] = decodelist(v)
764 758 elif keytype == 'csv':
765 759 opts[k] = list(v.split(','))
766 760 elif keytype == 'scsv':
767 761 opts[k] = set(v.split(','))
768 762 elif keytype == 'boolean':
769 763 # Client should serialize False as '0', which is a non-empty string
770 764 # so it evaluates as a True bool.
771 765 if v == '0':
772 766 opts[k] = False
773 767 else:
774 768 opts[k] = bool(v)
775 769 elif keytype != 'plain':
776 770 raise KeyError('unknown getbundle option type %s'
777 771 % keytype)
778 772
779 773 if not bundle1allowed(repo, 'pull'):
780 774 if not exchange.bundle2requested(opts.get('bundlecaps')):
781 775 return ooberror(bundle2required)
782 776
783 777 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
784 return streamres(proto.compresschunks(chunks))
778 return streamres(gen=chunks, v1compressible=True)
785 779
786 780 @wireprotocommand('heads')
787 781 def heads(repo, proto):
788 782 h = repo.heads()
789 783 return encodelist(h) + "\n"
790 784
791 785 @wireprotocommand('hello')
792 786 def hello(repo, proto):
793 787 '''the hello command returns a set of lines describing various
794 788 interesting things about the server, in an RFC822-like format.
795 789 Currently the only one defined is "capabilities", which
796 790 consists of a line in the form:
797 791
798 792 capabilities: space separated list of tokens
799 793 '''
800 794 return "capabilities: %s\n" % (capabilities(repo, proto))
801 795
802 796 @wireprotocommand('listkeys', 'namespace')
803 797 def listkeys(repo, proto, namespace):
804 798 d = repo.listkeys(encoding.tolocal(namespace)).items()
805 799 return pushkeymod.encodekeys(d)
806 800
807 801 @wireprotocommand('lookup', 'key')
808 802 def lookup(repo, proto, key):
809 803 try:
810 804 k = encoding.tolocal(key)
811 805 c = repo[k]
812 806 r = c.hex()
813 807 success = 1
814 808 except Exception as inst:
815 809 r = str(inst)
816 810 success = 0
817 811 return "%s %s\n" % (success, r)
818 812
819 813 @wireprotocommand('known', 'nodes *')
820 814 def known(repo, proto, nodes, others):
821 815 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
822 816
823 817 @wireprotocommand('pushkey', 'namespace key old new')
824 818 def pushkey(repo, proto, namespace, key, old, new):
825 819 # compatibility with pre-1.8 clients which were accidentally
826 820 # sending raw binary nodes rather than utf-8-encoded hex
827 821 if len(new) == 20 and new.encode('string-escape') != new:
828 822 # looks like it could be a binary node
829 823 try:
830 824 new.decode('utf-8')
831 825 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
832 826 except UnicodeDecodeError:
833 827 pass # binary, leave unmodified
834 828 else:
835 829 new = encoding.tolocal(new) # normal path
836 830
837 831 if util.safehasattr(proto, 'restore'):
838 832
839 833 proto.redirect()
840 834
841 835 try:
842 836 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
843 837 encoding.tolocal(old), new) or False
844 838 except error.Abort:
845 839 r = False
846 840
847 841 output = proto.restore()
848 842
849 843 return '%s\n%s' % (int(r), output)
850 844
851 845 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
852 846 encoding.tolocal(old), new)
853 847 return '%s\n' % int(r)
854 848
855 849 @wireprotocommand('stream_out')
856 850 def stream(repo, proto):
857 851 '''If the server supports streaming clone, it advertises the "stream"
858 852 capability with a value representing the version and flags of the repo
859 853 it is serving. Client checks to see if it understands the format.
860 854 '''
861 855 if not streamclone.allowservergeneration(repo.ui):
862 856 return '1\n'
863 857
864 858 def getstream(it):
865 859 yield '0\n'
866 860 for chunk in it:
867 861 yield chunk
868 862
869 863 try:
870 864 # LockError may be raised before the first result is yielded. Don't
871 865 # emit output until we're sure we got the lock successfully.
872 866 it = streamclone.generatev1wireproto(repo)
873 return streamres(getstream(it))
867 return streamres(gen=getstream(it))
874 868 except error.LockError:
875 869 return '2\n'
876 870
877 871 @wireprotocommand('unbundle', 'heads')
878 872 def unbundle(repo, proto, heads):
879 873 their_heads = decodelist(heads)
880 874
881 875 try:
882 876 proto.redirect()
883 877
884 878 exchange.check_heads(repo, their_heads, 'preparing changes')
885 879
886 880 # write bundle data to temporary file because it can be big
887 881 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
888 882 fp = os.fdopen(fd, 'wb+')
889 883 r = 0
890 884 try:
891 885 proto.getfile(fp)
892 886 fp.seek(0)
893 887 gen = exchange.readbundle(repo.ui, fp, None)
894 888 if (isinstance(gen, changegroupmod.cg1unpacker)
895 889 and not bundle1allowed(repo, 'push')):
896 890 return ooberror(bundle2required)
897 891
898 892 r = exchange.unbundle(repo, gen, their_heads, 'serve',
899 893 proto._client())
900 894 if util.safehasattr(r, 'addpart'):
901 895 # The return looks streamable, we are in the bundle2 case and
902 896 # should return a stream.
903 return streamres(r.getchunks())
897 return streamres(gen=r.getchunks())
904 898 return pushres(r)
905 899
906 900 finally:
907 901 fp.close()
908 902 os.unlink(tempname)
909 903
910 904 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
911 905 # handle non-bundle2 case first
912 906 if not getattr(exc, 'duringunbundle2', False):
913 907 try:
914 908 raise
915 909 except error.Abort:
916 910 # The old code we moved used sys.stderr directly.
917 911 # We did not change it to minimise code change.
918 912 # This need to be moved to something proper.
919 913 # Feel free to do it.
920 914 sys.stderr.write("abort: %s\n" % exc)
921 915 return pushres(0)
922 916 except error.PushRaced:
923 917 return pusherr(str(exc))
924 918
925 919 bundler = bundle2.bundle20(repo.ui)
926 920 for out in getattr(exc, '_bundle2salvagedoutput', ()):
927 921 bundler.addpart(out)
928 922 try:
929 923 try:
930 924 raise
931 925 except error.PushkeyFailed as exc:
932 926 # check client caps
933 927 remotecaps = getattr(exc, '_replycaps', None)
934 928 if (remotecaps is not None
935 929 and 'pushkey' not in remotecaps.get('error', ())):
936 930 # no support remote side, fallback to Abort handler.
937 931 raise
938 932 part = bundler.newpart('error:pushkey')
939 933 part.addparam('in-reply-to', exc.partid)
940 934 if exc.namespace is not None:
941 935 part.addparam('namespace', exc.namespace, mandatory=False)
942 936 if exc.key is not None:
943 937 part.addparam('key', exc.key, mandatory=False)
944 938 if exc.new is not None:
945 939 part.addparam('new', exc.new, mandatory=False)
946 940 if exc.old is not None:
947 941 part.addparam('old', exc.old, mandatory=False)
948 942 if exc.ret is not None:
949 943 part.addparam('ret', exc.ret, mandatory=False)
950 944 except error.BundleValueError as exc:
951 945 errpart = bundler.newpart('error:unsupportedcontent')
952 946 if exc.parttype is not None:
953 947 errpart.addparam('parttype', exc.parttype)
954 948 if exc.params:
955 949 errpart.addparam('params', '\0'.join(exc.params))
956 950 except error.Abort as exc:
957 951 manargs = [('message', str(exc))]
958 952 advargs = []
959 953 if exc.hint is not None:
960 954 advargs.append(('hint', exc.hint))
961 955 bundler.addpart(bundle2.bundlepart('error:abort',
962 956 manargs, advargs))
963 957 except error.PushRaced as exc:
964 958 bundler.newpart('error:pushraced', [('message', str(exc))])
965 return streamres(bundler.getchunks())
959 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now