##// END OF EJS Templates
merge with stable
Matt Mackall -
r28532:ed75909c merge default
parent child Browse files
Show More
@@ -1,382 +1,383 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import struct
11 11 import time
12 12
13 13 from .i18n import _
14 14 from . import (
15 15 branchmap,
16 16 error,
17 17 store,
18 18 util,
19 19 )
20 20
21 21 def canperformstreamclone(pullop, bailifbundle2supported=False):
22 22 """Whether it is possible to perform a streaming clone as part of pull.
23 23
24 24 ``bailifbundle2supported`` will cause the function to return False if
25 25 bundle2 stream clones are supported. It should only be called by the
26 26 legacy stream clone code path.
27 27
28 28 Returns a tuple of (supported, requirements). ``supported`` is True if
29 29 streaming clone is supported and False otherwise. ``requirements`` is
30 30 a set of repo requirements from the remote, or ``None`` if stream clone
31 31 isn't supported.
32 32 """
33 33 repo = pullop.repo
34 34 remote = pullop.remote
35 35
36 36 bundle2supported = False
37 37 if pullop.canusebundle2:
38 38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
39 39 bundle2supported = True
40 40 # else
41 41 # Server doesn't support bundle2 stream clone or doesn't support
42 42 # the versions we support. Fall back and possibly allow legacy.
43 43
44 44 # Ensures legacy code path uses available bundle2.
45 45 if bailifbundle2supported and bundle2supported:
46 46 return False, None
47 47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
48 48 #elif not bailifbundle2supported and not bundle2supported:
49 49 # return False, None
50 50
51 51 # Streaming clone only works on empty repositories.
52 52 if len(repo):
53 53 return False, None
54 54
55 55 # Streaming clone only works if all data is being requested.
56 56 if pullop.heads:
57 57 return False, None
58 58
59 59 streamrequested = pullop.streamclonerequested
60 60
61 61 # If we don't have a preference, let the server decide for us. This
62 62 # likely only comes into play in LANs.
63 63 if streamrequested is None:
64 64 # The server can advertise whether to prefer streaming clone.
65 65 streamrequested = remote.capable('stream-preferred')
66 66
67 67 if not streamrequested:
68 68 return False, None
69 69
70 70 # In order for stream clone to work, the client has to support all the
71 71 # requirements advertised by the server.
72 72 #
73 73 # The server advertises its requirements via the "stream" and "streamreqs"
74 74 # capability. "stream" (a value-less capability) is advertised if and only
75 75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
76 76 # is advertised and contains a comma-delimited list of requirements.
77 77 requirements = set()
78 78 if remote.capable('stream'):
79 79 requirements.add('revlogv1')
80 80 else:
81 81 streamreqs = remote.capable('streamreqs')
82 82 # This is weird and shouldn't happen with modern servers.
83 83 if not streamreqs:
84 84 return False, None
85 85
86 86 streamreqs = set(streamreqs.split(','))
87 87 # Server requires something we don't support. Bail.
88 88 if streamreqs - repo.supportedformats:
89 89 return False, None
90 90 requirements = streamreqs
91 91
92 92 return True, requirements
93 93
94 94 def maybeperformlegacystreamclone(pullop):
95 95 """Possibly perform a legacy stream clone operation.
96 96
97 97 Legacy stream clones are performed as part of pull but before all other
98 98 operations.
99 99
100 100 A legacy stream clone will not be performed if a bundle2 stream clone is
101 101 supported.
102 102 """
103 103 supported, requirements = canperformstreamclone(pullop)
104 104
105 105 if not supported:
106 106 return
107 107
108 108 repo = pullop.repo
109 109 remote = pullop.remote
110 110
111 111 # Save remote branchmap. We will use it later to speed up branchcache
112 112 # creation.
113 113 rbranchmap = None
114 114 if remote.capable('branchmap'):
115 115 rbranchmap = remote.branchmap()
116 116
117 117 repo.ui.status(_('streaming all changes\n'))
118 118
119 119 fp = remote.stream_out()
120 120 l = fp.readline()
121 121 try:
122 122 resp = int(l)
123 123 except ValueError:
124 124 raise error.ResponseError(
125 125 _('unexpected response from remote server:'), l)
126 126 if resp == 1:
127 127 raise error.Abort(_('operation forbidden by server'))
128 128 elif resp == 2:
129 129 raise error.Abort(_('locking the remote repository failed'))
130 130 elif resp != 0:
131 131 raise error.Abort(_('the server sent an unknown error code'))
132 132
133 133 l = fp.readline()
134 134 try:
135 135 filecount, bytecount = map(int, l.split(' ', 1))
136 136 except (ValueError, TypeError):
137 137 raise error.ResponseError(
138 138 _('unexpected response from remote server:'), l)
139 139
140 140 with repo.lock():
141 141 consumev1(repo, fp, filecount, bytecount)
142 142
143 143 # new requirements = old non-format requirements +
144 144 # new format-related remote requirements
145 145 # requirements from the streamed-in repository
146 146 repo.requirements = requirements | (
147 147 repo.requirements - repo.supportedformats)
148 148 repo._applyopenerreqs()
149 149 repo._writerequirements()
150 150
151 151 if rbranchmap:
152 152 branchmap.replacecache(repo, rbranchmap)
153 153
154 154 repo.invalidate()
155 155
156 156 def allowservergeneration(ui):
157 157 """Whether streaming clones are allowed from the server."""
158 158 return ui.configbool('server', 'uncompressed', True, untrusted=True)
159 159
160 160 # This is it's own function so extensions can override it.
161 161 def _walkstreamfiles(repo):
162 162 return repo.store.walk()
163 163
164 164 def generatev1(repo):
165 165 """Emit content for version 1 of a streaming clone.
166 166
167 167 This returns a 3-tuple of (file count, byte size, data iterator).
168 168
169 169 The data iterator consists of N entries for each file being transferred.
170 170 Each file entry starts as a line with the file name and integer size
171 171 delimited by a null byte.
172 172
173 173 The raw file data follows. Following the raw file data is the next file
174 174 entry, or EOF.
175 175
176 176 When used on the wire protocol, an additional line indicating protocol
177 177 success will be prepended to the stream. This function is not responsible
178 178 for adding it.
179 179
180 180 This function will obtain a repository lock to ensure a consistent view of
181 181 the store is captured. It therefore may raise LockError.
182 182 """
183 183 entries = []
184 184 total_bytes = 0
185 185 # Get consistent snapshot of repo, lock during scan.
186 186 with repo.lock():
187 187 repo.ui.debug('scanning\n')
188 188 for name, ename, size in _walkstreamfiles(repo):
189 189 if size:
190 190 entries.append((name, size))
191 191 total_bytes += size
192 192
193 193 repo.ui.debug('%d files, %d bytes to transfer\n' %
194 194 (len(entries), total_bytes))
195 195
196 196 svfs = repo.svfs
197 197 oldaudit = svfs.mustaudit
198 198 debugflag = repo.ui.debugflag
199 199 svfs.mustaudit = False
200 200
201 201 def emitrevlogdata():
202 202 try:
203 203 for name, size in entries:
204 204 if debugflag:
205 205 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
206 206 # partially encode name over the wire for backwards compat
207 207 yield '%s\0%d\n' % (store.encodedir(name), size)
208 208 if size <= 65536:
209 yield svfs.read(name)
209 with svfs(name, 'rb') as fp:
210 yield fp.read(size)
210 211 else:
211 212 for chunk in util.filechunkiter(svfs(name), limit=size):
212 213 yield chunk
213 214 finally:
214 215 svfs.mustaudit = oldaudit
215 216
216 217 return len(entries), total_bytes, emitrevlogdata()
217 218
218 219 def generatev1wireproto(repo):
219 220 """Emit content for version 1 of streaming clone suitable for the wire.
220 221
221 222 This is the data output from ``generatev1()`` with a header line
222 223 indicating file count and byte size.
223 224 """
224 225 filecount, bytecount, it = generatev1(repo)
225 226 yield '%d %d\n' % (filecount, bytecount)
226 227 for chunk in it:
227 228 yield chunk
228 229
229 230 def generatebundlev1(repo, compression='UN'):
230 231 """Emit content for version 1 of a stream clone bundle.
231 232
232 233 The first 4 bytes of the output ("HGS1") denote this as stream clone
233 234 bundle version 1.
234 235
235 236 The next 2 bytes indicate the compression type. Only "UN" is currently
236 237 supported.
237 238
238 239 The next 16 bytes are two 64-bit big endian unsigned integers indicating
239 240 file count and byte count, respectively.
240 241
241 242 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
242 243 of the requirements string, including a trailing \0. The following N bytes
243 244 are the requirements string, which is ASCII containing a comma-delimited
244 245 list of repo requirements that are needed to support the data.
245 246
246 247 The remaining content is the output of ``generatev1()`` (which may be
247 248 compressed in the future).
248 249
249 250 Returns a tuple of (requirements, data generator).
250 251 """
251 252 if compression != 'UN':
252 253 raise ValueError('we do not support the compression argument yet')
253 254
254 255 requirements = repo.requirements & repo.supportedformats
255 256 requires = ','.join(sorted(requirements))
256 257
257 258 def gen():
258 259 yield 'HGS1'
259 260 yield compression
260 261
261 262 filecount, bytecount, it = generatev1(repo)
262 263 repo.ui.status(_('writing %d bytes for %d files\n') %
263 264 (bytecount, filecount))
264 265
265 266 yield struct.pack('>QQ', filecount, bytecount)
266 267 yield struct.pack('>H', len(requires) + 1)
267 268 yield requires + '\0'
268 269
269 270 # This is where we'll add compression in the future.
270 271 assert compression == 'UN'
271 272
272 273 seen = 0
273 274 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
274 275
275 276 for chunk in it:
276 277 seen += len(chunk)
277 278 repo.ui.progress(_('bundle'), seen, total=bytecount,
278 279 unit=_('bytes'))
279 280 yield chunk
280 281
281 282 repo.ui.progress(_('bundle'), None)
282 283
283 284 return requirements, gen()
284 285
285 286 def consumev1(repo, fp, filecount, bytecount):
286 287 """Apply the contents from version 1 of a streaming clone file handle.
287 288
288 289 This takes the output from "streamout" and applies it to the specified
289 290 repository.
290 291
291 292 Like "streamout," the status line added by the wire protocol is not handled
292 293 by this function.
293 294 """
294 295 with repo.lock():
295 296 repo.ui.status(_('%d files to transfer, %s of data\n') %
296 297 (filecount, util.bytecount(bytecount)))
297 298 handled_bytes = 0
298 299 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
299 300 start = time.time()
300 301
301 302 with repo.transaction('clone'):
302 303 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
303 304 for i in xrange(filecount):
304 305 # XXX doesn't support '\n' or '\r' in filenames
305 306 l = fp.readline()
306 307 try:
307 308 name, size = l.split('\0', 1)
308 309 size = int(size)
309 310 except (ValueError, TypeError):
310 311 raise error.ResponseError(
311 312 _('unexpected response from remote server:'), l)
312 313 if repo.ui.debugflag:
313 314 repo.ui.debug('adding %s (%s)\n' %
314 315 (name, util.bytecount(size)))
315 316 # for backwards compat, name was partially encoded
316 317 path = store.decodedir(name)
317 318 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
318 319 for chunk in util.filechunkiter(fp, limit=size):
319 320 handled_bytes += len(chunk)
320 321 repo.ui.progress(_('clone'), handled_bytes,
321 322 total=bytecount, unit=_('bytes'))
322 323 ofp.write(chunk)
323 324
324 325 # Writing straight to files circumvented the inmemory caches
325 326 repo.invalidate()
326 327
327 328 elapsed = time.time() - start
328 329 if elapsed <= 0:
329 330 elapsed = 0.001
330 331 repo.ui.progress(_('clone'), None)
331 332 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
332 333 (util.bytecount(bytecount), elapsed,
333 334 util.bytecount(bytecount / elapsed)))
334 335
335 336 def readbundle1header(fp):
336 337 compression = fp.read(2)
337 338 if compression != 'UN':
338 339 raise error.Abort(_('only uncompressed stream clone bundles are '
339 340 'supported; got %s') % compression)
340 341
341 342 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
342 343 requireslen = struct.unpack('>H', fp.read(2))[0]
343 344 requires = fp.read(requireslen)
344 345
345 346 if not requires.endswith('\0'):
346 347 raise error.Abort(_('malformed stream clone bundle: '
347 348 'requirements not properly encoded'))
348 349
349 350 requirements = set(requires.rstrip('\0').split(','))
350 351
351 352 return filecount, bytecount, requirements
352 353
353 354 def applybundlev1(repo, fp):
354 355 """Apply the content from a stream clone bundle version 1.
355 356
356 357 We assume the 4 byte header has been read and validated and the file handle
357 358 is at the 2 byte compression identifier.
358 359 """
359 360 if len(repo):
360 361 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
361 362 'repo'))
362 363
363 364 filecount, bytecount, requirements = readbundle1header(fp)
364 365 missingreqs = requirements - repo.supportedformats
365 366 if missingreqs:
366 367 raise error.Abort(_('unable to apply stream clone: '
367 368 'unsupported format: %s') %
368 369 ', '.join(sorted(missingreqs)))
369 370
370 371 consumev1(repo, fp, filecount, bytecount)
371 372
372 373 class streamcloneapplier(object):
373 374 """Class to manage applying streaming clone bundles.
374 375
375 376 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
376 377 readers to perform bundle type-specific functionality.
377 378 """
378 379 def __init__(self, fh):
379 380 self._fh = fh
380 381
381 382 def apply(self, repo):
382 383 return applybundlev1(repo, self._fh)
@@ -1,52 +1,92 b''
1 1 #require serve
2 2
3 3 Initialize repository
4 4 the status call is to check for issue5130
5 5
6 6 $ hg init server
7 7 $ cd server
8 8 $ touch foo
9 9 $ hg -q commit -A -m initial
10 10 >>> for i in range(1024):
11 11 ... with open(str(i), 'wb') as fh:
12 12 ... fh.write(str(i))
13 13 $ hg -q commit -A -m 'add a lot of files'
14 14 $ hg st
15 15 $ hg serve -p $HGPORT -d --pid-file=hg.pid
16 16 $ cat hg.pid >> $DAEMON_PIDS
17 17 $ cd ..
18 18
19 19 Basic clone
20 20
21 21 $ hg clone --uncompressed -U http://localhost:$HGPORT clone1
22 22 streaming all changes
23 23 1027 files to transfer, 96.3 KB of data
24 24 transferred 96.3 KB in * seconds (*/sec) (glob)
25 25 searching for changes
26 26 no changes found
27 27
28 28 Clone with background file closing enabled
29 29
30 30 $ hg --debug --config worker.backgroundclose=true --config worker.backgroundcloseminfilecount=1 clone --uncompressed -U http://localhost:$HGPORT clone-background | grep -v adding
31 31 using http://localhost:$HGPORT/
32 32 sending capabilities command
33 33 sending branchmap command
34 34 streaming all changes
35 35 sending stream_out command
36 36 1027 files to transfer, 96.3 KB of data
37 37 starting 4 threads for background file closing
38 38 transferred 96.3 KB in * seconds (*/sec) (glob)
39 39 query 1; heads
40 40 sending batch command
41 41 searching for changes
42 42 all remote heads known locally
43 43 no changes found
44 44 sending getbundle command
45 45 bundle2-input-bundle: with-transaction
46 46 bundle2-input-part: "listkeys" (params: 1 mandatory) supported
47 47 bundle2-input-part: "listkeys" (params: 1 mandatory) supported
48 48 bundle2-input-bundle: 1 parts total
49 49 checking for updated bookmarks
50 50 preparing listkeys for "phases"
51 51 sending listkeys command
52 52 received listkey for "phases": 58 bytes
53
54
55 Stream clone while repo is changing:
56
57 $ mkdir changing
58 $ cd changing
59
60 extension for delaying the server process so we reliably can modify the repo
61 while cloning
62
63 $ cat > delayer.py <<EOF
64 > import time
65 > from mercurial import extensions, scmutil
66 > def __call__(orig, self, path, *args, **kwargs):
67 > if path == 'data/f1.i':
68 > time.sleep(2)
69 > return orig(self, path, *args, **kwargs)
70 > extensions.wrapfunction(scmutil.vfs, '__call__', __call__)
71 > EOF
72
73 prepare repo with small and big file to cover both code paths in emitrevlogdata
74
75 $ hg init repo
76 $ touch repo/f1
77 $ $TESTDIR/seq.py 50000 > repo/f2
78 $ hg -R repo ci -Aqm "0"
79 $ hg -R repo serve -p $HGPORT1 -d --pid-file=hg.pid --config extensions.delayer=delayer.py
80 $ cat hg.pid >> $DAEMON_PIDS
81
82 clone while modifying the repo between stating file with write lock and
83 actually serving file content
84
85 $ hg clone -q --uncompressed -U http://localhost:$HGPORT1 clone &
86 $ sleep 1
87 $ echo >> repo/f1
88 $ echo >> repo/f2
89 $ hg -R repo ci -m "1"
90 $ wait
91 $ hg -R clone id
92 000000000000
General Comments 0
You need to be logged in to leave comments. Login now