##// END OF EJS Templates
streamclone: move "streaming all changes" message location...
Gregory Szorc -
r26470:4b5647d9 default
parent child Browse files
Show More
@@ -1,291 +1,292 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import time
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 store,
17 17 util,
18 18 )
19 19
20 20 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 21 """Whether it is possible to perform a streaming clone as part of pull.
22 22
23 23 ``bailifbundle2supported`` will cause the function to return False if
24 24 bundle2 stream clones are supported. It should only be called by the
25 25 legacy stream clone code path.
26 26
27 27 Returns a tuple of (supported, requirements). ``supported`` is True if
28 28 streaming clone is supported and False otherwise. ``requirements`` is
29 29 a set of repo requirements from the remote, or ``None`` if stream clone
30 30 isn't supported.
31 31 """
32 32 repo = pullop.repo
33 33 remote = pullop.remote
34 34
35 35 bundle2supported = False
36 36 if pullop.canusebundle2:
37 37 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 38 bundle2supported = True
39 39 # else
40 40 # Server doesn't support bundle2 stream clone or doesn't support
41 41 # the versions we support. Fall back and possibly allow legacy.
42 42
43 43 # Ensures legacy code path uses available bundle2.
44 44 if bailifbundle2supported and bundle2supported:
45 45 return False, None
46 46 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 47 #elif not bailifbundle2supported and not bundle2supported:
48 48 # return False, None
49 49
50 50 # Streaming clone only works on empty repositories.
51 51 if len(repo):
52 52 return False, None
53 53
54 54 # Streaming clone only works if all data is being requested.
55 55 if pullop.heads:
56 56 return False, None
57 57
58 58 streamrequested = pullop.streamclonerequested
59 59
60 60 # If we don't have a preference, let the server decide for us. This
61 61 # likely only comes into play in LANs.
62 62 if streamrequested is None:
63 63 # The server can advertise whether to prefer streaming clone.
64 64 streamrequested = remote.capable('stream-preferred')
65 65
66 66 if not streamrequested:
67 67 return False, None
68 68
69 69 # In order for stream clone to work, the client has to support all the
70 70 # requirements advertised by the server.
71 71 #
72 72 # The server advertises its requirements via the "stream" and "streamreqs"
73 73 # capability. "stream" (a value-less capability) is advertised if and only
74 74 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
75 75 # is advertised and contains a comma-delimited list of requirements.
76 76 requirements = set()
77 77 if remote.capable('stream'):
78 78 requirements.add('revlogv1')
79 79 else:
80 80 streamreqs = remote.capable('streamreqs')
81 81 # This is weird and shouldn't happen with modern servers.
82 82 if not streamreqs:
83 83 return False, None
84 84
85 85 streamreqs = set(streamreqs.split(','))
86 86 # Server requires something we don't support. Bail.
87 87 if streamreqs - repo.supportedformats:
88 88 return False, None
89 89 requirements = streamreqs
90 90
91 91 return True, requirements
92 92
93 93 def maybeperformlegacystreamclone(pullop):
94 94 """Possibly perform a legacy stream clone operation.
95 95
96 96 Legacy stream clones are performed as part of pull but before all other
97 97 operations.
98 98
99 99 A legacy stream clone will not be performed if a bundle2 stream clone is
100 100 supported.
101 101 """
102 102 supported, requirements = canperformstreamclone(pullop)
103 103
104 104 if not supported:
105 105 return
106 106
107 107 repo = pullop.repo
108 108 remote = pullop.remote
109 109
110 110 # Save remote branchmap. We will use it later to speed up branchcache
111 111 # creation.
112 112 rbranchmap = None
113 113 if remote.capable('branchmap'):
114 114 rbranchmap = remote.branchmap()
115 115
116 repo.ui.status(_('streaming all changes\n'))
117
116 118 fp = remote.stream_out()
117 119 l = fp.readline()
118 120 try:
119 121 resp = int(l)
120 122 except ValueError:
121 123 raise error.ResponseError(
122 124 _('unexpected response from remote server:'), l)
123 125 if resp == 1:
124 126 raise util.Abort(_('operation forbidden by server'))
125 127 elif resp == 2:
126 128 raise util.Abort(_('locking the remote repository failed'))
127 129 elif resp != 0:
128 130 raise util.Abort(_('the server sent an unknown error code'))
129 131
130 132 l = fp.readline()
131 133 try:
132 134 filecount, bytecount = map(int, l.split(' ', 1))
133 135 except (ValueError, TypeError):
134 136 raise error.ResponseError(
135 137 _('unexpected response from remote server:'), l)
136 138
137 139 lock = repo.lock()
138 140 try:
139 141 consumev1(repo, fp, filecount, bytecount)
140 142
141 143 # new requirements = old non-format requirements +
142 144 # new format-related remote requirements
143 145 # requirements from the streamed-in repository
144 146 repo.requirements = requirements | (
145 147 repo.requirements - repo.supportedformats)
146 148 repo._applyopenerreqs()
147 149 repo._writerequirements()
148 150
149 151 if rbranchmap:
150 152 branchmap.replacecache(repo, rbranchmap)
151 153
152 154 repo.invalidate()
153 155 finally:
154 156 lock.release()
155 157
156 158 def allowservergeneration(ui):
157 159 """Whether streaming clones are allowed from the server."""
158 160 return ui.configbool('server', 'uncompressed', True, untrusted=True)
159 161
160 162 # This is it's own function so extensions can override it.
161 163 def _walkstreamfiles(repo):
162 164 return repo.store.walk()
163 165
164 166 def generatev1(repo):
165 167 """Emit content for version 1 of a streaming clone.
166 168
167 169 This returns a 3-tuple of (file count, byte size, data iterator).
168 170
169 171 The data iterator consists of N entries for each file being transferred.
170 172 Each file entry starts as a line with the file name and integer size
171 173 delimited by a null byte.
172 174
173 175 The raw file data follows. Following the raw file data is the next file
174 176 entry, or EOF.
175 177
176 178 When used on the wire protocol, an additional line indicating protocol
177 179 success will be prepended to the stream. This function is not responsible
178 180 for adding it.
179 181
180 182 This function will obtain a repository lock to ensure a consistent view of
181 183 the store is captured. It therefore may raise LockError.
182 184 """
183 185 entries = []
184 186 total_bytes = 0
185 187 # Get consistent snapshot of repo, lock during scan.
186 188 lock = repo.lock()
187 189 try:
188 190 repo.ui.debug('scanning\n')
189 191 for name, ename, size in _walkstreamfiles(repo):
190 192 if size:
191 193 entries.append((name, size))
192 194 total_bytes += size
193 195 finally:
194 196 lock.release()
195 197
196 198 repo.ui.debug('%d files, %d bytes to transfer\n' %
197 199 (len(entries), total_bytes))
198 200
199 201 svfs = repo.svfs
200 202 oldaudit = svfs.mustaudit
201 203 debugflag = repo.ui.debugflag
202 204 svfs.mustaudit = False
203 205
204 206 def emitrevlogdata():
205 207 try:
206 208 for name, size in entries:
207 209 if debugflag:
208 210 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
209 211 # partially encode name over the wire for backwards compat
210 212 yield '%s\0%d\n' % (store.encodedir(name), size)
211 213 if size <= 65536:
212 214 fp = svfs(name)
213 215 try:
214 216 data = fp.read(size)
215 217 finally:
216 218 fp.close()
217 219 yield data
218 220 else:
219 221 for chunk in util.filechunkiter(svfs(name), limit=size):
220 222 yield chunk
221 223 finally:
222 224 svfs.mustaudit = oldaudit
223 225
224 226 return len(entries), total_bytes, emitrevlogdata()
225 227
226 228 def generatev1wireproto(repo):
227 229 """Emit content for version 1 of streaming clone suitable for the wire.
228 230
229 231 This is the data output from ``generatev1()`` with a header line
230 232 indicating file count and byte size.
231 233 """
232 234 filecount, bytecount, it = generatev1(repo)
233 235 yield '%d %d\n' % (filecount, bytecount)
234 236 for chunk in it:
235 237 yield chunk
236 238
237 239 def consumev1(repo, fp, filecount, bytecount):
238 240 """Apply the contents from version 1 of a streaming clone file handle.
239 241
240 242 This takes the output from "streamout" and applies it to the specified
241 243 repository.
242 244
243 245 Like "streamout," the status line added by the wire protocol is not handled
244 246 by this function.
245 247 """
246 248 lock = repo.lock()
247 249 try:
248 repo.ui.status(_('streaming all changes\n'))
249 250 repo.ui.status(_('%d files to transfer, %s of data\n') %
250 251 (filecount, util.bytecount(bytecount)))
251 252 handled_bytes = 0
252 253 repo.ui.progress(_('clone'), 0, total=bytecount)
253 254 start = time.time()
254 255
255 256 tr = repo.transaction(_('clone'))
256 257 try:
257 258 for i in xrange(filecount):
258 259 # XXX doesn't support '\n' or '\r' in filenames
259 260 l = fp.readline()
260 261 try:
261 262 name, size = l.split('\0', 1)
262 263 size = int(size)
263 264 except (ValueError, TypeError):
264 265 raise error.ResponseError(
265 266 _('unexpected response from remote server:'), l)
266 267 if repo.ui.debugflag:
267 268 repo.ui.debug('adding %s (%s)\n' %
268 269 (name, util.bytecount(size)))
269 270 # for backwards compat, name was partially encoded
270 271 ofp = repo.svfs(store.decodedir(name), 'w')
271 272 for chunk in util.filechunkiter(fp, limit=size):
272 273 handled_bytes += len(chunk)
273 274 repo.ui.progress(_('clone'), handled_bytes, total=bytecount)
274 275 ofp.write(chunk)
275 276 ofp.close()
276 277 tr.close()
277 278 finally:
278 279 tr.release()
279 280
280 281 # Writing straight to files circumvented the inmemory caches
281 282 repo.invalidate()
282 283
283 284 elapsed = time.time() - start
284 285 if elapsed <= 0:
285 286 elapsed = 0.001
286 287 repo.ui.progress(_('clone'), None)
287 288 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
288 289 (util.bytecount(bytecount), elapsed,
289 290 util.bytecount(bytecount / elapsed)))
290 291 finally:
291 292 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now