##// END OF EJS Templates
streamclone: use read()...
Gregory Szorc -
r27632:9fea6b38 default
parent child Browse files
Show More
@@ -1,392 +1,387 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import struct
11 11 import time
12 12
13 13 from .i18n import _
14 14 from . import (
15 15 branchmap,
16 16 error,
17 17 store,
18 18 util,
19 19 )
20 20
21 21 def canperformstreamclone(pullop, bailifbundle2supported=False):
22 22 """Whether it is possible to perform a streaming clone as part of pull.
23 23
24 24 ``bailifbundle2supported`` will cause the function to return False if
25 25 bundle2 stream clones are supported. It should only be called by the
26 26 legacy stream clone code path.
27 27
28 28 Returns a tuple of (supported, requirements). ``supported`` is True if
29 29 streaming clone is supported and False otherwise. ``requirements`` is
30 30 a set of repo requirements from the remote, or ``None`` if stream clone
31 31 isn't supported.
32 32 """
33 33 repo = pullop.repo
34 34 remote = pullop.remote
35 35
36 36 bundle2supported = False
37 37 if pullop.canusebundle2:
38 38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
39 39 bundle2supported = True
40 40 # else
41 41 # Server doesn't support bundle2 stream clone or doesn't support
42 42 # the versions we support. Fall back and possibly allow legacy.
43 43
44 44 # Ensures legacy code path uses available bundle2.
45 45 if bailifbundle2supported and bundle2supported:
46 46 return False, None
47 47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
48 48 #elif not bailifbundle2supported and not bundle2supported:
49 49 # return False, None
50 50
51 51 # Streaming clone only works on empty repositories.
52 52 if len(repo):
53 53 return False, None
54 54
55 55 # Streaming clone only works if all data is being requested.
56 56 if pullop.heads:
57 57 return False, None
58 58
59 59 streamrequested = pullop.streamclonerequested
60 60
61 61 # If we don't have a preference, let the server decide for us. This
62 62 # likely only comes into play in LANs.
63 63 if streamrequested is None:
64 64 # The server can advertise whether to prefer streaming clone.
65 65 streamrequested = remote.capable('stream-preferred')
66 66
67 67 if not streamrequested:
68 68 return False, None
69 69
70 70 # In order for stream clone to work, the client has to support all the
71 71 # requirements advertised by the server.
72 72 #
73 73 # The server advertises its requirements via the "stream" and "streamreqs"
74 74 # capability. "stream" (a value-less capability) is advertised if and only
75 75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
76 76 # is advertised and contains a comma-delimited list of requirements.
77 77 requirements = set()
78 78 if remote.capable('stream'):
79 79 requirements.add('revlogv1')
80 80 else:
81 81 streamreqs = remote.capable('streamreqs')
82 82 # This is weird and shouldn't happen with modern servers.
83 83 if not streamreqs:
84 84 return False, None
85 85
86 86 streamreqs = set(streamreqs.split(','))
87 87 # Server requires something we don't support. Bail.
88 88 if streamreqs - repo.supportedformats:
89 89 return False, None
90 90 requirements = streamreqs
91 91
92 92 return True, requirements
93 93
94 94 def maybeperformlegacystreamclone(pullop):
95 95 """Possibly perform a legacy stream clone operation.
96 96
97 97 Legacy stream clones are performed as part of pull but before all other
98 98 operations.
99 99
100 100 A legacy stream clone will not be performed if a bundle2 stream clone is
101 101 supported.
102 102 """
103 103 supported, requirements = canperformstreamclone(pullop)
104 104
105 105 if not supported:
106 106 return
107 107
108 108 repo = pullop.repo
109 109 remote = pullop.remote
110 110
111 111 # Save remote branchmap. We will use it later to speed up branchcache
112 112 # creation.
113 113 rbranchmap = None
114 114 if remote.capable('branchmap'):
115 115 rbranchmap = remote.branchmap()
116 116
117 117 repo.ui.status(_('streaming all changes\n'))
118 118
119 119 fp = remote.stream_out()
120 120 l = fp.readline()
121 121 try:
122 122 resp = int(l)
123 123 except ValueError:
124 124 raise error.ResponseError(
125 125 _('unexpected response from remote server:'), l)
126 126 if resp == 1:
127 127 raise error.Abort(_('operation forbidden by server'))
128 128 elif resp == 2:
129 129 raise error.Abort(_('locking the remote repository failed'))
130 130 elif resp != 0:
131 131 raise error.Abort(_('the server sent an unknown error code'))
132 132
133 133 l = fp.readline()
134 134 try:
135 135 filecount, bytecount = map(int, l.split(' ', 1))
136 136 except (ValueError, TypeError):
137 137 raise error.ResponseError(
138 138 _('unexpected response from remote server:'), l)
139 139
140 140 lock = repo.lock()
141 141 try:
142 142 consumev1(repo, fp, filecount, bytecount)
143 143
144 144 # new requirements = old non-format requirements +
145 145 # new format-related remote requirements
146 146 # requirements from the streamed-in repository
147 147 repo.requirements = requirements | (
148 148 repo.requirements - repo.supportedformats)
149 149 repo._applyopenerreqs()
150 150 repo._writerequirements()
151 151
152 152 if rbranchmap:
153 153 branchmap.replacecache(repo, rbranchmap)
154 154
155 155 repo.invalidate()
156 156 finally:
157 157 lock.release()
158 158
159 159 def allowservergeneration(ui):
160 160 """Whether streaming clones are allowed from the server."""
161 161 return ui.configbool('server', 'uncompressed', True, untrusted=True)
162 162
163 163 # This is it's own function so extensions can override it.
164 164 def _walkstreamfiles(repo):
165 165 return repo.store.walk()
166 166
167 167 def generatev1(repo):
168 168 """Emit content for version 1 of a streaming clone.
169 169
170 170 This returns a 3-tuple of (file count, byte size, data iterator).
171 171
172 172 The data iterator consists of N entries for each file being transferred.
173 173 Each file entry starts as a line with the file name and integer size
174 174 delimited by a null byte.
175 175
176 176 The raw file data follows. Following the raw file data is the next file
177 177 entry, or EOF.
178 178
179 179 When used on the wire protocol, an additional line indicating protocol
180 180 success will be prepended to the stream. This function is not responsible
181 181 for adding it.
182 182
183 183 This function will obtain a repository lock to ensure a consistent view of
184 184 the store is captured. It therefore may raise LockError.
185 185 """
186 186 entries = []
187 187 total_bytes = 0
188 188 # Get consistent snapshot of repo, lock during scan.
189 189 lock = repo.lock()
190 190 try:
191 191 repo.ui.debug('scanning\n')
192 192 for name, ename, size in _walkstreamfiles(repo):
193 193 if size:
194 194 entries.append((name, size))
195 195 total_bytes += size
196 196 finally:
197 197 lock.release()
198 198
199 199 repo.ui.debug('%d files, %d bytes to transfer\n' %
200 200 (len(entries), total_bytes))
201 201
202 202 svfs = repo.svfs
203 203 oldaudit = svfs.mustaudit
204 204 debugflag = repo.ui.debugflag
205 205 svfs.mustaudit = False
206 206
207 207 def emitrevlogdata():
208 208 try:
209 209 for name, size in entries:
210 210 if debugflag:
211 211 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
212 212 # partially encode name over the wire for backwards compat
213 213 yield '%s\0%d\n' % (store.encodedir(name), size)
214 214 if size <= 65536:
215 fp = svfs(name)
216 try:
217 data = fp.read(size)
218 finally:
219 fp.close()
220 yield data
215 yield svfs.read(name)
221 216 else:
222 217 for chunk in util.filechunkiter(svfs(name), limit=size):
223 218 yield chunk
224 219 finally:
225 220 svfs.mustaudit = oldaudit
226 221
227 222 return len(entries), total_bytes, emitrevlogdata()
228 223
229 224 def generatev1wireproto(repo):
230 225 """Emit content for version 1 of streaming clone suitable for the wire.
231 226
232 227 This is the data output from ``generatev1()`` with a header line
233 228 indicating file count and byte size.
234 229 """
235 230 filecount, bytecount, it = generatev1(repo)
236 231 yield '%d %d\n' % (filecount, bytecount)
237 232 for chunk in it:
238 233 yield chunk
239 234
240 235 def generatebundlev1(repo, compression='UN'):
241 236 """Emit content for version 1 of a stream clone bundle.
242 237
243 238 The first 4 bytes of the output ("HGS1") denote this as stream clone
244 239 bundle version 1.
245 240
246 241 The next 2 bytes indicate the compression type. Only "UN" is currently
247 242 supported.
248 243
249 244 The next 16 bytes are two 64-bit big endian unsigned integers indicating
250 245 file count and byte count, respectively.
251 246
252 247 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
253 248 of the requirements string, including a trailing \0. The following N bytes
254 249 are the requirements string, which is ASCII containing a comma-delimited
255 250 list of repo requirements that are needed to support the data.
256 251
257 252 The remaining content is the output of ``generatev1()`` (which may be
258 253 compressed in the future).
259 254
260 255 Returns a tuple of (requirements, data generator).
261 256 """
262 257 if compression != 'UN':
263 258 raise ValueError('we do not support the compression argument yet')
264 259
265 260 requirements = repo.requirements & repo.supportedformats
266 261 requires = ','.join(sorted(requirements))
267 262
268 263 def gen():
269 264 yield 'HGS1'
270 265 yield compression
271 266
272 267 filecount, bytecount, it = generatev1(repo)
273 268 repo.ui.status(_('writing %d bytes for %d files\n') %
274 269 (bytecount, filecount))
275 270
276 271 yield struct.pack('>QQ', filecount, bytecount)
277 272 yield struct.pack('>H', len(requires) + 1)
278 273 yield requires + '\0'
279 274
280 275 # This is where we'll add compression in the future.
281 276 assert compression == 'UN'
282 277
283 278 seen = 0
284 279 repo.ui.progress(_('bundle'), 0, total=bytecount)
285 280
286 281 for chunk in it:
287 282 seen += len(chunk)
288 283 repo.ui.progress(_('bundle'), seen, total=bytecount)
289 284 yield chunk
290 285
291 286 repo.ui.progress(_('bundle'), None)
292 287
293 288 return requirements, gen()
294 289
295 290 def consumev1(repo, fp, filecount, bytecount):
296 291 """Apply the contents from version 1 of a streaming clone file handle.
297 292
298 293 This takes the output from "streamout" and applies it to the specified
299 294 repository.
300 295
301 296 Like "streamout," the status line added by the wire protocol is not handled
302 297 by this function.
303 298 """
304 299 lock = repo.lock()
305 300 try:
306 301 repo.ui.status(_('%d files to transfer, %s of data\n') %
307 302 (filecount, util.bytecount(bytecount)))
308 303 handled_bytes = 0
309 304 repo.ui.progress(_('clone'), 0, total=bytecount)
310 305 start = time.time()
311 306
312 307 tr = repo.transaction(_('clone'))
313 308 try:
314 309 for i in xrange(filecount):
315 310 # XXX doesn't support '\n' or '\r' in filenames
316 311 l = fp.readline()
317 312 try:
318 313 name, size = l.split('\0', 1)
319 314 size = int(size)
320 315 except (ValueError, TypeError):
321 316 raise error.ResponseError(
322 317 _('unexpected response from remote server:'), l)
323 318 if repo.ui.debugflag:
324 319 repo.ui.debug('adding %s (%s)\n' %
325 320 (name, util.bytecount(size)))
326 321 # for backwards compat, name was partially encoded
327 322 ofp = repo.svfs(store.decodedir(name), 'w')
328 323 for chunk in util.filechunkiter(fp, limit=size):
329 324 handled_bytes += len(chunk)
330 325 repo.ui.progress(_('clone'), handled_bytes, total=bytecount)
331 326 ofp.write(chunk)
332 327 ofp.close()
333 328 tr.close()
334 329 finally:
335 330 tr.release()
336 331
337 332 # Writing straight to files circumvented the inmemory caches
338 333 repo.invalidate()
339 334
340 335 elapsed = time.time() - start
341 336 if elapsed <= 0:
342 337 elapsed = 0.001
343 338 repo.ui.progress(_('clone'), None)
344 339 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
345 340 (util.bytecount(bytecount), elapsed,
346 341 util.bytecount(bytecount / elapsed)))
347 342 finally:
348 343 lock.release()
349 344
350 345 def applybundlev1(repo, fp):
351 346 """Apply the content from a stream clone bundle version 1.
352 347
353 348 We assume the 4 byte header has been read and validated and the file handle
354 349 is at the 2 byte compression identifier.
355 350 """
356 351 if len(repo):
357 352 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
358 353 'repo'))
359 354
360 355 compression = fp.read(2)
361 356 if compression != 'UN':
362 357 raise error.Abort(_('only uncompressed stream clone bundles are '
363 358 'supported; got %s') % compression)
364 359
365 360 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
366 361 requireslen = struct.unpack('>H', fp.read(2))[0]
367 362 requires = fp.read(requireslen)
368 363
369 364 if not requires.endswith('\0'):
370 365 raise error.Abort(_('malformed stream clone bundle: '
371 366 'requirements not properly encoded'))
372 367
373 368 requirements = set(requires.rstrip('\0').split(','))
374 369 missingreqs = requirements - repo.supportedformats
375 370 if missingreqs:
376 371 raise error.Abort(_('unable to apply stream clone: '
377 372 'unsupported format: %s') %
378 373 ', '.join(sorted(missingreqs)))
379 374
380 375 consumev1(repo, fp, filecount, bytecount)
381 376
382 377 class streamcloneapplier(object):
383 378 """Class to manage applying streaming clone bundles.
384 379
385 380 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
386 381 readers to perform bundle type-specific functionality.
387 382 """
388 383 def __init__(self, fh):
389 384 self._fh = fh
390 385
391 386 def apply(self, repo):
392 387 return applybundlev1(repo, self._fh)
General Comments 0
You need to be logged in to leave comments. Login now