##// END OF EJS Templates
wireproto: drop the _decompress method in favor a new call type...
Pierre-Yves David -
r20905:167047ba default
parent child Browse files
Show More
@@ -1,249 +1,247 b''
1 # httppeer.py - HTTP repository proxy classes for mercurial
1 # httppeer.py - HTTP repository proxy classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from node import nullid
9 from node import nullid
10 from i18n import _
10 from i18n import _
11 import changegroup, statichttprepo, error, httpconnection, url, util, wireproto
11 import changegroup, statichttprepo, error, httpconnection, url, util, wireproto
12 import os, urllib, urllib2, zlib, httplib
12 import os, urllib, urllib2, zlib, httplib
13 import errno, socket
13 import errno, socket
14
14
15 def zgenerator(f):
15 def zgenerator(f):
16 zd = zlib.decompressobj()
16 zd = zlib.decompressobj()
17 try:
17 try:
18 for chunk in util.filechunkiter(f):
18 for chunk in util.filechunkiter(f):
19 while chunk:
19 while chunk:
20 yield zd.decompress(chunk, 2**18)
20 yield zd.decompress(chunk, 2**18)
21 chunk = zd.unconsumed_tail
21 chunk = zd.unconsumed_tail
22 except httplib.HTTPException:
22 except httplib.HTTPException:
23 raise IOError(None, _('connection ended unexpectedly'))
23 raise IOError(None, _('connection ended unexpectedly'))
24 yield zd.flush()
24 yield zd.flush()
25
25
26 class httppeer(wireproto.wirepeer):
26 class httppeer(wireproto.wirepeer):
27 def __init__(self, ui, path):
27 def __init__(self, ui, path):
28 self.path = path
28 self.path = path
29 self.caps = None
29 self.caps = None
30 self.handler = None
30 self.handler = None
31 self.urlopener = None
31 self.urlopener = None
32 u = util.url(path)
32 u = util.url(path)
33 if u.query or u.fragment:
33 if u.query or u.fragment:
34 raise util.Abort(_('unsupported URL component: "%s"') %
34 raise util.Abort(_('unsupported URL component: "%s"') %
35 (u.query or u.fragment))
35 (u.query or u.fragment))
36
36
37 # urllib cannot handle URLs with embedded user or passwd
37 # urllib cannot handle URLs with embedded user or passwd
38 self._url, authinfo = u.authinfo()
38 self._url, authinfo = u.authinfo()
39
39
40 self.ui = ui
40 self.ui = ui
41 self.ui.debug('using %s\n' % self._url)
41 self.ui.debug('using %s\n' % self._url)
42
42
43 self.urlopener = url.opener(ui, authinfo)
43 self.urlopener = url.opener(ui, authinfo)
44
44
45 def __del__(self):
45 def __del__(self):
46 if self.urlopener:
46 if self.urlopener:
47 for h in self.urlopener.handlers:
47 for h in self.urlopener.handlers:
48 h.close()
48 h.close()
49 getattr(h, "close_all", lambda : None)()
49 getattr(h, "close_all", lambda : None)()
50
50
51 def url(self):
51 def url(self):
52 return self.path
52 return self.path
53
53
54 # look up capabilities only when needed
54 # look up capabilities only when needed
55
55
56 def _fetchcaps(self):
56 def _fetchcaps(self):
57 self.caps = set(self._call('capabilities').split())
57 self.caps = set(self._call('capabilities').split())
58
58
59 def _capabilities(self):
59 def _capabilities(self):
60 if self.caps is None:
60 if self.caps is None:
61 try:
61 try:
62 self._fetchcaps()
62 self._fetchcaps()
63 except error.RepoError:
63 except error.RepoError:
64 self.caps = set()
64 self.caps = set()
65 self.ui.debug('capabilities: %s\n' %
65 self.ui.debug('capabilities: %s\n' %
66 (' '.join(self.caps or ['none'])))
66 (' '.join(self.caps or ['none'])))
67 return self.caps
67 return self.caps
68
68
69 def lock(self):
69 def lock(self):
70 raise util.Abort(_('operation not supported over http'))
70 raise util.Abort(_('operation not supported over http'))
71
71
72 def _callstream(self, cmd, **args):
72 def _callstream(self, cmd, **args):
73 if cmd == 'pushkey':
73 if cmd == 'pushkey':
74 args['data'] = ''
74 args['data'] = ''
75 data = args.pop('data', None)
75 data = args.pop('data', None)
76 size = 0
76 size = 0
77 if util.safehasattr(data, 'length'):
77 if util.safehasattr(data, 'length'):
78 size = data.length
78 size = data.length
79 elif data is not None:
79 elif data is not None:
80 size = len(data)
80 size = len(data)
81 headers = args.pop('headers', {})
81 headers = args.pop('headers', {})
82 if data is not None and 'Content-Type' not in headers:
82 if data is not None and 'Content-Type' not in headers:
83 headers['Content-Type'] = 'application/mercurial-0.1'
83 headers['Content-Type'] = 'application/mercurial-0.1'
84
84
85
85
86 if size and self.ui.configbool('ui', 'usehttp2', False):
86 if size and self.ui.configbool('ui', 'usehttp2', False):
87 headers['Expect'] = '100-Continue'
87 headers['Expect'] = '100-Continue'
88 headers['X-HgHttp2'] = '1'
88 headers['X-HgHttp2'] = '1'
89
89
90 self.ui.debug("sending %s command\n" % cmd)
90 self.ui.debug("sending %s command\n" % cmd)
91 q = [('cmd', cmd)]
91 q = [('cmd', cmd)]
92 headersize = 0
92 headersize = 0
93 if len(args) > 0:
93 if len(args) > 0:
94 httpheader = self.capable('httpheader')
94 httpheader = self.capable('httpheader')
95 if httpheader:
95 if httpheader:
96 headersize = int(httpheader.split(',')[0])
96 headersize = int(httpheader.split(',')[0])
97 if headersize > 0:
97 if headersize > 0:
98 # The headers can typically carry more data than the URL.
98 # The headers can typically carry more data than the URL.
99 encargs = urllib.urlencode(sorted(args.items()))
99 encargs = urllib.urlencode(sorted(args.items()))
100 headerfmt = 'X-HgArg-%s'
100 headerfmt = 'X-HgArg-%s'
101 contentlen = headersize - len(headerfmt % '000' + ': \r\n')
101 contentlen = headersize - len(headerfmt % '000' + ': \r\n')
102 headernum = 0
102 headernum = 0
103 for i in xrange(0, len(encargs), contentlen):
103 for i in xrange(0, len(encargs), contentlen):
104 headernum += 1
104 headernum += 1
105 header = headerfmt % str(headernum)
105 header = headerfmt % str(headernum)
106 headers[header] = encargs[i:i + contentlen]
106 headers[header] = encargs[i:i + contentlen]
107 varyheaders = [headerfmt % str(h) for h in range(1, headernum + 1)]
107 varyheaders = [headerfmt % str(h) for h in range(1, headernum + 1)]
108 headers['Vary'] = ','.join(varyheaders)
108 headers['Vary'] = ','.join(varyheaders)
109 else:
109 else:
110 q += sorted(args.items())
110 q += sorted(args.items())
111 qs = '?%s' % urllib.urlencode(q)
111 qs = '?%s' % urllib.urlencode(q)
112 cu = "%s%s" % (self._url, qs)
112 cu = "%s%s" % (self._url, qs)
113 req = urllib2.Request(cu, data, headers)
113 req = urllib2.Request(cu, data, headers)
114 if data is not None:
114 if data is not None:
115 self.ui.debug("sending %s bytes\n" % size)
115 self.ui.debug("sending %s bytes\n" % size)
116 req.add_unredirected_header('Content-Length', '%d' % size)
116 req.add_unredirected_header('Content-Length', '%d' % size)
117 try:
117 try:
118 resp = self.urlopener.open(req)
118 resp = self.urlopener.open(req)
119 except urllib2.HTTPError, inst:
119 except urllib2.HTTPError, inst:
120 if inst.code == 401:
120 if inst.code == 401:
121 raise util.Abort(_('authorization failed'))
121 raise util.Abort(_('authorization failed'))
122 raise
122 raise
123 except httplib.HTTPException, inst:
123 except httplib.HTTPException, inst:
124 self.ui.debug('http error while sending %s command\n' % cmd)
124 self.ui.debug('http error while sending %s command\n' % cmd)
125 self.ui.traceback()
125 self.ui.traceback()
126 raise IOError(None, inst)
126 raise IOError(None, inst)
127 except IndexError:
127 except IndexError:
128 # this only happens with Python 2.3, later versions raise URLError
128 # this only happens with Python 2.3, later versions raise URLError
129 raise util.Abort(_('http error, possibly caused by proxy setting'))
129 raise util.Abort(_('http error, possibly caused by proxy setting'))
130 # record the url we got redirected to
130 # record the url we got redirected to
131 resp_url = resp.geturl()
131 resp_url = resp.geturl()
132 if resp_url.endswith(qs):
132 if resp_url.endswith(qs):
133 resp_url = resp_url[:-len(qs)]
133 resp_url = resp_url[:-len(qs)]
134 if self._url.rstrip('/') != resp_url.rstrip('/'):
134 if self._url.rstrip('/') != resp_url.rstrip('/'):
135 if not self.ui.quiet:
135 if not self.ui.quiet:
136 self.ui.warn(_('real URL is %s\n') % resp_url)
136 self.ui.warn(_('real URL is %s\n') % resp_url)
137 self._url = resp_url
137 self._url = resp_url
138 try:
138 try:
139 proto = resp.getheader('content-type')
139 proto = resp.getheader('content-type')
140 except AttributeError:
140 except AttributeError:
141 proto = resp.headers.get('content-type', '')
141 proto = resp.headers.get('content-type', '')
142
142
143 safeurl = util.hidepassword(self._url)
143 safeurl = util.hidepassword(self._url)
144 if proto.startswith('application/hg-error'):
144 if proto.startswith('application/hg-error'):
145 raise error.OutOfBandError(resp.read())
145 raise error.OutOfBandError(resp.read())
146 # accept old "text/plain" and "application/hg-changegroup" for now
146 # accept old "text/plain" and "application/hg-changegroup" for now
147 if not (proto.startswith('application/mercurial-') or
147 if not (proto.startswith('application/mercurial-') or
148 (proto.startswith('text/plain')
148 (proto.startswith('text/plain')
149 and not resp.headers.get('content-length')) or
149 and not resp.headers.get('content-length')) or
150 proto.startswith('application/hg-changegroup')):
150 proto.startswith('application/hg-changegroup')):
151 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
151 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
152 raise error.RepoError(
152 raise error.RepoError(
153 _("'%s' does not appear to be an hg repository:\n"
153 _("'%s' does not appear to be an hg repository:\n"
154 "---%%<--- (%s)\n%s\n---%%<---\n")
154 "---%%<--- (%s)\n%s\n---%%<---\n")
155 % (safeurl, proto or 'no content-type', resp.read(1024)))
155 % (safeurl, proto or 'no content-type', resp.read(1024)))
156
156
157 if proto.startswith('application/mercurial-'):
157 if proto.startswith('application/mercurial-'):
158 try:
158 try:
159 version = proto.split('-', 1)[1]
159 version = proto.split('-', 1)[1]
160 version_info = tuple([int(n) for n in version.split('.')])
160 version_info = tuple([int(n) for n in version.split('.')])
161 except ValueError:
161 except ValueError:
162 raise error.RepoError(_("'%s' sent a broken Content-Type "
162 raise error.RepoError(_("'%s' sent a broken Content-Type "
163 "header (%s)") % (safeurl, proto))
163 "header (%s)") % (safeurl, proto))
164 if version_info > (0, 1):
164 if version_info > (0, 1):
165 raise error.RepoError(_("'%s' uses newer protocol %s") %
165 raise error.RepoError(_("'%s' uses newer protocol %s") %
166 (safeurl, version))
166 (safeurl, version))
167
167
168 return resp
168 return resp
169
169
170 def _call(self, cmd, **args):
170 def _call(self, cmd, **args):
171 fp = self._callstream(cmd, **args)
171 fp = self._callstream(cmd, **args)
172 try:
172 try:
173 return fp.read()
173 return fp.read()
174 finally:
174 finally:
175 # if using keepalive, allow connection to be reused
175 # if using keepalive, allow connection to be reused
176 fp.close()
176 fp.close()
177
177
178 def _callpush(self, cmd, cg, **args):
178 def _callpush(self, cmd, cg, **args):
179 # have to stream bundle to a temp file because we do not have
179 # have to stream bundle to a temp file because we do not have
180 # http 1.1 chunked transfer.
180 # http 1.1 chunked transfer.
181
181
182 types = self.capable('unbundle')
182 types = self.capable('unbundle')
183 try:
183 try:
184 types = types.split(',')
184 types = types.split(',')
185 except AttributeError:
185 except AttributeError:
186 # servers older than d1b16a746db6 will send 'unbundle' as a
186 # servers older than d1b16a746db6 will send 'unbundle' as a
187 # boolean capability. They only support headerless/uncompressed
187 # boolean capability. They only support headerless/uncompressed
188 # bundles.
188 # bundles.
189 types = [""]
189 types = [""]
190 for x in types:
190 for x in types:
191 if x in changegroup.bundletypes:
191 if x in changegroup.bundletypes:
192 type = x
192 type = x
193 break
193 break
194
194
195 tempname = changegroup.writebundle(cg, None, type)
195 tempname = changegroup.writebundle(cg, None, type)
196 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
196 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
197 headers = {'Content-Type': 'application/mercurial-0.1'}
197 headers = {'Content-Type': 'application/mercurial-0.1'}
198
198
199 try:
199 try:
200 try:
200 try:
201 r = self._call(cmd, data=fp, headers=headers, **args)
201 r = self._call(cmd, data=fp, headers=headers, **args)
202 vals = r.split('\n', 1)
202 vals = r.split('\n', 1)
203 if len(vals) < 2:
203 if len(vals) < 2:
204 raise error.ResponseError(_("unexpected response:"), r)
204 raise error.ResponseError(_("unexpected response:"), r)
205 return vals
205 return vals
206 except socket.error, err:
206 except socket.error, err:
207 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
207 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
208 raise util.Abort(_('push failed: %s') % err.args[1])
208 raise util.Abort(_('push failed: %s') % err.args[1])
209 raise util.Abort(err.args[1])
209 raise util.Abort(err.args[1])
210 finally:
210 finally:
211 fp.close()
211 fp.close()
212 os.unlink(tempname)
212 os.unlink(tempname)
213
213
214 def _abort(self, exception):
214 def _callcompressable(self, cmd, **args):
215 raise exception
215 stream = self._callstream(cmd, **args)
216
217 def _decompress(self, stream):
218 return util.chunkbuffer(zgenerator(stream))
216 return util.chunkbuffer(zgenerator(stream))
219
217
220 class httpspeer(httppeer):
218 class httpspeer(httppeer):
221 def __init__(self, ui, path):
219 def __init__(self, ui, path):
222 if not url.has_https:
220 if not url.has_https:
223 raise util.Abort(_('Python support for SSL and HTTPS '
221 raise util.Abort(_('Python support for SSL and HTTPS '
224 'is not installed'))
222 'is not installed'))
225 httppeer.__init__(self, ui, path)
223 httppeer.__init__(self, ui, path)
226
224
227 def instance(ui, path, create):
225 def instance(ui, path, create):
228 if create:
226 if create:
229 raise util.Abort(_('cannot create new http repository'))
227 raise util.Abort(_('cannot create new http repository'))
230 try:
228 try:
231 if path.startswith('https:'):
229 if path.startswith('https:'):
232 inst = httpspeer(ui, path)
230 inst = httpspeer(ui, path)
233 else:
231 else:
234 inst = httppeer(ui, path)
232 inst = httppeer(ui, path)
235 try:
233 try:
236 # Try to do useful work when checking compatibility.
234 # Try to do useful work when checking compatibility.
237 # Usually saves a roundtrip since we want the caps anyway.
235 # Usually saves a roundtrip since we want the caps anyway.
238 inst._fetchcaps()
236 inst._fetchcaps()
239 except error.RepoError:
237 except error.RepoError:
240 # No luck, try older compatibility check.
238 # No luck, try older compatibility check.
241 inst.between([(nullid, nullid)])
239 inst.between([(nullid, nullid)])
242 return inst
240 return inst
243 except error.RepoError, httpexception:
241 except error.RepoError, httpexception:
244 try:
242 try:
245 r = statichttprepo.instance(ui, "static-" + path, create)
243 r = statichttprepo.instance(ui, "static-" + path, create)
246 ui.note('(falling back to static-http)\n')
244 ui.note('(falling back to static-http)\n')
247 return r
245 return r
248 except error.RepoError:
246 except error.RepoError:
249 raise httpexception # use the original http RepoError instead
247 raise httpexception # use the original http RepoError instead
@@ -1,242 +1,243 b''
1 # sshpeer.py - ssh repository proxy class for mercurial
1 # sshpeer.py - ssh repository proxy class for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import re
8 import re
9 from i18n import _
9 from i18n import _
10 import util, error, wireproto
10 import util, error, wireproto
11
11
12 class remotelock(object):
12 class remotelock(object):
13 def __init__(self, repo):
13 def __init__(self, repo):
14 self.repo = repo
14 self.repo = repo
15 def release(self):
15 def release(self):
16 self.repo.unlock()
16 self.repo.unlock()
17 self.repo = None
17 self.repo = None
18 def __del__(self):
18 def __del__(self):
19 if self.repo:
19 if self.repo:
20 self.release()
20 self.release()
21
21
22 def _serverquote(s):
22 def _serverquote(s):
23 '''quote a string for the remote shell ... which we assume is sh'''
23 '''quote a string for the remote shell ... which we assume is sh'''
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
24 if re.match('[a-zA-Z0-9@%_+=:,./-]*$', s):
25 return s
25 return s
26 return "'%s'" % s.replace("'", "'\\''")
26 return "'%s'" % s.replace("'", "'\\''")
27
27
28 class sshpeer(wireproto.wirepeer):
28 class sshpeer(wireproto.wirepeer):
29 def __init__(self, ui, path, create=False):
29 def __init__(self, ui, path, create=False):
30 self._url = path
30 self._url = path
31 self.ui = ui
31 self.ui = ui
32 self.pipeo = self.pipei = self.pipee = None
32 self.pipeo = self.pipei = self.pipee = None
33
33
34 u = util.url(path, parsequery=False, parsefragment=False)
34 u = util.url(path, parsequery=False, parsefragment=False)
35 if u.scheme != 'ssh' or not u.host or u.path is None:
35 if u.scheme != 'ssh' or not u.host or u.path is None:
36 self._abort(error.RepoError(_("couldn't parse location %s") % path))
36 self._abort(error.RepoError(_("couldn't parse location %s") % path))
37
37
38 self.user = u.user
38 self.user = u.user
39 if u.passwd is not None:
39 if u.passwd is not None:
40 self._abort(error.RepoError(_("password in URL not supported")))
40 self._abort(error.RepoError(_("password in URL not supported")))
41 self.host = u.host
41 self.host = u.host
42 self.port = u.port
42 self.port = u.port
43 self.path = u.path or "."
43 self.path = u.path or "."
44
44
45 sshcmd = self.ui.config("ui", "ssh", "ssh")
45 sshcmd = self.ui.config("ui", "ssh", "ssh")
46 remotecmd = self.ui.config("ui", "remotecmd", "hg")
46 remotecmd = self.ui.config("ui", "remotecmd", "hg")
47
47
48 args = util.sshargs(sshcmd, self.host, self.user, self.port)
48 args = util.sshargs(sshcmd, self.host, self.user, self.port)
49
49
50 if create:
50 if create:
51 cmd = '%s %s %s' % (sshcmd, args,
51 cmd = '%s %s %s' % (sshcmd, args,
52 util.shellquote("%s init %s" %
52 util.shellquote("%s init %s" %
53 (_serverquote(remotecmd), _serverquote(self.path))))
53 (_serverquote(remotecmd), _serverquote(self.path))))
54 ui.debug('running %s\n' % cmd)
54 ui.debug('running %s\n' % cmd)
55 res = util.system(cmd)
55 res = util.system(cmd)
56 if res != 0:
56 if res != 0:
57 self._abort(error.RepoError(_("could not create remote repo")))
57 self._abort(error.RepoError(_("could not create remote repo")))
58
58
59 self._validaterepo(sshcmd, args, remotecmd)
59 self._validaterepo(sshcmd, args, remotecmd)
60
60
61 def url(self):
61 def url(self):
62 return self._url
62 return self._url
63
63
64 def _validaterepo(self, sshcmd, args, remotecmd):
64 def _validaterepo(self, sshcmd, args, remotecmd):
65 # cleanup up previous run
65 # cleanup up previous run
66 self.cleanup()
66 self.cleanup()
67
67
68 cmd = '%s %s %s' % (sshcmd, args,
68 cmd = '%s %s %s' % (sshcmd, args,
69 util.shellquote("%s -R %s serve --stdio" %
69 util.shellquote("%s -R %s serve --stdio" %
70 (_serverquote(remotecmd), _serverquote(self.path))))
70 (_serverquote(remotecmd), _serverquote(self.path))))
71 self.ui.debug('running %s\n' % cmd)
71 self.ui.debug('running %s\n' % cmd)
72 cmd = util.quotecommand(cmd)
72 cmd = util.quotecommand(cmd)
73
73
74 # while self.subprocess isn't used, having it allows the subprocess to
74 # while self.subprocess isn't used, having it allows the subprocess to
75 # to clean up correctly later
75 # to clean up correctly later
76 self.pipeo, self.pipei, self.pipee, self.subprocess = util.popen4(cmd)
76 self.pipeo, self.pipei, self.pipee, self.subprocess = util.popen4(cmd)
77
77
78 # skip any noise generated by remote shell
78 # skip any noise generated by remote shell
79 self._callstream("hello")
79 self._callstream("hello")
80 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
80 r = self._callstream("between", pairs=("%s-%s" % ("0"*40, "0"*40)))
81 lines = ["", "dummy"]
81 lines = ["", "dummy"]
82 max_noise = 500
82 max_noise = 500
83 while lines[-1] and max_noise:
83 while lines[-1] and max_noise:
84 l = r.readline()
84 l = r.readline()
85 self.readerr()
85 self.readerr()
86 if lines[-1] == "1\n" and l == "\n":
86 if lines[-1] == "1\n" and l == "\n":
87 break
87 break
88 if l:
88 if l:
89 self.ui.debug("remote: ", l)
89 self.ui.debug("remote: ", l)
90 lines.append(l)
90 lines.append(l)
91 max_noise -= 1
91 max_noise -= 1
92 else:
92 else:
93 self._abort(error.RepoError(_('no suitable response from '
93 self._abort(error.RepoError(_('no suitable response from '
94 'remote hg')))
94 'remote hg')))
95
95
96 self._caps = set()
96 self._caps = set()
97 for l in reversed(lines):
97 for l in reversed(lines):
98 if l.startswith("capabilities:"):
98 if l.startswith("capabilities:"):
99 self._caps.update(l[:-1].split(":")[1].split())
99 self._caps.update(l[:-1].split(":")[1].split())
100 break
100 break
101
101
102 def _capabilities(self):
102 def _capabilities(self):
103 return self._caps
103 return self._caps
104
104
105 def readerr(self):
105 def readerr(self):
106 while True:
106 while True:
107 size = util.fstat(self.pipee).st_size
107 size = util.fstat(self.pipee).st_size
108 if size == 0:
108 if size == 0:
109 break
109 break
110 s = self.pipee.read(size)
110 s = self.pipee.read(size)
111 if not s:
111 if not s:
112 break
112 break
113 for l in s.splitlines():
113 for l in s.splitlines():
114 self.ui.status(_("remote: "), l, '\n')
114 self.ui.status(_("remote: "), l, '\n')
115
115
116 def _abort(self, exception):
116 def _abort(self, exception):
117 self.cleanup()
117 self.cleanup()
118 raise exception
118 raise exception
119
119
120 def cleanup(self):
120 def cleanup(self):
121 if self.pipeo is None:
121 if self.pipeo is None:
122 return
122 return
123 self.pipeo.close()
123 self.pipeo.close()
124 self.pipei.close()
124 self.pipei.close()
125 try:
125 try:
126 # read the error descriptor until EOF
126 # read the error descriptor until EOF
127 for l in self.pipee:
127 for l in self.pipee:
128 self.ui.status(_("remote: "), l)
128 self.ui.status(_("remote: "), l)
129 except (IOError, ValueError):
129 except (IOError, ValueError):
130 pass
130 pass
131 self.pipee.close()
131 self.pipee.close()
132
132
133 __del__ = cleanup
133 __del__ = cleanup
134
134
135 def _callstream(self, cmd, **args):
135 def _callstream(self, cmd, **args):
136 self.ui.debug("sending %s command\n" % cmd)
136 self.ui.debug("sending %s command\n" % cmd)
137 self.pipeo.write("%s\n" % cmd)
137 self.pipeo.write("%s\n" % cmd)
138 _func, names = wireproto.commands[cmd]
138 _func, names = wireproto.commands[cmd]
139 keys = names.split()
139 keys = names.split()
140 wireargs = {}
140 wireargs = {}
141 for k in keys:
141 for k in keys:
142 if k == '*':
142 if k == '*':
143 wireargs['*'] = args
143 wireargs['*'] = args
144 break
144 break
145 else:
145 else:
146 wireargs[k] = args[k]
146 wireargs[k] = args[k]
147 del args[k]
147 del args[k]
148 for k, v in sorted(wireargs.iteritems()):
148 for k, v in sorted(wireargs.iteritems()):
149 self.pipeo.write("%s %d\n" % (k, len(v)))
149 self.pipeo.write("%s %d\n" % (k, len(v)))
150 if isinstance(v, dict):
150 if isinstance(v, dict):
151 for dk, dv in v.iteritems():
151 for dk, dv in v.iteritems():
152 self.pipeo.write("%s %d\n" % (dk, len(dv)))
152 self.pipeo.write("%s %d\n" % (dk, len(dv)))
153 self.pipeo.write(dv)
153 self.pipeo.write(dv)
154 else:
154 else:
155 self.pipeo.write(v)
155 self.pipeo.write(v)
156 self.pipeo.flush()
156 self.pipeo.flush()
157
157
158 return self.pipei
158 return self.pipei
159
159
160 def _callcompressable(self, cmd, **args):
161 return self._callstream(cmd, **args)
162
160 def _call(self, cmd, **args):
163 def _call(self, cmd, **args):
161 self._callstream(cmd, **args)
164 self._callstream(cmd, **args)
162 return self._recv()
165 return self._recv()
163
166
164 def _callpush(self, cmd, fp, **args):
167 def _callpush(self, cmd, fp, **args):
165 r = self._call(cmd, **args)
168 r = self._call(cmd, **args)
166 if r:
169 if r:
167 return '', r
170 return '', r
168 while True:
171 while True:
169 d = fp.read(4096)
172 d = fp.read(4096)
170 if not d:
173 if not d:
171 break
174 break
172 self._send(d)
175 self._send(d)
173 self._send("", flush=True)
176 self._send("", flush=True)
174 r = self._recv()
177 r = self._recv()
175 if r:
178 if r:
176 return '', r
179 return '', r
177 return self._recv(), ''
180 return self._recv(), ''
178
181
179 def _decompress(self, stream):
180 return stream
181
182
182 def _recv(self):
183 def _recv(self):
183 l = self.pipei.readline()
184 l = self.pipei.readline()
184 if l == '\n':
185 if l == '\n':
185 err = []
186 err = []
186 while True:
187 while True:
187 line = self.pipee.readline()
188 line = self.pipee.readline()
188 if line == '-\n':
189 if line == '-\n':
189 break
190 break
190 err.extend([line])
191 err.extend([line])
191 if len(err) > 0:
192 if len(err) > 0:
192 # strip the trailing newline added to the last line server-side
193 # strip the trailing newline added to the last line server-side
193 err[-1] = err[-1][:-1]
194 err[-1] = err[-1][:-1]
194 self._abort(error.OutOfBandError(*err))
195 self._abort(error.OutOfBandError(*err))
195 self.readerr()
196 self.readerr()
196 try:
197 try:
197 l = int(l)
198 l = int(l)
198 except ValueError:
199 except ValueError:
199 self._abort(error.ResponseError(_("unexpected response:"), l))
200 self._abort(error.ResponseError(_("unexpected response:"), l))
200 return self.pipei.read(l)
201 return self.pipei.read(l)
201
202
202 def _send(self, data, flush=False):
203 def _send(self, data, flush=False):
203 self.pipeo.write("%d\n" % len(data))
204 self.pipeo.write("%d\n" % len(data))
204 if data:
205 if data:
205 self.pipeo.write(data)
206 self.pipeo.write(data)
206 if flush:
207 if flush:
207 self.pipeo.flush()
208 self.pipeo.flush()
208 self.readerr()
209 self.readerr()
209
210
210 def lock(self):
211 def lock(self):
211 self._call("lock")
212 self._call("lock")
212 return remotelock(self)
213 return remotelock(self)
213
214
214 def unlock(self):
215 def unlock(self):
215 self._call("unlock")
216 self._call("unlock")
216
217
217 def addchangegroup(self, cg, source, url, lock=None):
218 def addchangegroup(self, cg, source, url, lock=None):
218 '''Send a changegroup to the remote server. Return an integer
219 '''Send a changegroup to the remote server. Return an integer
219 similar to unbundle(). DEPRECATED, since it requires locking the
220 similar to unbundle(). DEPRECATED, since it requires locking the
220 remote.'''
221 remote.'''
221 d = self._call("addchangegroup")
222 d = self._call("addchangegroup")
222 if d:
223 if d:
223 self._abort(error.RepoError(_("push refused: %s") % d))
224 self._abort(error.RepoError(_("push refused: %s") % d))
224 while True:
225 while True:
225 d = cg.read(4096)
226 d = cg.read(4096)
226 if not d:
227 if not d:
227 break
228 break
228 self.pipeo.write(d)
229 self.pipeo.write(d)
229 self.readerr()
230 self.readerr()
230
231
231 self.pipeo.flush()
232 self.pipeo.flush()
232
233
233 self.readerr()
234 self.readerr()
234 r = self._recv()
235 r = self._recv()
235 if not r:
236 if not r:
236 return 1
237 return 1
237 try:
238 try:
238 return int(r)
239 return int(r)
239 except ValueError:
240 except ValueError:
240 self._abort(error.ResponseError(_("unexpected response:"), r))
241 self._abort(error.ResponseError(_("unexpected response:"), r))
241
242
242 instance = sshpeer
243 instance = sshpeer
@@ -1,784 +1,791 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import peer, error, encoding, util, store
12 import peer, error, encoding, util, store
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 # abstract batching support
62
62
63 class future(object):
63 class future(object):
64 '''placeholder for a value to be set later'''
64 '''placeholder for a value to be set later'''
65 def set(self, value):
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
67 raise error.RepoError("future is already set")
68 self.value = value
68 self.value = value
69
69
70 class batcher(object):
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
71 '''base class for batches of commands submittable in a single request
72
72
73 All methods invoked on instances of this class are simply queued and
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
75 calls are performed and the results set in their respective futures.
76 '''
76 '''
77 def __init__(self):
77 def __init__(self):
78 self.calls = []
78 self.calls = []
79 def __getattr__(self, name):
79 def __getattr__(self, name):
80 def call(*args, **opts):
80 def call(*args, **opts):
81 resref = future()
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
82 self.calls.append((name, args, opts, resref,))
83 return resref
83 return resref
84 return call
84 return call
85 def submit(self):
85 def submit(self):
86 pass
86 pass
87
87
88 class localbatch(batcher):
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
89 '''performs the queued calls directly'''
90 def __init__(self, local):
90 def __init__(self, local):
91 batcher.__init__(self)
91 batcher.__init__(self)
92 self.local = local
92 self.local = local
93 def submit(self):
93 def submit(self):
94 for name, args, opts, resref in self.calls:
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
95 resref.set(getattr(self.local, name)(*args, **opts))
96
96
97 class remotebatch(batcher):
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
98 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
99 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
100 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
101 _submitone(op, encargs)'''
102 batcher.__init__(self)
102 batcher.__init__(self)
103 self.remote = remote
103 self.remote = remote
104 def submit(self):
104 def submit(self):
105 req, rsp = [], []
105 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
106 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
107 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
108 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
109 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if encresref:
112 if encresref:
113 req.append((name, encargsorres,))
113 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
114 rsp.append((batchable, encresref, resref,))
115 else:
115 else:
116 resref.set(encargsorres)
116 resref.set(encargsorres)
117 else:
117 else:
118 if req:
118 if req:
119 self._submitreq(req, rsp)
119 self._submitreq(req, rsp)
120 req, rsp = [], []
120 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
121 resref.set(mtd(*args, **opts))
122 if req:
122 if req:
123 self._submitreq(req, rsp)
123 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
124 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
125 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
126 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
127 batchable, encresref, resref = r
128 encresref.set(encres)
128 encresref.set(encres)
129 resref.set(batchable.next())
129 resref.set(batchable.next())
130
130
131 def batchable(f):
131 def batchable(f):
132 '''annotation for batchable methods
132 '''annotation for batchable methods
133
133
134 Such methods must implement a coroutine as follows:
134 Such methods must implement a coroutine as follows:
135
135
136 @batchable
136 @batchable
137 def sample(self, one, two=None):
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
138 # Handle locally computable results first:
139 if not one:
139 if not one:
140 yield "a local result", None
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
143 # Create future for injection of encoded result:
144 encresref = future()
144 encresref = future()
145 # Return encoded arguments and future:
145 # Return encoded arguments and future:
146 yield encargs, encresref
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
148 # request now. Decode it:
149 yield decode(encresref.value)
149 yield decode(encresref.value)
150
150
151 The decorator returns a function which wraps this coroutine as a plain
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
154 decoding phases.
155 '''
155 '''
156 def plain(*args, **opts):
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
158 encargsorres, encresref = batchable.next()
159 if not encresref:
159 if not encresref:
160 return encargsorres # a local result in this case
160 return encargsorres # a local result in this case
161 self = args[0]
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
163 return batchable.next()
164 setattr(plain, 'batchable', f)
164 setattr(plain, 'batchable', f)
165 return plain
165 return plain
166
166
167 # list of nodes encoding / decoding
167 # list of nodes encoding / decoding
168
168
169 def decodelist(l, sep=' '):
169 def decodelist(l, sep=' '):
170 if l:
170 if l:
171 return map(bin, l.split(sep))
171 return map(bin, l.split(sep))
172 return []
172 return []
173
173
174 def encodelist(l, sep=' '):
174 def encodelist(l, sep=' '):
175 return sep.join(map(hex, l))
175 return sep.join(map(hex, l))
176
176
177 # batched call argument encoding
177 # batched call argument encoding
178
178
179 def escapearg(plain):
179 def escapearg(plain):
180 return (plain
180 return (plain
181 .replace(':', '::')
181 .replace(':', '::')
182 .replace(',', ':,')
182 .replace(',', ':,')
183 .replace(';', ':;')
183 .replace(';', ':;')
184 .replace('=', ':='))
184 .replace('=', ':='))
185
185
186 def unescapearg(escaped):
186 def unescapearg(escaped):
187 return (escaped
187 return (escaped
188 .replace(':=', '=')
188 .replace(':=', '=')
189 .replace(':;', ';')
189 .replace(':;', ';')
190 .replace(':,', ',')
190 .replace(':,', ',')
191 .replace('::', ':'))
191 .replace('::', ':'))
192
192
193 # client side
193 # client side
194
194
195 class wirepeer(peer.peerrepository):
195 class wirepeer(peer.peerrepository):
196
196
197 def batch(self):
197 def batch(self):
198 return remotebatch(self)
198 return remotebatch(self)
199 def _submitbatch(self, req):
199 def _submitbatch(self, req):
200 cmds = []
200 cmds = []
201 for op, argsdict in req:
201 for op, argsdict in req:
202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 cmds.append('%s %s' % (op, args))
203 cmds.append('%s %s' % (op, args))
204 rsp = self._call("batch", cmds=';'.join(cmds))
204 rsp = self._call("batch", cmds=';'.join(cmds))
205 return rsp.split(';')
205 return rsp.split(';')
206 def _submitone(self, op, args):
206 def _submitone(self, op, args):
207 return self._call(op, **args)
207 return self._call(op, **args)
208
208
209 @batchable
209 @batchable
210 def lookup(self, key):
210 def lookup(self, key):
211 self.requirecap('lookup', _('look up remote revision'))
211 self.requirecap('lookup', _('look up remote revision'))
212 f = future()
212 f = future()
213 yield {'key': encoding.fromlocal(key)}, f
213 yield {'key': encoding.fromlocal(key)}, f
214 d = f.value
214 d = f.value
215 success, data = d[:-1].split(" ", 1)
215 success, data = d[:-1].split(" ", 1)
216 if int(success):
216 if int(success):
217 yield bin(data)
217 yield bin(data)
218 self._abort(error.RepoError(data))
218 self._abort(error.RepoError(data))
219
219
220 @batchable
220 @batchable
221 def heads(self):
221 def heads(self):
222 f = future()
222 f = future()
223 yield {}, f
223 yield {}, f
224 d = f.value
224 d = f.value
225 try:
225 try:
226 yield decodelist(d[:-1])
226 yield decodelist(d[:-1])
227 except ValueError:
227 except ValueError:
228 self._abort(error.ResponseError(_("unexpected response:"), d))
228 self._abort(error.ResponseError(_("unexpected response:"), d))
229
229
230 @batchable
230 @batchable
231 def known(self, nodes):
231 def known(self, nodes):
232 f = future()
232 f = future()
233 yield {'nodes': encodelist(nodes)}, f
233 yield {'nodes': encodelist(nodes)}, f
234 d = f.value
234 d = f.value
235 try:
235 try:
236 yield [bool(int(f)) for f in d]
236 yield [bool(int(f)) for f in d]
237 except ValueError:
237 except ValueError:
238 self._abort(error.ResponseError(_("unexpected response:"), d))
238 self._abort(error.ResponseError(_("unexpected response:"), d))
239
239
240 @batchable
240 @batchable
241 def branchmap(self):
241 def branchmap(self):
242 f = future()
242 f = future()
243 yield {}, f
243 yield {}, f
244 d = f.value
244 d = f.value
245 try:
245 try:
246 branchmap = {}
246 branchmap = {}
247 for branchpart in d.splitlines():
247 for branchpart in d.splitlines():
248 branchname, branchheads = branchpart.split(' ', 1)
248 branchname, branchheads = branchpart.split(' ', 1)
249 branchname = encoding.tolocal(urllib.unquote(branchname))
249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 branchheads = decodelist(branchheads)
250 branchheads = decodelist(branchheads)
251 branchmap[branchname] = branchheads
251 branchmap[branchname] = branchheads
252 yield branchmap
252 yield branchmap
253 except TypeError:
253 except TypeError:
254 self._abort(error.ResponseError(_("unexpected response:"), d))
254 self._abort(error.ResponseError(_("unexpected response:"), d))
255
255
256 def branches(self, nodes):
256 def branches(self, nodes):
257 n = encodelist(nodes)
257 n = encodelist(nodes)
258 d = self._call("branches", nodes=n)
258 d = self._call("branches", nodes=n)
259 try:
259 try:
260 br = [tuple(decodelist(b)) for b in d.splitlines()]
260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 return br
261 return br
262 except ValueError:
262 except ValueError:
263 self._abort(error.ResponseError(_("unexpected response:"), d))
263 self._abort(error.ResponseError(_("unexpected response:"), d))
264
264
265 def between(self, pairs):
265 def between(self, pairs):
266 batch = 8 # avoid giant requests
266 batch = 8 # avoid giant requests
267 r = []
267 r = []
268 for i in xrange(0, len(pairs), batch):
268 for i in xrange(0, len(pairs), batch):
269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 d = self._call("between", pairs=n)
270 d = self._call("between", pairs=n)
271 try:
271 try:
272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 except ValueError:
273 except ValueError:
274 self._abort(error.ResponseError(_("unexpected response:"), d))
274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 return r
275 return r
276
276
277 @batchable
277 @batchable
278 def pushkey(self, namespace, key, old, new):
278 def pushkey(self, namespace, key, old, new):
279 if not self.capable('pushkey'):
279 if not self.capable('pushkey'):
280 yield False, None
280 yield False, None
281 f = future()
281 f = future()
282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 yield {'namespace': encoding.fromlocal(namespace),
283 yield {'namespace': encoding.fromlocal(namespace),
284 'key': encoding.fromlocal(key),
284 'key': encoding.fromlocal(key),
285 'old': encoding.fromlocal(old),
285 'old': encoding.fromlocal(old),
286 'new': encoding.fromlocal(new)}, f
286 'new': encoding.fromlocal(new)}, f
287 d = f.value
287 d = f.value
288 d, output = d.split('\n', 1)
288 d, output = d.split('\n', 1)
289 try:
289 try:
290 d = bool(int(d))
290 d = bool(int(d))
291 except ValueError:
291 except ValueError:
292 raise error.ResponseError(
292 raise error.ResponseError(
293 _('push failed (unexpected response):'), d)
293 _('push failed (unexpected response):'), d)
294 for l in output.splitlines(True):
294 for l in output.splitlines(True):
295 self.ui.status(_('remote: '), l)
295 self.ui.status(_('remote: '), l)
296 yield d
296 yield d
297
297
298 @batchable
298 @batchable
299 def listkeys(self, namespace):
299 def listkeys(self, namespace):
300 if not self.capable('pushkey'):
300 if not self.capable('pushkey'):
301 yield {}, None
301 yield {}, None
302 f = future()
302 f = future()
303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 yield {'namespace': encoding.fromlocal(namespace)}, f
304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 d = f.value
305 d = f.value
306 r = {}
306 r = {}
307 for l in d.splitlines():
307 for l in d.splitlines():
308 k, v = l.split('\t')
308 k, v = l.split('\t')
309 r[encoding.tolocal(k)] = encoding.tolocal(v)
309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 yield r
310 yield r
311
311
312 def stream_out(self):
312 def stream_out(self):
313 return self._callstream('stream_out')
313 return self._callstream('stream_out')
314
314
315 def changegroup(self, nodes, kind):
315 def changegroup(self, nodes, kind):
316 n = encodelist(nodes)
316 n = encodelist(nodes)
317 f = self._callstream("changegroup", roots=n)
317 f = self._callcompressable("changegroup", roots=n)
318 return changegroupmod.unbundle10(self._decompress(f), 'UN')
318 return changegroupmod.unbundle10(f, 'UN')
319
319
320 def changegroupsubset(self, bases, heads, kind):
320 def changegroupsubset(self, bases, heads, kind):
321 self.requirecap('changegroupsubset', _('look up remote changes'))
321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 bases = encodelist(bases)
322 bases = encodelist(bases)
323 heads = encodelist(heads)
323 heads = encodelist(heads)
324 f = self._callstream("changegroupsubset",
324 f = self._callcompressable("changegroupsubset",
325 bases=bases, heads=heads)
325 bases=bases, heads=heads)
326 return changegroupmod.unbundle10(self._decompress(f), 'UN')
326 return changegroupmod.unbundle10(f, 'UN')
327
327
328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
329 self.requirecap('getbundle', _('look up remote changes'))
329 self.requirecap('getbundle', _('look up remote changes'))
330 opts = {}
330 opts = {}
331 if heads is not None:
331 if heads is not None:
332 opts['heads'] = encodelist(heads)
332 opts['heads'] = encodelist(heads)
333 if common is not None:
333 if common is not None:
334 opts['common'] = encodelist(common)
334 opts['common'] = encodelist(common)
335 if bundlecaps is not None:
335 if bundlecaps is not None:
336 opts['bundlecaps'] = ','.join(bundlecaps)
336 opts['bundlecaps'] = ','.join(bundlecaps)
337 f = self._callstream("getbundle", **opts)
337 f = self._callcompressable("getbundle", **opts)
338 return changegroupmod.unbundle10(self._decompress(f), 'UN')
338 return changegroupmod.unbundle10(f, 'UN')
339
339
340 def unbundle(self, cg, heads, source):
340 def unbundle(self, cg, heads, source):
341 '''Send cg (a readable file-like object representing the
341 '''Send cg (a readable file-like object representing the
342 changegroup to push, typically a chunkbuffer object) to the
342 changegroup to push, typically a chunkbuffer object) to the
343 remote server as a bundle. Return an integer indicating the
343 remote server as a bundle. Return an integer indicating the
344 result of the push (see localrepository.addchangegroup()).'''
344 result of the push (see localrepository.addchangegroup()).'''
345
345
346 if heads != ['force'] and self.capable('unbundlehash'):
346 if heads != ['force'] and self.capable('unbundlehash'):
347 heads = encodelist(['hashed',
347 heads = encodelist(['hashed',
348 util.sha1(''.join(sorted(heads))).digest()])
348 util.sha1(''.join(sorted(heads))).digest()])
349 else:
349 else:
350 heads = encodelist(heads)
350 heads = encodelist(heads)
351
351
352 ret, output = self._callpush("unbundle", cg, heads=heads)
352 ret, output = self._callpush("unbundle", cg, heads=heads)
353 if ret == "":
353 if ret == "":
354 raise error.ResponseError(
354 raise error.ResponseError(
355 _('push failed:'), output)
355 _('push failed:'), output)
356 try:
356 try:
357 ret = int(ret)
357 ret = int(ret)
358 except ValueError:
358 except ValueError:
359 raise error.ResponseError(
359 raise error.ResponseError(
360 _('push failed (unexpected response):'), ret)
360 _('push failed (unexpected response):'), ret)
361
361
362 for l in output.splitlines(True):
362 for l in output.splitlines(True):
363 self.ui.status(_('remote: '), l)
363 self.ui.status(_('remote: '), l)
364 return ret
364 return ret
365
365
366 def debugwireargs(self, one, two, three=None, four=None, five=None):
366 def debugwireargs(self, one, two, three=None, four=None, five=None):
367 # don't pass optional arguments left at their default value
367 # don't pass optional arguments left at their default value
368 opts = {}
368 opts = {}
369 if three is not None:
369 if three is not None:
370 opts['three'] = three
370 opts['three'] = three
371 if four is not None:
371 if four is not None:
372 opts['four'] = four
372 opts['four'] = four
373 return self._call('debugwireargs', one=one, two=two, **opts)
373 return self._call('debugwireargs', one=one, two=two, **opts)
374
374
375 def _call(self, cmd, **args):
375 def _call(self, cmd, **args):
376 """execute <cmd> on the server
376 """execute <cmd> on the server
377
377
378 The command is expected to return a simple string.
378 The command is expected to return a simple string.
379
379
380 returns the server reply as a string."""
380 returns the server reply as a string."""
381 raise NotImplementedError()
381 raise NotImplementedError()
382
382
383 def _callstream(self, cmd, **args):
383 def _callstream(self, cmd, **args):
384 """execute <cmd> on the server
384 """execute <cmd> on the server
385
385
386 The command is expected to return a stream.
386 The command is expected to return a stream.
387
387
388 returns the server reply as a file like object."""
388 returns the server reply as a file like object."""
389 raise NotImplementedError()
389 raise NotImplementedError()
390
390
391 def _callcompressable(self, cmd, **args):
392 """execute <cmd> on the server
393
394 The command is expected to return a stream.
395
396 The stream may have been compressed in some implementaitons. This
397 function takes care of the decompression. This is the only difference
398 with _callstream.
399
400 returns the server reply as a file like object.
401 """
402 raise NotImplementedError()
403
391 def _callpush(self, cmd, fp, **args):
404 def _callpush(self, cmd, fp, **args):
392 """execute a <cmd> on server
405 """execute a <cmd> on server
393
406
394 The command is expected to be related to a push. Push has a special
407 The command is expected to be related to a push. Push has a special
395 return method.
408 return method.
396
409
397 returns the server reply as a (ret, output) tuple. ret is either
410 returns the server reply as a (ret, output) tuple. ret is either
398 empty (error) or a stringified int.
411 empty (error) or a stringified int.
399 """
412 """
400 raise NotImplementedError()
413 raise NotImplementedError()
401
414
402 def _abort(self, exception):
415 def _abort(self, exception):
403 """clearly abort the wire protocol connection and raise the exception
416 """clearly abort the wire protocol connection and raise the exception
404 """
417 """
405 raise NotImplementedError()
418 raise NotImplementedError()
406
419
407
408 def _decompress(self, stream):
409 """decompress a received stream
410 """
411 raise NotImplementedError()
412
413 # server side
420 # server side
414
421
415 # wire protocol command can either return a string or one of these classes.
422 # wire protocol command can either return a string or one of these classes.
416 class streamres(object):
423 class streamres(object):
417 """wireproto reply: binary stream
424 """wireproto reply: binary stream
418
425
419 The call was successful and the result is a stream.
426 The call was successful and the result is a stream.
420 Iterate on the `self.gen` attribute to retrieve chunks.
427 Iterate on the `self.gen` attribute to retrieve chunks.
421 """
428 """
422 def __init__(self, gen):
429 def __init__(self, gen):
423 self.gen = gen
430 self.gen = gen
424
431
425 class pushres(object):
432 class pushres(object):
426 """wireproto reply: success with simple integer return
433 """wireproto reply: success with simple integer return
427
434
428 The call was successful and returned an integer contained in `self.res`.
435 The call was successful and returned an integer contained in `self.res`.
429 """
436 """
430 def __init__(self, res):
437 def __init__(self, res):
431 self.res = res
438 self.res = res
432
439
433 class pusherr(object):
440 class pusherr(object):
434 """wireproto reply: failure
441 """wireproto reply: failure
435
442
436 The call failed. The `self.res` attribute contains the error message.
443 The call failed. The `self.res` attribute contains the error message.
437 """
444 """
438 def __init__(self, res):
445 def __init__(self, res):
439 self.res = res
446 self.res = res
440
447
441 class ooberror(object):
448 class ooberror(object):
442 """wireproto reply: failure of a batch of operation
449 """wireproto reply: failure of a batch of operation
443
450
444 Something failed during a batch call. The error message is stored in
451 Something failed during a batch call. The error message is stored in
445 `self.message`.
452 `self.message`.
446 """
453 """
447 def __init__(self, message):
454 def __init__(self, message):
448 self.message = message
455 self.message = message
449
456
450 def dispatch(repo, proto, command):
457 def dispatch(repo, proto, command):
451 repo = repo.filtered("served")
458 repo = repo.filtered("served")
452 func, spec = commands[command]
459 func, spec = commands[command]
453 args = proto.getargs(spec)
460 args = proto.getargs(spec)
454 return func(repo, proto, *args)
461 return func(repo, proto, *args)
455
462
456 def options(cmd, keys, others):
463 def options(cmd, keys, others):
457 opts = {}
464 opts = {}
458 for k in keys:
465 for k in keys:
459 if k in others:
466 if k in others:
460 opts[k] = others[k]
467 opts[k] = others[k]
461 del others[k]
468 del others[k]
462 if others:
469 if others:
463 sys.stderr.write("abort: %s got unexpected arguments %s\n"
470 sys.stderr.write("abort: %s got unexpected arguments %s\n"
464 % (cmd, ",".join(others)))
471 % (cmd, ",".join(others)))
465 return opts
472 return opts
466
473
467 def batch(repo, proto, cmds, others):
474 def batch(repo, proto, cmds, others):
468 repo = repo.filtered("served")
475 repo = repo.filtered("served")
469 res = []
476 res = []
470 for pair in cmds.split(';'):
477 for pair in cmds.split(';'):
471 op, args = pair.split(' ', 1)
478 op, args = pair.split(' ', 1)
472 vals = {}
479 vals = {}
473 for a in args.split(','):
480 for a in args.split(','):
474 if a:
481 if a:
475 n, v = a.split('=')
482 n, v = a.split('=')
476 vals[n] = unescapearg(v)
483 vals[n] = unescapearg(v)
477 func, spec = commands[op]
484 func, spec = commands[op]
478 if spec:
485 if spec:
479 keys = spec.split()
486 keys = spec.split()
480 data = {}
487 data = {}
481 for k in keys:
488 for k in keys:
482 if k == '*':
489 if k == '*':
483 star = {}
490 star = {}
484 for key in vals.keys():
491 for key in vals.keys():
485 if key not in keys:
492 if key not in keys:
486 star[key] = vals[key]
493 star[key] = vals[key]
487 data['*'] = star
494 data['*'] = star
488 else:
495 else:
489 data[k] = vals[k]
496 data[k] = vals[k]
490 result = func(repo, proto, *[data[k] for k in keys])
497 result = func(repo, proto, *[data[k] for k in keys])
491 else:
498 else:
492 result = func(repo, proto)
499 result = func(repo, proto)
493 if isinstance(result, ooberror):
500 if isinstance(result, ooberror):
494 return result
501 return result
495 res.append(escapearg(result))
502 res.append(escapearg(result))
496 return ';'.join(res)
503 return ';'.join(res)
497
504
498 def between(repo, proto, pairs):
505 def between(repo, proto, pairs):
499 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
506 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
500 r = []
507 r = []
501 for b in repo.between(pairs):
508 for b in repo.between(pairs):
502 r.append(encodelist(b) + "\n")
509 r.append(encodelist(b) + "\n")
503 return "".join(r)
510 return "".join(r)
504
511
505 def branchmap(repo, proto):
512 def branchmap(repo, proto):
506 branchmap = repo.branchmap()
513 branchmap = repo.branchmap()
507 heads = []
514 heads = []
508 for branch, nodes in branchmap.iteritems():
515 for branch, nodes in branchmap.iteritems():
509 branchname = urllib.quote(encoding.fromlocal(branch))
516 branchname = urllib.quote(encoding.fromlocal(branch))
510 branchnodes = encodelist(nodes)
517 branchnodes = encodelist(nodes)
511 heads.append('%s %s' % (branchname, branchnodes))
518 heads.append('%s %s' % (branchname, branchnodes))
512 return '\n'.join(heads)
519 return '\n'.join(heads)
513
520
514 def branches(repo, proto, nodes):
521 def branches(repo, proto, nodes):
515 nodes = decodelist(nodes)
522 nodes = decodelist(nodes)
516 r = []
523 r = []
517 for b in repo.branches(nodes):
524 for b in repo.branches(nodes):
518 r.append(encodelist(b) + "\n")
525 r.append(encodelist(b) + "\n")
519 return "".join(r)
526 return "".join(r)
520
527
521
528
522 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
529 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
523 'known', 'getbundle', 'unbundlehash', 'batch']
530 'known', 'getbundle', 'unbundlehash', 'batch']
524
531
525 def _capabilities(repo, proto):
532 def _capabilities(repo, proto):
526 """return a list of capabilities for a repo
533 """return a list of capabilities for a repo
527
534
528 This function exists to allow extensions to easily wrap capabilities
535 This function exists to allow extensions to easily wrap capabilities
529 computation
536 computation
530
537
531 - returns a lists: easy to alter
538 - returns a lists: easy to alter
532 - change done here will be propagated to both `capabilities` and `hello`
539 - change done here will be propagated to both `capabilities` and `hello`
533 command without any other effort. without any other action needed.
540 command without any other effort. without any other action needed.
534 """
541 """
535 # copy to prevent modification of the global list
542 # copy to prevent modification of the global list
536 caps = list(wireprotocaps)
543 caps = list(wireprotocaps)
537 if _allowstream(repo.ui):
544 if _allowstream(repo.ui):
538 if repo.ui.configbool('server', 'preferuncompressed', False):
545 if repo.ui.configbool('server', 'preferuncompressed', False):
539 caps.append('stream-preferred')
546 caps.append('stream-preferred')
540 requiredformats = repo.requirements & repo.supportedformats
547 requiredformats = repo.requirements & repo.supportedformats
541 # if our local revlogs are just revlogv1, add 'stream' cap
548 # if our local revlogs are just revlogv1, add 'stream' cap
542 if not requiredformats - set(('revlogv1',)):
549 if not requiredformats - set(('revlogv1',)):
543 caps.append('stream')
550 caps.append('stream')
544 # otherwise, add 'streamreqs' detailing our local revlog format
551 # otherwise, add 'streamreqs' detailing our local revlog format
545 else:
552 else:
546 caps.append('streamreqs=%s' % ','.join(requiredformats))
553 caps.append('streamreqs=%s' % ','.join(requiredformats))
547 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
554 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
548 caps.append('httpheader=1024')
555 caps.append('httpheader=1024')
549 return caps
556 return caps
550
557
551 # If you are writting and extension and consider wrapping this function. Wrap
558 # If you are writting and extension and consider wrapping this function. Wrap
552 # `_capabilities` instead.
559 # `_capabilities` instead.
553 def capabilities(repo, proto):
560 def capabilities(repo, proto):
554 return ' '.join(_capabilities(repo, proto))
561 return ' '.join(_capabilities(repo, proto))
555
562
556 def changegroup(repo, proto, roots):
563 def changegroup(repo, proto, roots):
557 nodes = decodelist(roots)
564 nodes = decodelist(roots)
558 cg = repo.changegroup(nodes, 'serve')
565 cg = repo.changegroup(nodes, 'serve')
559 return streamres(proto.groupchunks(cg))
566 return streamres(proto.groupchunks(cg))
560
567
561 def changegroupsubset(repo, proto, bases, heads):
568 def changegroupsubset(repo, proto, bases, heads):
562 bases = decodelist(bases)
569 bases = decodelist(bases)
563 heads = decodelist(heads)
570 heads = decodelist(heads)
564 cg = repo.changegroupsubset(bases, heads, 'serve')
571 cg = repo.changegroupsubset(bases, heads, 'serve')
565 return streamres(proto.groupchunks(cg))
572 return streamres(proto.groupchunks(cg))
566
573
567 def debugwireargs(repo, proto, one, two, others):
574 def debugwireargs(repo, proto, one, two, others):
568 # only accept optional args from the known set
575 # only accept optional args from the known set
569 opts = options('debugwireargs', ['three', 'four'], others)
576 opts = options('debugwireargs', ['three', 'four'], others)
570 return repo.debugwireargs(one, two, **opts)
577 return repo.debugwireargs(one, two, **opts)
571
578
572 def getbundle(repo, proto, others):
579 def getbundle(repo, proto, others):
573 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
580 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
574 for k, v in opts.iteritems():
581 for k, v in opts.iteritems():
575 if k in ('heads', 'common'):
582 if k in ('heads', 'common'):
576 opts[k] = decodelist(v)
583 opts[k] = decodelist(v)
577 elif k == 'bundlecaps':
584 elif k == 'bundlecaps':
578 opts[k] = set(v.split(','))
585 opts[k] = set(v.split(','))
579 cg = repo.getbundle('serve', **opts)
586 cg = repo.getbundle('serve', **opts)
580 return streamres(proto.groupchunks(cg))
587 return streamres(proto.groupchunks(cg))
581
588
582 def heads(repo, proto):
589 def heads(repo, proto):
583 h = repo.heads()
590 h = repo.heads()
584 return encodelist(h) + "\n"
591 return encodelist(h) + "\n"
585
592
586 def hello(repo, proto):
593 def hello(repo, proto):
587 '''the hello command returns a set of lines describing various
594 '''the hello command returns a set of lines describing various
588 interesting things about the server, in an RFC822-like format.
595 interesting things about the server, in an RFC822-like format.
589 Currently the only one defined is "capabilities", which
596 Currently the only one defined is "capabilities", which
590 consists of a line in the form:
597 consists of a line in the form:
591
598
592 capabilities: space separated list of tokens
599 capabilities: space separated list of tokens
593 '''
600 '''
594 return "capabilities: %s\n" % (capabilities(repo, proto))
601 return "capabilities: %s\n" % (capabilities(repo, proto))
595
602
596 def listkeys(repo, proto, namespace):
603 def listkeys(repo, proto, namespace):
597 d = repo.listkeys(encoding.tolocal(namespace)).items()
604 d = repo.listkeys(encoding.tolocal(namespace)).items()
598 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
605 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
599 for k, v in d])
606 for k, v in d])
600 return t
607 return t
601
608
602 def lookup(repo, proto, key):
609 def lookup(repo, proto, key):
603 try:
610 try:
604 k = encoding.tolocal(key)
611 k = encoding.tolocal(key)
605 c = repo[k]
612 c = repo[k]
606 r = c.hex()
613 r = c.hex()
607 success = 1
614 success = 1
608 except Exception, inst:
615 except Exception, inst:
609 r = str(inst)
616 r = str(inst)
610 success = 0
617 success = 0
611 return "%s %s\n" % (success, r)
618 return "%s %s\n" % (success, r)
612
619
613 def known(repo, proto, nodes, others):
620 def known(repo, proto, nodes, others):
614 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
621 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
615
622
616 def pushkey(repo, proto, namespace, key, old, new):
623 def pushkey(repo, proto, namespace, key, old, new):
617 # compatibility with pre-1.8 clients which were accidentally
624 # compatibility with pre-1.8 clients which were accidentally
618 # sending raw binary nodes rather than utf-8-encoded hex
625 # sending raw binary nodes rather than utf-8-encoded hex
619 if len(new) == 20 and new.encode('string-escape') != new:
626 if len(new) == 20 and new.encode('string-escape') != new:
620 # looks like it could be a binary node
627 # looks like it could be a binary node
621 try:
628 try:
622 new.decode('utf-8')
629 new.decode('utf-8')
623 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
630 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
624 except UnicodeDecodeError:
631 except UnicodeDecodeError:
625 pass # binary, leave unmodified
632 pass # binary, leave unmodified
626 else:
633 else:
627 new = encoding.tolocal(new) # normal path
634 new = encoding.tolocal(new) # normal path
628
635
629 if util.safehasattr(proto, 'restore'):
636 if util.safehasattr(proto, 'restore'):
630
637
631 proto.redirect()
638 proto.redirect()
632
639
633 try:
640 try:
634 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
641 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
635 encoding.tolocal(old), new) or False
642 encoding.tolocal(old), new) or False
636 except util.Abort:
643 except util.Abort:
637 r = False
644 r = False
638
645
639 output = proto.restore()
646 output = proto.restore()
640
647
641 return '%s\n%s' % (int(r), output)
648 return '%s\n%s' % (int(r), output)
642
649
643 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
650 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
644 encoding.tolocal(old), new)
651 encoding.tolocal(old), new)
645 return '%s\n' % int(r)
652 return '%s\n' % int(r)
646
653
647 def _allowstream(ui):
654 def _allowstream(ui):
648 return ui.configbool('server', 'uncompressed', True, untrusted=True)
655 return ui.configbool('server', 'uncompressed', True, untrusted=True)
649
656
650 def _walkstreamfiles(repo):
657 def _walkstreamfiles(repo):
651 # this is it's own function so extensions can override it
658 # this is it's own function so extensions can override it
652 return repo.store.walk()
659 return repo.store.walk()
653
660
654 def stream(repo, proto):
661 def stream(repo, proto):
655 '''If the server supports streaming clone, it advertises the "stream"
662 '''If the server supports streaming clone, it advertises the "stream"
656 capability with a value representing the version and flags of the repo
663 capability with a value representing the version and flags of the repo
657 it is serving. Client checks to see if it understands the format.
664 it is serving. Client checks to see if it understands the format.
658
665
659 The format is simple: the server writes out a line with the amount
666 The format is simple: the server writes out a line with the amount
660 of files, then the total amount of bytes to be transferred (separated
667 of files, then the total amount of bytes to be transferred (separated
661 by a space). Then, for each file, the server first writes the filename
668 by a space). Then, for each file, the server first writes the filename
662 and filesize (separated by the null character), then the file contents.
669 and filesize (separated by the null character), then the file contents.
663 '''
670 '''
664
671
665 if not _allowstream(repo.ui):
672 if not _allowstream(repo.ui):
666 return '1\n'
673 return '1\n'
667
674
668 entries = []
675 entries = []
669 total_bytes = 0
676 total_bytes = 0
670 try:
677 try:
671 # get consistent snapshot of repo, lock during scan
678 # get consistent snapshot of repo, lock during scan
672 lock = repo.lock()
679 lock = repo.lock()
673 try:
680 try:
674 repo.ui.debug('scanning\n')
681 repo.ui.debug('scanning\n')
675 for name, ename, size in _walkstreamfiles(repo):
682 for name, ename, size in _walkstreamfiles(repo):
676 if size:
683 if size:
677 entries.append((name, size))
684 entries.append((name, size))
678 total_bytes += size
685 total_bytes += size
679 finally:
686 finally:
680 lock.release()
687 lock.release()
681 except error.LockError:
688 except error.LockError:
682 return '2\n' # error: 2
689 return '2\n' # error: 2
683
690
684 def streamer(repo, entries, total):
691 def streamer(repo, entries, total):
685 '''stream out all metadata files in repository.'''
692 '''stream out all metadata files in repository.'''
686 yield '0\n' # success
693 yield '0\n' # success
687 repo.ui.debug('%d files, %d bytes to transfer\n' %
694 repo.ui.debug('%d files, %d bytes to transfer\n' %
688 (len(entries), total_bytes))
695 (len(entries), total_bytes))
689 yield '%d %d\n' % (len(entries), total_bytes)
696 yield '%d %d\n' % (len(entries), total_bytes)
690
697
691 sopener = repo.sopener
698 sopener = repo.sopener
692 oldaudit = sopener.mustaudit
699 oldaudit = sopener.mustaudit
693 debugflag = repo.ui.debugflag
700 debugflag = repo.ui.debugflag
694 sopener.mustaudit = False
701 sopener.mustaudit = False
695
702
696 try:
703 try:
697 for name, size in entries:
704 for name, size in entries:
698 if debugflag:
705 if debugflag:
699 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
706 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
700 # partially encode name over the wire for backwards compat
707 # partially encode name over the wire for backwards compat
701 yield '%s\0%d\n' % (store.encodedir(name), size)
708 yield '%s\0%d\n' % (store.encodedir(name), size)
702 if size <= 65536:
709 if size <= 65536:
703 fp = sopener(name)
710 fp = sopener(name)
704 try:
711 try:
705 data = fp.read(size)
712 data = fp.read(size)
706 finally:
713 finally:
707 fp.close()
714 fp.close()
708 yield data
715 yield data
709 else:
716 else:
710 for chunk in util.filechunkiter(sopener(name), limit=size):
717 for chunk in util.filechunkiter(sopener(name), limit=size):
711 yield chunk
718 yield chunk
712 # replace with "finally:" when support for python 2.4 has been dropped
719 # replace with "finally:" when support for python 2.4 has been dropped
713 except Exception:
720 except Exception:
714 sopener.mustaudit = oldaudit
721 sopener.mustaudit = oldaudit
715 raise
722 raise
716 sopener.mustaudit = oldaudit
723 sopener.mustaudit = oldaudit
717
724
718 return streamres(streamer(repo, entries, total_bytes))
725 return streamres(streamer(repo, entries, total_bytes))
719
726
720 def unbundle(repo, proto, heads):
727 def unbundle(repo, proto, heads):
721 their_heads = decodelist(heads)
728 their_heads = decodelist(heads)
722
729
723 def check_heads():
730 def check_heads():
724 heads = repo.heads()
731 heads = repo.heads()
725 heads_hash = util.sha1(''.join(sorted(heads))).digest()
732 heads_hash = util.sha1(''.join(sorted(heads))).digest()
726 return (their_heads == ['force'] or their_heads == heads or
733 return (their_heads == ['force'] or their_heads == heads or
727 their_heads == ['hashed', heads_hash])
734 their_heads == ['hashed', heads_hash])
728
735
729 proto.redirect()
736 proto.redirect()
730
737
731 # fail early if possible
738 # fail early if possible
732 if not check_heads():
739 if not check_heads():
733 return pusherr('repository changed while preparing changes - '
740 return pusherr('repository changed while preparing changes - '
734 'please try again')
741 'please try again')
735
742
736 # write bundle data to temporary file because it can be big
743 # write bundle data to temporary file because it can be big
737 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
744 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
738 fp = os.fdopen(fd, 'wb+')
745 fp = os.fdopen(fd, 'wb+')
739 r = 0
746 r = 0
740 try:
747 try:
741 proto.getfile(fp)
748 proto.getfile(fp)
742 lock = repo.lock()
749 lock = repo.lock()
743 try:
750 try:
744 if not check_heads():
751 if not check_heads():
745 # someone else committed/pushed/unbundled while we
752 # someone else committed/pushed/unbundled while we
746 # were transferring data
753 # were transferring data
747 return pusherr('repository changed while uploading changes - '
754 return pusherr('repository changed while uploading changes - '
748 'please try again')
755 'please try again')
749
756
750 # push can proceed
757 # push can proceed
751 fp.seek(0)
758 fp.seek(0)
752 gen = changegroupmod.readbundle(fp, None)
759 gen = changegroupmod.readbundle(fp, None)
753
760
754 try:
761 try:
755 r = repo.addchangegroup(gen, 'serve', proto._client())
762 r = repo.addchangegroup(gen, 'serve', proto._client())
756 except util.Abort, inst:
763 except util.Abort, inst:
757 sys.stderr.write("abort: %s\n" % inst)
764 sys.stderr.write("abort: %s\n" % inst)
758 finally:
765 finally:
759 lock.release()
766 lock.release()
760 return pushres(r)
767 return pushres(r)
761
768
762 finally:
769 finally:
763 fp.close()
770 fp.close()
764 os.unlink(tempname)
771 os.unlink(tempname)
765
772
766 commands = {
773 commands = {
767 'batch': (batch, 'cmds *'),
774 'batch': (batch, 'cmds *'),
768 'between': (between, 'pairs'),
775 'between': (between, 'pairs'),
769 'branchmap': (branchmap, ''),
776 'branchmap': (branchmap, ''),
770 'branches': (branches, 'nodes'),
777 'branches': (branches, 'nodes'),
771 'capabilities': (capabilities, ''),
778 'capabilities': (capabilities, ''),
772 'changegroup': (changegroup, 'roots'),
779 'changegroup': (changegroup, 'roots'),
773 'changegroupsubset': (changegroupsubset, 'bases heads'),
780 'changegroupsubset': (changegroupsubset, 'bases heads'),
774 'debugwireargs': (debugwireargs, 'one two *'),
781 'debugwireargs': (debugwireargs, 'one two *'),
775 'getbundle': (getbundle, '*'),
782 'getbundle': (getbundle, '*'),
776 'heads': (heads, ''),
783 'heads': (heads, ''),
777 'hello': (hello, ''),
784 'hello': (hello, ''),
778 'known': (known, 'nodes *'),
785 'known': (known, 'nodes *'),
779 'listkeys': (listkeys, 'namespace'),
786 'listkeys': (listkeys, 'namespace'),
780 'lookup': (lookup, 'key'),
787 'lookup': (lookup, 'key'),
781 'pushkey': (pushkey, 'namespace key old new'),
788 'pushkey': (pushkey, 'namespace key old new'),
782 'stream_out': (stream, ''),
789 'stream_out': (stream, ''),
783 'unbundle': (unbundle, 'heads'),
790 'unbundle': (unbundle, 'heads'),
784 }
791 }
General Comments 0
You need to be logged in to leave comments. Login now