##// END OF EJS Templates
streamclone: move streamin() into maybeperformstreamclone()...
Gregory Szorc -
r26459:3b28ffde default
parent child Browse files
Show More
@@ -1,287 +1,283 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import time
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 store,
17 17 util,
18 18 )
19 19
20 20 def canperformstreamclone(repo, remote, heads, streamrequested=None):
21 21 """Whether it is possible to perform a streaming clone as part of pull.
22 22
23 23 Returns a tuple of (supported, requirements). ``supported`` is True if
24 24 streaming clone is supported and False otherwise. ``requirements`` is
25 25 a set of repo requirements from the remote, or ``None`` if stream clone
26 26 isn't supported.
27 27 """
28 28 # Streaming clone only works on empty repositories.
29 29 if len(repo):
30 30 return False, None
31 31
32 32 # Streaming clone only works if all data is being requested.
33 33 if heads:
34 34 return False, None
35 35
36 36 # If we don't have a preference, let the server decide for us. This
37 37 # likely only comes into play in LANs.
38 38 if streamrequested is None:
39 39 # The server can advertise whether to prefer streaming clone.
40 40 streamrequested = remote.capable('stream-preferred')
41 41
42 42 if not streamrequested:
43 43 return False, None
44 44
45 45 # In order for stream clone to work, the client has to support all the
46 46 # requirements advertised by the server.
47 47 #
48 48 # The server advertises its requirements via the "stream" and "streamreqs"
49 49 # capability. "stream" (a value-less capability) is advertised if and only
50 50 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
51 51 # is advertised and contains a comma-delimited list of requirements.
52 52 requirements = set()
53 53 if remote.capable('stream'):
54 54 requirements.add('revlogv1')
55 55 else:
56 56 streamreqs = remote.capable('streamreqs')
57 57 # This is weird and shouldn't happen with modern servers.
58 58 if not streamreqs:
59 59 return False, None
60 60
61 61 streamreqs = set(streamreqs.split(','))
62 62 # Server requires something we don't support. Bail.
63 63 if streamreqs - repo.supportedformats:
64 64 return False, None
65 65 requirements = streamreqs
66 66
67 67 return True, requirements
68 68
69 69 def maybeperformstreamclone(pullop):
70 70 repo = pullop.repo
71 71 remote = pullop.remote
72 72
73 73 r = canperformstreamclone(repo, remote, pullop.heads,
74 74 streamrequested=pullop.streamclonerequested)
75 75 supported, requirements = r
76 76
77 77 if not supported:
78 78 return
79 79
80 streamin(repo, remote, requirements)
80 # Save remote branchmap. We will use it later to speed up branchcache
81 # creation.
82 rbranchmap = None
83 if remote.capable('branchmap'):
84 rbranchmap = remote.branchmap()
85
86 fp = remote.stream_out()
87 l = fp.readline()
88 try:
89 resp = int(l)
90 except ValueError:
91 raise error.ResponseError(
92 _('unexpected response from remote server:'), l)
93 if resp == 1:
94 raise util.Abort(_('operation forbidden by server'))
95 elif resp == 2:
96 raise util.Abort(_('locking the remote repository failed'))
97 elif resp != 0:
98 raise util.Abort(_('the server sent an unknown error code'))
99
100 applyremotedata(repo, requirements, rbranchmap, fp)
81 101
82 102 def allowservergeneration(ui):
83 103 """Whether streaming clones are allowed from the server."""
84 104 return ui.configbool('server', 'uncompressed', True, untrusted=True)
85 105
86 106 # This is it's own function so extensions can override it.
87 107 def _walkstreamfiles(repo):
88 108 return repo.store.walk()
89 109
90 110 def generatev1(repo):
91 111 """Emit content for version 1 of a streaming clone.
92 112
93 113 This is a generator of raw chunks that constitute a streaming clone.
94 114
95 115 The stream begins with a line of 2 space-delimited integers containing the
96 116 number of entries and total bytes size.
97 117
98 118 Next, are N entries for each file being transferred. Each file entry starts
99 119 as a line with the file name and integer size delimited by a null byte.
100 120 The raw file data follows. Following the raw file data is the next file
101 121 entry, or EOF.
102 122
103 123 When used on the wire protocol, an additional line indicating protocol
104 124 success will be prepended to the stream. This function is not responsible
105 125 for adding it.
106 126
107 127 This function will obtain a repository lock to ensure a consistent view of
108 128 the store is captured. It therefore may raise LockError.
109 129 """
110 130 entries = []
111 131 total_bytes = 0
112 132 # Get consistent snapshot of repo, lock during scan.
113 133 lock = repo.lock()
114 134 try:
115 135 repo.ui.debug('scanning\n')
116 136 for name, ename, size in _walkstreamfiles(repo):
117 137 if size:
118 138 entries.append((name, size))
119 139 total_bytes += size
120 140 finally:
121 141 lock.release()
122 142
123 143 repo.ui.debug('%d files, %d bytes to transfer\n' %
124 144 (len(entries), total_bytes))
125 145 yield '%d %d\n' % (len(entries), total_bytes)
126 146
127 147 svfs = repo.svfs
128 148 oldaudit = svfs.mustaudit
129 149 debugflag = repo.ui.debugflag
130 150 svfs.mustaudit = False
131 151
132 152 try:
133 153 for name, size in entries:
134 154 if debugflag:
135 155 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
136 156 # partially encode name over the wire for backwards compat
137 157 yield '%s\0%d\n' % (store.encodedir(name), size)
138 158 if size <= 65536:
139 159 fp = svfs(name)
140 160 try:
141 161 data = fp.read(size)
142 162 finally:
143 163 fp.close()
144 164 yield data
145 165 else:
146 166 for chunk in util.filechunkiter(svfs(name), limit=size):
147 167 yield chunk
148 168 finally:
149 169 svfs.mustaudit = oldaudit
150 170
151 171 def consumev1(repo, fp):
152 172 """Apply the contents from version 1 of a streaming clone file handle.
153 173
154 174 This takes the output from "streamout" and applies it to the specified
155 175 repository.
156 176
157 177 Like "streamout," the status line added by the wire protocol is not handled
158 178 by this function.
159 179 """
160 180 lock = repo.lock()
161 181 try:
162 182 repo.ui.status(_('streaming all changes\n'))
163 183 l = fp.readline()
164 184 try:
165 185 total_files, total_bytes = map(int, l.split(' ', 1))
166 186 except (ValueError, TypeError):
167 187 raise error.ResponseError(
168 188 _('unexpected response from remote server:'), l)
169 189 repo.ui.status(_('%d files to transfer, %s of data\n') %
170 190 (total_files, util.bytecount(total_bytes)))
171 191 handled_bytes = 0
172 192 repo.ui.progress(_('clone'), 0, total=total_bytes)
173 193 start = time.time()
174 194
175 195 tr = repo.transaction(_('clone'))
176 196 try:
177 197 for i in xrange(total_files):
178 198 # XXX doesn't support '\n' or '\r' in filenames
179 199 l = fp.readline()
180 200 try:
181 201 name, size = l.split('\0', 1)
182 202 size = int(size)
183 203 except (ValueError, TypeError):
184 204 raise error.ResponseError(
185 205 _('unexpected response from remote server:'), l)
186 206 if repo.ui.debugflag:
187 207 repo.ui.debug('adding %s (%s)\n' %
188 208 (name, util.bytecount(size)))
189 209 # for backwards compat, name was partially encoded
190 210 ofp = repo.svfs(store.decodedir(name), 'w')
191 211 for chunk in util.filechunkiter(fp, limit=size):
192 212 handled_bytes += len(chunk)
193 213 repo.ui.progress(_('clone'), handled_bytes,
194 214 total=total_bytes)
195 215 ofp.write(chunk)
196 216 ofp.close()
197 217 tr.close()
198 218 finally:
199 219 tr.release()
200 220
201 221 # Writing straight to files circumvented the inmemory caches
202 222 repo.invalidate()
203 223
204 224 elapsed = time.time() - start
205 225 if elapsed <= 0:
206 226 elapsed = 0.001
207 227 repo.ui.progress(_('clone'), None)
208 228 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
209 229 (util.bytecount(total_bytes), elapsed,
210 230 util.bytecount(total_bytes / elapsed)))
211 231 finally:
212 232 lock.release()
213 233
214 def streamin(repo, remote, remotereqs):
215 # Save remote branchmap. We will use it later
216 # to speed up branchcache creation
217 rbranchmap = None
218 if remote.capable("branchmap"):
219 rbranchmap = remote.branchmap()
220
221 fp = remote.stream_out()
222 l = fp.readline()
223 try:
224 resp = int(l)
225 except ValueError:
226 raise error.ResponseError(
227 _('unexpected response from remote server:'), l)
228 if resp == 1:
229 raise util.Abort(_('operation forbidden by server'))
230 elif resp == 2:
231 raise util.Abort(_('locking the remote repository failed'))
232 elif resp != 0:
233 raise util.Abort(_('the server sent an unknown error code'))
234
235 applyremotedata(repo, remotereqs, rbranchmap, fp)
236 return len(repo.heads()) + 1
237
238 234 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
239 235 """Apply stream clone data to a repository.
240 236
241 237 "remotereqs" is a set of requirements to handle the incoming data.
242 238 "remotebranchmap" is the result of a branchmap lookup on the remote. It
243 239 can be None.
244 240 "fp" is a file object containing the raw stream data, suitable for
245 241 feeding into consumev1().
246 242 """
247 243 lock = repo.lock()
248 244 try:
249 245 consumev1(repo, fp)
250 246
251 247 # new requirements = old non-format requirements +
252 248 # new format-related remote requirements
253 249 # requirements from the streamed-in repository
254 250 repo.requirements = remotereqs | (
255 251 repo.requirements - repo.supportedformats)
256 252 repo._applyopenerreqs()
257 253 repo._writerequirements()
258 254
259 255 if remotebranchmap:
260 256 rbheads = []
261 257 closed = []
262 258 for bheads in remotebranchmap.itervalues():
263 259 rbheads.extend(bheads)
264 260 for h in bheads:
265 261 r = repo.changelog.rev(h)
266 262 b, c = repo.changelog.branchinfo(r)
267 263 if c:
268 264 closed.append(h)
269 265
270 266 if rbheads:
271 267 rtiprev = max((int(repo.changelog.rev(node))
272 268 for node in rbheads))
273 269 cache = branchmap.branchcache(remotebranchmap,
274 270 repo[rtiprev].node(),
275 271 rtiprev,
276 272 closednodes=closed)
277 273 # Try to stick it as low as possible
278 274 # filter above served are unlikely to be fetch from a clone
279 275 for candidate in ('base', 'immutable', 'served'):
280 276 rview = repo.filtered(candidate)
281 277 if cache.validfor(rview):
282 278 repo._branchcaches[candidate] = cache
283 279 cache.write(rview)
284 280 break
285 281 repo.invalidate()
286 282 finally:
287 283 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now