##// END OF EJS Templates
httppeer: support for _calltwowaystream...
Pierre-Yves David -
r21074:f8a0d82b default
parent child Browse files
Show More
@@ -1,247 +1,269
1 # httppeer.py - HTTP repository proxy classes for mercurial
1 # httppeer.py - HTTP repository proxy classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from node import nullid
9 from node import nullid
10 from i18n import _
10 from i18n import _
11 import tempfile
11 import changegroup, statichttprepo, error, httpconnection, url, util, wireproto
12 import changegroup, statichttprepo, error, httpconnection, url, util, wireproto
12 import os, urllib, urllib2, zlib, httplib
13 import os, urllib, urllib2, zlib, httplib
13 import errno, socket
14 import errno, socket
14
15
15 def zgenerator(f):
16 def zgenerator(f):
16 zd = zlib.decompressobj()
17 zd = zlib.decompressobj()
17 try:
18 try:
18 for chunk in util.filechunkiter(f):
19 for chunk in util.filechunkiter(f):
19 while chunk:
20 while chunk:
20 yield zd.decompress(chunk, 2**18)
21 yield zd.decompress(chunk, 2**18)
21 chunk = zd.unconsumed_tail
22 chunk = zd.unconsumed_tail
22 except httplib.HTTPException:
23 except httplib.HTTPException:
23 raise IOError(None, _('connection ended unexpectedly'))
24 raise IOError(None, _('connection ended unexpectedly'))
24 yield zd.flush()
25 yield zd.flush()
25
26
26 class httppeer(wireproto.wirepeer):
27 class httppeer(wireproto.wirepeer):
27 def __init__(self, ui, path):
28 def __init__(self, ui, path):
28 self.path = path
29 self.path = path
29 self.caps = None
30 self.caps = None
30 self.handler = None
31 self.handler = None
31 self.urlopener = None
32 self.urlopener = None
32 u = util.url(path)
33 u = util.url(path)
33 if u.query or u.fragment:
34 if u.query or u.fragment:
34 raise util.Abort(_('unsupported URL component: "%s"') %
35 raise util.Abort(_('unsupported URL component: "%s"') %
35 (u.query or u.fragment))
36 (u.query or u.fragment))
36
37
37 # urllib cannot handle URLs with embedded user or passwd
38 # urllib cannot handle URLs with embedded user or passwd
38 self._url, authinfo = u.authinfo()
39 self._url, authinfo = u.authinfo()
39
40
40 self.ui = ui
41 self.ui = ui
41 self.ui.debug('using %s\n' % self._url)
42 self.ui.debug('using %s\n' % self._url)
42
43
43 self.urlopener = url.opener(ui, authinfo)
44 self.urlopener = url.opener(ui, authinfo)
44
45
45 def __del__(self):
46 def __del__(self):
46 if self.urlopener:
47 if self.urlopener:
47 for h in self.urlopener.handlers:
48 for h in self.urlopener.handlers:
48 h.close()
49 h.close()
49 getattr(h, "close_all", lambda : None)()
50 getattr(h, "close_all", lambda : None)()
50
51
51 def url(self):
52 def url(self):
52 return self.path
53 return self.path
53
54
54 # look up capabilities only when needed
55 # look up capabilities only when needed
55
56
56 def _fetchcaps(self):
57 def _fetchcaps(self):
57 self.caps = set(self._call('capabilities').split())
58 self.caps = set(self._call('capabilities').split())
58
59
59 def _capabilities(self):
60 def _capabilities(self):
60 if self.caps is None:
61 if self.caps is None:
61 try:
62 try:
62 self._fetchcaps()
63 self._fetchcaps()
63 except error.RepoError:
64 except error.RepoError:
64 self.caps = set()
65 self.caps = set()
65 self.ui.debug('capabilities: %s\n' %
66 self.ui.debug('capabilities: %s\n' %
66 (' '.join(self.caps or ['none'])))
67 (' '.join(self.caps or ['none'])))
67 return self.caps
68 return self.caps
68
69
69 def lock(self):
70 def lock(self):
70 raise util.Abort(_('operation not supported over http'))
71 raise util.Abort(_('operation not supported over http'))
71
72
72 def _callstream(self, cmd, **args):
73 def _callstream(self, cmd, **args):
73 if cmd == 'pushkey':
74 if cmd == 'pushkey':
74 args['data'] = ''
75 args['data'] = ''
75 data = args.pop('data', None)
76 data = args.pop('data', None)
76 size = 0
77 size = 0
77 if util.safehasattr(data, 'length'):
78 if util.safehasattr(data, 'length'):
78 size = data.length
79 size = data.length
79 elif data is not None:
80 elif data is not None:
80 size = len(data)
81 size = len(data)
81 headers = args.pop('headers', {})
82 headers = args.pop('headers', {})
82 if data is not None and 'Content-Type' not in headers:
83 if data is not None and 'Content-Type' not in headers:
83 headers['Content-Type'] = 'application/mercurial-0.1'
84 headers['Content-Type'] = 'application/mercurial-0.1'
84
85
85
86
86 if size and self.ui.configbool('ui', 'usehttp2', False):
87 if size and self.ui.configbool('ui', 'usehttp2', False):
87 headers['Expect'] = '100-Continue'
88 headers['Expect'] = '100-Continue'
88 headers['X-HgHttp2'] = '1'
89 headers['X-HgHttp2'] = '1'
89
90
90 self.ui.debug("sending %s command\n" % cmd)
91 self.ui.debug("sending %s command\n" % cmd)
91 q = [('cmd', cmd)]
92 q = [('cmd', cmd)]
92 headersize = 0
93 headersize = 0
93 if len(args) > 0:
94 if len(args) > 0:
94 httpheader = self.capable('httpheader')
95 httpheader = self.capable('httpheader')
95 if httpheader:
96 if httpheader:
96 headersize = int(httpheader.split(',')[0])
97 headersize = int(httpheader.split(',')[0])
97 if headersize > 0:
98 if headersize > 0:
98 # The headers can typically carry more data than the URL.
99 # The headers can typically carry more data than the URL.
99 encargs = urllib.urlencode(sorted(args.items()))
100 encargs = urllib.urlencode(sorted(args.items()))
100 headerfmt = 'X-HgArg-%s'
101 headerfmt = 'X-HgArg-%s'
101 contentlen = headersize - len(headerfmt % '000' + ': \r\n')
102 contentlen = headersize - len(headerfmt % '000' + ': \r\n')
102 headernum = 0
103 headernum = 0
103 for i in xrange(0, len(encargs), contentlen):
104 for i in xrange(0, len(encargs), contentlen):
104 headernum += 1
105 headernum += 1
105 header = headerfmt % str(headernum)
106 header = headerfmt % str(headernum)
106 headers[header] = encargs[i:i + contentlen]
107 headers[header] = encargs[i:i + contentlen]
107 varyheaders = [headerfmt % str(h) for h in range(1, headernum + 1)]
108 varyheaders = [headerfmt % str(h) for h in range(1, headernum + 1)]
108 headers['Vary'] = ','.join(varyheaders)
109 headers['Vary'] = ','.join(varyheaders)
109 else:
110 else:
110 q += sorted(args.items())
111 q += sorted(args.items())
111 qs = '?%s' % urllib.urlencode(q)
112 qs = '?%s' % urllib.urlencode(q)
112 cu = "%s%s" % (self._url, qs)
113 cu = "%s%s" % (self._url, qs)
113 req = urllib2.Request(cu, data, headers)
114 req = urllib2.Request(cu, data, headers)
114 if data is not None:
115 if data is not None:
115 self.ui.debug("sending %s bytes\n" % size)
116 self.ui.debug("sending %s bytes\n" % size)
116 req.add_unredirected_header('Content-Length', '%d' % size)
117 req.add_unredirected_header('Content-Length', '%d' % size)
117 try:
118 try:
118 resp = self.urlopener.open(req)
119 resp = self.urlopener.open(req)
119 except urllib2.HTTPError, inst:
120 except urllib2.HTTPError, inst:
120 if inst.code == 401:
121 if inst.code == 401:
121 raise util.Abort(_('authorization failed'))
122 raise util.Abort(_('authorization failed'))
122 raise
123 raise
123 except httplib.HTTPException, inst:
124 except httplib.HTTPException, inst:
124 self.ui.debug('http error while sending %s command\n' % cmd)
125 self.ui.debug('http error while sending %s command\n' % cmd)
125 self.ui.traceback()
126 self.ui.traceback()
126 raise IOError(None, inst)
127 raise IOError(None, inst)
127 except IndexError:
128 except IndexError:
128 # this only happens with Python 2.3, later versions raise URLError
129 # this only happens with Python 2.3, later versions raise URLError
129 raise util.Abort(_('http error, possibly caused by proxy setting'))
130 raise util.Abort(_('http error, possibly caused by proxy setting'))
130 # record the url we got redirected to
131 # record the url we got redirected to
131 resp_url = resp.geturl()
132 resp_url = resp.geturl()
132 if resp_url.endswith(qs):
133 if resp_url.endswith(qs):
133 resp_url = resp_url[:-len(qs)]
134 resp_url = resp_url[:-len(qs)]
134 if self._url.rstrip('/') != resp_url.rstrip('/'):
135 if self._url.rstrip('/') != resp_url.rstrip('/'):
135 if not self.ui.quiet:
136 if not self.ui.quiet:
136 self.ui.warn(_('real URL is %s\n') % resp_url)
137 self.ui.warn(_('real URL is %s\n') % resp_url)
137 self._url = resp_url
138 self._url = resp_url
138 try:
139 try:
139 proto = resp.getheader('content-type')
140 proto = resp.getheader('content-type')
140 except AttributeError:
141 except AttributeError:
141 proto = resp.headers.get('content-type', '')
142 proto = resp.headers.get('content-type', '')
142
143
143 safeurl = util.hidepassword(self._url)
144 safeurl = util.hidepassword(self._url)
144 if proto.startswith('application/hg-error'):
145 if proto.startswith('application/hg-error'):
145 raise error.OutOfBandError(resp.read())
146 raise error.OutOfBandError(resp.read())
146 # accept old "text/plain" and "application/hg-changegroup" for now
147 # accept old "text/plain" and "application/hg-changegroup" for now
147 if not (proto.startswith('application/mercurial-') or
148 if not (proto.startswith('application/mercurial-') or
148 (proto.startswith('text/plain')
149 (proto.startswith('text/plain')
149 and not resp.headers.get('content-length')) or
150 and not resp.headers.get('content-length')) or
150 proto.startswith('application/hg-changegroup')):
151 proto.startswith('application/hg-changegroup')):
151 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
152 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
152 raise error.RepoError(
153 raise error.RepoError(
153 _("'%s' does not appear to be an hg repository:\n"
154 _("'%s' does not appear to be an hg repository:\n"
154 "---%%<--- (%s)\n%s\n---%%<---\n")
155 "---%%<--- (%s)\n%s\n---%%<---\n")
155 % (safeurl, proto or 'no content-type', resp.read(1024)))
156 % (safeurl, proto or 'no content-type', resp.read(1024)))
156
157
157 if proto.startswith('application/mercurial-'):
158 if proto.startswith('application/mercurial-'):
158 try:
159 try:
159 version = proto.split('-', 1)[1]
160 version = proto.split('-', 1)[1]
160 version_info = tuple([int(n) for n in version.split('.')])
161 version_info = tuple([int(n) for n in version.split('.')])
161 except ValueError:
162 except ValueError:
162 raise error.RepoError(_("'%s' sent a broken Content-Type "
163 raise error.RepoError(_("'%s' sent a broken Content-Type "
163 "header (%s)") % (safeurl, proto))
164 "header (%s)") % (safeurl, proto))
164 if version_info > (0, 1):
165 if version_info > (0, 1):
165 raise error.RepoError(_("'%s' uses newer protocol %s") %
166 raise error.RepoError(_("'%s' uses newer protocol %s") %
166 (safeurl, version))
167 (safeurl, version))
167
168
168 return resp
169 return resp
169
170
170 def _call(self, cmd, **args):
171 def _call(self, cmd, **args):
171 fp = self._callstream(cmd, **args)
172 fp = self._callstream(cmd, **args)
172 try:
173 try:
173 return fp.read()
174 return fp.read()
174 finally:
175 finally:
175 # if using keepalive, allow connection to be reused
176 # if using keepalive, allow connection to be reused
176 fp.close()
177 fp.close()
177
178
178 def _callpush(self, cmd, cg, **args):
179 def _callpush(self, cmd, cg, **args):
179 # have to stream bundle to a temp file because we do not have
180 # have to stream bundle to a temp file because we do not have
180 # http 1.1 chunked transfer.
181 # http 1.1 chunked transfer.
181
182
182 types = self.capable('unbundle')
183 types = self.capable('unbundle')
183 try:
184 try:
184 types = types.split(',')
185 types = types.split(',')
185 except AttributeError:
186 except AttributeError:
186 # servers older than d1b16a746db6 will send 'unbundle' as a
187 # servers older than d1b16a746db6 will send 'unbundle' as a
187 # boolean capability. They only support headerless/uncompressed
188 # boolean capability. They only support headerless/uncompressed
188 # bundles.
189 # bundles.
189 types = [""]
190 types = [""]
190 for x in types:
191 for x in types:
191 if x in changegroup.bundletypes:
192 if x in changegroup.bundletypes:
192 type = x
193 type = x
193 break
194 break
194
195
195 tempname = changegroup.writebundle(cg, None, type)
196 tempname = changegroup.writebundle(cg, None, type)
196 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
197 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
197 headers = {'Content-Type': 'application/mercurial-0.1'}
198 headers = {'Content-Type': 'application/mercurial-0.1'}
198
199
199 try:
200 try:
200 try:
201 try:
201 r = self._call(cmd, data=fp, headers=headers, **args)
202 r = self._call(cmd, data=fp, headers=headers, **args)
202 vals = r.split('\n', 1)
203 vals = r.split('\n', 1)
203 if len(vals) < 2:
204 if len(vals) < 2:
204 raise error.ResponseError(_("unexpected response:"), r)
205 raise error.ResponseError(_("unexpected response:"), r)
205 return vals
206 return vals
206 except socket.error, err:
207 except socket.error, err:
207 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
208 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
208 raise util.Abort(_('push failed: %s') % err.args[1])
209 raise util.Abort(_('push failed: %s') % err.args[1])
209 raise util.Abort(err.args[1])
210 raise util.Abort(err.args[1])
210 finally:
211 finally:
211 fp.close()
212 fp.close()
212 os.unlink(tempname)
213 os.unlink(tempname)
213
214
215 def _calltwowaystream(self, cmd, fp, **args):
216 fh = None
217 filename = None
218 try:
219 # dump bundle to disk
220 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
221 fh = os.fdopen(fd, "wb")
222 d = fp.read(4096)
223 while d:
224 fh.write(d)
225 d = fp.read(4096)
226 fh.close()
227 # start http push
228 fp = httpconnection.httpsendfile(self.ui, filename, "rb")
229 headers = {'Content-Type': 'application/mercurial-0.1'}
230 return self._callstream(cmd, data=fp, headers=headers, **args)
231 finally:
232 if fh is not None:
233 fh.close()
234 os.unlink(filename)
235
214 def _callcompressable(self, cmd, **args):
236 def _callcompressable(self, cmd, **args):
215 stream = self._callstream(cmd, **args)
237 stream = self._callstream(cmd, **args)
216 return util.chunkbuffer(zgenerator(stream))
238 return util.chunkbuffer(zgenerator(stream))
217
239
218 class httpspeer(httppeer):
240 class httpspeer(httppeer):
219 def __init__(self, ui, path):
241 def __init__(self, ui, path):
220 if not url.has_https:
242 if not url.has_https:
221 raise util.Abort(_('Python support for SSL and HTTPS '
243 raise util.Abort(_('Python support for SSL and HTTPS '
222 'is not installed'))
244 'is not installed'))
223 httppeer.__init__(self, ui, path)
245 httppeer.__init__(self, ui, path)
224
246
225 def instance(ui, path, create):
247 def instance(ui, path, create):
226 if create:
248 if create:
227 raise util.Abort(_('cannot create new http repository'))
249 raise util.Abort(_('cannot create new http repository'))
228 try:
250 try:
229 if path.startswith('https:'):
251 if path.startswith('https:'):
230 inst = httpspeer(ui, path)
252 inst = httpspeer(ui, path)
231 else:
253 else:
232 inst = httppeer(ui, path)
254 inst = httppeer(ui, path)
233 try:
255 try:
234 # Try to do useful work when checking compatibility.
256 # Try to do useful work when checking compatibility.
235 # Usually saves a roundtrip since we want the caps anyway.
257 # Usually saves a roundtrip since we want the caps anyway.
236 inst._fetchcaps()
258 inst._fetchcaps()
237 except error.RepoError:
259 except error.RepoError:
238 # No luck, try older compatibility check.
260 # No luck, try older compatibility check.
239 inst.between([(nullid, nullid)])
261 inst.between([(nullid, nullid)])
240 return inst
262 return inst
241 except error.RepoError, httpexception:
263 except error.RepoError, httpexception:
242 try:
264 try:
243 r = statichttprepo.instance(ui, "static-" + path, create)
265 r = statichttprepo.instance(ui, "static-" + path, create)
244 ui.note('(falling back to static-http)\n')
266 ui.note('(falling back to static-http)\n')
245 return r
267 return r
246 except error.RepoError:
268 except error.RepoError:
247 raise httpexception # use the original http RepoError instead
269 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now