##// END OF EJS Templates
merge with stable
Matt Mackall -
r28532:ed75909c merge default
parent child Browse files
Show More
@@ -1,382 +1,383 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import struct
10 import struct
11 import time
11 import time
12
12
13 from .i18n import _
13 from .i18n import _
14 from . import (
14 from . import (
15 branchmap,
15 branchmap,
16 error,
16 error,
17 store,
17 store,
18 util,
18 util,
19 )
19 )
20
20
21 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 def canperformstreamclone(pullop, bailifbundle2supported=False):
22 """Whether it is possible to perform a streaming clone as part of pull.
22 """Whether it is possible to perform a streaming clone as part of pull.
23
23
24 ``bailifbundle2supported`` will cause the function to return False if
24 ``bailifbundle2supported`` will cause the function to return False if
25 bundle2 stream clones are supported. It should only be called by the
25 bundle2 stream clones are supported. It should only be called by the
26 legacy stream clone code path.
26 legacy stream clone code path.
27
27
28 Returns a tuple of (supported, requirements). ``supported`` is True if
28 Returns a tuple of (supported, requirements). ``supported`` is True if
29 streaming clone is supported and False otherwise. ``requirements`` is
29 streaming clone is supported and False otherwise. ``requirements`` is
30 a set of repo requirements from the remote, or ``None`` if stream clone
30 a set of repo requirements from the remote, or ``None`` if stream clone
31 isn't supported.
31 isn't supported.
32 """
32 """
33 repo = pullop.repo
33 repo = pullop.repo
34 remote = pullop.remote
34 remote = pullop.remote
35
35
36 bundle2supported = False
36 bundle2supported = False
37 if pullop.canusebundle2:
37 if pullop.canusebundle2:
38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
39 bundle2supported = True
39 bundle2supported = True
40 # else
40 # else
41 # Server doesn't support bundle2 stream clone or doesn't support
41 # Server doesn't support bundle2 stream clone or doesn't support
42 # the versions we support. Fall back and possibly allow legacy.
42 # the versions we support. Fall back and possibly allow legacy.
43
43
44 # Ensures legacy code path uses available bundle2.
44 # Ensures legacy code path uses available bundle2.
45 if bailifbundle2supported and bundle2supported:
45 if bailifbundle2supported and bundle2supported:
46 return False, None
46 return False, None
47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
48 #elif not bailifbundle2supported and not bundle2supported:
48 #elif not bailifbundle2supported and not bundle2supported:
49 # return False, None
49 # return False, None
50
50
51 # Streaming clone only works on empty repositories.
51 # Streaming clone only works on empty repositories.
52 if len(repo):
52 if len(repo):
53 return False, None
53 return False, None
54
54
55 # Streaming clone only works if all data is being requested.
55 # Streaming clone only works if all data is being requested.
56 if pullop.heads:
56 if pullop.heads:
57 return False, None
57 return False, None
58
58
59 streamrequested = pullop.streamclonerequested
59 streamrequested = pullop.streamclonerequested
60
60
61 # If we don't have a preference, let the server decide for us. This
61 # If we don't have a preference, let the server decide for us. This
62 # likely only comes into play in LANs.
62 # likely only comes into play in LANs.
63 if streamrequested is None:
63 if streamrequested is None:
64 # The server can advertise whether to prefer streaming clone.
64 # The server can advertise whether to prefer streaming clone.
65 streamrequested = remote.capable('stream-preferred')
65 streamrequested = remote.capable('stream-preferred')
66
66
67 if not streamrequested:
67 if not streamrequested:
68 return False, None
68 return False, None
69
69
70 # In order for stream clone to work, the client has to support all the
70 # In order for stream clone to work, the client has to support all the
71 # requirements advertised by the server.
71 # requirements advertised by the server.
72 #
72 #
73 # The server advertises its requirements via the "stream" and "streamreqs"
73 # The server advertises its requirements via the "stream" and "streamreqs"
74 # capability. "stream" (a value-less capability) is advertised if and only
74 # capability. "stream" (a value-less capability) is advertised if and only
75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
76 # is advertised and contains a comma-delimited list of requirements.
76 # is advertised and contains a comma-delimited list of requirements.
77 requirements = set()
77 requirements = set()
78 if remote.capable('stream'):
78 if remote.capable('stream'):
79 requirements.add('revlogv1')
79 requirements.add('revlogv1')
80 else:
80 else:
81 streamreqs = remote.capable('streamreqs')
81 streamreqs = remote.capable('streamreqs')
82 # This is weird and shouldn't happen with modern servers.
82 # This is weird and shouldn't happen with modern servers.
83 if not streamreqs:
83 if not streamreqs:
84 return False, None
84 return False, None
85
85
86 streamreqs = set(streamreqs.split(','))
86 streamreqs = set(streamreqs.split(','))
87 # Server requires something we don't support. Bail.
87 # Server requires something we don't support. Bail.
88 if streamreqs - repo.supportedformats:
88 if streamreqs - repo.supportedformats:
89 return False, None
89 return False, None
90 requirements = streamreqs
90 requirements = streamreqs
91
91
92 return True, requirements
92 return True, requirements
93
93
94 def maybeperformlegacystreamclone(pullop):
94 def maybeperformlegacystreamclone(pullop):
95 """Possibly perform a legacy stream clone operation.
95 """Possibly perform a legacy stream clone operation.
96
96
97 Legacy stream clones are performed as part of pull but before all other
97 Legacy stream clones are performed as part of pull but before all other
98 operations.
98 operations.
99
99
100 A legacy stream clone will not be performed if a bundle2 stream clone is
100 A legacy stream clone will not be performed if a bundle2 stream clone is
101 supported.
101 supported.
102 """
102 """
103 supported, requirements = canperformstreamclone(pullop)
103 supported, requirements = canperformstreamclone(pullop)
104
104
105 if not supported:
105 if not supported:
106 return
106 return
107
107
108 repo = pullop.repo
108 repo = pullop.repo
109 remote = pullop.remote
109 remote = pullop.remote
110
110
111 # Save remote branchmap. We will use it later to speed up branchcache
111 # Save remote branchmap. We will use it later to speed up branchcache
112 # creation.
112 # creation.
113 rbranchmap = None
113 rbranchmap = None
114 if remote.capable('branchmap'):
114 if remote.capable('branchmap'):
115 rbranchmap = remote.branchmap()
115 rbranchmap = remote.branchmap()
116
116
117 repo.ui.status(_('streaming all changes\n'))
117 repo.ui.status(_('streaming all changes\n'))
118
118
119 fp = remote.stream_out()
119 fp = remote.stream_out()
120 l = fp.readline()
120 l = fp.readline()
121 try:
121 try:
122 resp = int(l)
122 resp = int(l)
123 except ValueError:
123 except ValueError:
124 raise error.ResponseError(
124 raise error.ResponseError(
125 _('unexpected response from remote server:'), l)
125 _('unexpected response from remote server:'), l)
126 if resp == 1:
126 if resp == 1:
127 raise error.Abort(_('operation forbidden by server'))
127 raise error.Abort(_('operation forbidden by server'))
128 elif resp == 2:
128 elif resp == 2:
129 raise error.Abort(_('locking the remote repository failed'))
129 raise error.Abort(_('locking the remote repository failed'))
130 elif resp != 0:
130 elif resp != 0:
131 raise error.Abort(_('the server sent an unknown error code'))
131 raise error.Abort(_('the server sent an unknown error code'))
132
132
133 l = fp.readline()
133 l = fp.readline()
134 try:
134 try:
135 filecount, bytecount = map(int, l.split(' ', 1))
135 filecount, bytecount = map(int, l.split(' ', 1))
136 except (ValueError, TypeError):
136 except (ValueError, TypeError):
137 raise error.ResponseError(
137 raise error.ResponseError(
138 _('unexpected response from remote server:'), l)
138 _('unexpected response from remote server:'), l)
139
139
140 with repo.lock():
140 with repo.lock():
141 consumev1(repo, fp, filecount, bytecount)
141 consumev1(repo, fp, filecount, bytecount)
142
142
143 # new requirements = old non-format requirements +
143 # new requirements = old non-format requirements +
144 # new format-related remote requirements
144 # new format-related remote requirements
145 # requirements from the streamed-in repository
145 # requirements from the streamed-in repository
146 repo.requirements = requirements | (
146 repo.requirements = requirements | (
147 repo.requirements - repo.supportedformats)
147 repo.requirements - repo.supportedformats)
148 repo._applyopenerreqs()
148 repo._applyopenerreqs()
149 repo._writerequirements()
149 repo._writerequirements()
150
150
151 if rbranchmap:
151 if rbranchmap:
152 branchmap.replacecache(repo, rbranchmap)
152 branchmap.replacecache(repo, rbranchmap)
153
153
154 repo.invalidate()
154 repo.invalidate()
155
155
156 def allowservergeneration(ui):
156 def allowservergeneration(ui):
157 """Whether streaming clones are allowed from the server."""
157 """Whether streaming clones are allowed from the server."""
158 return ui.configbool('server', 'uncompressed', True, untrusted=True)
158 return ui.configbool('server', 'uncompressed', True, untrusted=True)
159
159
160 # This is it's own function so extensions can override it.
160 # This is it's own function so extensions can override it.
161 def _walkstreamfiles(repo):
161 def _walkstreamfiles(repo):
162 return repo.store.walk()
162 return repo.store.walk()
163
163
164 def generatev1(repo):
164 def generatev1(repo):
165 """Emit content for version 1 of a streaming clone.
165 """Emit content for version 1 of a streaming clone.
166
166
167 This returns a 3-tuple of (file count, byte size, data iterator).
167 This returns a 3-tuple of (file count, byte size, data iterator).
168
168
169 The data iterator consists of N entries for each file being transferred.
169 The data iterator consists of N entries for each file being transferred.
170 Each file entry starts as a line with the file name and integer size
170 Each file entry starts as a line with the file name and integer size
171 delimited by a null byte.
171 delimited by a null byte.
172
172
173 The raw file data follows. Following the raw file data is the next file
173 The raw file data follows. Following the raw file data is the next file
174 entry, or EOF.
174 entry, or EOF.
175
175
176 When used on the wire protocol, an additional line indicating protocol
176 When used on the wire protocol, an additional line indicating protocol
177 success will be prepended to the stream. This function is not responsible
177 success will be prepended to the stream. This function is not responsible
178 for adding it.
178 for adding it.
179
179
180 This function will obtain a repository lock to ensure a consistent view of
180 This function will obtain a repository lock to ensure a consistent view of
181 the store is captured. It therefore may raise LockError.
181 the store is captured. It therefore may raise LockError.
182 """
182 """
183 entries = []
183 entries = []
184 total_bytes = 0
184 total_bytes = 0
185 # Get consistent snapshot of repo, lock during scan.
185 # Get consistent snapshot of repo, lock during scan.
186 with repo.lock():
186 with repo.lock():
187 repo.ui.debug('scanning\n')
187 repo.ui.debug('scanning\n')
188 for name, ename, size in _walkstreamfiles(repo):
188 for name, ename, size in _walkstreamfiles(repo):
189 if size:
189 if size:
190 entries.append((name, size))
190 entries.append((name, size))
191 total_bytes += size
191 total_bytes += size
192
192
193 repo.ui.debug('%d files, %d bytes to transfer\n' %
193 repo.ui.debug('%d files, %d bytes to transfer\n' %
194 (len(entries), total_bytes))
194 (len(entries), total_bytes))
195
195
196 svfs = repo.svfs
196 svfs = repo.svfs
197 oldaudit = svfs.mustaudit
197 oldaudit = svfs.mustaudit
198 debugflag = repo.ui.debugflag
198 debugflag = repo.ui.debugflag
199 svfs.mustaudit = False
199 svfs.mustaudit = False
200
200
201 def emitrevlogdata():
201 def emitrevlogdata():
202 try:
202 try:
203 for name, size in entries:
203 for name, size in entries:
204 if debugflag:
204 if debugflag:
205 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
205 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
206 # partially encode name over the wire for backwards compat
206 # partially encode name over the wire for backwards compat
207 yield '%s\0%d\n' % (store.encodedir(name), size)
207 yield '%s\0%d\n' % (store.encodedir(name), size)
208 if size <= 65536:
208 if size <= 65536:
209 yield svfs.read(name)
209 with svfs(name, 'rb') as fp:
210 yield fp.read(size)
210 else:
211 else:
211 for chunk in util.filechunkiter(svfs(name), limit=size):
212 for chunk in util.filechunkiter(svfs(name), limit=size):
212 yield chunk
213 yield chunk
213 finally:
214 finally:
214 svfs.mustaudit = oldaudit
215 svfs.mustaudit = oldaudit
215
216
216 return len(entries), total_bytes, emitrevlogdata()
217 return len(entries), total_bytes, emitrevlogdata()
217
218
218 def generatev1wireproto(repo):
219 def generatev1wireproto(repo):
219 """Emit content for version 1 of streaming clone suitable for the wire.
220 """Emit content for version 1 of streaming clone suitable for the wire.
220
221
221 This is the data output from ``generatev1()`` with a header line
222 This is the data output from ``generatev1()`` with a header line
222 indicating file count and byte size.
223 indicating file count and byte size.
223 """
224 """
224 filecount, bytecount, it = generatev1(repo)
225 filecount, bytecount, it = generatev1(repo)
225 yield '%d %d\n' % (filecount, bytecount)
226 yield '%d %d\n' % (filecount, bytecount)
226 for chunk in it:
227 for chunk in it:
227 yield chunk
228 yield chunk
228
229
229 def generatebundlev1(repo, compression='UN'):
230 def generatebundlev1(repo, compression='UN'):
230 """Emit content for version 1 of a stream clone bundle.
231 """Emit content for version 1 of a stream clone bundle.
231
232
232 The first 4 bytes of the output ("HGS1") denote this as stream clone
233 The first 4 bytes of the output ("HGS1") denote this as stream clone
233 bundle version 1.
234 bundle version 1.
234
235
235 The next 2 bytes indicate the compression type. Only "UN" is currently
236 The next 2 bytes indicate the compression type. Only "UN" is currently
236 supported.
237 supported.
237
238
238 The next 16 bytes are two 64-bit big endian unsigned integers indicating
239 The next 16 bytes are two 64-bit big endian unsigned integers indicating
239 file count and byte count, respectively.
240 file count and byte count, respectively.
240
241
241 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
242 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
242 of the requirements string, including a trailing \0. The following N bytes
243 of the requirements string, including a trailing \0. The following N bytes
243 are the requirements string, which is ASCII containing a comma-delimited
244 are the requirements string, which is ASCII containing a comma-delimited
244 list of repo requirements that are needed to support the data.
245 list of repo requirements that are needed to support the data.
245
246
246 The remaining content is the output of ``generatev1()`` (which may be
247 The remaining content is the output of ``generatev1()`` (which may be
247 compressed in the future).
248 compressed in the future).
248
249
249 Returns a tuple of (requirements, data generator).
250 Returns a tuple of (requirements, data generator).
250 """
251 """
251 if compression != 'UN':
252 if compression != 'UN':
252 raise ValueError('we do not support the compression argument yet')
253 raise ValueError('we do not support the compression argument yet')
253
254
254 requirements = repo.requirements & repo.supportedformats
255 requirements = repo.requirements & repo.supportedformats
255 requires = ','.join(sorted(requirements))
256 requires = ','.join(sorted(requirements))
256
257
257 def gen():
258 def gen():
258 yield 'HGS1'
259 yield 'HGS1'
259 yield compression
260 yield compression
260
261
261 filecount, bytecount, it = generatev1(repo)
262 filecount, bytecount, it = generatev1(repo)
262 repo.ui.status(_('writing %d bytes for %d files\n') %
263 repo.ui.status(_('writing %d bytes for %d files\n') %
263 (bytecount, filecount))
264 (bytecount, filecount))
264
265
265 yield struct.pack('>QQ', filecount, bytecount)
266 yield struct.pack('>QQ', filecount, bytecount)
266 yield struct.pack('>H', len(requires) + 1)
267 yield struct.pack('>H', len(requires) + 1)
267 yield requires + '\0'
268 yield requires + '\0'
268
269
269 # This is where we'll add compression in the future.
270 # This is where we'll add compression in the future.
270 assert compression == 'UN'
271 assert compression == 'UN'
271
272
272 seen = 0
273 seen = 0
273 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
274 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
274
275
275 for chunk in it:
276 for chunk in it:
276 seen += len(chunk)
277 seen += len(chunk)
277 repo.ui.progress(_('bundle'), seen, total=bytecount,
278 repo.ui.progress(_('bundle'), seen, total=bytecount,
278 unit=_('bytes'))
279 unit=_('bytes'))
279 yield chunk
280 yield chunk
280
281
281 repo.ui.progress(_('bundle'), None)
282 repo.ui.progress(_('bundle'), None)
282
283
283 return requirements, gen()
284 return requirements, gen()
284
285
285 def consumev1(repo, fp, filecount, bytecount):
286 def consumev1(repo, fp, filecount, bytecount):
286 """Apply the contents from version 1 of a streaming clone file handle.
287 """Apply the contents from version 1 of a streaming clone file handle.
287
288
288 This takes the output from "streamout" and applies it to the specified
289 This takes the output from "streamout" and applies it to the specified
289 repository.
290 repository.
290
291
291 Like "streamout," the status line added by the wire protocol is not handled
292 Like "streamout," the status line added by the wire protocol is not handled
292 by this function.
293 by this function.
293 """
294 """
294 with repo.lock():
295 with repo.lock():
295 repo.ui.status(_('%d files to transfer, %s of data\n') %
296 repo.ui.status(_('%d files to transfer, %s of data\n') %
296 (filecount, util.bytecount(bytecount)))
297 (filecount, util.bytecount(bytecount)))
297 handled_bytes = 0
298 handled_bytes = 0
298 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
299 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
299 start = time.time()
300 start = time.time()
300
301
301 with repo.transaction('clone'):
302 with repo.transaction('clone'):
302 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
303 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
303 for i in xrange(filecount):
304 for i in xrange(filecount):
304 # XXX doesn't support '\n' or '\r' in filenames
305 # XXX doesn't support '\n' or '\r' in filenames
305 l = fp.readline()
306 l = fp.readline()
306 try:
307 try:
307 name, size = l.split('\0', 1)
308 name, size = l.split('\0', 1)
308 size = int(size)
309 size = int(size)
309 except (ValueError, TypeError):
310 except (ValueError, TypeError):
310 raise error.ResponseError(
311 raise error.ResponseError(
311 _('unexpected response from remote server:'), l)
312 _('unexpected response from remote server:'), l)
312 if repo.ui.debugflag:
313 if repo.ui.debugflag:
313 repo.ui.debug('adding %s (%s)\n' %
314 repo.ui.debug('adding %s (%s)\n' %
314 (name, util.bytecount(size)))
315 (name, util.bytecount(size)))
315 # for backwards compat, name was partially encoded
316 # for backwards compat, name was partially encoded
316 path = store.decodedir(name)
317 path = store.decodedir(name)
317 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
318 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
318 for chunk in util.filechunkiter(fp, limit=size):
319 for chunk in util.filechunkiter(fp, limit=size):
319 handled_bytes += len(chunk)
320 handled_bytes += len(chunk)
320 repo.ui.progress(_('clone'), handled_bytes,
321 repo.ui.progress(_('clone'), handled_bytes,
321 total=bytecount, unit=_('bytes'))
322 total=bytecount, unit=_('bytes'))
322 ofp.write(chunk)
323 ofp.write(chunk)
323
324
324 # Writing straight to files circumvented the inmemory caches
325 # Writing straight to files circumvented the inmemory caches
325 repo.invalidate()
326 repo.invalidate()
326
327
327 elapsed = time.time() - start
328 elapsed = time.time() - start
328 if elapsed <= 0:
329 if elapsed <= 0:
329 elapsed = 0.001
330 elapsed = 0.001
330 repo.ui.progress(_('clone'), None)
331 repo.ui.progress(_('clone'), None)
331 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
332 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
332 (util.bytecount(bytecount), elapsed,
333 (util.bytecount(bytecount), elapsed,
333 util.bytecount(bytecount / elapsed)))
334 util.bytecount(bytecount / elapsed)))
334
335
335 def readbundle1header(fp):
336 def readbundle1header(fp):
336 compression = fp.read(2)
337 compression = fp.read(2)
337 if compression != 'UN':
338 if compression != 'UN':
338 raise error.Abort(_('only uncompressed stream clone bundles are '
339 raise error.Abort(_('only uncompressed stream clone bundles are '
339 'supported; got %s') % compression)
340 'supported; got %s') % compression)
340
341
341 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
342 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
342 requireslen = struct.unpack('>H', fp.read(2))[0]
343 requireslen = struct.unpack('>H', fp.read(2))[0]
343 requires = fp.read(requireslen)
344 requires = fp.read(requireslen)
344
345
345 if not requires.endswith('\0'):
346 if not requires.endswith('\0'):
346 raise error.Abort(_('malformed stream clone bundle: '
347 raise error.Abort(_('malformed stream clone bundle: '
347 'requirements not properly encoded'))
348 'requirements not properly encoded'))
348
349
349 requirements = set(requires.rstrip('\0').split(','))
350 requirements = set(requires.rstrip('\0').split(','))
350
351
351 return filecount, bytecount, requirements
352 return filecount, bytecount, requirements
352
353
353 def applybundlev1(repo, fp):
354 def applybundlev1(repo, fp):
354 """Apply the content from a stream clone bundle version 1.
355 """Apply the content from a stream clone bundle version 1.
355
356
356 We assume the 4 byte header has been read and validated and the file handle
357 We assume the 4 byte header has been read and validated and the file handle
357 is at the 2 byte compression identifier.
358 is at the 2 byte compression identifier.
358 """
359 """
359 if len(repo):
360 if len(repo):
360 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
361 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
361 'repo'))
362 'repo'))
362
363
363 filecount, bytecount, requirements = readbundle1header(fp)
364 filecount, bytecount, requirements = readbundle1header(fp)
364 missingreqs = requirements - repo.supportedformats
365 missingreqs = requirements - repo.supportedformats
365 if missingreqs:
366 if missingreqs:
366 raise error.Abort(_('unable to apply stream clone: '
367 raise error.Abort(_('unable to apply stream clone: '
367 'unsupported format: %s') %
368 'unsupported format: %s') %
368 ', '.join(sorted(missingreqs)))
369 ', '.join(sorted(missingreqs)))
369
370
370 consumev1(repo, fp, filecount, bytecount)
371 consumev1(repo, fp, filecount, bytecount)
371
372
372 class streamcloneapplier(object):
373 class streamcloneapplier(object):
373 """Class to manage applying streaming clone bundles.
374 """Class to manage applying streaming clone bundles.
374
375
375 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
376 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
376 readers to perform bundle type-specific functionality.
377 readers to perform bundle type-specific functionality.
377 """
378 """
378 def __init__(self, fh):
379 def __init__(self, fh):
379 self._fh = fh
380 self._fh = fh
380
381
381 def apply(self, repo):
382 def apply(self, repo):
382 return applybundlev1(repo, self._fh)
383 return applybundlev1(repo, self._fh)
@@ -1,52 +1,92 b''
1 #require serve
1 #require serve
2
2
3 Initialize repository
3 Initialize repository
4 the status call is to check for issue5130
4 the status call is to check for issue5130
5
5
6 $ hg init server
6 $ hg init server
7 $ cd server
7 $ cd server
8 $ touch foo
8 $ touch foo
9 $ hg -q commit -A -m initial
9 $ hg -q commit -A -m initial
10 >>> for i in range(1024):
10 >>> for i in range(1024):
11 ... with open(str(i), 'wb') as fh:
11 ... with open(str(i), 'wb') as fh:
12 ... fh.write(str(i))
12 ... fh.write(str(i))
13 $ hg -q commit -A -m 'add a lot of files'
13 $ hg -q commit -A -m 'add a lot of files'
14 $ hg st
14 $ hg st
15 $ hg serve -p $HGPORT -d --pid-file=hg.pid
15 $ hg serve -p $HGPORT -d --pid-file=hg.pid
16 $ cat hg.pid >> $DAEMON_PIDS
16 $ cat hg.pid >> $DAEMON_PIDS
17 $ cd ..
17 $ cd ..
18
18
19 Basic clone
19 Basic clone
20
20
21 $ hg clone --uncompressed -U http://localhost:$HGPORT clone1
21 $ hg clone --uncompressed -U http://localhost:$HGPORT clone1
22 streaming all changes
22 streaming all changes
23 1027 files to transfer, 96.3 KB of data
23 1027 files to transfer, 96.3 KB of data
24 transferred 96.3 KB in * seconds (*/sec) (glob)
24 transferred 96.3 KB in * seconds (*/sec) (glob)
25 searching for changes
25 searching for changes
26 no changes found
26 no changes found
27
27
28 Clone with background file closing enabled
28 Clone with background file closing enabled
29
29
30 $ hg --debug --config worker.backgroundclose=true --config worker.backgroundcloseminfilecount=1 clone --uncompressed -U http://localhost:$HGPORT clone-background | grep -v adding
30 $ hg --debug --config worker.backgroundclose=true --config worker.backgroundcloseminfilecount=1 clone --uncompressed -U http://localhost:$HGPORT clone-background | grep -v adding
31 using http://localhost:$HGPORT/
31 using http://localhost:$HGPORT/
32 sending capabilities command
32 sending capabilities command
33 sending branchmap command
33 sending branchmap command
34 streaming all changes
34 streaming all changes
35 sending stream_out command
35 sending stream_out command
36 1027 files to transfer, 96.3 KB of data
36 1027 files to transfer, 96.3 KB of data
37 starting 4 threads for background file closing
37 starting 4 threads for background file closing
38 transferred 96.3 KB in * seconds (*/sec) (glob)
38 transferred 96.3 KB in * seconds (*/sec) (glob)
39 query 1; heads
39 query 1; heads
40 sending batch command
40 sending batch command
41 searching for changes
41 searching for changes
42 all remote heads known locally
42 all remote heads known locally
43 no changes found
43 no changes found
44 sending getbundle command
44 sending getbundle command
45 bundle2-input-bundle: with-transaction
45 bundle2-input-bundle: with-transaction
46 bundle2-input-part: "listkeys" (params: 1 mandatory) supported
46 bundle2-input-part: "listkeys" (params: 1 mandatory) supported
47 bundle2-input-part: "listkeys" (params: 1 mandatory) supported
47 bundle2-input-part: "listkeys" (params: 1 mandatory) supported
48 bundle2-input-bundle: 1 parts total
48 bundle2-input-bundle: 1 parts total
49 checking for updated bookmarks
49 checking for updated bookmarks
50 preparing listkeys for "phases"
50 preparing listkeys for "phases"
51 sending listkeys command
51 sending listkeys command
52 received listkey for "phases": 58 bytes
52 received listkey for "phases": 58 bytes
53
54
55 Stream clone while repo is changing:
56
57 $ mkdir changing
58 $ cd changing
59
60 extension for delaying the server process so we reliably can modify the repo
61 while cloning
62
63 $ cat > delayer.py <<EOF
64 > import time
65 > from mercurial import extensions, scmutil
66 > def __call__(orig, self, path, *args, **kwargs):
67 > if path == 'data/f1.i':
68 > time.sleep(2)
69 > return orig(self, path, *args, **kwargs)
70 > extensions.wrapfunction(scmutil.vfs, '__call__', __call__)
71 > EOF
72
73 prepare repo with small and big file to cover both code paths in emitrevlogdata
74
75 $ hg init repo
76 $ touch repo/f1
77 $ $TESTDIR/seq.py 50000 > repo/f2
78 $ hg -R repo ci -Aqm "0"
79 $ hg -R repo serve -p $HGPORT1 -d --pid-file=hg.pid --config extensions.delayer=delayer.py
80 $ cat hg.pid >> $DAEMON_PIDS
81
82 clone while modifying the repo between stating file with write lock and
83 actually serving file content
84
85 $ hg clone -q --uncompressed -U http://localhost:$HGPORT1 clone &
86 $ sleep 1
87 $ echo >> repo/f1
88 $ echo >> repo/f2
89 $ hg -R repo ci -m "1"
90 $ wait
91 $ hg -R clone id
92 000000000000
General Comments 0
You need to be logged in to leave comments. Login now