##// END OF EJS Templates
largefiles: getlfile must hit end of HTTP chunked streams to reuse connections...
Mads Kiilerich -
r19006:0b3b8422 default
parent child Browse files
Show More
@@ -1,167 +1,174
1 1 # Copyright 2011 Fog Creek Software
2 2 #
3 3 # This software may be used and distributed according to the terms of the
4 4 # GNU General Public License version 2 or any later version.
5 5
6 6 import os
7 7 import urllib2
8 8
9 9 from mercurial import error, httppeer, util, wireproto
10 10 from mercurial.wireproto import batchable, future
11 11 from mercurial.i18n import _
12 12
13 13 import lfutil
14 14
15 15 LARGEFILES_REQUIRED_MSG = ('\nThis repository uses the largefiles extension.'
16 16 '\n\nPlease enable it in your Mercurial config '
17 17 'file.\n')
18 18
19 19 # these will all be replaced by largefiles.uisetup
20 20 capabilitiesorig = None
21 21 ssholdcallstream = None
22 22 httpoldcallstream = None
23 23
24 24 def putlfile(repo, proto, sha):
25 25 '''Put a largefile into a repository's local store and into the
26 26 user cache.'''
27 27 proto.redirect()
28 28
29 29 path = lfutil.storepath(repo, sha)
30 30 util.makedirs(os.path.dirname(path))
31 31 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
32 32
33 33 try:
34 34 try:
35 35 proto.getfile(tmpfp)
36 36 tmpfp._fp.seek(0)
37 37 if sha != lfutil.hexsha1(tmpfp._fp):
38 38 raise IOError(0, _('largefile contents do not match hash'))
39 39 tmpfp.close()
40 40 lfutil.linktousercache(repo, sha)
41 41 except IOError, e:
42 42 repo.ui.warn(_('largefiles: failed to put %s into store: %s') %
43 43 (sha, e.strerror))
44 44 return wireproto.pushres(1)
45 45 finally:
46 46 tmpfp.discard()
47 47
48 48 return wireproto.pushres(0)
49 49
50 50 def getlfile(repo, proto, sha):
51 51 '''Retrieve a largefile from the repository-local cache or system
52 52 cache.'''
53 53 filename = lfutil.findfile(repo, sha)
54 54 if not filename:
55 55 raise util.Abort(_('requested largefile %s not present in cache') % sha)
56 56 f = open(filename, 'rb')
57 57 length = os.fstat(f.fileno())[6]
58 58
59 59 # Since we can't set an HTTP content-length header here, and
60 60 # Mercurial core provides no way to give the length of a streamres
61 61 # (and reading the entire file into RAM would be ill-advised), we
62 62 # just send the length on the first line of the response, like the
63 63 # ssh proto does for string responses.
64 64 def generator():
65 65 yield '%d\n' % length
66 66 for chunk in f:
67 67 yield chunk
68 68 return wireproto.streamres(generator())
69 69
70 70 def statlfile(repo, proto, sha):
71 71 '''Return '2\n' if the largefile is missing, '0\n' if it seems to be in
72 72 good condition.
73 73
74 74 The value 1 is reserved for mismatched checksum, but that is too expensive
75 75 to be verified on every stat and must be caught be running 'hg verify'
76 76 server side.'''
77 77 filename = lfutil.findfile(repo, sha)
78 78 if not filename:
79 79 return '2\n'
80 80 return '0\n'
81 81
82 82 def wirereposetup(ui, repo):
83 83 class lfileswirerepository(repo.__class__):
84 84 def putlfile(self, sha, fd):
85 85 # unfortunately, httprepository._callpush tries to convert its
86 86 # input file-like into a bundle before sending it, so we can't use
87 87 # it ...
88 88 if issubclass(self.__class__, httppeer.httppeer):
89 89 res = None
90 90 try:
91 91 res = self._call('putlfile', data=fd, sha=sha,
92 92 headers={'content-type':'application/mercurial-0.1'})
93 93 d, output = res.split('\n', 1)
94 94 for l in output.splitlines(True):
95 95 self.ui.warn(_('remote: '), l, '\n')
96 96 return int(d)
97 97 except (ValueError, urllib2.HTTPError):
98 98 self.ui.warn(_('unexpected putlfile response: %s') % res)
99 99 return 1
100 100 # ... but we can't use sshrepository._call because the data=
101 101 # argument won't get sent, and _callpush does exactly what we want
102 102 # in this case: send the data straight through
103 103 else:
104 104 try:
105 105 ret, output = self._callpush("putlfile", fd, sha=sha)
106 106 if ret == "":
107 107 raise error.ResponseError(_('putlfile failed:'),
108 108 output)
109 109 return int(ret)
110 110 except IOError:
111 111 return 1
112 112 except ValueError:
113 113 raise error.ResponseError(
114 114 _('putlfile failed (unexpected response):'), ret)
115 115
116 116 def getlfile(self, sha):
117 117 """returns an iterable with the chunks of the file with sha sha"""
118 118 stream = self._callstream("getlfile", sha=sha)
119 119 length = stream.readline()
120 120 try:
121 121 length = int(length)
122 122 except ValueError:
123 123 self._abort(error.ResponseError(_("unexpected response:"),
124 124 length))
125 125
126 126 # SSH streams will block if reading more than length
127 127 for chunk in util.filechunkiter(stream, 128 * 1024, length):
128 128 yield chunk
129 # HTTP streams must hit the end to process the last empty
130 # chunk of Chunked-Encoding so the connection can be reused.
131 if issubclass(self.__class__, httppeer.httppeer):
132 chunk = stream.read(1)
133 if chunk:
134 self._abort(error.ResponseError(_("unexpected response:"),
135 chunk))
129 136
130 137 @batchable
131 138 def statlfile(self, sha):
132 139 f = future()
133 140 result = {'sha': sha}
134 141 yield result, f
135 142 try:
136 143 yield int(f.value)
137 144 except (ValueError, urllib2.HTTPError):
138 145 # If the server returns anything but an integer followed by a
139 146 # newline, newline, it's not speaking our language; if we get
140 147 # an HTTP error, we can't be sure the largefile is present;
141 148 # either way, consider it missing.
142 149 yield 2
143 150
144 151 repo.__class__ = lfileswirerepository
145 152
146 153 # advertise the largefiles=serve capability
147 154 def capabilities(repo, proto):
148 155 return capabilitiesorig(repo, proto) + ' largefiles=serve'
149 156
150 157 def heads(repo, proto):
151 158 if lfutil.islfilesrepo(repo):
152 159 return wireproto.ooberror(LARGEFILES_REQUIRED_MSG)
153 160 return wireproto.heads(repo, proto)
154 161
155 162 def sshrepocallstream(self, cmd, **args):
156 163 if cmd == 'heads' and self.capable('largefiles'):
157 164 cmd = 'lheads'
158 165 if cmd == 'batch' and self.capable('largefiles'):
159 166 args['cmds'] = args['cmds'].replace('heads ', 'lheads ')
160 167 return ssholdcallstream(self, cmd, **args)
161 168
162 169 def httprepocallstream(self, cmd, **args):
163 170 if cmd == 'heads' and self.capable('largefiles'):
164 171 cmd = 'lheads'
165 172 if cmd == 'batch' and self.capable('largefiles'):
166 173 args['cmds'] = args['cmds'].replace('heads ', 'lheads ')
167 174 return httpoldcallstream(self, cmd, **args)
General Comments 0
You need to be logged in to leave comments. Login now