##// END OF EJS Templates
streamclone: move payload header line consumption...
Gregory Szorc -
r26468:19bbd53a default
parent child Browse files
Show More
@@ -1,279 +1,279 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import time
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 store,
17 17 util,
18 18 )
19 19
20 20 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 21 """Whether it is possible to perform a streaming clone as part of pull.
22 22
23 23 ``bailifbundle2supported`` will cause the function to return False if
24 24 bundle2 stream clones are supported. It should only be called by the
25 25 legacy stream clone code path.
26 26
27 27 Returns a tuple of (supported, requirements). ``supported`` is True if
28 28 streaming clone is supported and False otherwise. ``requirements`` is
29 29 a set of repo requirements from the remote, or ``None`` if stream clone
30 30 isn't supported.
31 31 """
32 32 repo = pullop.repo
33 33 remote = pullop.remote
34 34
35 35 bundle2supported = False
36 36 if pullop.canusebundle2:
37 37 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 38 bundle2supported = True
39 39 # else
40 40 # Server doesn't support bundle2 stream clone or doesn't support
41 41 # the versions we support. Fall back and possibly allow legacy.
42 42
43 43 # Ensures legacy code path uses available bundle2.
44 44 if bailifbundle2supported and bundle2supported:
45 45 return False, None
46 46 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 47 #elif not bailifbundle2supported and not bundle2supported:
48 48 # return False, None
49 49
50 50 # Streaming clone only works on empty repositories.
51 51 if len(repo):
52 52 return False, None
53 53
54 54 # Streaming clone only works if all data is being requested.
55 55 if pullop.heads:
56 56 return False, None
57 57
58 58 streamrequested = pullop.streamclonerequested
59 59
60 60 # If we don't have a preference, let the server decide for us. This
61 61 # likely only comes into play in LANs.
62 62 if streamrequested is None:
63 63 # The server can advertise whether to prefer streaming clone.
64 64 streamrequested = remote.capable('stream-preferred')
65 65
66 66 if not streamrequested:
67 67 return False, None
68 68
69 69 # In order for stream clone to work, the client has to support all the
70 70 # requirements advertised by the server.
71 71 #
72 72 # The server advertises its requirements via the "stream" and "streamreqs"
73 73 # capability. "stream" (a value-less capability) is advertised if and only
74 74 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
75 75 # is advertised and contains a comma-delimited list of requirements.
76 76 requirements = set()
77 77 if remote.capable('stream'):
78 78 requirements.add('revlogv1')
79 79 else:
80 80 streamreqs = remote.capable('streamreqs')
81 81 # This is weird and shouldn't happen with modern servers.
82 82 if not streamreqs:
83 83 return False, None
84 84
85 85 streamreqs = set(streamreqs.split(','))
86 86 # Server requires something we don't support. Bail.
87 87 if streamreqs - repo.supportedformats:
88 88 return False, None
89 89 requirements = streamreqs
90 90
91 91 return True, requirements
92 92
93 93 def maybeperformlegacystreamclone(pullop):
94 94 """Possibly perform a legacy stream clone operation.
95 95
96 96 Legacy stream clones are performed as part of pull but before all other
97 97 operations.
98 98
99 99 A legacy stream clone will not be performed if a bundle2 stream clone is
100 100 supported.
101 101 """
102 102 supported, requirements = canperformstreamclone(pullop)
103 103
104 104 if not supported:
105 105 return
106 106
107 107 repo = pullop.repo
108 108 remote = pullop.remote
109 109
110 110 # Save remote branchmap. We will use it later to speed up branchcache
111 111 # creation.
112 112 rbranchmap = None
113 113 if remote.capable('branchmap'):
114 114 rbranchmap = remote.branchmap()
115 115
116 116 fp = remote.stream_out()
117 117 l = fp.readline()
118 118 try:
119 119 resp = int(l)
120 120 except ValueError:
121 121 raise error.ResponseError(
122 122 _('unexpected response from remote server:'), l)
123 123 if resp == 1:
124 124 raise util.Abort(_('operation forbidden by server'))
125 125 elif resp == 2:
126 126 raise util.Abort(_('locking the remote repository failed'))
127 127 elif resp != 0:
128 128 raise util.Abort(_('the server sent an unknown error code'))
129 129
130 l = fp.readline()
131 try:
132 filecount, bytecount = map(int, l.split(' ', 1))
133 except (ValueError, TypeError):
134 raise error.ResponseError(
135 _('unexpected response from remote server:'), l)
136
130 137 lock = repo.lock()
131 138 try:
132 consumev1(repo, fp)
139 consumev1(repo, fp, filecount, bytecount)
133 140
134 141 # new requirements = old non-format requirements +
135 142 # new format-related remote requirements
136 143 # requirements from the streamed-in repository
137 144 repo.requirements = requirements | (
138 145 repo.requirements - repo.supportedformats)
139 146 repo._applyopenerreqs()
140 147 repo._writerequirements()
141 148
142 149 if rbranchmap:
143 150 branchmap.replacecache(repo, rbranchmap)
144 151
145 152 repo.invalidate()
146 153 finally:
147 154 lock.release()
148 155
149 156 def allowservergeneration(ui):
150 157 """Whether streaming clones are allowed from the server."""
151 158 return ui.configbool('server', 'uncompressed', True, untrusted=True)
152 159
153 160 # This is it's own function so extensions can override it.
154 161 def _walkstreamfiles(repo):
155 162 return repo.store.walk()
156 163
157 164 def generatev1(repo):
158 165 """Emit content for version 1 of a streaming clone.
159 166
160 167 This is a generator of raw chunks that constitute a streaming clone.
161 168
162 169 The stream begins with a line of 2 space-delimited integers containing the
163 170 number of entries and total bytes size.
164 171
165 172 Next, are N entries for each file being transferred. Each file entry starts
166 173 as a line with the file name and integer size delimited by a null byte.
167 174 The raw file data follows. Following the raw file data is the next file
168 175 entry, or EOF.
169 176
170 177 When used on the wire protocol, an additional line indicating protocol
171 178 success will be prepended to the stream. This function is not responsible
172 179 for adding it.
173 180
174 181 This function will obtain a repository lock to ensure a consistent view of
175 182 the store is captured. It therefore may raise LockError.
176 183 """
177 184 entries = []
178 185 total_bytes = 0
179 186 # Get consistent snapshot of repo, lock during scan.
180 187 lock = repo.lock()
181 188 try:
182 189 repo.ui.debug('scanning\n')
183 190 for name, ename, size in _walkstreamfiles(repo):
184 191 if size:
185 192 entries.append((name, size))
186 193 total_bytes += size
187 194 finally:
188 195 lock.release()
189 196
190 197 repo.ui.debug('%d files, %d bytes to transfer\n' %
191 198 (len(entries), total_bytes))
192 199 yield '%d %d\n' % (len(entries), total_bytes)
193 200
194 201 svfs = repo.svfs
195 202 oldaudit = svfs.mustaudit
196 203 debugflag = repo.ui.debugflag
197 204 svfs.mustaudit = False
198 205
199 206 try:
200 207 for name, size in entries:
201 208 if debugflag:
202 209 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
203 210 # partially encode name over the wire for backwards compat
204 211 yield '%s\0%d\n' % (store.encodedir(name), size)
205 212 if size <= 65536:
206 213 fp = svfs(name)
207 214 try:
208 215 data = fp.read(size)
209 216 finally:
210 217 fp.close()
211 218 yield data
212 219 else:
213 220 for chunk in util.filechunkiter(svfs(name), limit=size):
214 221 yield chunk
215 222 finally:
216 223 svfs.mustaudit = oldaudit
217 224
218 def consumev1(repo, fp):
225 def consumev1(repo, fp, filecount, bytecount):
219 226 """Apply the contents from version 1 of a streaming clone file handle.
220 227
221 228 This takes the output from "streamout" and applies it to the specified
222 229 repository.
223 230
224 231 Like "streamout," the status line added by the wire protocol is not handled
225 232 by this function.
226 233 """
227 234 lock = repo.lock()
228 235 try:
229 236 repo.ui.status(_('streaming all changes\n'))
230 l = fp.readline()
231 try:
232 total_files, total_bytes = map(int, l.split(' ', 1))
233 except (ValueError, TypeError):
234 raise error.ResponseError(
235 _('unexpected response from remote server:'), l)
236 237 repo.ui.status(_('%d files to transfer, %s of data\n') %
237 (total_files, util.bytecount(total_bytes)))
238 (filecount, util.bytecount(bytecount)))
238 239 handled_bytes = 0
239 repo.ui.progress(_('clone'), 0, total=total_bytes)
240 repo.ui.progress(_('clone'), 0, total=bytecount)
240 241 start = time.time()
241 242
242 243 tr = repo.transaction(_('clone'))
243 244 try:
244 for i in xrange(total_files):
245 for i in xrange(filecount):
245 246 # XXX doesn't support '\n' or '\r' in filenames
246 247 l = fp.readline()
247 248 try:
248 249 name, size = l.split('\0', 1)
249 250 size = int(size)
250 251 except (ValueError, TypeError):
251 252 raise error.ResponseError(
252 253 _('unexpected response from remote server:'), l)
253 254 if repo.ui.debugflag:
254 255 repo.ui.debug('adding %s (%s)\n' %
255 256 (name, util.bytecount(size)))
256 257 # for backwards compat, name was partially encoded
257 258 ofp = repo.svfs(store.decodedir(name), 'w')
258 259 for chunk in util.filechunkiter(fp, limit=size):
259 260 handled_bytes += len(chunk)
260 repo.ui.progress(_('clone'), handled_bytes,
261 total=total_bytes)
261 repo.ui.progress(_('clone'), handled_bytes, total=bytecount)
262 262 ofp.write(chunk)
263 263 ofp.close()
264 264 tr.close()
265 265 finally:
266 266 tr.release()
267 267
268 268 # Writing straight to files circumvented the inmemory caches
269 269 repo.invalidate()
270 270
271 271 elapsed = time.time() - start
272 272 if elapsed <= 0:
273 273 elapsed = 0.001
274 274 repo.ui.progress(_('clone'), None)
275 275 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
276 (util.bytecount(total_bytes), elapsed,
277 util.bytecount(total_bytes / elapsed)))
276 (util.bytecount(bytecount), elapsed,
277 util.bytecount(bytecount / elapsed)))
278 278 finally:
279 279 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now