##// END OF EJS Templates
protocol: unify client unbundle support...
Matt Mackall -
r11592:26e0782b default
parent child Browse files
Show More
@@ -1,215 +1,200
1 # httprepo.py - HTTP repository proxy classes for mercurial
1 # httprepo.py - HTTP repository proxy classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from node import bin, hex, nullid
9 from node import bin, hex, nullid
10 from i18n import _
10 from i18n import _
11 import repo, changegroup, statichttprepo, error, url, util, wireproto
11 import repo, changegroup, statichttprepo, error, url, util, wireproto
12 import os, urllib, urllib2, urlparse, zlib, httplib
12 import os, urllib, urllib2, urlparse, zlib, httplib
13 import errno, socket
13 import errno, socket
14 import encoding
14 import encoding
15
15
16 def zgenerator(f):
16 def zgenerator(f):
17 zd = zlib.decompressobj()
17 zd = zlib.decompressobj()
18 try:
18 try:
19 for chunk in util.filechunkiter(f):
19 for chunk in util.filechunkiter(f):
20 yield zd.decompress(chunk)
20 yield zd.decompress(chunk)
21 except httplib.HTTPException:
21 except httplib.HTTPException:
22 raise IOError(None, _('connection ended unexpectedly'))
22 raise IOError(None, _('connection ended unexpectedly'))
23 yield zd.flush()
23 yield zd.flush()
24
24
25 class httprepository(wireproto.wirerepository):
25 class httprepository(wireproto.wirerepository):
26 def __init__(self, ui, path):
26 def __init__(self, ui, path):
27 self.path = path
27 self.path = path
28 self.caps = None
28 self.caps = None
29 self.handler = None
29 self.handler = None
30 scheme, netloc, urlpath, query, frag = urlparse.urlsplit(path)
30 scheme, netloc, urlpath, query, frag = urlparse.urlsplit(path)
31 if query or frag:
31 if query or frag:
32 raise util.Abort(_('unsupported URL component: "%s"') %
32 raise util.Abort(_('unsupported URL component: "%s"') %
33 (query or frag))
33 (query or frag))
34
34
35 # urllib cannot handle URLs with embedded user or passwd
35 # urllib cannot handle URLs with embedded user or passwd
36 self._url, authinfo = url.getauthinfo(path)
36 self._url, authinfo = url.getauthinfo(path)
37
37
38 self.ui = ui
38 self.ui = ui
39 self.ui.debug('using %s\n' % self._url)
39 self.ui.debug('using %s\n' % self._url)
40
40
41 self.urlopener = url.opener(ui, authinfo)
41 self.urlopener = url.opener(ui, authinfo)
42
42
43 def __del__(self):
43 def __del__(self):
44 for h in self.urlopener.handlers:
44 for h in self.urlopener.handlers:
45 h.close()
45 h.close()
46 if hasattr(h, "close_all"):
46 if hasattr(h, "close_all"):
47 h.close_all()
47 h.close_all()
48
48
49 def url(self):
49 def url(self):
50 return self.path
50 return self.path
51
51
52 # look up capabilities only when needed
52 # look up capabilities only when needed
53
53
54 def get_caps(self):
54 def get_caps(self):
55 if self.caps is None:
55 if self.caps is None:
56 try:
56 try:
57 self.caps = set(self._call('capabilities').split())
57 self.caps = set(self._call('capabilities').split())
58 except error.RepoError:
58 except error.RepoError:
59 self.caps = set()
59 self.caps = set()
60 self.ui.debug('capabilities: %s\n' %
60 self.ui.debug('capabilities: %s\n' %
61 (' '.join(self.caps or ['none'])))
61 (' '.join(self.caps or ['none'])))
62 return self.caps
62 return self.caps
63
63
64 capabilities = property(get_caps)
64 capabilities = property(get_caps)
65
65
66 def lock(self):
66 def lock(self):
67 raise util.Abort(_('operation not supported over http'))
67 raise util.Abort(_('operation not supported over http'))
68
68
69 def _callstream(self, cmd, **args):
69 def _callstream(self, cmd, **args):
70 data = args.pop('data', None)
70 data = args.pop('data', None)
71 headers = args.pop('headers', {})
71 headers = args.pop('headers', {})
72 self.ui.debug("sending %s command\n" % cmd)
72 self.ui.debug("sending %s command\n" % cmd)
73 q = {"cmd": cmd}
73 q = {"cmd": cmd}
74 q.update(args)
74 q.update(args)
75 qs = '?%s' % urllib.urlencode(q)
75 qs = '?%s' % urllib.urlencode(q)
76 cu = "%s%s" % (self._url, qs)
76 cu = "%s%s" % (self._url, qs)
77 req = urllib2.Request(cu, data, headers)
77 req = urllib2.Request(cu, data, headers)
78 if data is not None:
78 if data is not None:
79 # len(data) is broken if data doesn't fit into Py_ssize_t
79 # len(data) is broken if data doesn't fit into Py_ssize_t
80 # add the header ourself to avoid OverflowError
80 # add the header ourself to avoid OverflowError
81 size = data.__len__()
81 size = data.__len__()
82 self.ui.debug("sending %s bytes\n" % size)
82 self.ui.debug("sending %s bytes\n" % size)
83 req.add_unredirected_header('Content-Length', '%d' % size)
83 req.add_unredirected_header('Content-Length', '%d' % size)
84 try:
84 try:
85 resp = self.urlopener.open(req)
85 resp = self.urlopener.open(req)
86 except urllib2.HTTPError, inst:
86 except urllib2.HTTPError, inst:
87 if inst.code == 401:
87 if inst.code == 401:
88 raise util.Abort(_('authorization failed'))
88 raise util.Abort(_('authorization failed'))
89 raise
89 raise
90 except httplib.HTTPException, inst:
90 except httplib.HTTPException, inst:
91 self.ui.debug('http error while sending %s command\n' % cmd)
91 self.ui.debug('http error while sending %s command\n' % cmd)
92 self.ui.traceback()
92 self.ui.traceback()
93 raise IOError(None, inst)
93 raise IOError(None, inst)
94 except IndexError:
94 except IndexError:
95 # this only happens with Python 2.3, later versions raise URLError
95 # this only happens with Python 2.3, later versions raise URLError
96 raise util.Abort(_('http error, possibly caused by proxy setting'))
96 raise util.Abort(_('http error, possibly caused by proxy setting'))
97 # record the url we got redirected to
97 # record the url we got redirected to
98 resp_url = resp.geturl()
98 resp_url = resp.geturl()
99 if resp_url.endswith(qs):
99 if resp_url.endswith(qs):
100 resp_url = resp_url[:-len(qs)]
100 resp_url = resp_url[:-len(qs)]
101 if self._url.rstrip('/') != resp_url.rstrip('/'):
101 if self._url.rstrip('/') != resp_url.rstrip('/'):
102 self.ui.status(_('real URL is %s\n') % resp_url)
102 self.ui.status(_('real URL is %s\n') % resp_url)
103 self._url = resp_url
103 self._url = resp_url
104 try:
104 try:
105 proto = resp.getheader('content-type')
105 proto = resp.getheader('content-type')
106 except AttributeError:
106 except AttributeError:
107 proto = resp.headers['content-type']
107 proto = resp.headers['content-type']
108
108
109 safeurl = url.hidepassword(self._url)
109 safeurl = url.hidepassword(self._url)
110 # accept old "text/plain" and "application/hg-changegroup" for now
110 # accept old "text/plain" and "application/hg-changegroup" for now
111 if not (proto.startswith('application/mercurial-') or
111 if not (proto.startswith('application/mercurial-') or
112 proto.startswith('text/plain') or
112 proto.startswith('text/plain') or
113 proto.startswith('application/hg-changegroup')):
113 proto.startswith('application/hg-changegroup')):
114 self.ui.debug("requested URL: '%s'\n" % url.hidepassword(cu))
114 self.ui.debug("requested URL: '%s'\n" % url.hidepassword(cu))
115 raise error.RepoError(
115 raise error.RepoError(
116 _("'%s' does not appear to be an hg repository:\n"
116 _("'%s' does not appear to be an hg repository:\n"
117 "---%%<--- (%s)\n%s\n---%%<---\n")
117 "---%%<--- (%s)\n%s\n---%%<---\n")
118 % (safeurl, proto, resp.read()))
118 % (safeurl, proto, resp.read()))
119
119
120 if proto.startswith('application/mercurial-'):
120 if proto.startswith('application/mercurial-'):
121 try:
121 try:
122 version = proto.split('-', 1)[1]
122 version = proto.split('-', 1)[1]
123 version_info = tuple([int(n) for n in version.split('.')])
123 version_info = tuple([int(n) for n in version.split('.')])
124 except ValueError:
124 except ValueError:
125 raise error.RepoError(_("'%s' sent a broken Content-Type "
125 raise error.RepoError(_("'%s' sent a broken Content-Type "
126 "header (%s)") % (safeurl, proto))
126 "header (%s)") % (safeurl, proto))
127 if version_info > (0, 1):
127 if version_info > (0, 1):
128 raise error.RepoError(_("'%s' uses newer protocol %s") %
128 raise error.RepoError(_("'%s' uses newer protocol %s") %
129 (safeurl, version))
129 (safeurl, version))
130
130
131 return resp
131 return resp
132
132
133 def _call(self, cmd, **args):
133 def _call(self, cmd, **args):
134 fp = self._callstream(cmd, **args)
134 fp = self._callstream(cmd, **args)
135 try:
135 try:
136 return fp.read()
136 return fp.read()
137 finally:
137 finally:
138 # if using keepalive, allow connection to be reused
138 # if using keepalive, allow connection to be reused
139 fp.close()
139 fp.close()
140
140
141 def _abort(self, exception):
141 def _callpush(self, cmd, cg, **args):
142 raise exception
143
144 def _decompress(self, stream):
145 return util.chunkbuffer(zgenerator(stream))
146
147 def unbundle(self, cg, heads, source):
148 '''Send cg (a readable file-like object representing the
149 changegroup to push, typically a chunkbuffer object) to the
150 remote server as a bundle. Return an integer response code:
151 non-zero indicates a successful push (see
152 localrepository.addchangegroup()), and zero indicates either
153 error or nothing to push.'''
154 # have to stream bundle to a temp file because we do not have
142 # have to stream bundle to a temp file because we do not have
155 # http 1.1 chunked transfer.
143 # http 1.1 chunked transfer.
156
144
157 type = ""
145 type = ""
158 types = self.capable('unbundle')
146 types = self.capable('unbundle')
159 # servers older than d1b16a746db6 will send 'unbundle' as a
147 # servers older than d1b16a746db6 will send 'unbundle' as a
160 # boolean capability
148 # boolean capability
161 try:
149 try:
162 types = types.split(',')
150 types = types.split(',')
163 except AttributeError:
151 except AttributeError:
164 types = [""]
152 types = [""]
165 if types:
153 if types:
166 for x in types:
154 for x in types:
167 if x in changegroup.bundletypes:
155 if x in changegroup.bundletypes:
168 type = x
156 type = x
169 break
157 break
170
158
171 tempname = changegroup.writebundle(cg, None, type)
159 tempname = changegroup.writebundle(cg, None, type)
172 fp = url.httpsendfile(tempname, "rb")
160 fp = url.httpsendfile(tempname, "rb")
161 headers = {'Content-Type': 'application/mercurial-0.1'}
162
173 try:
163 try:
174 try:
164 try:
175 resp = self._call(
165 r = self._call(cmd, data=fp, headers=headers, **args)
176 'unbundle', data=fp,
166 return r.split('\n', 1)
177 headers={'Content-Type': 'application/mercurial-0.1'},
178 heads=' '.join(map(hex, heads)))
179 resp_code, output = resp.split('\n', 1)
180 try:
181 ret = int(resp_code)
182 except ValueError, err:
183 raise error.ResponseError(
184 _('push failed (unexpected response):'), resp)
185 for l in output.splitlines(True):
186 self.ui.status(_('remote: '), l)
187 return ret
188 except socket.error, err:
167 except socket.error, err:
189 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
168 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
190 raise util.Abort(_('push failed: %s') % err.args[1])
169 raise util.Abort(_('push failed: %s') % err.args[1])
191 raise util.Abort(err.args[1])
170 raise util.Abort(err.args[1])
192 finally:
171 finally:
193 fp.close()
172 fp.close()
194 os.unlink(tempname)
173 os.unlink(tempname)
195
174
175 def _abort(self, exception):
176 raise exception
177
178 def _decompress(self, stream):
179 return util.chunkbuffer(zgenerator(stream))
180
196 class httpsrepository(httprepository):
181 class httpsrepository(httprepository):
197 def __init__(self, ui, path):
182 def __init__(self, ui, path):
198 if not url.has_https:
183 if not url.has_https:
199 raise util.Abort(_('Python support for SSL and HTTPS '
184 raise util.Abort(_('Python support for SSL and HTTPS '
200 'is not installed'))
185 'is not installed'))
201 httprepository.__init__(self, ui, path)
186 httprepository.__init__(self, ui, path)
202
187
203 def instance(ui, path, create):
188 def instance(ui, path, create):
204 if create:
189 if create:
205 raise util.Abort(_('cannot create new http repository'))
190 raise util.Abort(_('cannot create new http repository'))
206 try:
191 try:
207 if path.startswith('https:'):
192 if path.startswith('https:'):
208 inst = httpsrepository(ui, path)
193 inst = httpsrepository(ui, path)
209 else:
194 else:
210 inst = httprepository(ui, path)
195 inst = httprepository(ui, path)
211 inst.between([(nullid, nullid)])
196 inst.between([(nullid, nullid)])
212 return inst
197 return inst
213 except error.RepoError:
198 except error.RepoError:
214 ui.note('(falling back to static-http)\n')
199 ui.note('(falling back to static-http)\n')
215 return statichttprepo.instance(ui, "static-" + path, create)
200 return statichttprepo.instance(ui, "static-" + path, create)
@@ -1,212 +1,198
1 # sshrepo.py - ssh repository proxy class for mercurial
1 # sshrepo.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from node import bin, hex
8 from node import bin, hex
9 from i18n import _
9 from i18n import _
10 import repo, util, error, encoding, wireproto
10 import repo, util, error, encoding, wireproto
11 import re, urllib
11 import re, urllib
12
12
13 class remotelock(object):
13 class remotelock(object):
14 def __init__(self, repo):
14 def __init__(self, repo):
15 self.repo = repo
15 self.repo = repo
16 def release(self):
16 def release(self):
17 self.repo.unlock()
17 self.repo.unlock()
18 self.repo = None
18 self.repo = None
19 def __del__(self):
19 def __del__(self):
20 if self.repo:
20 if self.repo:
21 self.release()
21 self.release()
22
22
23 class sshrepository(wireproto.wirerepository):
23 class sshrepository(wireproto.wirerepository):
24 def __init__(self, ui, path, create=0):
24 def __init__(self, ui, path, create=0):
25 self._url = path
25 self._url = path
26 self.ui = ui
26 self.ui = ui
27
27
28 m = re.match(r'^ssh://(([^@]+)@)?([^:/]+)(:(\d+))?(/(.*))?$', path)
28 m = re.match(r'^ssh://(([^@]+)@)?([^:/]+)(:(\d+))?(/(.*))?$', path)
29 if not m:
29 if not m:
30 self._abort(error.RepoError(_("couldn't parse location %s") % path))
30 self._abort(error.RepoError(_("couldn't parse location %s") % path))
31
31
32 self.user = m.group(2)
32 self.user = m.group(2)
33 self.host = m.group(3)
33 self.host = m.group(3)
34 self.port = m.group(5)
34 self.port = m.group(5)
35 self.path = m.group(7) or "."
35 self.path = m.group(7) or "."
36
36
37 sshcmd = self.ui.config("ui", "ssh", "ssh")
37 sshcmd = self.ui.config("ui", "ssh", "ssh")
38 remotecmd = self.ui.config("ui", "remotecmd", "hg")
38 remotecmd = self.ui.config("ui", "remotecmd", "hg")
39
39
40 args = util.sshargs(sshcmd, self.host, self.user, self.port)
40 args = util.sshargs(sshcmd, self.host, self.user, self.port)
41
41
42 if create:
42 if create:
43 cmd = '%s %s "%s init %s"'
43 cmd = '%s %s "%s init %s"'
44 cmd = cmd % (sshcmd, args, remotecmd, self.path)
44 cmd = cmd % (sshcmd, args, remotecmd, self.path)
45
45
46 ui.note(_('running %s\n') % cmd)
46 ui.note(_('running %s\n') % cmd)
47 res = util.system(cmd)
47 res = util.system(cmd)
48 if res != 0:
48 if res != 0:
49 self._abort(error.RepoError(_("could not create remote repo")))
49 self._abort(error.RepoError(_("could not create remote repo")))
50
50
51 self.validate_repo(ui, sshcmd, args, remotecmd)
51 self.validate_repo(ui, sshcmd, args, remotecmd)
52
52
53 def url(self):
53 def url(self):
54 return self._url
54 return self._url
55
55
56 def validate_repo(self, ui, sshcmd, args, remotecmd):
56 def validate_repo(self, ui, sshcmd, args, remotecmd):
57 # cleanup up previous run
57 # cleanup up previous run
58 self.cleanup()
58 self.cleanup()
59
59
60 cmd = '%s %s "%s -R %s serve --stdio"'
60 cmd = '%s %s "%s -R %s serve --stdio"'
61 cmd = cmd % (sshcmd, args, remotecmd, self.path)
61 cmd = cmd % (sshcmd, args, remotecmd, self.path)
62
62
63 cmd = util.quotecommand(cmd)
63 cmd = util.quotecommand(cmd)
64 ui.note(_('running %s\n') % cmd)
64 ui.note(_('running %s\n') % cmd)
65 self.pipeo, self.pipei, self.pipee = util.popen3(cmd)
65 self.pipeo, self.pipei, self.pipee = util.popen3(cmd)
66
66
67 # skip any noise generated by remote shell
67 # skip any noise generated by remote shell
68 self._callstream("hello")
68 self._callstream("hello")
69 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
69 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
70 lines = ["", "dummy"]
70 lines = ["", "dummy"]
71 max_noise = 500
71 max_noise = 500
72 while lines[-1] and max_noise:
72 while lines[-1] and max_noise:
73 l = r.readline()
73 l = r.readline()
74 self.readerr()
74 self.readerr()
75 if lines[-1] == "1\n" and l == "\n":
75 if lines[-1] == "1\n" and l == "\n":
76 break
76 break
77 if l:
77 if l:
78 ui.debug("remote: ", l)
78 ui.debug("remote: ", l)
79 lines.append(l)
79 lines.append(l)
80 max_noise -= 1
80 max_noise -= 1
81 else:
81 else:
82 self._abort(error.RepoError(_("no suitable response from remote hg")))
82 self._abort(error.RepoError(_("no suitable response from remote hg")))
83
83
84 self.capabilities = set()
84 self.capabilities = set()
85 for l in reversed(lines):
85 for l in reversed(lines):
86 if l.startswith("capabilities:"):
86 if l.startswith("capabilities:"):
87 self.capabilities.update(l[:-1].split(":")[1].split())
87 self.capabilities.update(l[:-1].split(":")[1].split())
88 break
88 break
89
89
90 def readerr(self):
90 def readerr(self):
91 while 1:
91 while 1:
92 size = util.fstat(self.pipee).st_size
92 size = util.fstat(self.pipee).st_size
93 if size == 0:
93 if size == 0:
94 break
94 break
95 l = self.pipee.readline()
95 l = self.pipee.readline()
96 if not l:
96 if not l:
97 break
97 break
98 self.ui.status(_("remote: "), l)
98 self.ui.status(_("remote: "), l)
99
99
100 def _abort(self, exception):
100 def _abort(self, exception):
101 self.cleanup()
101 self.cleanup()
102 raise exception
102 raise exception
103
103
104 def cleanup(self):
104 def cleanup(self):
105 try:
105 try:
106 self.pipeo.close()
106 self.pipeo.close()
107 self.pipei.close()
107 self.pipei.close()
108 # read the error descriptor until EOF
108 # read the error descriptor until EOF
109 for l in self.pipee:
109 for l in self.pipee:
110 self.ui.status(_("remote: "), l)
110 self.ui.status(_("remote: "), l)
111 self.pipee.close()
111 self.pipee.close()
112 except:
112 except:
113 pass
113 pass
114
114
115 __del__ = cleanup
115 __del__ = cleanup
116
116
117 def _callstream(self, cmd, **args):
117 def _callstream(self, cmd, **args):
118 self.ui.debug("sending %s command\n" % cmd)
118 self.ui.debug("sending %s command\n" % cmd)
119 self.pipeo.write("%s\n" % cmd)
119 self.pipeo.write("%s\n" % cmd)
120 for k, v in sorted(args.iteritems()):
120 for k, v in sorted(args.iteritems()):
121 self.pipeo.write("%s %d\n" % (k, len(v)))
121 self.pipeo.write("%s %d\n" % (k, len(v)))
122 self.pipeo.write(v)
122 self.pipeo.write(v)
123 self.pipeo.flush()
123 self.pipeo.flush()
124
124
125 return self.pipei
125 return self.pipei
126
126
127 def _call(self, cmd, **args):
127 def _call(self, cmd, **args):
128 self._callstream(cmd, **args)
128 self._callstream(cmd, **args)
129 return self._recv()
129 return self._recv()
130
130
131 def _callpush(self, cmd, fp, **args):
132 r = self._call(cmd, **args)
133 if r:
134 return '', r
135 while 1:
136 d = fp.read(4096)
137 if not d:
138 break
139 self._send(d)
140 self._send("", flush=True)
141 r = self._recv()
142 if r:
143 return '', r
144 return self._recv(), ''
145
131 def _decompress(self, stream):
146 def _decompress(self, stream):
132 return stream
147 return stream
133
148
134 def _recv(self):
149 def _recv(self):
135 l = self.pipei.readline()
150 l = self.pipei.readline()
136 self.readerr()
151 self.readerr()
137 try:
152 try:
138 l = int(l)
153 l = int(l)
139 except:
154 except:
140 self._abort(error.ResponseError(_("unexpected response:"), l))
155 self._abort(error.ResponseError(_("unexpected response:"), l))
141 return self.pipei.read(l)
156 return self.pipei.read(l)
142
157
143 def _send(self, data, flush=False):
158 def _send(self, data, flush=False):
144 self.pipeo.write("%d\n" % len(data))
159 self.pipeo.write("%d\n" % len(data))
145 if data:
160 if data:
146 self.pipeo.write(data)
161 self.pipeo.write(data)
147 if flush:
162 if flush:
148 self.pipeo.flush()
163 self.pipeo.flush()
149 self.readerr()
164 self.readerr()
150
165
151 def lock(self):
166 def lock(self):
152 self._call("lock")
167 self._call("lock")
153 return remotelock(self)
168 return remotelock(self)
154
169
155 def unlock(self):
170 def unlock(self):
156 self._call("unlock")
171 self._call("unlock")
157
172
158 def unbundle(self, cg, heads, source):
159 '''Send cg (a readable file-like object representing the
160 changegroup to push, typically a chunkbuffer object) to the
161 remote server as a bundle. Return an integer indicating the
162 result of the push (see localrepository.addchangegroup()).'''
163 d = self._call("unbundle", heads=' '.join(map(hex, heads)))
164 if d:
165 # remote may send "unsynced changes"
166 self._abort(error.RepoError(_("push refused: %s") % d))
167
168 while 1:
169 d = cg.read(4096)
170 if not d:
171 break
172 self._send(d)
173
174 self._send("", flush=True)
175
176 r = self._recv()
177 if r:
178 # remote may send "unsynced changes"
179 self._abort(error.RepoError(_("push failed: %s") % r))
180
181 r = self._recv()
182 try:
183 return int(r)
184 except:
185 self._abort(error.ResponseError(_("unexpected response:"), r))
186
187 def addchangegroup(self, cg, source, url):
173 def addchangegroup(self, cg, source, url):
188 '''Send a changegroup to the remote server. Return an integer
174 '''Send a changegroup to the remote server. Return an integer
189 similar to unbundle(). DEPRECATED, since it requires locking the
175 similar to unbundle(). DEPRECATED, since it requires locking the
190 remote.'''
176 remote.'''
191 d = self._call("addchangegroup")
177 d = self._call("addchangegroup")
192 if d:
178 if d:
193 self._abort(error.RepoError(_("push refused: %s") % d))
179 self._abort(error.RepoError(_("push refused: %s") % d))
194 while 1:
180 while 1:
195 d = cg.read(4096)
181 d = cg.read(4096)
196 if not d:
182 if not d:
197 break
183 break
198 self.pipeo.write(d)
184 self.pipeo.write(d)
199 self.readerr()
185 self.readerr()
200
186
201 self.pipeo.flush()
187 self.pipeo.flush()
202
188
203 self.readerr()
189 self.readerr()
204 r = self._recv()
190 r = self._recv()
205 if not r:
191 if not r:
206 return 1
192 return 1
207 try:
193 try:
208 return int(r)
194 return int(r)
209 except:
195 except:
210 self._abort(error.ResponseError(_("unexpected response:"), r))
196 self._abort(error.ResponseError(_("unexpected response:"), r))
211
197
212 instance = sshrepository
198 instance = sshrepository
@@ -1,192 +1,212
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from i18n import _
8 from i18n import _
9 from node import bin, hex
9 from node import bin, hex
10 import urllib
10 import urllib
11 import streamclone, repo, error, encoding
11 import streamclone, repo, error, encoding
12 import pushkey as pushkey_
12 import pushkey as pushkey_
13
13
14 # client side
14 # client side
15
15
16 class wirerepository(repo.repository):
16 class wirerepository(repo.repository):
17 def lookup(self, key):
17 def lookup(self, key):
18 self.requirecap('lookup', _('look up remote revision'))
18 self.requirecap('lookup', _('look up remote revision'))
19 d = self._call("lookup", key=key)
19 d = self._call("lookup", key=key)
20 success, data = d[:-1].split(" ", 1)
20 success, data = d[:-1].split(" ", 1)
21 if int(success):
21 if int(success):
22 return bin(data)
22 return bin(data)
23 self._abort(error.RepoError(data))
23 self._abort(error.RepoError(data))
24
24
25 def heads(self):
25 def heads(self):
26 d = self._call("heads")
26 d = self._call("heads")
27 try:
27 try:
28 return map(bin, d[:-1].split(" "))
28 return map(bin, d[:-1].split(" "))
29 except:
29 except:
30 self.abort(error.ResponseError(_("unexpected response:"), d))
30 self.abort(error.ResponseError(_("unexpected response:"), d))
31
31
32 def branchmap(self):
32 def branchmap(self):
33 d = self._call("branchmap")
33 d = self._call("branchmap")
34 try:
34 try:
35 branchmap = {}
35 branchmap = {}
36 for branchpart in d.splitlines():
36 for branchpart in d.splitlines():
37 branchheads = branchpart.split(' ')
37 branchheads = branchpart.split(' ')
38 branchname = urllib.unquote(branchheads[0])
38 branchname = urllib.unquote(branchheads[0])
39 # Earlier servers (1.3.x) send branch names in (their) local
39 # Earlier servers (1.3.x) send branch names in (their) local
40 # charset. The best we can do is assume it's identical to our
40 # charset. The best we can do is assume it's identical to our
41 # own local charset, in case it's not utf-8.
41 # own local charset, in case it's not utf-8.
42 try:
42 try:
43 branchname.decode('utf-8')
43 branchname.decode('utf-8')
44 except UnicodeDecodeError:
44 except UnicodeDecodeError:
45 branchname = encoding.fromlocal(branchname)
45 branchname = encoding.fromlocal(branchname)
46 branchheads = [bin(x) for x in branchheads[1:]]
46 branchheads = [bin(x) for x in branchheads[1:]]
47 branchmap[branchname] = branchheads
47 branchmap[branchname] = branchheads
48 return branchmap
48 return branchmap
49 except TypeError:
49 except TypeError:
50 self._abort(error.ResponseError(_("unexpected response:"), d))
50 self._abort(error.ResponseError(_("unexpected response:"), d))
51
51
52 def branches(self, nodes):
52 def branches(self, nodes):
53 n = " ".join(map(hex, nodes))
53 n = " ".join(map(hex, nodes))
54 d = self._call("branches", nodes=n)
54 d = self._call("branches", nodes=n)
55 try:
55 try:
56 br = [tuple(map(bin, b.split(" "))) for b in d.splitlines()]
56 br = [tuple(map(bin, b.split(" "))) for b in d.splitlines()]
57 return br
57 return br
58 except:
58 except:
59 self._abort(error.ResponseError(_("unexpected response:"), d))
59 self._abort(error.ResponseError(_("unexpected response:"), d))
60
60
61 def between(self, pairs):
61 def between(self, pairs):
62 batch = 8 # avoid giant requests
62 batch = 8 # avoid giant requests
63 r = []
63 r = []
64 for i in xrange(0, len(pairs), batch):
64 for i in xrange(0, len(pairs), batch):
65 n = " ".join(["-".join(map(hex, p)) for p in pairs[i:i + batch]])
65 n = " ".join(["-".join(map(hex, p)) for p in pairs[i:i + batch]])
66 d = self._call("between", pairs=n)
66 d = self._call("between", pairs=n)
67 try:
67 try:
68 r += [l and map(bin, l.split(" ")) or []
68 r += [l and map(bin, l.split(" ")) or []
69 for l in d.splitlines()]
69 for l in d.splitlines()]
70 except:
70 except:
71 self._abort(error.ResponseError(_("unexpected response:"), d))
71 self._abort(error.ResponseError(_("unexpected response:"), d))
72 return r
72 return r
73
73
74 def pushkey(self, namespace, key, old, new):
74 def pushkey(self, namespace, key, old, new):
75 if not self.capable('pushkey'):
75 if not self.capable('pushkey'):
76 return False
76 return False
77 d = self._call("pushkey",
77 d = self._call("pushkey",
78 namespace=namespace, key=key, old=old, new=new)
78 namespace=namespace, key=key, old=old, new=new)
79 return bool(int(d))
79 return bool(int(d))
80
80
81 def listkeys(self, namespace):
81 def listkeys(self, namespace):
82 if not self.capable('pushkey'):
82 if not self.capable('pushkey'):
83 return {}
83 return {}
84 d = self._call("listkeys", namespace=namespace)
84 d = self._call("listkeys", namespace=namespace)
85 r = {}
85 r = {}
86 for l in d.splitlines():
86 for l in d.splitlines():
87 k, v = l.split('\t')
87 k, v = l.split('\t')
88 r[k.decode('string-escape')] = v.decode('string-escape')
88 r[k.decode('string-escape')] = v.decode('string-escape')
89 return r
89 return r
90
90
91 def stream_out(self):
91 def stream_out(self):
92 return self._callstream('stream_out')
92 return self._callstream('stream_out')
93
93
94 def changegroup(self, nodes, kind):
94 def changegroup(self, nodes, kind):
95 n = " ".join(map(hex, nodes))
95 n = " ".join(map(hex, nodes))
96 f = self._callstream("changegroup", roots=n)
96 f = self._callstream("changegroup", roots=n)
97 return self._decompress(f)
97 return self._decompress(f)
98
98
99 def changegroupsubset(self, bases, heads, kind):
99 def changegroupsubset(self, bases, heads, kind):
100 self.requirecap('changegroupsubset', _('look up remote changes'))
100 self.requirecap('changegroupsubset', _('look up remote changes'))
101 bases = " ".join(map(hex, bases))
101 bases = " ".join(map(hex, bases))
102 heads = " ".join(map(hex, heads))
102 heads = " ".join(map(hex, heads))
103 return self._decompress(self._callstream("changegroupsubset",
103 return self._decompress(self._callstream("changegroupsubset",
104 bases=bases, heads=heads))
104 bases=bases, heads=heads))
105
105
106 def unbundle(self, cg, heads, source):
107 '''Send cg (a readable file-like object representing the
108 changegroup to push, typically a chunkbuffer object) to the
109 remote server as a bundle. Return an integer indicating the
110 result of the push (see localrepository.addchangegroup()).'''
111
112 ret, output = self._callpush("unbundle", cg, heads=' '.join(map(hex, heads)))
113 if ret == "":
114 raise error.ResponseError(
115 _('push failed:'), output)
116 try:
117 ret = int(ret)
118 except ValueError, err:
119 raise error.ResponseError(
120 _('push failed (unexpected response):'), ret)
121
122 for l in output.splitlines(True):
123 self.ui.status(_('remote: '), l)
124 return ret
125
106 # server side
126 # server side
107
127
108 def dispatch(repo, proto, command):
128 def dispatch(repo, proto, command):
109 if command not in commands:
129 if command not in commands:
110 return False
130 return False
111 func, spec = commands[command]
131 func, spec = commands[command]
112 args = proto.getargs(spec)
132 args = proto.getargs(spec)
113 r = func(repo, proto, *args)
133 r = func(repo, proto, *args)
114 if r != None:
134 if r != None:
115 proto.respond(r)
135 proto.respond(r)
116 return True
136 return True
117
137
118 def between(repo, proto, pairs):
138 def between(repo, proto, pairs):
119 pairs = [map(bin, p.split("-")) for p in pairs.split(" ")]
139 pairs = [map(bin, p.split("-")) for p in pairs.split(" ")]
120 r = []
140 r = []
121 for b in repo.between(pairs):
141 for b in repo.between(pairs):
122 r.append(" ".join(map(hex, b)) + "\n")
142 r.append(" ".join(map(hex, b)) + "\n")
123 return "".join(r)
143 return "".join(r)
124
144
125 def branchmap(repo, proto):
145 def branchmap(repo, proto):
126 branchmap = repo.branchmap()
146 branchmap = repo.branchmap()
127 heads = []
147 heads = []
128 for branch, nodes in branchmap.iteritems():
148 for branch, nodes in branchmap.iteritems():
129 branchname = urllib.quote(branch)
149 branchname = urllib.quote(branch)
130 branchnodes = [hex(node) for node in nodes]
150 branchnodes = [hex(node) for node in nodes]
131 heads.append('%s %s' % (branchname, ' '.join(branchnodes)))
151 heads.append('%s %s' % (branchname, ' '.join(branchnodes)))
132 return '\n'.join(heads)
152 return '\n'.join(heads)
133
153
134 def branches(repo, proto, nodes):
154 def branches(repo, proto, nodes):
135 nodes = map(bin, nodes.split(" "))
155 nodes = map(bin, nodes.split(" "))
136 r = []
156 r = []
137 for b in repo.branches(nodes):
157 for b in repo.branches(nodes):
138 r.append(" ".join(map(hex, b)) + "\n")
158 r.append(" ".join(map(hex, b)) + "\n")
139 return "".join(r)
159 return "".join(r)
140
160
141 def changegroup(repo, proto, roots):
161 def changegroup(repo, proto, roots):
142 nodes = map(bin, roots.split(" "))
162 nodes = map(bin, roots.split(" "))
143 cg = repo.changegroup(nodes, 'serve')
163 cg = repo.changegroup(nodes, 'serve')
144 proto.sendchangegroup(cg)
164 proto.sendchangegroup(cg)
145
165
146 def changegroupsubset(repo, proto, bases, heads):
166 def changegroupsubset(repo, proto, bases, heads):
147 bases = [bin(n) for n in bases.split(' ')]
167 bases = [bin(n) for n in bases.split(' ')]
148 heads = [bin(n) for n in heads.split(' ')]
168 heads = [bin(n) for n in heads.split(' ')]
149 cg = repo.changegroupsubset(bases, heads, 'serve')
169 cg = repo.changegroupsubset(bases, heads, 'serve')
150 proto.sendchangegroup(cg)
170 proto.sendchangegroup(cg)
151
171
152 def heads(repo, proto):
172 def heads(repo, proto):
153 h = repo.heads()
173 h = repo.heads()
154 return " ".join(map(hex, h)) + "\n"
174 return " ".join(map(hex, h)) + "\n"
155
175
156 def listkeys(repo, proto, namespace):
176 def listkeys(repo, proto, namespace):
157 d = pushkey_.list(repo, namespace).items()
177 d = pushkey_.list(repo, namespace).items()
158 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
178 t = '\n'.join(['%s\t%s' % (k.encode('string-escape'),
159 v.encode('string-escape')) for k, v in d])
179 v.encode('string-escape')) for k, v in d])
160 return t
180 return t
161
181
162 def lookup(repo, proto, key):
182 def lookup(repo, proto, key):
163 try:
183 try:
164 r = hex(repo.lookup(key))
184 r = hex(repo.lookup(key))
165 success = 1
185 success = 1
166 except Exception, inst:
186 except Exception, inst:
167 r = str(inst)
187 r = str(inst)
168 success = 0
188 success = 0
169 return "%s %s\n" % (success, r)
189 return "%s %s\n" % (success, r)
170
190
171 def pushkey(repo, proto, namespace, key, old, new):
191 def pushkey(repo, proto, namespace, key, old, new):
172 r = pushkey_.push(repo, namespace, key, old, new)
192 r = pushkey_.push(repo, namespace, key, old, new)
173 return '%s\n' % int(r)
193 return '%s\n' % int(r)
174
194
175 def stream(repo, proto):
195 def stream(repo, proto):
176 try:
196 try:
177 proto.sendstream(streamclone.stream_out(repo))
197 proto.sendstream(streamclone.stream_out(repo))
178 except streamclone.StreamException, inst:
198 except streamclone.StreamException, inst:
179 return str(inst)
199 return str(inst)
180
200
181 commands = {
201 commands = {
182 'between': (between, 'pairs'),
202 'between': (between, 'pairs'),
183 'branchmap': (branchmap, ''),
203 'branchmap': (branchmap, ''),
184 'branches': (branches, 'nodes'),
204 'branches': (branches, 'nodes'),
185 'changegroup': (changegroup, 'roots'),
205 'changegroup': (changegroup, 'roots'),
186 'changegroupsubset': (changegroupsubset, 'bases heads'),
206 'changegroupsubset': (changegroupsubset, 'bases heads'),
187 'heads': (heads, ''),
207 'heads': (heads, ''),
188 'listkeys': (listkeys, 'namespace'),
208 'listkeys': (listkeys, 'namespace'),
189 'lookup': (lookup, 'key'),
209 'lookup': (lookup, 'key'),
190 'pushkey': (pushkey, 'namespace key old new'),
210 'pushkey': (pushkey, 'namespace key old new'),
191 'stream_out': (stream, ''),
211 'stream_out': (stream, ''),
192 }
212 }
General Comments 0
You need to be logged in to leave comments. Login now