##// END OF EJS Templates
httppeer: close the temporary bundle file after two-way streaming it...
Matt Harbison -
r23086:cde6904f stable
parent child Browse files
Show More
@@ -1,272 +1,275
1 # httppeer.py - HTTP repository proxy classes for mercurial
1 # httppeer.py - HTTP repository proxy classes for mercurial
2 #
2 #
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from node import nullid
9 from node import nullid
10 from i18n import _
10 from i18n import _
11 import tempfile
11 import tempfile
12 import changegroup, statichttprepo, error, httpconnection, url, util, wireproto
12 import changegroup, statichttprepo, error, httpconnection, url, util, wireproto
13 import os, urllib, urllib2, zlib, httplib
13 import os, urllib, urllib2, zlib, httplib
14 import errno, socket
14 import errno, socket
15
15
16 def zgenerator(f):
16 def zgenerator(f):
17 zd = zlib.decompressobj()
17 zd = zlib.decompressobj()
18 try:
18 try:
19 for chunk in util.filechunkiter(f):
19 for chunk in util.filechunkiter(f):
20 while chunk:
20 while chunk:
21 yield zd.decompress(chunk, 2**18)
21 yield zd.decompress(chunk, 2**18)
22 chunk = zd.unconsumed_tail
22 chunk = zd.unconsumed_tail
23 except httplib.HTTPException:
23 except httplib.HTTPException:
24 raise IOError(None, _('connection ended unexpectedly'))
24 raise IOError(None, _('connection ended unexpectedly'))
25 yield zd.flush()
25 yield zd.flush()
26
26
27 class httppeer(wireproto.wirepeer):
27 class httppeer(wireproto.wirepeer):
28 def __init__(self, ui, path):
28 def __init__(self, ui, path):
29 self.path = path
29 self.path = path
30 self.caps = None
30 self.caps = None
31 self.handler = None
31 self.handler = None
32 self.urlopener = None
32 self.urlopener = None
33 u = util.url(path)
33 u = util.url(path)
34 if u.query or u.fragment:
34 if u.query or u.fragment:
35 raise util.Abort(_('unsupported URL component: "%s"') %
35 raise util.Abort(_('unsupported URL component: "%s"') %
36 (u.query or u.fragment))
36 (u.query or u.fragment))
37
37
38 # urllib cannot handle URLs with embedded user or passwd
38 # urllib cannot handle URLs with embedded user or passwd
39 self._url, authinfo = u.authinfo()
39 self._url, authinfo = u.authinfo()
40
40
41 self.ui = ui
41 self.ui = ui
42 self.ui.debug('using %s\n' % self._url)
42 self.ui.debug('using %s\n' % self._url)
43
43
44 self.urlopener = url.opener(ui, authinfo)
44 self.urlopener = url.opener(ui, authinfo)
45
45
46 def __del__(self):
46 def __del__(self):
47 if self.urlopener:
47 if self.urlopener:
48 for h in self.urlopener.handlers:
48 for h in self.urlopener.handlers:
49 h.close()
49 h.close()
50 getattr(h, "close_all", lambda : None)()
50 getattr(h, "close_all", lambda : None)()
51
51
52 def url(self):
52 def url(self):
53 return self.path
53 return self.path
54
54
55 # look up capabilities only when needed
55 # look up capabilities only when needed
56
56
57 def _fetchcaps(self):
57 def _fetchcaps(self):
58 self.caps = set(self._call('capabilities').split())
58 self.caps = set(self._call('capabilities').split())
59
59
60 def _capabilities(self):
60 def _capabilities(self):
61 if self.caps is None:
61 if self.caps is None:
62 try:
62 try:
63 self._fetchcaps()
63 self._fetchcaps()
64 except error.RepoError:
64 except error.RepoError:
65 self.caps = set()
65 self.caps = set()
66 self.ui.debug('capabilities: %s\n' %
66 self.ui.debug('capabilities: %s\n' %
67 (' '.join(self.caps or ['none'])))
67 (' '.join(self.caps or ['none'])))
68 return self.caps
68 return self.caps
69
69
70 def lock(self):
70 def lock(self):
71 raise util.Abort(_('operation not supported over http'))
71 raise util.Abort(_('operation not supported over http'))
72
72
73 def _callstream(self, cmd, **args):
73 def _callstream(self, cmd, **args):
74 if cmd == 'pushkey':
74 if cmd == 'pushkey':
75 args['data'] = ''
75 args['data'] = ''
76 data = args.pop('data', None)
76 data = args.pop('data', None)
77 size = 0
77 size = 0
78 if util.safehasattr(data, 'length'):
78 if util.safehasattr(data, 'length'):
79 size = data.length
79 size = data.length
80 elif data is not None:
80 elif data is not None:
81 size = len(data)
81 size = len(data)
82 headers = args.pop('headers', {})
82 headers = args.pop('headers', {})
83 if data is not None and 'Content-Type' not in headers:
83 if data is not None and 'Content-Type' not in headers:
84 headers['Content-Type'] = 'application/mercurial-0.1'
84 headers['Content-Type'] = 'application/mercurial-0.1'
85
85
86
86
87 if size and self.ui.configbool('ui', 'usehttp2', False):
87 if size and self.ui.configbool('ui', 'usehttp2', False):
88 headers['Expect'] = '100-Continue'
88 headers['Expect'] = '100-Continue'
89 headers['X-HgHttp2'] = '1'
89 headers['X-HgHttp2'] = '1'
90
90
91 self.ui.debug("sending %s command\n" % cmd)
91 self.ui.debug("sending %s command\n" % cmd)
92 q = [('cmd', cmd)]
92 q = [('cmd', cmd)]
93 headersize = 0
93 headersize = 0
94 if len(args) > 0:
94 if len(args) > 0:
95 httpheader = self.capable('httpheader')
95 httpheader = self.capable('httpheader')
96 if httpheader:
96 if httpheader:
97 headersize = int(httpheader.split(',')[0])
97 headersize = int(httpheader.split(',')[0])
98 if headersize > 0:
98 if headersize > 0:
99 # The headers can typically carry more data than the URL.
99 # The headers can typically carry more data than the URL.
100 encargs = urllib.urlencode(sorted(args.items()))
100 encargs = urllib.urlencode(sorted(args.items()))
101 headerfmt = 'X-HgArg-%s'
101 headerfmt = 'X-HgArg-%s'
102 contentlen = headersize - len(headerfmt % '000' + ': \r\n')
102 contentlen = headersize - len(headerfmt % '000' + ': \r\n')
103 headernum = 0
103 headernum = 0
104 for i in xrange(0, len(encargs), contentlen):
104 for i in xrange(0, len(encargs), contentlen):
105 headernum += 1
105 headernum += 1
106 header = headerfmt % str(headernum)
106 header = headerfmt % str(headernum)
107 headers[header] = encargs[i:i + contentlen]
107 headers[header] = encargs[i:i + contentlen]
108 varyheaders = [headerfmt % str(h) for h in range(1, headernum + 1)]
108 varyheaders = [headerfmt % str(h) for h in range(1, headernum + 1)]
109 headers['Vary'] = ','.join(varyheaders)
109 headers['Vary'] = ','.join(varyheaders)
110 else:
110 else:
111 q += sorted(args.items())
111 q += sorted(args.items())
112 qs = '?%s' % urllib.urlencode(q)
112 qs = '?%s' % urllib.urlencode(q)
113 cu = "%s%s" % (self._url, qs)
113 cu = "%s%s" % (self._url, qs)
114 req = urllib2.Request(cu, data, headers)
114 req = urllib2.Request(cu, data, headers)
115 if data is not None:
115 if data is not None:
116 self.ui.debug("sending %s bytes\n" % size)
116 self.ui.debug("sending %s bytes\n" % size)
117 req.add_unredirected_header('Content-Length', '%d' % size)
117 req.add_unredirected_header('Content-Length', '%d' % size)
118 try:
118 try:
119 resp = self.urlopener.open(req)
119 resp = self.urlopener.open(req)
120 except urllib2.HTTPError, inst:
120 except urllib2.HTTPError, inst:
121 if inst.code == 401:
121 if inst.code == 401:
122 raise util.Abort(_('authorization failed'))
122 raise util.Abort(_('authorization failed'))
123 raise
123 raise
124 except httplib.HTTPException, inst:
124 except httplib.HTTPException, inst:
125 self.ui.debug('http error while sending %s command\n' % cmd)
125 self.ui.debug('http error while sending %s command\n' % cmd)
126 self.ui.traceback()
126 self.ui.traceback()
127 raise IOError(None, inst)
127 raise IOError(None, inst)
128 except IndexError:
128 except IndexError:
129 # this only happens with Python 2.3, later versions raise URLError
129 # this only happens with Python 2.3, later versions raise URLError
130 raise util.Abort(_('http error, possibly caused by proxy setting'))
130 raise util.Abort(_('http error, possibly caused by proxy setting'))
131 # record the url we got redirected to
131 # record the url we got redirected to
132 resp_url = resp.geturl()
132 resp_url = resp.geturl()
133 if resp_url.endswith(qs):
133 if resp_url.endswith(qs):
134 resp_url = resp_url[:-len(qs)]
134 resp_url = resp_url[:-len(qs)]
135 if self._url.rstrip('/') != resp_url.rstrip('/'):
135 if self._url.rstrip('/') != resp_url.rstrip('/'):
136 if not self.ui.quiet:
136 if not self.ui.quiet:
137 self.ui.warn(_('real URL is %s\n') % resp_url)
137 self.ui.warn(_('real URL is %s\n') % resp_url)
138 self._url = resp_url
138 self._url = resp_url
139 try:
139 try:
140 proto = resp.getheader('content-type')
140 proto = resp.getheader('content-type')
141 except AttributeError:
141 except AttributeError:
142 proto = resp.headers.get('content-type', '')
142 proto = resp.headers.get('content-type', '')
143
143
144 safeurl = util.hidepassword(self._url)
144 safeurl = util.hidepassword(self._url)
145 if proto.startswith('application/hg-error'):
145 if proto.startswith('application/hg-error'):
146 raise error.OutOfBandError(resp.read())
146 raise error.OutOfBandError(resp.read())
147 # accept old "text/plain" and "application/hg-changegroup" for now
147 # accept old "text/plain" and "application/hg-changegroup" for now
148 if not (proto.startswith('application/mercurial-') or
148 if not (proto.startswith('application/mercurial-') or
149 (proto.startswith('text/plain')
149 (proto.startswith('text/plain')
150 and not resp.headers.get('content-length')) or
150 and not resp.headers.get('content-length')) or
151 proto.startswith('application/hg-changegroup')):
151 proto.startswith('application/hg-changegroup')):
152 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
152 self.ui.debug("requested URL: '%s'\n" % util.hidepassword(cu))
153 raise error.RepoError(
153 raise error.RepoError(
154 _("'%s' does not appear to be an hg repository:\n"
154 _("'%s' does not appear to be an hg repository:\n"
155 "---%%<--- (%s)\n%s\n---%%<---\n")
155 "---%%<--- (%s)\n%s\n---%%<---\n")
156 % (safeurl, proto or 'no content-type', resp.read(1024)))
156 % (safeurl, proto or 'no content-type', resp.read(1024)))
157
157
158 if proto.startswith('application/mercurial-'):
158 if proto.startswith('application/mercurial-'):
159 try:
159 try:
160 version = proto.split('-', 1)[1]
160 version = proto.split('-', 1)[1]
161 version_info = tuple([int(n) for n in version.split('.')])
161 version_info = tuple([int(n) for n in version.split('.')])
162 except ValueError:
162 except ValueError:
163 raise error.RepoError(_("'%s' sent a broken Content-Type "
163 raise error.RepoError(_("'%s' sent a broken Content-Type "
164 "header (%s)") % (safeurl, proto))
164 "header (%s)") % (safeurl, proto))
165 if version_info > (0, 1):
165 if version_info > (0, 1):
166 raise error.RepoError(_("'%s' uses newer protocol %s") %
166 raise error.RepoError(_("'%s' uses newer protocol %s") %
167 (safeurl, version))
167 (safeurl, version))
168
168
169 return resp
169 return resp
170
170
171 def _call(self, cmd, **args):
171 def _call(self, cmd, **args):
172 fp = self._callstream(cmd, **args)
172 fp = self._callstream(cmd, **args)
173 try:
173 try:
174 return fp.read()
174 return fp.read()
175 finally:
175 finally:
176 # if using keepalive, allow connection to be reused
176 # if using keepalive, allow connection to be reused
177 fp.close()
177 fp.close()
178
178
179 def _callpush(self, cmd, cg, **args):
179 def _callpush(self, cmd, cg, **args):
180 # have to stream bundle to a temp file because we do not have
180 # have to stream bundle to a temp file because we do not have
181 # http 1.1 chunked transfer.
181 # http 1.1 chunked transfer.
182
182
183 types = self.capable('unbundle')
183 types = self.capable('unbundle')
184 try:
184 try:
185 types = types.split(',')
185 types = types.split(',')
186 except AttributeError:
186 except AttributeError:
187 # servers older than d1b16a746db6 will send 'unbundle' as a
187 # servers older than d1b16a746db6 will send 'unbundle' as a
188 # boolean capability. They only support headerless/uncompressed
188 # boolean capability. They only support headerless/uncompressed
189 # bundles.
189 # bundles.
190 types = [""]
190 types = [""]
191 for x in types:
191 for x in types:
192 if x in changegroup.bundletypes:
192 if x in changegroup.bundletypes:
193 type = x
193 type = x
194 break
194 break
195
195
196 tempname = changegroup.writebundle(cg, None, type)
196 tempname = changegroup.writebundle(cg, None, type)
197 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
197 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
198 headers = {'Content-Type': 'application/mercurial-0.1'}
198 headers = {'Content-Type': 'application/mercurial-0.1'}
199
199
200 try:
200 try:
201 try:
201 try:
202 r = self._call(cmd, data=fp, headers=headers, **args)
202 r = self._call(cmd, data=fp, headers=headers, **args)
203 vals = r.split('\n', 1)
203 vals = r.split('\n', 1)
204 if len(vals) < 2:
204 if len(vals) < 2:
205 raise error.ResponseError(_("unexpected response:"), r)
205 raise error.ResponseError(_("unexpected response:"), r)
206 return vals
206 return vals
207 except socket.error, err:
207 except socket.error, err:
208 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
208 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
209 raise util.Abort(_('push failed: %s') % err.args[1])
209 raise util.Abort(_('push failed: %s') % err.args[1])
210 raise util.Abort(err.args[1])
210 raise util.Abort(err.args[1])
211 finally:
211 finally:
212 fp.close()
212 fp.close()
213 os.unlink(tempname)
213 os.unlink(tempname)
214
214
215 def _calltwowaystream(self, cmd, fp, **args):
215 def _calltwowaystream(self, cmd, fp, **args):
216 fh = None
216 fh = None
217 fp_ = None
217 filename = None
218 filename = None
218 try:
219 try:
219 # dump bundle to disk
220 # dump bundle to disk
220 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
221 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
221 fh = os.fdopen(fd, "wb")
222 fh = os.fdopen(fd, "wb")
222 d = fp.read(4096)
223 d = fp.read(4096)
223 while d:
224 while d:
224 fh.write(d)
225 fh.write(d)
225 d = fp.read(4096)
226 d = fp.read(4096)
226 fh.close()
227 fh.close()
227 # start http push
228 # start http push
228 fp = httpconnection.httpsendfile(self.ui, filename, "rb")
229 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
229 headers = {'Content-Type': 'application/mercurial-0.1'}
230 headers = {'Content-Type': 'application/mercurial-0.1'}
230 return self._callstream(cmd, data=fp, headers=headers, **args)
231 return self._callstream(cmd, data=fp_, headers=headers, **args)
231 finally:
232 finally:
233 if fp_ is not None:
234 fp_.close()
232 if fh is not None:
235 if fh is not None:
233 fh.close()
236 fh.close()
234 os.unlink(filename)
237 os.unlink(filename)
235
238
236 def _callcompressable(self, cmd, **args):
239 def _callcompressable(self, cmd, **args):
237 stream = self._callstream(cmd, **args)
240 stream = self._callstream(cmd, **args)
238 return util.chunkbuffer(zgenerator(stream))
241 return util.chunkbuffer(zgenerator(stream))
239
242
240 def _abort(self, exception):
243 def _abort(self, exception):
241 raise exception
244 raise exception
242
245
243 class httpspeer(httppeer):
246 class httpspeer(httppeer):
244 def __init__(self, ui, path):
247 def __init__(self, ui, path):
245 if not url.has_https:
248 if not url.has_https:
246 raise util.Abort(_('Python support for SSL and HTTPS '
249 raise util.Abort(_('Python support for SSL and HTTPS '
247 'is not installed'))
250 'is not installed'))
248 httppeer.__init__(self, ui, path)
251 httppeer.__init__(self, ui, path)
249
252
250 def instance(ui, path, create):
253 def instance(ui, path, create):
251 if create:
254 if create:
252 raise util.Abort(_('cannot create new http repository'))
255 raise util.Abort(_('cannot create new http repository'))
253 try:
256 try:
254 if path.startswith('https:'):
257 if path.startswith('https:'):
255 inst = httpspeer(ui, path)
258 inst = httpspeer(ui, path)
256 else:
259 else:
257 inst = httppeer(ui, path)
260 inst = httppeer(ui, path)
258 try:
261 try:
259 # Try to do useful work when checking compatibility.
262 # Try to do useful work when checking compatibility.
260 # Usually saves a roundtrip since we want the caps anyway.
263 # Usually saves a roundtrip since we want the caps anyway.
261 inst._fetchcaps()
264 inst._fetchcaps()
262 except error.RepoError:
265 except error.RepoError:
263 # No luck, try older compatibility check.
266 # No luck, try older compatibility check.
264 inst.between([(nullid, nullid)])
267 inst.between([(nullid, nullid)])
265 return inst
268 return inst
266 except error.RepoError, httpexception:
269 except error.RepoError, httpexception:
267 try:
270 try:
268 r = statichttprepo.instance(ui, "static-" + path, create)
271 r = statichttprepo.instance(ui, "static-" + path, create)
269 ui.note('(falling back to static-http)\n')
272 ui.note('(falling back to static-http)\n')
270 return r
273 return r
271 except error.RepoError:
274 except error.RepoError:
272 raise httpexception # use the original http RepoError instead
275 raise httpexception # use the original http RepoError instead
General Comments 0
You need to be logged in to leave comments. Login now