##// END OF EJS Templates
wireproto: drop the _decompress method in favor a new call type...
Pierre-Yves David -
r20905:167047ba default
parent child Browse files
Show More
@@ -1,249 +1,247
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from node import nullid
10 10 from i18n import _
11 11 import changegroup, statichttprepo, error, httpconnection, url, util, wireproto
12 12 import os, urllib, urllib2, zlib, httplib
13 13 import errno, socket
14 14
15 15 def zgenerator(f):
16 16 zd = zlib.decompressobj()
17 17 try:
18 18 for chunk in util.filechunkiter(f):
19 19 while chunk:
20 20 yield zd.decompress(chunk, 2**18)
21 21 chunk = zd.unconsumed_tail
22 22 except httplib.HTTPException:
23 23 raise IOError(None, _('connection ended unexpectedly'))
24 24 yield zd.flush()
25 25
26 26 class httppeer(wireproto.wirepeer):
27 27 def __init__(self, ui, path):
28 28 self.path = path
29 29 self.caps = None
30 30 self.handler = None
31 31 self.urlopener = None
32 32 u = util.url(path)
33 33 if u.query or u.fragment:
34 34 raise util.Abort(_('unsupported URL component: "%s"') %
35 35 (u.query or u.fragment))
36 36
37 37 # urllib cannot handle URLs with embedded user or passwd
38 38 self._url, authinfo = u.authinfo()
39 39
40 40 self.ui = ui
41 41 self.ui.debug('using %s\n' % self._url)
42 42
43 43 self.urlopener = url.opener(ui, authinfo)
44 44
45 45 def __del__(self):
46 46 if self.urlopener:
47 47 for h in self.urlopener.handlers:
48 48 h.close()
49 49 getattr(h, "close_all", lambda : None)()
50 50
51 51 def url(self):
52 52 return self.path
53 53
54 54 # look up capabilities only when needed
55 55
56 56 def _fetchcaps(self):
57 57 self.caps = set(self._call('capabilities').split())
58 58
59 59 def _capabilities(self):
60 60 if self.caps is None:
61 61 try:
62 62 self._fetchcaps()
63 63 except error.RepoError:
64 64 self.caps = set()
65 65 self.ui.debug('capabilities: %s\n' %
66 66 (' '.join(self.caps or ['none'])))
67 67 return self.caps
68 68
69 69 def lock(self):
70 70 raise util.Abort(_('operation not supported over http'))
71 71
72 72 def _callstream(self, cmd, **args):
73 73 if cmd == 'pushkey':
74 74 args['data'] = ''
75 75 data = args.pop('data', None)
76 76 size = 0
77 77 if util.safehasattr(data, 'length'):
78 78 size = data.length
79 79 elif data is not None:
80 80 size = len(data)
81 81 headers = args.pop('headers', {})
82 82 if data is not None and 'Content-Type' not in headers:
83 83 headers['Content-Type'] = 'application/mercurial-0.1'
84 84
85 85
86 86 if size and self.ui.configbool('ui', 'usehttp2', False):
87 87 headers['Expect'] = '100-Continue'
88 88 headers['X-HgHttp2'] = '1'
89 89
90 90 self.ui.debug("sending %s command\n" % cmd)
91 91 q = [('cmd', cmd)]
92 92 headersize = 0
93 93 if len(args) > 0:
94 94 httpheader = self.capable('httpheader')
95 95 if httpheader:
96 96 headersize = int(httpheader.split(',')[0])
97 97 if headersize > 0:
98 98 # The headers can typically carry more data than the URL.
99 99 encargs = urllib.urlencode(sorted(args.items()))
100 100 headerfmt = 'X-HgArg-%s'
101 101 contentlen = headersize - len(headerfmt % '000' + ': \r\n')
102 102 headernum = 0
103 103 for i in xrange(0, len(encargs), contentlen):
104 104 headernum += 1
105 105 header = headerfmt % str(headernum)
106 106 headers[header] = encargs[i:i + contentlen]
107 107 varyheaders = [headerfmt % str(h) for h in range(1, headernum + 1)]
108 108 headers['Vary'] = ','.join(varyheaders)
109 109 else:
110 110 q += sorted(args.items())
111 111 qs = '?%s' % urllib.urlencode(q)
112 112 cu = "%s%s" % (self._url, qs)
113 113 req = urllib2.Request(cu, data, headers)
114 114 if data is not None:
115 115 self.ui.debug("sending %s bytes\n" % size)
116 116 req.add_unredirected_header('Content-Length', '%d' % size)
117 117 try:
118 118 resp = self.urlopener.open(req)
119 119 except urllib2.HTTPError, inst:
120 120 if inst.code == 401:
121 121 raise util.Abort(_('authorization failed'))
122 122 raise
123 123 except httplib.HTTPException, inst:
124 124 self.ui.debug('http error while sending %s command\n' % cmd)
125 125 self.ui.traceback()
126 126 raise IOError(None, inst)
127 127 except IndexError:
128 128 # this only happens with Python 2.3, later versions raise URLError
129 129 raise util.Abort(_('http error, possibly caused by proxy setting'))
130 130 # record the url we got redirected to
131 131 resp_url = resp.geturl()
132 132 if resp_url.endswith(qs):
133 133 resp_url = resp_url[:-len(qs)]
134 134 if self._url.rstrip('/') != resp_url.rstrip('/'):
135 135 if not self.ui.quiet:
136 136 self.ui.warn(_('real URL is %s\n') % resp_url)
137 137 self._url = resp_url
138 138 try:
139 139 proto = resp.getheader('content-type')
140 140 except AttributeError:
141 141 proto = resp.headers.get('content-type', '')
142 142
143 143 safeurl = util.hidepassword(self._url)
144 144 if proto.startswith('application/hg-error'):
145 145 raise error.OutOfBandError(resp.read())
146 146 # accept old "text/plain" and "application/hg-changegroup" for now
147 147 if not (proto.startswith('application/mercurial-') or
148 148 (proto.startswith('text/plain')
149 149 and not resp.headers.get('content-length')) or
150 150 proto.startswith('application/hg-changegroup')):
151 151 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
152 152 raise error.RepoError(
153 153 _("'%s' does not appear to be an hg repository:\n"
154 154 "---%%<--- (%s)\n%s\n---%%<---\n")
155 155 % (safeurl, proto or 'no content-type', resp.read(1024)))
156 156
157 157 if proto.startswith('application/mercurial-'):
158 158 try:
159 159 version = proto.split('-', 1)[1]
160 160 version_info = tuple([int(n) for n in version.split('.')])
161 161 except ValueError:
162 162 raise error.RepoError(_("'%s' sent a broken Content-Type "
163 163 "header (%s)") % (safeurl, proto))
164 164 if version_info > (0, 1):
165 165 raise error.RepoError(_("'%s' uses newer protocol %s") %
166 166 (safeurl, version))
167 167
168 168 return resp
169 169
170 170 def _call(self, cmd, **args):
171 171 fp = self._callstream(cmd, **args)
172 172 try:
173 173 return fp.read()
174 174 finally:
175 175 # if using keepalive, allow connection to be reused
176 176 fp.close()
177 177
178 178 def _callpush(self, cmd, cg, **args):
179 179 # have to stream bundle to a temp file because we do not have
180 180 # http 1.1 chunked transfer.
181 181
182 182 types = self.capable('unbundle')
183 183 try:
184 184 types = types.split(',')
185 185 except AttributeError:
186 186 # servers older than d1b16a746db6 will send 'unbundle' as a
187 187 # boolean capability. They only support headerless/uncompressed
188 188 # bundles.
189 189 types = [""]
190 190 for x in types:
191 191 if x in changegroup.bundletypes:
192 192 type = x
193 193 break
194 194
195 195 tempname = changegroup.writebundle(cg, None, type)
196 196 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
197 197 headers = {'Content-Type': 'application/mercurial-0.1'}
198 198
199 199 try:
200 200 try:
201 201 r = self._call(cmd, data=fp, headers=headers, **args)
202 202 vals = r.split('\n', 1)
203 203 if len(vals) < 2:
204 204 raise error.ResponseError(_("unexpected response:"), r)
205 205 return vals
206 206 except socket.error, err:
207 207 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
208 208 raise util.Abort(_('push failed: %s') % err.args[1])
209 209 raise util.Abort(err.args[1])
210 210 finally:
211 211 fp.close()
212 212 os.unlink(tempname)
213 213
214 def _abort(self, exception):
215 raise exception
216
217 def _decompress(self, stream):
214 def _callcompressable(self, cmd, **args):
215 stream = self._callstream(cmd, **args)
218 216 return util.chunkbuffer(zgenerator(stream))
219 217
220 218 class httpspeer(httppeer):
221 219 def __init__(self, ui, path):
222 220 if not url.has_https:
223 221 raise util.Abort(_('Python support for SSL and HTTPS '
224 222 'is not installed'))
225 223 httppeer.__init__(self, ui, path)
226 224
227 225 def instance(ui, path, create):
228 226 if create:
229 227 raise util.Abort(_('cannot create new http repository'))
230 228 try:
231 229 if path.startswith('https:'):
232 230 inst = httpspeer(ui, path)
233 231 else:
234 232 inst = httppeer(ui, path)
235 233 try:
236 234 # Try to do useful work when checking compatibility.
237 235 # Usually saves a roundtrip since we want the caps anyway.
238 236 inst._fetchcaps()
239 237 except error.RepoError:
240 238 # No luck, try older compatibility check.
241 239 inst.between([(nullid, nullid)])
242 240 return inst
243 241 except error.RepoError, httpexception:
244 242 try:
245 243 r = statichttprepo.instance(ui, "static-" + path, create)
246 244 ui.note('(falling back to static-http)\n')
247 245 return r
248 246 except error.RepoError:
249 247 raise httpexception # use the original http RepoError instead
@@ -1,242 +1,243
1 1 # sshpeer.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import re
9 9 from i18n import _
10 10 import util, error, wireproto
11 11
12 12 class remotelock(object):
13 13 def __init__(self, repo):
14 14 self.repo = repo
15 15 def release(self):
16 16 self.repo.unlock()
17 17 self.repo = None
18 18 def __del__(self):
19 19 if self.repo:
20 20 self.release()
21 21
22 22 def _serverquote(s):
23 23 '''quote a string for the remote shell ... which we assume is sh'''
24 24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 25 return s
26 26 return "'%s'" % s.replace("'", "'\\''")
27 27
28 28 class sshpeer(wireproto.wirepeer):
29 29 def __init__(self, ui, path, create=False):
30 30 self._url = path
31 31 self.ui = ui
32 32 self.pipeo = self.pipei = self.pipee = None
33 33
34 34 u = util.url(path, parsequery=False, parsefragment=False)
35 35 if u.scheme != 'ssh' or not u.host or u.path is None:
36 36 self._abort(error.RepoError(_("couldn't parse location %s") % path))
37 37
38 38 self.user = u.user
39 39 if u.passwd is not None:
40 40 self._abort(error.RepoError(_("password in URL not supported")))
41 41 self.host = u.host
42 42 self.port = u.port
43 43 self.path = u.path or "."
44 44
45 45 sshcmd = self.ui.config("ui", "ssh", "ssh")
46 46 remotecmd = self.ui.config("ui", "remotecmd", "hg")
47 47
48 48 args = util.sshargs(sshcmd, self.host, self.user, self.port)
49 49
50 50 if create:
51 51 cmd = '%s %s %s' % (sshcmd, args,
52 52 util.shellquote("%s init %s" %
53 53 (_serverquote(remotecmd), _serverquote(self.path))))
54 54 ui.debug('running %s\n' % cmd)
55 55 res = util.system(cmd)
56 56 if res != 0:
57 57 self._abort(error.RepoError(_("could not create remote repo")))
58 58
59 59 self._validaterepo(sshcmd, args, remotecmd)
60 60
61 61 def url(self):
62 62 return self._url
63 63
64 64 def _validaterepo(self, sshcmd, args, remotecmd):
65 65 # cleanup up previous run
66 66 self.cleanup()
67 67
68 68 cmd = '%s %s %s' % (sshcmd, args,
69 69 util.shellquote("%s -R %s serve --stdio" %
70 70 (_serverquote(remotecmd), _serverquote(self.path))))
71 71 self.ui.debug('running %s\n' % cmd)
72 72 cmd = util.quotecommand(cmd)
73 73
74 74 # while self.subprocess isn't used, having it allows the subprocess to
75 75 # to clean up correctly later
76 76 self.pipeo, self.pipei, self.pipee, self.subprocess = util.popen4(cmd)
77 77
78 78 # skip any noise generated by remote shell
79 79 self._callstream("hello")
80 80 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
81 81 lines = ["", "dummy"]
82 82 max_noise = 500
83 83 while lines[-1] and max_noise:
84 84 l = r.readline()
85 85 self.readerr()
86 86 if lines[-1] == "1\n" and l == "\n":
87 87 break
88 88 if l:
89 89 self.ui.debug("remote: ", l)
90 90 lines.append(l)
91 91 max_noise -= 1
92 92 else:
93 93 self._abort(error.RepoError(_('no suitable response from '
94 94 'remote hg')))
95 95
96 96 self._caps = set()
97 97 for l in reversed(lines):
98 98 if l.startswith("capabilities:"):
99 99 self._caps.update(l[:-1].split(":")[1].split())
100 100 break
101 101
102 102 def _capabilities(self):
103 103 return self._caps
104 104
105 105 def readerr(self):
106 106 while True:
107 107 size = util.fstat(self.pipee).st_size
108 108 if size == 0:
109 109 break
110 110 s = self.pipee.read(size)
111 111 if not s:
112 112 break
113 113 for l in s.splitlines():
114 114 self.ui.status(_("remote: "), l, '\n')
115 115
116 116 def _abort(self, exception):
117 117 self.cleanup()
118 118 raise exception
119 119
120 120 def cleanup(self):
121 121 if self.pipeo is None:
122 122 return
123 123 self.pipeo.close()
124 124 self.pipei.close()
125 125 try:
126 126 # read the error descriptor until EOF
127 127 for l in self.pipee:
128 128 self.ui.status(_("remote: "), l)
129 129 except (IOError, ValueError):
130 130 pass
131 131 self.pipee.close()
132 132
133 133 __del__ = cleanup
134 134
135 135 def _callstream(self, cmd, **args):
136 136 self.ui.debug("sending %s command\n" % cmd)
137 137 self.pipeo.write("%s\n" % cmd)
138 138 _func, names = wireproto.commands[cmd]
139 139 keys = names.split()
140 140 wireargs = {}
141 141 for k in keys:
142 142 if k == '*':
143 143 wireargs['*'] = args
144 144 break
145 145 else:
146 146 wireargs[k] = args[k]
147 147 del args[k]
148 148 for k, v in sorted(wireargs.iteritems()):
149 149 self.pipeo.write("%s %d\n" % (k, len(v)))
150 150 if isinstance(v, dict):
151 151 for dk, dv in v.iteritems():
152 152 self.pipeo.write("%s %d\n" % (dk, len(dv)))
153 153 self.pipeo.write(dv)
154 154 else:
155 155 self.pipeo.write(v)
156 156 self.pipeo.flush()
157 157
158 158 return self.pipei
159 159
160 def _callcompressable(self, cmd, **args):
161 return self._callstream(cmd, **args)
162
160 163 def _call(self, cmd, **args):
161 164 self._callstream(cmd, **args)
162 165 return self._recv()
163 166
164 167 def _callpush(self, cmd, fp, **args):
165 168 r = self._call(cmd, **args)
166 169 if r:
167 170 return '', r
168 171 while True:
169 172 d = fp.read(4096)
170 173 if not d:
171 174 break
172 175 self._send(d)
173 176 self._send("", flush=True)
174 177 r = self._recv()
175 178 if r:
176 179 return '', r
177 180 return self._recv(), ''
178 181
179 def _decompress(self, stream):
180 return stream
181 182
182 183 def _recv(self):
183 184 l = self.pipei.readline()
184 185 if l == '\n':
185 186 err = []
186 187 while True:
187 188 line = self.pipee.readline()
188 189 if line == '-\n':
189 190 break
190 191 err.extend([line])
191 192 if len(err) > 0:
192 193 # strip the trailing newline added to the last line server-side
193 194 err[-1] = err[-1][:-1]
194 195 self._abort(error.OutOfBandError(*err))
195 196 self.readerr()
196 197 try:
197 198 l = int(l)
198 199 except ValueError:
199 200 self._abort(error.ResponseError(_("unexpected response:"), l))
200 201 return self.pipei.read(l)
201 202
202 203 def _send(self, data, flush=False):
203 204 self.pipeo.write("%d\n" % len(data))
204 205 if data:
205 206 self.pipeo.write(data)
206 207 if flush:
207 208 self.pipeo.flush()
208 209 self.readerr()
209 210
210 211 def lock(self):
211 212 self._call("lock")
212 213 return remotelock(self)
213 214
214 215 def unlock(self):
215 216 self._call("unlock")
216 217
217 218 def addchangegroup(self, cg, source, url, lock=None):
218 219 '''Send a changegroup to the remote server. Return an integer
219 220 similar to unbundle(). DEPRECATED, since it requires locking the
220 221 remote.'''
221 222 d = self._call("addchangegroup")
222 223 if d:
223 224 self._abort(error.RepoError(_("push refused: %s") % d))
224 225 while True:
225 226 d = cg.read(4096)
226 227 if not d:
227 228 break
228 229 self.pipeo.write(d)
229 230 self.readerr()
230 231
231 232 self.pipeo.flush()
232 233
233 234 self.readerr()
234 235 r = self._recv()
235 236 if not r:
236 237 return 1
237 238 try:
238 239 return int(r)
239 240 except ValueError:
240 241 self._abort(error.ResponseError(_("unexpected response:"), r))
241 242
242 243 instance = sshpeer
@@ -1,784 +1,791
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod
12 12 import peer, error, encoding, util, store
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 # abstract batching support
62 62
63 63 class future(object):
64 64 '''placeholder for a value to be set later'''
65 65 def set(self, value):
66 66 if util.safehasattr(self, 'value'):
67 67 raise error.RepoError("future is already set")
68 68 self.value = value
69 69
70 70 class batcher(object):
71 71 '''base class for batches of commands submittable in a single request
72 72
73 73 All methods invoked on instances of this class are simply queued and
74 74 return a a future for the result. Once you call submit(), all the queued
75 75 calls are performed and the results set in their respective futures.
76 76 '''
77 77 def __init__(self):
78 78 self.calls = []
79 79 def __getattr__(self, name):
80 80 def call(*args, **opts):
81 81 resref = future()
82 82 self.calls.append((name, args, opts, resref,))
83 83 return resref
84 84 return call
85 85 def submit(self):
86 86 pass
87 87
88 88 class localbatch(batcher):
89 89 '''performs the queued calls directly'''
90 90 def __init__(self, local):
91 91 batcher.__init__(self)
92 92 self.local = local
93 93 def submit(self):
94 94 for name, args, opts, resref in self.calls:
95 95 resref.set(getattr(self.local, name)(*args, **opts))
96 96
97 97 class remotebatch(batcher):
98 98 '''batches the queued calls; uses as few roundtrips as possible'''
99 99 def __init__(self, remote):
100 100 '''remote must support _submitbatch(encbatch) and
101 101 _submitone(op, encargs)'''
102 102 batcher.__init__(self)
103 103 self.remote = remote
104 104 def submit(self):
105 105 req, rsp = [], []
106 106 for name, args, opts, resref in self.calls:
107 107 mtd = getattr(self.remote, name)
108 108 batchablefn = getattr(mtd, 'batchable', None)
109 109 if batchablefn is not None:
110 110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 111 encargsorres, encresref = batchable.next()
112 112 if encresref:
113 113 req.append((name, encargsorres,))
114 114 rsp.append((batchable, encresref, resref,))
115 115 else:
116 116 resref.set(encargsorres)
117 117 else:
118 118 if req:
119 119 self._submitreq(req, rsp)
120 120 req, rsp = [], []
121 121 resref.set(mtd(*args, **opts))
122 122 if req:
123 123 self._submitreq(req, rsp)
124 124 def _submitreq(self, req, rsp):
125 125 encresults = self.remote._submitbatch(req)
126 126 for encres, r in zip(encresults, rsp):
127 127 batchable, encresref, resref = r
128 128 encresref.set(encres)
129 129 resref.set(batchable.next())
130 130
131 131 def batchable(f):
132 132 '''annotation for batchable methods
133 133
134 134 Such methods must implement a coroutine as follows:
135 135
136 136 @batchable
137 137 def sample(self, one, two=None):
138 138 # Handle locally computable results first:
139 139 if not one:
140 140 yield "a local result", None
141 141 # Build list of encoded arguments suitable for your wire protocol:
142 142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 143 # Create future for injection of encoded result:
144 144 encresref = future()
145 145 # Return encoded arguments and future:
146 146 yield encargs, encresref
147 147 # Assuming the future to be filled with the result from the batched
148 148 # request now. Decode it:
149 149 yield decode(encresref.value)
150 150
151 151 The decorator returns a function which wraps this coroutine as a plain
152 152 method, but adds the original method as an attribute called "batchable",
153 153 which is used by remotebatch to split the call into separate encoding and
154 154 decoding phases.
155 155 '''
156 156 def plain(*args, **opts):
157 157 batchable = f(*args, **opts)
158 158 encargsorres, encresref = batchable.next()
159 159 if not encresref:
160 160 return encargsorres # a local result in this case
161 161 self = args[0]
162 162 encresref.set(self._submitone(f.func_name, encargsorres))
163 163 return batchable.next()
164 164 setattr(plain, 'batchable', f)
165 165 return plain
166 166
167 167 # list of nodes encoding / decoding
168 168
169 169 def decodelist(l, sep=' '):
170 170 if l:
171 171 return map(bin, l.split(sep))
172 172 return []
173 173
174 174 def encodelist(l, sep=' '):
175 175 return sep.join(map(hex, l))
176 176
177 177 # batched call argument encoding
178 178
179 179 def escapearg(plain):
180 180 return (plain
181 181 .replace(':', '::')
182 182 .replace(',', ':,')
183 183 .replace(';', ':;')
184 184 .replace('=', ':='))
185 185
186 186 def unescapearg(escaped):
187 187 return (escaped
188 188 .replace(':=', '=')
189 189 .replace(':;', ';')
190 190 .replace(':,', ',')
191 191 .replace('::', ':'))
192 192
193 193 # client side
194 194
195 195 class wirepeer(peer.peerrepository):
196 196
197 197 def batch(self):
198 198 return remotebatch(self)
199 199 def _submitbatch(self, req):
200 200 cmds = []
201 201 for op, argsdict in req:
202 202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 203 cmds.append('%s %s' % (op, args))
204 204 rsp = self._call("batch", cmds=';'.join(cmds))
205 205 return rsp.split(';')
206 206 def _submitone(self, op, args):
207 207 return self._call(op, **args)
208 208
209 209 @batchable
210 210 def lookup(self, key):
211 211 self.requirecap('lookup', _('look up remote revision'))
212 212 f = future()
213 213 yield {'key': encoding.fromlocal(key)}, f
214 214 d = f.value
215 215 success, data = d[:-1].split(" ", 1)
216 216 if int(success):
217 217 yield bin(data)
218 218 self._abort(error.RepoError(data))
219 219
220 220 @batchable
221 221 def heads(self):
222 222 f = future()
223 223 yield {}, f
224 224 d = f.value
225 225 try:
226 226 yield decodelist(d[:-1])
227 227 except ValueError:
228 228 self._abort(error.ResponseError(_("unexpected response:"), d))
229 229
230 230 @batchable
231 231 def known(self, nodes):
232 232 f = future()
233 233 yield {'nodes': encodelist(nodes)}, f
234 234 d = f.value
235 235 try:
236 236 yield [bool(int(f)) for f in d]
237 237 except ValueError:
238 238 self._abort(error.ResponseError(_("unexpected response:"), d))
239 239
240 240 @batchable
241 241 def branchmap(self):
242 242 f = future()
243 243 yield {}, f
244 244 d = f.value
245 245 try:
246 246 branchmap = {}
247 247 for branchpart in d.splitlines():
248 248 branchname, branchheads = branchpart.split(' ', 1)
249 249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 250 branchheads = decodelist(branchheads)
251 251 branchmap[branchname] = branchheads
252 252 yield branchmap
253 253 except TypeError:
254 254 self._abort(error.ResponseError(_("unexpected response:"), d))
255 255
256 256 def branches(self, nodes):
257 257 n = encodelist(nodes)
258 258 d = self._call("branches", nodes=n)
259 259 try:
260 260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 261 return br
262 262 except ValueError:
263 263 self._abort(error.ResponseError(_("unexpected response:"), d))
264 264
265 265 def between(self, pairs):
266 266 batch = 8 # avoid giant requests
267 267 r = []
268 268 for i in xrange(0, len(pairs), batch):
269 269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 270 d = self._call("between", pairs=n)
271 271 try:
272 272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 273 except ValueError:
274 274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 275 return r
276 276
277 277 @batchable
278 278 def pushkey(self, namespace, key, old, new):
279 279 if not self.capable('pushkey'):
280 280 yield False, None
281 281 f = future()
282 282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 283 yield {'namespace': encoding.fromlocal(namespace),
284 284 'key': encoding.fromlocal(key),
285 285 'old': encoding.fromlocal(old),
286 286 'new': encoding.fromlocal(new)}, f
287 287 d = f.value
288 288 d, output = d.split('\n', 1)
289 289 try:
290 290 d = bool(int(d))
291 291 except ValueError:
292 292 raise error.ResponseError(
293 293 _('push failed (unexpected response):'), d)
294 294 for l in output.splitlines(True):
295 295 self.ui.status(_('remote: '), l)
296 296 yield d
297 297
298 298 @batchable
299 299 def listkeys(self, namespace):
300 300 if not self.capable('pushkey'):
301 301 yield {}, None
302 302 f = future()
303 303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 305 d = f.value
306 306 r = {}
307 307 for l in d.splitlines():
308 308 k, v = l.split('\t')
309 309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 310 yield r
311 311
312 312 def stream_out(self):
313 313 return self._callstream('stream_out')
314 314
315 315 def changegroup(self, nodes, kind):
316 316 n = encodelist(nodes)
317 f = self._callstream("changegroup", roots=n)
318 return changegroupmod.unbundle10(self._decompress(f), 'UN')
317 f = self._callcompressable("changegroup", roots=n)
318 return changegroupmod.unbundle10(f, 'UN')
319 319
320 320 def changegroupsubset(self, bases, heads, kind):
321 321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 322 bases = encodelist(bases)
323 323 heads = encodelist(heads)
324 f = self._callstream("changegroupsubset",
324 f = self._callcompressable("changegroupsubset",
325 325 bases=bases, heads=heads)
326 return changegroupmod.unbundle10(self._decompress(f), 'UN')
326 return changegroupmod.unbundle10(f, 'UN')
327 327
328 328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
329 329 self.requirecap('getbundle', _('look up remote changes'))
330 330 opts = {}
331 331 if heads is not None:
332 332 opts['heads'] = encodelist(heads)
333 333 if common is not None:
334 334 opts['common'] = encodelist(common)
335 335 if bundlecaps is not None:
336 336 opts['bundlecaps'] = ','.join(bundlecaps)
337 f = self._callstream("getbundle", **opts)
338 return changegroupmod.unbundle10(self._decompress(f), 'UN')
337 f = self._callcompressable("getbundle", **opts)
338 return changegroupmod.unbundle10(f, 'UN')
339 339
340 340 def unbundle(self, cg, heads, source):
341 341 '''Send cg (a readable file-like object representing the
342 342 changegroup to push, typically a chunkbuffer object) to the
343 343 remote server as a bundle. Return an integer indicating the
344 344 result of the push (see localrepository.addchangegroup()).'''
345 345
346 346 if heads != ['force'] and self.capable('unbundlehash'):
347 347 heads = encodelist(['hashed',
348 348 util.sha1(''.join(sorted(heads))).digest()])
349 349 else:
350 350 heads = encodelist(heads)
351 351
352 352 ret, output = self._callpush("unbundle", cg, heads=heads)
353 353 if ret == "":
354 354 raise error.ResponseError(
355 355 _('push failed:'), output)
356 356 try:
357 357 ret = int(ret)
358 358 except ValueError:
359 359 raise error.ResponseError(
360 360 _('push failed (unexpected response):'), ret)
361 361
362 362 for l in output.splitlines(True):
363 363 self.ui.status(_('remote: '), l)
364 364 return ret
365 365
366 366 def debugwireargs(self, one, two, three=None, four=None, five=None):
367 367 # don't pass optional arguments left at their default value
368 368 opts = {}
369 369 if three is not None:
370 370 opts['three'] = three
371 371 if four is not None:
372 372 opts['four'] = four
373 373 return self._call('debugwireargs', one=one, two=two, **opts)
374 374
375 375 def _call(self, cmd, **args):
376 376 """execute <cmd> on the server
377 377
378 378 The command is expected to return a simple string.
379 379
380 380 returns the server reply as a string."""
381 381 raise NotImplementedError()
382 382
383 383 def _callstream(self, cmd, **args):
384 384 """execute <cmd> on the server
385 385
386 386 The command is expected to return a stream.
387 387
388 388 returns the server reply as a file like object."""
389 389 raise NotImplementedError()
390 390
391 def _callcompressable(self, cmd, **args):
392 """execute <cmd> on the server
393
394 The command is expected to return a stream.
395
396 The stream may have been compressed in some implementaitons. This
397 function takes care of the decompression. This is the only difference
398 with _callstream.
399
400 returns the server reply as a file like object.
401 """
402 raise NotImplementedError()
403
391 404 def _callpush(self, cmd, fp, **args):
392 405 """execute a <cmd> on server
393 406
394 407 The command is expected to be related to a push. Push has a special
395 408 return method.
396 409
397 410 returns the server reply as a (ret, output) tuple. ret is either
398 411 empty (error) or a stringified int.
399 412 """
400 413 raise NotImplementedError()
401 414
402 415 def _abort(self, exception):
403 416 """clearly abort the wire protocol connection and raise the exception
404 417 """
405 418 raise NotImplementedError()
406 419
407
408 def _decompress(self, stream):
409 """decompress a received stream
410 """
411 raise NotImplementedError()
412
413 420 # server side
414 421
415 422 # wire protocol command can either return a string or one of these classes.
416 423 class streamres(object):
417 424 """wireproto reply: binary stream
418 425
419 426 The call was successful and the result is a stream.
420 427 Iterate on the `self.gen` attribute to retrieve chunks.
421 428 """
422 429 def __init__(self, gen):
423 430 self.gen = gen
424 431
425 432 class pushres(object):
426 433 """wireproto reply: success with simple integer return
427 434
428 435 The call was successful and returned an integer contained in `self.res`.
429 436 """
430 437 def __init__(self, res):
431 438 self.res = res
432 439
433 440 class pusherr(object):
434 441 """wireproto reply: failure
435 442
436 443 The call failed. The `self.res` attribute contains the error message.
437 444 """
438 445 def __init__(self, res):
439 446 self.res = res
440 447
441 448 class ooberror(object):
442 449 """wireproto reply: failure of a batch of operation
443 450
444 451 Something failed during a batch call. The error message is stored in
445 452 `self.message`.
446 453 """
447 454 def __init__(self, message):
448 455 self.message = message
449 456
450 457 def dispatch(repo, proto, command):
451 458 repo = repo.filtered("served")
452 459 func, spec = commands[command]
453 460 args = proto.getargs(spec)
454 461 return func(repo, proto, *args)
455 462
456 463 def options(cmd, keys, others):
457 464 opts = {}
458 465 for k in keys:
459 466 if k in others:
460 467 opts[k] = others[k]
461 468 del others[k]
462 469 if others:
463 470 sys.stderr.write("abort: %s got unexpected arguments %s\n"
464 471 % (cmd, ",".join(others)))
465 472 return opts
466 473
467 474 def batch(repo, proto, cmds, others):
468 475 repo = repo.filtered("served")
469 476 res = []
470 477 for pair in cmds.split(';'):
471 478 op, args = pair.split(' ', 1)
472 479 vals = {}
473 480 for a in args.split(','):
474 481 if a:
475 482 n, v = a.split('=')
476 483 vals[n] = unescapearg(v)
477 484 func, spec = commands[op]
478 485 if spec:
479 486 keys = spec.split()
480 487 data = {}
481 488 for k in keys:
482 489 if k == '*':
483 490 star = {}
484 491 for key in vals.keys():
485 492 if key not in keys:
486 493 star[key] = vals[key]
487 494 data['*'] = star
488 495 else:
489 496 data[k] = vals[k]
490 497 result = func(repo, proto, *[data[k] for k in keys])
491 498 else:
492 499 result = func(repo, proto)
493 500 if isinstance(result, ooberror):
494 501 return result
495 502 res.append(escapearg(result))
496 503 return ';'.join(res)
497 504
498 505 def between(repo, proto, pairs):
499 506 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
500 507 r = []
501 508 for b in repo.between(pairs):
502 509 r.append(encodelist(b) + "\n")
503 510 return "".join(r)
504 511
505 512 def branchmap(repo, proto):
506 513 branchmap = repo.branchmap()
507 514 heads = []
508 515 for branch, nodes in branchmap.iteritems():
509 516 branchname = urllib.quote(encoding.fromlocal(branch))
510 517 branchnodes = encodelist(nodes)
511 518 heads.append('%s %s' % (branchname, branchnodes))
512 519 return '\n'.join(heads)
513 520
514 521 def branches(repo, proto, nodes):
515 522 nodes = decodelist(nodes)
516 523 r = []
517 524 for b in repo.branches(nodes):
518 525 r.append(encodelist(b) + "\n")
519 526 return "".join(r)
520 527
521 528
522 529 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
523 530 'known', 'getbundle', 'unbundlehash', 'batch']
524 531
525 532 def _capabilities(repo, proto):
526 533 """return a list of capabilities for a repo
527 534
528 535 This function exists to allow extensions to easily wrap capabilities
529 536 computation
530 537
531 538 - returns a lists: easy to alter
532 539 - change done here will be propagated to both `capabilities` and `hello`
533 540 command without any other effort. without any other action needed.
534 541 """
535 542 # copy to prevent modification of the global list
536 543 caps = list(wireprotocaps)
537 544 if _allowstream(repo.ui):
538 545 if repo.ui.configbool('server', 'preferuncompressed', False):
539 546 caps.append('stream-preferred')
540 547 requiredformats = repo.requirements & repo.supportedformats
541 548 # if our local revlogs are just revlogv1, add 'stream' cap
542 549 if not requiredformats - set(('revlogv1',)):
543 550 caps.append('stream')
544 551 # otherwise, add 'streamreqs' detailing our local revlog format
545 552 else:
546 553 caps.append('streamreqs=%s' % ','.join(requiredformats))
547 554 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
548 555 caps.append('httpheader=1024')
549 556 return caps
550 557
551 558 # If you are writting and extension and consider wrapping this function. Wrap
552 559 # `_capabilities` instead.
553 560 def capabilities(repo, proto):
554 561 return ' '.join(_capabilities(repo, proto))
555 562
556 563 def changegroup(repo, proto, roots):
557 564 nodes = decodelist(roots)
558 565 cg = repo.changegroup(nodes, 'serve')
559 566 return streamres(proto.groupchunks(cg))
560 567
561 568 def changegroupsubset(repo, proto, bases, heads):
562 569 bases = decodelist(bases)
563 570 heads = decodelist(heads)
564 571 cg = repo.changegroupsubset(bases, heads, 'serve')
565 572 return streamres(proto.groupchunks(cg))
566 573
567 574 def debugwireargs(repo, proto, one, two, others):
568 575 # only accept optional args from the known set
569 576 opts = options('debugwireargs', ['three', 'four'], others)
570 577 return repo.debugwireargs(one, two, **opts)
571 578
572 579 def getbundle(repo, proto, others):
573 580 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
574 581 for k, v in opts.iteritems():
575 582 if k in ('heads', 'common'):
576 583 opts[k] = decodelist(v)
577 584 elif k == 'bundlecaps':
578 585 opts[k] = set(v.split(','))
579 586 cg = repo.getbundle('serve', **opts)
580 587 return streamres(proto.groupchunks(cg))
581 588
582 589 def heads(repo, proto):
583 590 h = repo.heads()
584 591 return encodelist(h) + "\n"
585 592
586 593 def hello(repo, proto):
587 594 '''the hello command returns a set of lines describing various
588 595 interesting things about the server, in an RFC822-like format.
589 596 Currently the only one defined is "capabilities", which
590 597 consists of a line in the form:
591 598
592 599 capabilities: space separated list of tokens
593 600 '''
594 601 return "capabilities: %s\n" % (capabilities(repo, proto))
595 602
596 603 def listkeys(repo, proto, namespace):
597 604 d = repo.listkeys(encoding.tolocal(namespace)).items()
598 605 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
599 606 for k, v in d])
600 607 return t
601 608
602 609 def lookup(repo, proto, key):
603 610 try:
604 611 k = encoding.tolocal(key)
605 612 c = repo[k]
606 613 r = c.hex()
607 614 success = 1
608 615 except Exception, inst:
609 616 r = str(inst)
610 617 success = 0
611 618 return "%s %s\n" % (success, r)
612 619
613 620 def known(repo, proto, nodes, others):
614 621 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
615 622
616 623 def pushkey(repo, proto, namespace, key, old, new):
617 624 # compatibility with pre-1.8 clients which were accidentally
618 625 # sending raw binary nodes rather than utf-8-encoded hex
619 626 if len(new) == 20 and new.encode('string-escape') != new:
620 627 # looks like it could be a binary node
621 628 try:
622 629 new.decode('utf-8')
623 630 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
624 631 except UnicodeDecodeError:
625 632 pass # binary, leave unmodified
626 633 else:
627 634 new = encoding.tolocal(new) # normal path
628 635
629 636 if util.safehasattr(proto, 'restore'):
630 637
631 638 proto.redirect()
632 639
633 640 try:
634 641 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
635 642 encoding.tolocal(old), new) or False
636 643 except util.Abort:
637 644 r = False
638 645
639 646 output = proto.restore()
640 647
641 648 return '%s\n%s' % (int(r), output)
642 649
643 650 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
644 651 encoding.tolocal(old), new)
645 652 return '%s\n' % int(r)
646 653
647 654 def _allowstream(ui):
648 655 return ui.configbool('server', 'uncompressed', True, untrusted=True)
649 656
650 657 def _walkstreamfiles(repo):
651 658 # this is it's own function so extensions can override it
652 659 return repo.store.walk()
653 660
654 661 def stream(repo, proto):
655 662 '''If the server supports streaming clone, it advertises the "stream"
656 663 capability with a value representing the version and flags of the repo
657 664 it is serving. Client checks to see if it understands the format.
658 665
659 666 The format is simple: the server writes out a line with the amount
660 667 of files, then the total amount of bytes to be transferred (separated
661 668 by a space). Then, for each file, the server first writes the filename
662 669 and filesize (separated by the null character), then the file contents.
663 670 '''
664 671
665 672 if not _allowstream(repo.ui):
666 673 return '1\n'
667 674
668 675 entries = []
669 676 total_bytes = 0
670 677 try:
671 678 # get consistent snapshot of repo, lock during scan
672 679 lock = repo.lock()
673 680 try:
674 681 repo.ui.debug('scanning\n')
675 682 for name, ename, size in _walkstreamfiles(repo):
676 683 if size:
677 684 entries.append((name, size))
678 685 total_bytes += size
679 686 finally:
680 687 lock.release()
681 688 except error.LockError:
682 689 return '2\n' # error: 2
683 690
684 691 def streamer(repo, entries, total):
685 692 '''stream out all metadata files in repository.'''
686 693 yield '0\n' # success
687 694 repo.ui.debug('%d files, %d bytes to transfer\n' %
688 695 (len(entries), total_bytes))
689 696 yield '%d %d\n' % (len(entries), total_bytes)
690 697
691 698 sopener = repo.sopener
692 699 oldaudit = sopener.mustaudit
693 700 debugflag = repo.ui.debugflag
694 701 sopener.mustaudit = False
695 702
696 703 try:
697 704 for name, size in entries:
698 705 if debugflag:
699 706 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
700 707 # partially encode name over the wire for backwards compat
701 708 yield '%s\0%d\n' % (store.encodedir(name), size)
702 709 if size <= 65536:
703 710 fp = sopener(name)
704 711 try:
705 712 data = fp.read(size)
706 713 finally:
707 714 fp.close()
708 715 yield data
709 716 else:
710 717 for chunk in util.filechunkiter(sopener(name), limit=size):
711 718 yield chunk
712 719 # replace with "finally:" when support for python 2.4 has been dropped
713 720 except Exception:
714 721 sopener.mustaudit = oldaudit
715 722 raise
716 723 sopener.mustaudit = oldaudit
717 724
718 725 return streamres(streamer(repo, entries, total_bytes))
719 726
720 727 def unbundle(repo, proto, heads):
721 728 their_heads = decodelist(heads)
722 729
723 730 def check_heads():
724 731 heads = repo.heads()
725 732 heads_hash = util.sha1(''.join(sorted(heads))).digest()
726 733 return (their_heads == ['force'] or their_heads == heads or
727 734 their_heads == ['hashed', heads_hash])
728 735
729 736 proto.redirect()
730 737
731 738 # fail early if possible
732 739 if not check_heads():
733 740 return pusherr('repository changed while preparing changes - '
734 741 'please try again')
735 742
736 743 # write bundle data to temporary file because it can be big
737 744 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
738 745 fp = os.fdopen(fd, 'wb+')
739 746 r = 0
740 747 try:
741 748 proto.getfile(fp)
742 749 lock = repo.lock()
743 750 try:
744 751 if not check_heads():
745 752 # someone else committed/pushed/unbundled while we
746 753 # were transferring data
747 754 return pusherr('repository changed while uploading changes - '
748 755 'please try again')
749 756
750 757 # push can proceed
751 758 fp.seek(0)
752 759 gen = changegroupmod.readbundle(fp, None)
753 760
754 761 try:
755 762 r = repo.addchangegroup(gen, 'serve', proto._client())
756 763 except util.Abort, inst:
757 764 sys.stderr.write("abort: %s\n" % inst)
758 765 finally:
759 766 lock.release()
760 767 return pushres(r)
761 768
762 769 finally:
763 770 fp.close()
764 771 os.unlink(tempname)
765 772
766 773 commands = {
767 774 'batch': (batch, 'cmds *'),
768 775 'between': (between, 'pairs'),
769 776 'branchmap': (branchmap, ''),
770 777 'branches': (branches, 'nodes'),
771 778 'capabilities': (capabilities, ''),
772 779 'changegroup': (changegroup, 'roots'),
773 780 'changegroupsubset': (changegroupsubset, 'bases heads'),
774 781 'debugwireargs': (debugwireargs, 'one two *'),
775 782 'getbundle': (getbundle, '*'),
776 783 'heads': (heads, ''),
777 784 'hello': (hello, ''),
778 785 'known': (known, 'nodes *'),
779 786 'listkeys': (listkeys, 'namespace'),
780 787 'lookup': (lookup, 'key'),
781 788 'pushkey': (pushkey, 'namespace key old new'),
782 789 'stream_out': (stream, ''),
783 790 'unbundle': (unbundle, 'heads'),
784 791 }
General Comments 0
You need to be logged in to leave comments. Login now