##// END OF EJS Templates
streamclone: teach canperformstreamclone to be bundle2 aware...
Gregory Szorc -
r26467:ff2c8923 default
parent child Browse files
Show More
@@ -1,260 +1,279 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import time
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 store,
17 17 util,
18 18 )
19 19
20 def canperformstreamclone(pullop):
20 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 21 """Whether it is possible to perform a streaming clone as part of pull.
22 22
23 ``bailifbundle2supported`` will cause the function to return False if
24 bundle2 stream clones are supported. It should only be called by the
25 legacy stream clone code path.
26
23 27 Returns a tuple of (supported, requirements). ``supported`` is True if
24 28 streaming clone is supported and False otherwise. ``requirements`` is
25 29 a set of repo requirements from the remote, or ``None`` if stream clone
26 30 isn't supported.
27 31 """
28 32 repo = pullop.repo
29 33 remote = pullop.remote
30 34
35 bundle2supported = False
36 if pullop.canusebundle2:
37 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 bundle2supported = True
39 # else
40 # Server doesn't support bundle2 stream clone or doesn't support
41 # the versions we support. Fall back and possibly allow legacy.
42
43 # Ensures legacy code path uses available bundle2.
44 if bailifbundle2supported and bundle2supported:
45 return False, None
46 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 #elif not bailifbundle2supported and not bundle2supported:
48 # return False, None
49
31 50 # Streaming clone only works on empty repositories.
32 51 if len(repo):
33 52 return False, None
34 53
35 54 # Streaming clone only works if all data is being requested.
36 55 if pullop.heads:
37 56 return False, None
38 57
39 58 streamrequested = pullop.streamclonerequested
40 59
41 60 # If we don't have a preference, let the server decide for us. This
42 61 # likely only comes into play in LANs.
43 62 if streamrequested is None:
44 63 # The server can advertise whether to prefer streaming clone.
45 64 streamrequested = remote.capable('stream-preferred')
46 65
47 66 if not streamrequested:
48 67 return False, None
49 68
50 69 # In order for stream clone to work, the client has to support all the
51 70 # requirements advertised by the server.
52 71 #
53 72 # The server advertises its requirements via the "stream" and "streamreqs"
54 73 # capability. "stream" (a value-less capability) is advertised if and only
55 74 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
56 75 # is advertised and contains a comma-delimited list of requirements.
57 76 requirements = set()
58 77 if remote.capable('stream'):
59 78 requirements.add('revlogv1')
60 79 else:
61 80 streamreqs = remote.capable('streamreqs')
62 81 # This is weird and shouldn't happen with modern servers.
63 82 if not streamreqs:
64 83 return False, None
65 84
66 85 streamreqs = set(streamreqs.split(','))
67 86 # Server requires something we don't support. Bail.
68 87 if streamreqs - repo.supportedformats:
69 88 return False, None
70 89 requirements = streamreqs
71 90
72 91 return True, requirements
73 92
74 93 def maybeperformlegacystreamclone(pullop):
75 94 """Possibly perform a legacy stream clone operation.
76 95
77 96 Legacy stream clones are performed as part of pull but before all other
78 97 operations.
79 98
80 99 A legacy stream clone will not be performed if a bundle2 stream clone is
81 100 supported.
82 101 """
83 102 supported, requirements = canperformstreamclone(pullop)
84 103
85 104 if not supported:
86 105 return
87 106
88 107 repo = pullop.repo
89 108 remote = pullop.remote
90 109
91 110 # Save remote branchmap. We will use it later to speed up branchcache
92 111 # creation.
93 112 rbranchmap = None
94 113 if remote.capable('branchmap'):
95 114 rbranchmap = remote.branchmap()
96 115
97 116 fp = remote.stream_out()
98 117 l = fp.readline()
99 118 try:
100 119 resp = int(l)
101 120 except ValueError:
102 121 raise error.ResponseError(
103 122 _('unexpected response from remote server:'), l)
104 123 if resp == 1:
105 124 raise util.Abort(_('operation forbidden by server'))
106 125 elif resp == 2:
107 126 raise util.Abort(_('locking the remote repository failed'))
108 127 elif resp != 0:
109 128 raise util.Abort(_('the server sent an unknown error code'))
110 129
111 130 lock = repo.lock()
112 131 try:
113 132 consumev1(repo, fp)
114 133
115 134 # new requirements = old non-format requirements +
116 135 # new format-related remote requirements
117 136 # requirements from the streamed-in repository
118 137 repo.requirements = requirements | (
119 138 repo.requirements - repo.supportedformats)
120 139 repo._applyopenerreqs()
121 140 repo._writerequirements()
122 141
123 142 if rbranchmap:
124 143 branchmap.replacecache(repo, rbranchmap)
125 144
126 145 repo.invalidate()
127 146 finally:
128 147 lock.release()
129 148
130 149 def allowservergeneration(ui):
131 150 """Whether streaming clones are allowed from the server."""
132 151 return ui.configbool('server', 'uncompressed', True, untrusted=True)
133 152
134 153 # This is it's own function so extensions can override it.
135 154 def _walkstreamfiles(repo):
136 155 return repo.store.walk()
137 156
138 157 def generatev1(repo):
139 158 """Emit content for version 1 of a streaming clone.
140 159
141 160 This is a generator of raw chunks that constitute a streaming clone.
142 161
143 162 The stream begins with a line of 2 space-delimited integers containing the
144 163 number of entries and total bytes size.
145 164
146 165 Next, are N entries for each file being transferred. Each file entry starts
147 166 as a line with the file name and integer size delimited by a null byte.
148 167 The raw file data follows. Following the raw file data is the next file
149 168 entry, or EOF.
150 169
151 170 When used on the wire protocol, an additional line indicating protocol
152 171 success will be prepended to the stream. This function is not responsible
153 172 for adding it.
154 173
155 174 This function will obtain a repository lock to ensure a consistent view of
156 175 the store is captured. It therefore may raise LockError.
157 176 """
158 177 entries = []
159 178 total_bytes = 0
160 179 # Get consistent snapshot of repo, lock during scan.
161 180 lock = repo.lock()
162 181 try:
163 182 repo.ui.debug('scanning\n')
164 183 for name, ename, size in _walkstreamfiles(repo):
165 184 if size:
166 185 entries.append((name, size))
167 186 total_bytes += size
168 187 finally:
169 188 lock.release()
170 189
171 190 repo.ui.debug('%d files, %d bytes to transfer\n' %
172 191 (len(entries), total_bytes))
173 192 yield '%d %d\n' % (len(entries), total_bytes)
174 193
175 194 svfs = repo.svfs
176 195 oldaudit = svfs.mustaudit
177 196 debugflag = repo.ui.debugflag
178 197 svfs.mustaudit = False
179 198
180 199 try:
181 200 for name, size in entries:
182 201 if debugflag:
183 202 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
184 203 # partially encode name over the wire for backwards compat
185 204 yield '%s\0%d\n' % (store.encodedir(name), size)
186 205 if size <= 65536:
187 206 fp = svfs(name)
188 207 try:
189 208 data = fp.read(size)
190 209 finally:
191 210 fp.close()
192 211 yield data
193 212 else:
194 213 for chunk in util.filechunkiter(svfs(name), limit=size):
195 214 yield chunk
196 215 finally:
197 216 svfs.mustaudit = oldaudit
198 217
199 218 def consumev1(repo, fp):
200 219 """Apply the contents from version 1 of a streaming clone file handle.
201 220
202 221 This takes the output from "streamout" and applies it to the specified
203 222 repository.
204 223
205 224 Like "streamout," the status line added by the wire protocol is not handled
206 225 by this function.
207 226 """
208 227 lock = repo.lock()
209 228 try:
210 229 repo.ui.status(_('streaming all changes\n'))
211 230 l = fp.readline()
212 231 try:
213 232 total_files, total_bytes = map(int, l.split(' ', 1))
214 233 except (ValueError, TypeError):
215 234 raise error.ResponseError(
216 235 _('unexpected response from remote server:'), l)
217 236 repo.ui.status(_('%d files to transfer, %s of data\n') %
218 237 (total_files, util.bytecount(total_bytes)))
219 238 handled_bytes = 0
220 239 repo.ui.progress(_('clone'), 0, total=total_bytes)
221 240 start = time.time()
222 241
223 242 tr = repo.transaction(_('clone'))
224 243 try:
225 244 for i in xrange(total_files):
226 245 # XXX doesn't support '\n' or '\r' in filenames
227 246 l = fp.readline()
228 247 try:
229 248 name, size = l.split('\0', 1)
230 249 size = int(size)
231 250 except (ValueError, TypeError):
232 251 raise error.ResponseError(
233 252 _('unexpected response from remote server:'), l)
234 253 if repo.ui.debugflag:
235 254 repo.ui.debug('adding %s (%s)\n' %
236 255 (name, util.bytecount(size)))
237 256 # for backwards compat, name was partially encoded
238 257 ofp = repo.svfs(store.decodedir(name), 'w')
239 258 for chunk in util.filechunkiter(fp, limit=size):
240 259 handled_bytes += len(chunk)
241 260 repo.ui.progress(_('clone'), handled_bytes,
242 261 total=total_bytes)
243 262 ofp.write(chunk)
244 263 ofp.close()
245 264 tr.close()
246 265 finally:
247 266 tr.release()
248 267
249 268 # Writing straight to files circumvented the inmemory caches
250 269 repo.invalidate()
251 270
252 271 elapsed = time.time() - start
253 272 if elapsed <= 0:
254 273 elapsed = 0.001
255 274 repo.ui.progress(_('clone'), None)
256 275 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
257 276 (util.bytecount(total_bytes), elapsed,
258 277 util.bytecount(total_bytes / elapsed)))
259 278 finally:
260 279 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now