##// END OF EJS Templates
protocol: unify client unbundle support...
Matt Mackall -
r11592:26e0782b default
parent child Browse files
Show More
@@ -1,215 +1,200
1 1 # httprepo.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from node import bin, hex, nullid
10 10 from i18n import _
11 11 import repo, changegroup, statichttprepo, error, url, util, wireproto
12 12 import os, urllib, urllib2, urlparse, zlib, httplib
13 13 import errno, socket
14 14 import encoding
15 15
16 16 def zgenerator(f):
17 17 zd = zlib.decompressobj()
18 18 try:
19 19 for chunk in util.filechunkiter(f):
20 20 yield zd.decompress(chunk)
21 21 except httplib.HTTPException:
22 22 raise IOError(None, _('connection ended unexpectedly'))
23 23 yield zd.flush()
24 24
25 25 class httprepository(wireproto.wirerepository):
26 26 def __init__(self, ui, path):
27 27 self.path = path
28 28 self.caps = None
29 29 self.handler = None
30 30 scheme, netloc, urlpath, query, frag = urlparse.urlsplit(path)
31 31 if query or frag:
32 32 raise util.Abort(_('unsupported URL component: "%s"') %
33 33 (query or frag))
34 34
35 35 # urllib cannot handle URLs with embedded user or passwd
36 36 self._url, authinfo = url.getauthinfo(path)
37 37
38 38 self.ui = ui
39 39 self.ui.debug('using %s\n' % self._url)
40 40
41 41 self.urlopener = url.opener(ui, authinfo)
42 42
43 43 def __del__(self):
44 44 for h in self.urlopener.handlers:
45 45 h.close()
46 46 if hasattr(h, "close_all"):
47 47 h.close_all()
48 48
49 49 def url(self):
50 50 return self.path
51 51
52 52 # look up capabilities only when needed
53 53
54 54 def get_caps(self):
55 55 if self.caps is None:
56 56 try:
57 57 self.caps = set(self._call('capabilities').split())
58 58 except error.RepoError:
59 59 self.caps = set()
60 60 self.ui.debug('capabilities: %s\n' %
61 61 (' '.join(self.caps or ['none'])))
62 62 return self.caps
63 63
64 64 capabilities = property(get_caps)
65 65
66 66 def lock(self):
67 67 raise util.Abort(_('operation not supported over http'))
68 68
69 69 def _callstream(self, cmd, **args):
70 70 data = args.pop('data', None)
71 71 headers = args.pop('headers', {})
72 72 self.ui.debug("sending %s command\n" % cmd)
73 73 q = {"cmd": cmd}
74 74 q.update(args)
75 75 qs = '?%s' % urllib.urlencode(q)
76 76 cu = "%s%s" % (self._url, qs)
77 77 req = urllib2.Request(cu, data, headers)
78 78 if data is not None:
79 79 # len(data) is broken if data doesn't fit into Py_ssize_t
80 80 # add the header ourself to avoid OverflowError
81 81 size = data.__len__()
82 82 self.ui.debug("sending %s bytes\n" % size)
83 83 req.add_unredirected_header('Content-Length', '%d' % size)
84 84 try:
85 85 resp = self.urlopener.open(req)
86 86 except urllib2.HTTPError, inst:
87 87 if inst.code == 401:
88 88 raise util.Abort(_('authorization failed'))
89 89 raise
90 90 except httplib.HTTPException, inst:
91 91 self.ui.debug('http error while sending %s command\n' % cmd)
92 92 self.ui.traceback()
93 93 raise IOError(None, inst)
94 94 except IndexError:
95 95 # this only happens with Python 2.3, later versions raise URLError
96 96 raise util.Abort(_('http error, possibly caused by proxy setting'))
97 97 # record the url we got redirected to
98 98 resp_url = resp.geturl()
99 99 if resp_url.endswith(qs):
100 100 resp_url = resp_url[:-len(qs)]
101 101 if self._url.rstrip('/') != resp_url.rstrip('/'):
102 102 self.ui.status(_('real URL is %s\n') % resp_url)
103 103 self._url = resp_url
104 104 try:
105 105 proto = resp.getheader('content-type')
106 106 except AttributeError:
107 107 proto = resp.headers['content-type']
108 108
109 109 safeurl = url.hidepassword(self._url)
110 110 # accept old "text/plain" and "application/hg-changegroup" for now
111 111 if not (proto.startswith('application/mercurial-') or
112 112 proto.startswith('text/plain') or
113 113 proto.startswith('application/hg-changegroup')):
114 114 self.ui.debug("requested URL: '%s'\n" % url.hidepassword(cu))
115 115 raise error.RepoError(
116 116 _("'%s' does not appear to be an hg repository:\n"
117 117 "---%%<--- (%s)\n%s\n---%%<---\n")
118 118 % (safeurl, proto, resp.read()))
119 119
120 120 if proto.startswith('application/mercurial-'):
121 121 try:
122 122 version = proto.split('-', 1)[1]
123 123 version_info = tuple([int(n) for n in version.split('.')])
124 124 except ValueError:
125 125 raise error.RepoError(_("'%s' sent a broken Content-Type "
126 126 "header (%s)") % (safeurl, proto))
127 127 if version_info > (0, 1):
128 128 raise error.RepoError(_("'%s' uses newer protocol %s") %
129 129 (safeurl, version))
130 130
131 131 return resp
132 132
133 133 def _call(self, cmd, **args):
134 134 fp = self._callstream(cmd, **args)
135 135 try:
136 136 return fp.read()
137 137 finally:
138 138 # if using keepalive, allow connection to be reused
139 139 fp.close()
140 140
141 def _abort(self, exception):
142 raise exception
143
144 def _decompress(self, stream):
145 return util.chunkbuffer(zgenerator(stream))
146
147 def unbundle(self, cg, heads, source):
148 '''Send cg (a readable file-like object representing the
149 changegroup to push, typically a chunkbuffer object) to the
150 remote server as a bundle. Return an integer response code:
151 non-zero indicates a successful push (see
152 localrepository.addchangegroup()), and zero indicates either
153 error or nothing to push.'''
141 def _callpush(self, cmd, cg, **args):
154 142 # have to stream bundle to a temp file because we do not have
155 143 # http 1.1 chunked transfer.
156 144
157 145 type = ""
158 146 types = self.capable('unbundle')
159 147 # servers older than d1b16a746db6 will send 'unbundle' as a
160 148 # boolean capability
161 149 try:
162 150 types = types.split(',')
163 151 except AttributeError:
164 152 types = [""]
165 153 if types:
166 154 for x in types:
167 155 if x in changegroup.bundletypes:
168 156 type = x
169 157 break
170 158
171 159 tempname = changegroup.writebundle(cg, None, type)
172 160 fp = url.httpsendfile(tempname, "rb")
161 headers = {'Content-Type': 'application/mercurial-0.1'}
162
173 163 try:
174 164 try:
175 resp = self._call(
176 'unbundle', data=fp,
177 headers={'Content-Type': 'application/mercurial-0.1'},
178 heads=' '.join(map(hex, heads)))
179 resp_code, output = resp.split('\n', 1)
180 try:
181 ret = int(resp_code)
182 except ValueError, err:
183 raise error.ResponseError(
184 _('push failed (unexpected response):'), resp)
185 for l in output.splitlines(True):
186 self.ui.status(_('remote: '), l)
187 return ret
165 r = self._call(cmd, data=fp, headers=headers, **args)
166 return r.split('\n', 1)
188 167 except socket.error, err:
189 168 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
190 169 raise util.Abort(_('push failed: %s') % err.args[1])
191 170 raise util.Abort(err.args[1])
192 171 finally:
193 172 fp.close()
194 173 os.unlink(tempname)
195 174
175 def _abort(self, exception):
176 raise exception
177
178 def _decompress(self, stream):
179 return util.chunkbuffer(zgenerator(stream))
180
196 181 class httpsrepository(httprepository):
197 182 def __init__(self, ui, path):
198 183 if not url.has_https:
199 184 raise util.Abort(_('Python support for SSL and HTTPS '
200 185 'is not installed'))
201 186 httprepository.__init__(self, ui, path)
202 187
203 188 def instance(ui, path, create):
204 189 if create:
205 190 raise util.Abort(_('cannot create new http repository'))
206 191 try:
207 192 if path.startswith('https:'):
208 193 inst = httpsrepository(ui, path)
209 194 else:
210 195 inst = httprepository(ui, path)
211 196 inst.between([(nullid, nullid)])
212 197 return inst
213 198 except error.RepoError:
214 199 ui.note('(falling back to static-http)\n')
215 200 return statichttprepo.instance(ui, "static-" + path, create)
@@ -1,212 +1,198
1 1 # sshrepo.py - ssh repository proxy class for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from node import bin, hex
9 9 from i18n import _
10 10 import repo, util, error, encoding, wireproto
11 11 import re, urllib
12 12
13 13 class remotelock(object):
14 14 def __init__(self, repo):
15 15 self.repo = repo
16 16 def release(self):
17 17 self.repo.unlock()
18 18 self.repo = None
19 19 def __del__(self):
20 20 if self.repo:
21 21 self.release()
22 22
23 23 class sshrepository(wireproto.wirerepository):
24 24 def __init__(self, ui, path, create=0):
25 25 self._url = path
26 26 self.ui = ui
27 27
28 28 m = re.match(r'^ssh://(([^@]+)@)?([^:/]+)(:(\d+))?(/(.*))?$', path)
29 29 if not m:
30 30 self._abort(error.RepoError(_("couldn't parse location %s") % path))
31 31
32 32 self.user = m.group(2)
33 33 self.host = m.group(3)
34 34 self.port = m.group(5)
35 35 self.path = m.group(7) or "."
36 36
37 37 sshcmd = self.ui.config("ui", "ssh", "ssh")
38 38 remotecmd = self.ui.config("ui", "remotecmd", "hg")
39 39
40 40 args = util.sshargs(sshcmd, self.host, self.user, self.port)
41 41
42 42 if create:
43 43 cmd = '%s %s "%s init %s"'
44 44 cmd = cmd % (sshcmd, args, remotecmd, self.path)
45 45
46 46 ui.note(_('running %s\n') % cmd)
47 47 res = util.system(cmd)
48 48 if res != 0:
49 49 self._abort(error.RepoError(_("could not create remote repo")))
50 50
51 51 self.validate_repo(ui, sshcmd, args, remotecmd)
52 52
53 53 def url(self):
54 54 return self._url
55 55
56 56 def validate_repo(self, ui, sshcmd, args, remotecmd):
57 57 # cleanup up previous run
58 58 self.cleanup()
59 59
60 60 cmd = '%s %s "%s -R %s serve --stdio"'
61 61 cmd = cmd % (sshcmd, args, remotecmd, self.path)
62 62
63 63 cmd = util.quotecommand(cmd)
64 64 ui.note(_('running %s\n') % cmd)
65 65 self.pipeo, self.pipei, self.pipee = util.popen3(cmd)
66 66
67 67 # skip any noise generated by remote shell
68 68 self._callstream("hello")
69 69 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
70 70 lines = ["", "dummy"]
71 71 max_noise = 500
72 72 while lines[-1] and max_noise:
73 73 l = r.readline()
74 74 self.readerr()
75 75 if lines[-1] == "1\n" and l == "\n":
76 76 break
77 77 if l:
78 78 ui.debug("remote: ", l)
79 79 lines.append(l)
80 80 max_noise -= 1
81 81 else:
82 82 self._abort(error.RepoError(_("no suitable response from remote hg")))
83 83
84 84 self.capabilities = set()
85 85 for l in reversed(lines):
86 86 if l.startswith("capabilities:"):
87 87 self.capabilities.update(l[:-1].split(":")[1].split())
88 88 break
89 89
90 90 def readerr(self):
91 91 while 1:
92 92 size = util.fstat(self.pipee).st_size
93 93 if size == 0:
94 94 break
95 95 l = self.pipee.readline()
96 96 if not l:
97 97 break
98 98 self.ui.status(_("remote: "), l)
99 99
100 100 def _abort(self, exception):
101 101 self.cleanup()
102 102 raise exception
103 103
104 104 def cleanup(self):
105 105 try:
106 106 self.pipeo.close()
107 107 self.pipei.close()
108 108 # read the error descriptor until EOF
109 109 for l in self.pipee:
110 110 self.ui.status(_("remote: "), l)
111 111 self.pipee.close()
112 112 except:
113 113 pass
114 114
115 115 __del__ = cleanup
116 116
117 117 def _callstream(self, cmd, **args):
118 118 self.ui.debug("sending %s command\n" % cmd)
119 119 self.pipeo.write("%s\n" % cmd)
120 120 for k, v in sorted(args.iteritems()):
121 121 self.pipeo.write("%s %d\n" % (k, len(v)))
122 122 self.pipeo.write(v)
123 123 self.pipeo.flush()
124 124
125 125 return self.pipei
126 126
127 127 def _call(self, cmd, **args):
128 128 self._callstream(cmd, **args)
129 129 return self._recv()
130 130
131 def _callpush(self, cmd, fp, **args):
132 r = self._call(cmd, **args)
133 if r:
134 return '', r
135 while 1:
136 d = fp.read(4096)
137 if not d:
138 break
139 self._send(d)
140 self._send("", flush=True)
141 r = self._recv()
142 if r:
143 return '', r
144 return self._recv(), ''
145
131 146 def _decompress(self, stream):
132 147 return stream
133 148
134 149 def _recv(self):
135 150 l = self.pipei.readline()
136 151 self.readerr()
137 152 try:
138 153 l = int(l)
139 154 except:
140 155 self._abort(error.ResponseError(_("unexpected response:"), l))
141 156 return self.pipei.read(l)
142 157
143 158 def _send(self, data, flush=False):
144 159 self.pipeo.write("%d\n" % len(data))
145 160 if data:
146 161 self.pipeo.write(data)
147 162 if flush:
148 163 self.pipeo.flush()
149 164 self.readerr()
150 165
151 166 def lock(self):
152 167 self._call("lock")
153 168 return remotelock(self)
154 169
155 170 def unlock(self):
156 171 self._call("unlock")
157 172
158 def unbundle(self, cg, heads, source):
159 '''Send cg (a readable file-like object representing the
160 changegroup to push, typically a chunkbuffer object) to the
161 remote server as a bundle. Return an integer indicating the
162 result of the push (see localrepository.addchangegroup()).'''
163 d = self._call("unbundle", heads=' '.join(map(hex, heads)))
164 if d:
165 # remote may send "unsynced changes"
166 self._abort(error.RepoError(_("push refused: %s") % d))
167
168 while 1:
169 d = cg.read(4096)
170 if not d:
171 break
172 self._send(d)
173
174 self._send("", flush=True)
175
176 r = self._recv()
177 if r:
178 # remote may send "unsynced changes"
179 self._abort(error.RepoError(_("push failed: %s") % r))
180
181 r = self._recv()
182 try:
183 return int(r)
184 except:
185 self._abort(error.ResponseError(_("unexpected response:"), r))
186
187 173 def addchangegroup(self, cg, source, url):
188 174 '''Send a changegroup to the remote server. Return an integer
189 175 similar to unbundle(). DEPRECATED, since it requires locking the
190 176 remote.'''
191 177 d = self._call("addchangegroup")
192 178 if d:
193 179 self._abort(error.RepoError(_("push refused: %s") % d))
194 180 while 1:
195 181 d = cg.read(4096)
196 182 if not d:
197 183 break
198 184 self.pipeo.write(d)
199 185 self.readerr()
200 186
201 187 self.pipeo.flush()
202 188
203 189 self.readerr()
204 190 r = self._recv()
205 191 if not r:
206 192 return 1
207 193 try:
208 194 return int(r)
209 195 except:
210 196 self._abort(error.ResponseError(_("unexpected response:"), r))
211 197
212 198 instance = sshrepository
@@ -1,192 +1,212
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import bin, hex
10 10 import urllib
11 11 import streamclone, repo, error, encoding
12 12 import pushkey as pushkey_
13 13
14 14 # client side
15 15
16 16 class wirerepository(repo.repository):
17 17 def lookup(self, key):
18 18 self.requirecap('lookup', _('look up remote revision'))
19 19 d = self._call("lookup", key=key)
20 20 success, data = d[:-1].split(" ", 1)
21 21 if int(success):
22 22 return bin(data)
23 23 self._abort(error.RepoError(data))
24 24
25 25 def heads(self):
26 26 d = self._call("heads")
27 27 try:
28 28 return map(bin, d[:-1].split(" "))
29 29 except:
30 30 self.abort(error.ResponseError(_("unexpected response:"), d))
31 31
32 32 def branchmap(self):
33 33 d = self._call("branchmap")
34 34 try:
35 35 branchmap = {}
36 36 for branchpart in d.splitlines():
37 37 branchheads = branchpart.split(' ')
38 38 branchname = urllib.unquote(branchheads[0])
39 39 # Earlier servers (1.3.x) send branch names in (their) local
40 40 # charset. The best we can do is assume it's identical to our
41 41 # own local charset, in case it's not utf-8.
42 42 try:
43 43 branchname.decode('utf-8')
44 44 except UnicodeDecodeError:
45 45 branchname = encoding.fromlocal(branchname)
46 46 branchheads = [bin(x) for x in branchheads[1:]]
47 47 branchmap[branchname] = branchheads
48 48 return branchmap
49 49 except TypeError:
50 50 self._abort(error.ResponseError(_("unexpected response:"), d))
51 51
52 52 def branches(self, nodes):
53 53 n = " ".join(map(hex, nodes))
54 54 d = self._call("branches", nodes=n)
55 55 try:
56 56 br = [tuple(map(bin, b.split(" "))) for b in d.splitlines()]
57 57 return br
58 58 except:
59 59 self._abort(error.ResponseError(_("unexpected response:"), d))
60 60
61 61 def between(self, pairs):
62 62 batch = 8 # avoid giant requests
63 63 r = []
64 64 for i in xrange(0, len(pairs), batch):
65 65 n = " ".join(["-".join(map(hex, p)) for p in pairs[i:i + batch]])
66 66 d = self._call("between", pairs=n)
67 67 try:
68 68 r += [l and map(bin, l.split(" ")) or []
69 69 for l in d.splitlines()]
70 70 except:
71 71 self._abort(error.ResponseError(_("unexpected response:"), d))
72 72 return r
73 73
74 74 def pushkey(self, namespace, key, old, new):
75 75 if not self.capable('pushkey'):
76 76 return False
77 77 d = self._call("pushkey",
78 78 namespace=namespace, key=key, old=old, new=new)
79 79 return bool(int(d))
80 80
81 81 def listkeys(self, namespace):
82 82 if not self.capable('pushkey'):
83 83 return {}
84 84 d = self._call("listkeys", namespace=namespace)
85 85 r = {}
86 86 for l in d.splitlines():
87 87 k, v = l.split('\t')
88 88 r[k.decode('string-escape')] = v.decode('string-escape')
89 89 return r
90 90
91 91 def stream_out(self):
92 92 return self._callstream('stream_out')
93 93
94 94 def changegroup(self, nodes, kind):
95 95 n = " ".join(map(hex, nodes))
96 96 f = self._callstream("changegroup", roots=n)
97 97 return self._decompress(f)
98 98
99 99 def changegroupsubset(self, bases, heads, kind):
100 100 self.requirecap('changegroupsubset', _('look up remote changes'))
101 101 bases = " ".join(map(hex, bases))
102 102 heads = " ".join(map(hex, heads))
103 103 return self._decompress(self._callstream("changegroupsubset",
104 104 bases=bases, heads=heads))
105 105
106 def unbundle(self, cg, heads, source):
107 '''Send cg (a readable file-like object representing the
108 changegroup to push, typically a chunkbuffer object) to the
109 remote server as a bundle. Return an integer indicating the
110 result of the push (see localrepository.addchangegroup()).'''
111
112 ret, output = self._callpush("unbundle", cg, heads=' '.join(map(hex, heads)))
113 if ret == "":
114 raise error.ResponseError(
115 _('push failed:'), output)
116 try:
117 ret = int(ret)
118 except ValueError, err:
119 raise error.ResponseError(
120 _('push failed (unexpected response):'), ret)
121
122 for l in output.splitlines(True):
123 self.ui.status(_('remote: '), l)
124 return ret
125
106 126 # server side
107 127
108 128 def dispatch(repo, proto, command):
109 129 if command not in commands:
110 130 return False
111 131 func, spec = commands[command]
112 132 args = proto.getargs(spec)
113 133 r = func(repo, proto, *args)
114 134 if r != None:
115 135 proto.respond(r)
116 136 return True
117 137
118 138 def between(repo, proto, pairs):
119 139 pairs = [map(bin, p.split("-")) for p in pairs.split(" ")]
120 140 r = []
121 141 for b in repo.between(pairs):
122 142 r.append(" ".join(map(hex, b)) + "\n")
123 143 return "".join(r)
124 144
125 145 def branchmap(repo, proto):
126 146 branchmap = repo.branchmap()
127 147 heads = []
128 148 for branch, nodes in branchmap.iteritems():
129 149 branchname = urllib.quote(branch)
130 150 branchnodes = [hex(node) for node in nodes]
131 151 heads.append('%s %s' % (branchname, ' '.join(branchnodes)))
132 152 return '\n'.join(heads)
133 153
134 154 def branches(repo, proto, nodes):
135 155 nodes = map(bin, nodes.split(" "))
136 156 r = []
137 157 for b in repo.branches(nodes):
138 158 r.append(" ".join(map(hex, b)) + "\n")
139 159 return "".join(r)
140 160
141 161 def changegroup(repo, proto, roots):
142 162 nodes = map(bin, roots.split(" "))
143 163 cg = repo.changegroup(nodes, 'serve')
144 164 proto.sendchangegroup(cg)
145 165
146 166 def changegroupsubset(repo, proto, bases, heads):
147 167 bases = [bin(n) for n in bases.split(' ')]
148 168 heads = [bin(n) for n in heads.split(' ')]
149 169 cg = repo.changegroupsubset(bases, heads, 'serve')
150 170 proto.sendchangegroup(cg)
151 171
152 172 def heads(repo, proto):
153 173 h = repo.heads()
154 174 return " ".join(map(hex, h)) + "\n"
155 175
156 176 def listkeys(repo, proto, namespace):
157 177 d = pushkey_.list(repo, namespace).items()
158 178 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
159 179 v.encode('string-escape')) for k, v in d])
160 180 return t
161 181
162 182 def lookup(repo, proto, key):
163 183 try:
164 184 r = hex(repo.lookup(key))
165 185 success = 1
166 186 except Exception, inst:
167 187 r = str(inst)
168 188 success = 0
169 189 return "%s %s\n" % (success, r)
170 190
171 191 def pushkey(repo, proto, namespace, key, old, new):
172 192 r = pushkey_.push(repo, namespace, key, old, new)
173 193 return '%s\n' % int(r)
174 194
175 195 def stream(repo, proto):
176 196 try:
177 197 proto.sendstream(streamclone.stream_out(repo))
178 198 except streamclone.StreamException, inst:
179 199 return str(inst)
180 200
181 201 commands = {
182 202 'between': (between, 'pairs'),
183 203 'branchmap': (branchmap, ''),
184 204 'branches': (branches, 'nodes'),
185 205 'changegroup': (changegroup, 'roots'),
186 206 'changegroupsubset': (changegroupsubset, 'bases heads'),
187 207 'heads': (heads, ''),
188 208 'listkeys': (listkeys, 'namespace'),
189 209 'lookup': (lookup, 'key'),
190 210 'pushkey': (pushkey, 'namespace key old new'),
191 211 'stream_out': (stream, ''),
192 212 }
General Comments 0
You need to be logged in to leave comments. Login now