##// END OF EJS Templates
largefiles: getlfile must hit end of HTTP chunked streams to reuse connections...
Mads Kiilerich -
r19006:0b3b8422 default
parent child Browse files
Show More
@@ -1,167 +1,174
1 # Copyright 2011 Fog Creek Software
1 # Copyright 2011 Fog Creek Software
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5
5
6 import os
6 import os
7 import urllib2
7 import urllib2
8
8
9 from mercurial import error, httppeer, util, wireproto
9 from mercurial import error, httppeer, util, wireproto
10 from mercurial.wireproto import batchable, future
10 from mercurial.wireproto import batchable, future
11 from mercurial.i18n import _
11 from mercurial.i18n import _
12
12
13 import lfutil
13 import lfutil
14
14
15 LARGEFILES_REQUIRED_MSG = ('\nThis repository uses the largefiles extension.'
15 LARGEFILES_REQUIRED_MSG = ('\nThis repository uses the largefiles extension.'
16 '\n\nPlease enable it in your Mercurial config '
16 '\n\nPlease enable it in your Mercurial config '
17 'file.\n')
17 'file.\n')
18
18
19 # these will all be replaced by largefiles.uisetup
19 # these will all be replaced by largefiles.uisetup
20 capabilitiesorig = None
20 capabilitiesorig = None
21 ssholdcallstream = None
21 ssholdcallstream = None
22 httpoldcallstream = None
22 httpoldcallstream = None
23
23
24 def putlfile(repo, proto, sha):
24 def putlfile(repo, proto, sha):
25 '''Put a largefile into a repository's local store and into the
25 '''Put a largefile into a repository's local store and into the
26 user cache.'''
26 user cache.'''
27 proto.redirect()
27 proto.redirect()
28
28
29 path = lfutil.storepath(repo, sha)
29 path = lfutil.storepath(repo, sha)
30 util.makedirs(os.path.dirname(path))
30 util.makedirs(os.path.dirname(path))
31 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
31 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
32
32
33 try:
33 try:
34 try:
34 try:
35 proto.getfile(tmpfp)
35 proto.getfile(tmpfp)
36 tmpfp._fp.seek(0)
36 tmpfp._fp.seek(0)
37 if sha != lfutil.hexsha1(tmpfp._fp):
37 if sha != lfutil.hexsha1(tmpfp._fp):
38 raise IOError(0, _('largefile contents do not match hash'))
38 raise IOError(0, _('largefile contents do not match hash'))
39 tmpfp.close()
39 tmpfp.close()
40 lfutil.linktousercache(repo, sha)
40 lfutil.linktousercache(repo, sha)
41 except IOError, e:
41 except IOError, e:
42 repo.ui.warn(_('largefiles: failed to put %s into store: %s') %
42 repo.ui.warn(_('largefiles: failed to put %s into store: %s') %
43 (sha, e.strerror))
43 (sha, e.strerror))
44 return wireproto.pushres(1)
44 return wireproto.pushres(1)
45 finally:
45 finally:
46 tmpfp.discard()
46 tmpfp.discard()
47
47
48 return wireproto.pushres(0)
48 return wireproto.pushres(0)
49
49
50 def getlfile(repo, proto, sha):
50 def getlfile(repo, proto, sha):
51 '''Retrieve a largefile from the repository-local cache or system
51 '''Retrieve a largefile from the repository-local cache or system
52 cache.'''
52 cache.'''
53 filename = lfutil.findfile(repo, sha)
53 filename = lfutil.findfile(repo, sha)
54 if not filename:
54 if not filename:
55 raise util.Abort(_('requested largefile %s not present in cache') % sha)
55 raise util.Abort(_('requested largefile %s not present in cache') % sha)
56 f = open(filename, 'rb')
56 f = open(filename, 'rb')
57 length = os.fstat(f.fileno())[6]
57 length = os.fstat(f.fileno())[6]
58
58
59 # Since we can't set an HTTP content-length header here, and
59 # Since we can't set an HTTP content-length header here, and
60 # Mercurial core provides no way to give the length of a streamres
60 # Mercurial core provides no way to give the length of a streamres
61 # (and reading the entire file into RAM would be ill-advised), we
61 # (and reading the entire file into RAM would be ill-advised), we
62 # just send the length on the first line of the response, like the
62 # just send the length on the first line of the response, like the
63 # ssh proto does for string responses.
63 # ssh proto does for string responses.
64 def generator():
64 def generator():
65 yield '%d\n' % length
65 yield '%d\n' % length
66 for chunk in f:
66 for chunk in f:
67 yield chunk
67 yield chunk
68 return wireproto.streamres(generator())
68 return wireproto.streamres(generator())
69
69
70 def statlfile(repo, proto, sha):
70 def statlfile(repo, proto, sha):
71 '''Return '2\n' if the largefile is missing, '0\n' if it seems to be in
71 '''Return '2\n' if the largefile is missing, '0\n' if it seems to be in
72 good condition.
72 good condition.
73
73
74 The value 1 is reserved for mismatched checksum, but that is too expensive
74 The value 1 is reserved for mismatched checksum, but that is too expensive
75 to be verified on every stat and must be caught be running 'hg verify'
75 to be verified on every stat and must be caught be running 'hg verify'
76 server side.'''
76 server side.'''
77 filename = lfutil.findfile(repo, sha)
77 filename = lfutil.findfile(repo, sha)
78 if not filename:
78 if not filename:
79 return '2\n'
79 return '2\n'
80 return '0\n'
80 return '0\n'
81
81
82 def wirereposetup(ui, repo):
82 def wirereposetup(ui, repo):
83 class lfileswirerepository(repo.__class__):
83 class lfileswirerepository(repo.__class__):
84 def putlfile(self, sha, fd):
84 def putlfile(self, sha, fd):
85 # unfortunately, httprepository._callpush tries to convert its
85 # unfortunately, httprepository._callpush tries to convert its
86 # input file-like into a bundle before sending it, so we can't use
86 # input file-like into a bundle before sending it, so we can't use
87 # it ...
87 # it ...
88 if issubclass(self.__class__, httppeer.httppeer):
88 if issubclass(self.__class__, httppeer.httppeer):
89 res = None
89 res = None
90 try:
90 try:
91 res = self._call('putlfile', data=fd, sha=sha,
91 res = self._call('putlfile', data=fd, sha=sha,
92 headers={'content-type':'application/mercurial-0.1'})
92 headers={'content-type':'application/mercurial-0.1'})
93 d, output = res.split('\n', 1)
93 d, output = res.split('\n', 1)
94 for l in output.splitlines(True):
94 for l in output.splitlines(True):
95 self.ui.warn(_('remote: '), l, '\n')
95 self.ui.warn(_('remote: '), l, '\n')
96 return int(d)
96 return int(d)
97 except (ValueError, urllib2.HTTPError):
97 except (ValueError, urllib2.HTTPError):
98 self.ui.warn(_('unexpected putlfile response: %s') % res)
98 self.ui.warn(_('unexpected putlfile response: %s') % res)
99 return 1
99 return 1
100 # ... but we can't use sshrepository._call because the data=
100 # ... but we can't use sshrepository._call because the data=
101 # argument won't get sent, and _callpush does exactly what we want
101 # argument won't get sent, and _callpush does exactly what we want
102 # in this case: send the data straight through
102 # in this case: send the data straight through
103 else:
103 else:
104 try:
104 try:
105 ret, output = self._callpush("putlfile", fd, sha=sha)
105 ret, output = self._callpush("putlfile", fd, sha=sha)
106 if ret == "":
106 if ret == "":
107 raise error.ResponseError(_('putlfile failed:'),
107 raise error.ResponseError(_('putlfile failed:'),
108 output)
108 output)
109 return int(ret)
109 return int(ret)
110 except IOError:
110 except IOError:
111 return 1
111 return 1
112 except ValueError:
112 except ValueError:
113 raise error.ResponseError(
113 raise error.ResponseError(
114 _('putlfile failed (unexpected response):'), ret)
114 _('putlfile failed (unexpected response):'), ret)
115
115
116 def getlfile(self, sha):
116 def getlfile(self, sha):
117 """returns an iterable with the chunks of the file with sha sha"""
117 """returns an iterable with the chunks of the file with sha sha"""
118 stream = self._callstream("getlfile", sha=sha)
118 stream = self._callstream("getlfile", sha=sha)
119 length = stream.readline()
119 length = stream.readline()
120 try:
120 try:
121 length = int(length)
121 length = int(length)
122 except ValueError:
122 except ValueError:
123 self._abort(error.ResponseError(_("unexpected response:"),
123 self._abort(error.ResponseError(_("unexpected response:"),
124 length))
124 length))
125
125
126 # SSH streams will block if reading more than length
126 # SSH streams will block if reading more than length
127 for chunk in util.filechunkiter(stream, 128 * 1024, length):
127 for chunk in util.filechunkiter(stream, 128 * 1024, length):
128 yield chunk
128 yield chunk
129 # HTTP streams must hit the end to process the last empty
130 # chunk of Chunked-Encoding so the connection can be reused.
131 if issubclass(self.__class__, httppeer.httppeer):
132 chunk = stream.read(1)
133 if chunk:
134 self._abort(error.ResponseError(_("unexpected response:"),
135 chunk))
129
136
130 @batchable
137 @batchable
131 def statlfile(self, sha):
138 def statlfile(self, sha):
132 f = future()
139 f = future()
133 result = {'sha': sha}
140 result = {'sha': sha}
134 yield result, f
141 yield result, f
135 try:
142 try:
136 yield int(f.value)
143 yield int(f.value)
137 except (ValueError, urllib2.HTTPError):
144 except (ValueError, urllib2.HTTPError):
138 # If the server returns anything but an integer followed by a
145 # If the server returns anything but an integer followed by a
139 # newline, newline, it's not speaking our language; if we get
146 # newline, newline, it's not speaking our language; if we get
140 # an HTTP error, we can't be sure the largefile is present;
147 # an HTTP error, we can't be sure the largefile is present;
141 # either way, consider it missing.
148 # either way, consider it missing.
142 yield 2
149 yield 2
143
150
144 repo.__class__ = lfileswirerepository
151 repo.__class__ = lfileswirerepository
145
152
146 # advertise the largefiles=serve capability
153 # advertise the largefiles=serve capability
147 def capabilities(repo, proto):
154 def capabilities(repo, proto):
148 return capabilitiesorig(repo, proto) + ' largefiles=serve'
155 return capabilitiesorig(repo, proto) + ' largefiles=serve'
149
156
150 def heads(repo, proto):
157 def heads(repo, proto):
151 if lfutil.islfilesrepo(repo):
158 if lfutil.islfilesrepo(repo):
152 return wireproto.ooberror(LARGEFILES_REQUIRED_MSG)
159 return wireproto.ooberror(LARGEFILES_REQUIRED_MSG)
153 return wireproto.heads(repo, proto)
160 return wireproto.heads(repo, proto)
154
161
155 def sshrepocallstream(self, cmd, **args):
162 def sshrepocallstream(self, cmd, **args):
156 if cmd == 'heads' and self.capable('largefiles'):
163 if cmd == 'heads' and self.capable('largefiles'):
157 cmd = 'lheads'
164 cmd = 'lheads'
158 if cmd == 'batch' and self.capable('largefiles'):
165 if cmd == 'batch' and self.capable('largefiles'):
159 args['cmds'] = args['cmds'].replace('heads ', 'lheads ')
166 args['cmds'] = args['cmds'].replace('heads ', 'lheads ')
160 return ssholdcallstream(self, cmd, **args)
167 return ssholdcallstream(self, cmd, **args)
161
168
162 def httprepocallstream(self, cmd, **args):
169 def httprepocallstream(self, cmd, **args):
163 if cmd == 'heads' and self.capable('largefiles'):
170 if cmd == 'heads' and self.capable('largefiles'):
164 cmd = 'lheads'
171 cmd = 'lheads'
165 if cmd == 'batch' and self.capable('largefiles'):
172 if cmd == 'batch' and self.capable('largefiles'):
166 args['cmds'] = args['cmds'].replace('heads ', 'lheads ')
173 args['cmds'] = args['cmds'].replace('heads ', 'lheads ')
167 return httpoldcallstream(self, cmd, **args)
174 return httpoldcallstream(self, cmd, **args)
General Comments 0
You need to be logged in to leave comments. Login now