##// END OF EJS Templates
streamclone: reimplement nested context manager...
Augie Fackler -
r39793:97f2992c default
parent child Browse files
Show More
@@ -1,647 +1,647 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import os
11 import os
12 import struct
12 import struct
13 import warnings
14
13
15 from .i18n import _
14 from .i18n import _
16 from . import (
15 from . import (
17 branchmap,
16 branchmap,
18 cacheutil,
17 cacheutil,
19 error,
18 error,
20 phases,
19 phases,
21 pycompat,
20 pycompat,
22 store,
21 store,
23 util,
22 util,
24 )
23 )
25
24
26 def canperformstreamclone(pullop, bundle2=False):
25 def canperformstreamclone(pullop, bundle2=False):
27 """Whether it is possible to perform a streaming clone as part of pull.
26 """Whether it is possible to perform a streaming clone as part of pull.
28
27
29 ``bundle2`` will cause the function to consider stream clone through
28 ``bundle2`` will cause the function to consider stream clone through
30 bundle2 and only through bundle2.
29 bundle2 and only through bundle2.
31
30
32 Returns a tuple of (supported, requirements). ``supported`` is True if
31 Returns a tuple of (supported, requirements). ``supported`` is True if
33 streaming clone is supported and False otherwise. ``requirements`` is
32 streaming clone is supported and False otherwise. ``requirements`` is
34 a set of repo requirements from the remote, or ``None`` if stream clone
33 a set of repo requirements from the remote, or ``None`` if stream clone
35 isn't supported.
34 isn't supported.
36 """
35 """
37 repo = pullop.repo
36 repo = pullop.repo
38 remote = pullop.remote
37 remote = pullop.remote
39
38
40 bundle2supported = False
39 bundle2supported = False
41 if pullop.canusebundle2:
40 if pullop.canusebundle2:
42 if 'v2' in pullop.remotebundle2caps.get('stream', []):
41 if 'v2' in pullop.remotebundle2caps.get('stream', []):
43 bundle2supported = True
42 bundle2supported = True
44 # else
43 # else
45 # Server doesn't support bundle2 stream clone or doesn't support
44 # Server doesn't support bundle2 stream clone or doesn't support
46 # the versions we support. Fall back and possibly allow legacy.
45 # the versions we support. Fall back and possibly allow legacy.
47
46
48 # Ensures legacy code path uses available bundle2.
47 # Ensures legacy code path uses available bundle2.
49 if bundle2supported and not bundle2:
48 if bundle2supported and not bundle2:
50 return False, None
49 return False, None
51 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
50 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
52 elif bundle2 and not bundle2supported:
51 elif bundle2 and not bundle2supported:
53 return False, None
52 return False, None
54
53
55 # Streaming clone only works on empty repositories.
54 # Streaming clone only works on empty repositories.
56 if len(repo):
55 if len(repo):
57 return False, None
56 return False, None
58
57
59 # Streaming clone only works if all data is being requested.
58 # Streaming clone only works if all data is being requested.
60 if pullop.heads:
59 if pullop.heads:
61 return False, None
60 return False, None
62
61
63 streamrequested = pullop.streamclonerequested
62 streamrequested = pullop.streamclonerequested
64
63
65 # If we don't have a preference, let the server decide for us. This
64 # If we don't have a preference, let the server decide for us. This
66 # likely only comes into play in LANs.
65 # likely only comes into play in LANs.
67 if streamrequested is None:
66 if streamrequested is None:
68 # The server can advertise whether to prefer streaming clone.
67 # The server can advertise whether to prefer streaming clone.
69 streamrequested = remote.capable('stream-preferred')
68 streamrequested = remote.capable('stream-preferred')
70
69
71 if not streamrequested:
70 if not streamrequested:
72 return False, None
71 return False, None
73
72
74 # In order for stream clone to work, the client has to support all the
73 # In order for stream clone to work, the client has to support all the
75 # requirements advertised by the server.
74 # requirements advertised by the server.
76 #
75 #
77 # The server advertises its requirements via the "stream" and "streamreqs"
76 # The server advertises its requirements via the "stream" and "streamreqs"
78 # capability. "stream" (a value-less capability) is advertised if and only
77 # capability. "stream" (a value-less capability) is advertised if and only
79 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
78 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
80 # is advertised and contains a comma-delimited list of requirements.
79 # is advertised and contains a comma-delimited list of requirements.
81 requirements = set()
80 requirements = set()
82 if remote.capable('stream'):
81 if remote.capable('stream'):
83 requirements.add('revlogv1')
82 requirements.add('revlogv1')
84 else:
83 else:
85 streamreqs = remote.capable('streamreqs')
84 streamreqs = remote.capable('streamreqs')
86 # This is weird and shouldn't happen with modern servers.
85 # This is weird and shouldn't happen with modern servers.
87 if not streamreqs:
86 if not streamreqs:
88 pullop.repo.ui.warn(_(
87 pullop.repo.ui.warn(_(
89 'warning: stream clone requested but server has them '
88 'warning: stream clone requested but server has them '
90 'disabled\n'))
89 'disabled\n'))
91 return False, None
90 return False, None
92
91
93 streamreqs = set(streamreqs.split(','))
92 streamreqs = set(streamreqs.split(','))
94 # Server requires something we don't support. Bail.
93 # Server requires something we don't support. Bail.
95 missingreqs = streamreqs - repo.supportedformats
94 missingreqs = streamreqs - repo.supportedformats
96 if missingreqs:
95 if missingreqs:
97 pullop.repo.ui.warn(_(
96 pullop.repo.ui.warn(_(
98 'warning: stream clone requested but client is missing '
97 'warning: stream clone requested but client is missing '
99 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
98 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
100 pullop.repo.ui.warn(
99 pullop.repo.ui.warn(
101 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
100 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
102 'for more information)\n'))
101 'for more information)\n'))
103 return False, None
102 return False, None
104 requirements = streamreqs
103 requirements = streamreqs
105
104
106 return True, requirements
105 return True, requirements
107
106
108 def maybeperformlegacystreamclone(pullop):
107 def maybeperformlegacystreamclone(pullop):
109 """Possibly perform a legacy stream clone operation.
108 """Possibly perform a legacy stream clone operation.
110
109
111 Legacy stream clones are performed as part of pull but before all other
110 Legacy stream clones are performed as part of pull but before all other
112 operations.
111 operations.
113
112
114 A legacy stream clone will not be performed if a bundle2 stream clone is
113 A legacy stream clone will not be performed if a bundle2 stream clone is
115 supported.
114 supported.
116 """
115 """
117 from . import localrepo
116 from . import localrepo
118
117
119 supported, requirements = canperformstreamclone(pullop)
118 supported, requirements = canperformstreamclone(pullop)
120
119
121 if not supported:
120 if not supported:
122 return
121 return
123
122
124 repo = pullop.repo
123 repo = pullop.repo
125 remote = pullop.remote
124 remote = pullop.remote
126
125
127 # Save remote branchmap. We will use it later to speed up branchcache
126 # Save remote branchmap. We will use it later to speed up branchcache
128 # creation.
127 # creation.
129 rbranchmap = None
128 rbranchmap = None
130 if remote.capable('branchmap'):
129 if remote.capable('branchmap'):
131 with remote.commandexecutor() as e:
130 with remote.commandexecutor() as e:
132 rbranchmap = e.callcommand('branchmap', {}).result()
131 rbranchmap = e.callcommand('branchmap', {}).result()
133
132
134 repo.ui.status(_('streaming all changes\n'))
133 repo.ui.status(_('streaming all changes\n'))
135
134
136 with remote.commandexecutor() as e:
135 with remote.commandexecutor() as e:
137 fp = e.callcommand('stream_out', {}).result()
136 fp = e.callcommand('stream_out', {}).result()
138
137
139 # TODO strictly speaking, this code should all be inside the context
138 # TODO strictly speaking, this code should all be inside the context
140 # manager because the context manager is supposed to ensure all wire state
139 # manager because the context manager is supposed to ensure all wire state
141 # is flushed when exiting. But the legacy peers don't do this, so it
140 # is flushed when exiting. But the legacy peers don't do this, so it
142 # doesn't matter.
141 # doesn't matter.
143 l = fp.readline()
142 l = fp.readline()
144 try:
143 try:
145 resp = int(l)
144 resp = int(l)
146 except ValueError:
145 except ValueError:
147 raise error.ResponseError(
146 raise error.ResponseError(
148 _('unexpected response from remote server:'), l)
147 _('unexpected response from remote server:'), l)
149 if resp == 1:
148 if resp == 1:
150 raise error.Abort(_('operation forbidden by server'))
149 raise error.Abort(_('operation forbidden by server'))
151 elif resp == 2:
150 elif resp == 2:
152 raise error.Abort(_('locking the remote repository failed'))
151 raise error.Abort(_('locking the remote repository failed'))
153 elif resp != 0:
152 elif resp != 0:
154 raise error.Abort(_('the server sent an unknown error code'))
153 raise error.Abort(_('the server sent an unknown error code'))
155
154
156 l = fp.readline()
155 l = fp.readline()
157 try:
156 try:
158 filecount, bytecount = map(int, l.split(' ', 1))
157 filecount, bytecount = map(int, l.split(' ', 1))
159 except (ValueError, TypeError):
158 except (ValueError, TypeError):
160 raise error.ResponseError(
159 raise error.ResponseError(
161 _('unexpected response from remote server:'), l)
160 _('unexpected response from remote server:'), l)
162
161
163 with repo.lock():
162 with repo.lock():
164 consumev1(repo, fp, filecount, bytecount)
163 consumev1(repo, fp, filecount, bytecount)
165
164
166 # new requirements = old non-format requirements +
165 # new requirements = old non-format requirements +
167 # new format-related remote requirements
166 # new format-related remote requirements
168 # requirements from the streamed-in repository
167 # requirements from the streamed-in repository
169 repo.requirements = requirements | (
168 repo.requirements = requirements | (
170 repo.requirements - repo.supportedformats)
169 repo.requirements - repo.supportedformats)
171 repo.svfs.options = localrepo.resolvestorevfsoptions(
170 repo.svfs.options = localrepo.resolvestorevfsoptions(
172 repo.ui, repo.requirements)
171 repo.ui, repo.requirements)
173 repo._writerequirements()
172 repo._writerequirements()
174
173
175 if rbranchmap:
174 if rbranchmap:
176 branchmap.replacecache(repo, rbranchmap)
175 branchmap.replacecache(repo, rbranchmap)
177
176
178 repo.invalidate()
177 repo.invalidate()
179
178
180 def allowservergeneration(repo):
179 def allowservergeneration(repo):
181 """Whether streaming clones are allowed from the server."""
180 """Whether streaming clones are allowed from the server."""
182 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
181 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
183 return False
182 return False
184
183
185 # The way stream clone works makes it impossible to hide secret changesets.
184 # The way stream clone works makes it impossible to hide secret changesets.
186 # So don't allow this by default.
185 # So don't allow this by default.
187 secret = phases.hassecret(repo)
186 secret = phases.hassecret(repo)
188 if secret:
187 if secret:
189 return repo.ui.configbool('server', 'uncompressedallowsecret')
188 return repo.ui.configbool('server', 'uncompressedallowsecret')
190
189
191 return True
190 return True
192
191
193 # This is it's own function so extensions can override it.
192 # This is it's own function so extensions can override it.
194 def _walkstreamfiles(repo):
193 def _walkstreamfiles(repo):
195 return repo.store.walk()
194 return repo.store.walk()
196
195
197 def generatev1(repo):
196 def generatev1(repo):
198 """Emit content for version 1 of a streaming clone.
197 """Emit content for version 1 of a streaming clone.
199
198
200 This returns a 3-tuple of (file count, byte size, data iterator).
199 This returns a 3-tuple of (file count, byte size, data iterator).
201
200
202 The data iterator consists of N entries for each file being transferred.
201 The data iterator consists of N entries for each file being transferred.
203 Each file entry starts as a line with the file name and integer size
202 Each file entry starts as a line with the file name and integer size
204 delimited by a null byte.
203 delimited by a null byte.
205
204
206 The raw file data follows. Following the raw file data is the next file
205 The raw file data follows. Following the raw file data is the next file
207 entry, or EOF.
206 entry, or EOF.
208
207
209 When used on the wire protocol, an additional line indicating protocol
208 When used on the wire protocol, an additional line indicating protocol
210 success will be prepended to the stream. This function is not responsible
209 success will be prepended to the stream. This function is not responsible
211 for adding it.
210 for adding it.
212
211
213 This function will obtain a repository lock to ensure a consistent view of
212 This function will obtain a repository lock to ensure a consistent view of
214 the store is captured. It therefore may raise LockError.
213 the store is captured. It therefore may raise LockError.
215 """
214 """
216 entries = []
215 entries = []
217 total_bytes = 0
216 total_bytes = 0
218 # Get consistent snapshot of repo, lock during scan.
217 # Get consistent snapshot of repo, lock during scan.
219 with repo.lock():
218 with repo.lock():
220 repo.ui.debug('scanning\n')
219 repo.ui.debug('scanning\n')
221 for name, ename, size in _walkstreamfiles(repo):
220 for name, ename, size in _walkstreamfiles(repo):
222 if size:
221 if size:
223 entries.append((name, size))
222 entries.append((name, size))
224 total_bytes += size
223 total_bytes += size
225
224
226 repo.ui.debug('%d files, %d bytes to transfer\n' %
225 repo.ui.debug('%d files, %d bytes to transfer\n' %
227 (len(entries), total_bytes))
226 (len(entries), total_bytes))
228
227
229 svfs = repo.svfs
228 svfs = repo.svfs
230 debugflag = repo.ui.debugflag
229 debugflag = repo.ui.debugflag
231
230
232 def emitrevlogdata():
231 def emitrevlogdata():
233 for name, size in entries:
232 for name, size in entries:
234 if debugflag:
233 if debugflag:
235 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
234 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
236 # partially encode name over the wire for backwards compat
235 # partially encode name over the wire for backwards compat
237 yield '%s\0%d\n' % (store.encodedir(name), size)
236 yield '%s\0%d\n' % (store.encodedir(name), size)
238 # auditing at this stage is both pointless (paths are already
237 # auditing at this stage is both pointless (paths are already
239 # trusted by the local repo) and expensive
238 # trusted by the local repo) and expensive
240 with svfs(name, 'rb', auditpath=False) as fp:
239 with svfs(name, 'rb', auditpath=False) as fp:
241 if size <= 65536:
240 if size <= 65536:
242 yield fp.read(size)
241 yield fp.read(size)
243 else:
242 else:
244 for chunk in util.filechunkiter(fp, limit=size):
243 for chunk in util.filechunkiter(fp, limit=size):
245 yield chunk
244 yield chunk
246
245
247 return len(entries), total_bytes, emitrevlogdata()
246 return len(entries), total_bytes, emitrevlogdata()
248
247
249 def generatev1wireproto(repo):
248 def generatev1wireproto(repo):
250 """Emit content for version 1 of streaming clone suitable for the wire.
249 """Emit content for version 1 of streaming clone suitable for the wire.
251
250
252 This is the data output from ``generatev1()`` with 2 header lines. The
251 This is the data output from ``generatev1()`` with 2 header lines. The
253 first line indicates overall success. The 2nd contains the file count and
252 first line indicates overall success. The 2nd contains the file count and
254 byte size of payload.
253 byte size of payload.
255
254
256 The success line contains "0" for success, "1" for stream generation not
255 The success line contains "0" for success, "1" for stream generation not
257 allowed, and "2" for error locking the repository (possibly indicating
256 allowed, and "2" for error locking the repository (possibly indicating
258 a permissions error for the server process).
257 a permissions error for the server process).
259 """
258 """
260 if not allowservergeneration(repo):
259 if not allowservergeneration(repo):
261 yield '1\n'
260 yield '1\n'
262 return
261 return
263
262
264 try:
263 try:
265 filecount, bytecount, it = generatev1(repo)
264 filecount, bytecount, it = generatev1(repo)
266 except error.LockError:
265 except error.LockError:
267 yield '2\n'
266 yield '2\n'
268 return
267 return
269
268
270 # Indicates successful response.
269 # Indicates successful response.
271 yield '0\n'
270 yield '0\n'
272 yield '%d %d\n' % (filecount, bytecount)
271 yield '%d %d\n' % (filecount, bytecount)
273 for chunk in it:
272 for chunk in it:
274 yield chunk
273 yield chunk
275
274
276 def generatebundlev1(repo, compression='UN'):
275 def generatebundlev1(repo, compression='UN'):
277 """Emit content for version 1 of a stream clone bundle.
276 """Emit content for version 1 of a stream clone bundle.
278
277
279 The first 4 bytes of the output ("HGS1") denote this as stream clone
278 The first 4 bytes of the output ("HGS1") denote this as stream clone
280 bundle version 1.
279 bundle version 1.
281
280
282 The next 2 bytes indicate the compression type. Only "UN" is currently
281 The next 2 bytes indicate the compression type. Only "UN" is currently
283 supported.
282 supported.
284
283
285 The next 16 bytes are two 64-bit big endian unsigned integers indicating
284 The next 16 bytes are two 64-bit big endian unsigned integers indicating
286 file count and byte count, respectively.
285 file count and byte count, respectively.
287
286
288 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
287 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
289 of the requirements string, including a trailing \0. The following N bytes
288 of the requirements string, including a trailing \0. The following N bytes
290 are the requirements string, which is ASCII containing a comma-delimited
289 are the requirements string, which is ASCII containing a comma-delimited
291 list of repo requirements that are needed to support the data.
290 list of repo requirements that are needed to support the data.
292
291
293 The remaining content is the output of ``generatev1()`` (which may be
292 The remaining content is the output of ``generatev1()`` (which may be
294 compressed in the future).
293 compressed in the future).
295
294
296 Returns a tuple of (requirements, data generator).
295 Returns a tuple of (requirements, data generator).
297 """
296 """
298 if compression != 'UN':
297 if compression != 'UN':
299 raise ValueError('we do not support the compression argument yet')
298 raise ValueError('we do not support the compression argument yet')
300
299
301 requirements = repo.requirements & repo.supportedformats
300 requirements = repo.requirements & repo.supportedformats
302 requires = ','.join(sorted(requirements))
301 requires = ','.join(sorted(requirements))
303
302
304 def gen():
303 def gen():
305 yield 'HGS1'
304 yield 'HGS1'
306 yield compression
305 yield compression
307
306
308 filecount, bytecount, it = generatev1(repo)
307 filecount, bytecount, it = generatev1(repo)
309 repo.ui.status(_('writing %d bytes for %d files\n') %
308 repo.ui.status(_('writing %d bytes for %d files\n') %
310 (bytecount, filecount))
309 (bytecount, filecount))
311
310
312 yield struct.pack('>QQ', filecount, bytecount)
311 yield struct.pack('>QQ', filecount, bytecount)
313 yield struct.pack('>H', len(requires) + 1)
312 yield struct.pack('>H', len(requires) + 1)
314 yield requires + '\0'
313 yield requires + '\0'
315
314
316 # This is where we'll add compression in the future.
315 # This is where we'll add compression in the future.
317 assert compression == 'UN'
316 assert compression == 'UN'
318
317
319 progress = repo.ui.makeprogress(_('bundle'), total=bytecount,
318 progress = repo.ui.makeprogress(_('bundle'), total=bytecount,
320 unit=_('bytes'))
319 unit=_('bytes'))
321 progress.update(0)
320 progress.update(0)
322
321
323 for chunk in it:
322 for chunk in it:
324 progress.increment(step=len(chunk))
323 progress.increment(step=len(chunk))
325 yield chunk
324 yield chunk
326
325
327 progress.complete()
326 progress.complete()
328
327
329 return requirements, gen()
328 return requirements, gen()
330
329
331 def consumev1(repo, fp, filecount, bytecount):
330 def consumev1(repo, fp, filecount, bytecount):
332 """Apply the contents from version 1 of a streaming clone file handle.
331 """Apply the contents from version 1 of a streaming clone file handle.
333
332
334 This takes the output from "stream_out" and applies it to the specified
333 This takes the output from "stream_out" and applies it to the specified
335 repository.
334 repository.
336
335
337 Like "stream_out," the status line added by the wire protocol is not
336 Like "stream_out," the status line added by the wire protocol is not
338 handled by this function.
337 handled by this function.
339 """
338 """
340 with repo.lock():
339 with repo.lock():
341 repo.ui.status(_('%d files to transfer, %s of data\n') %
340 repo.ui.status(_('%d files to transfer, %s of data\n') %
342 (filecount, util.bytecount(bytecount)))
341 (filecount, util.bytecount(bytecount)))
343 progress = repo.ui.makeprogress(_('clone'), total=bytecount,
342 progress = repo.ui.makeprogress(_('clone'), total=bytecount,
344 unit=_('bytes'))
343 unit=_('bytes'))
345 progress.update(0)
344 progress.update(0)
346 start = util.timer()
345 start = util.timer()
347
346
348 # TODO: get rid of (potential) inconsistency
347 # TODO: get rid of (potential) inconsistency
349 #
348 #
350 # If transaction is started and any @filecache property is
349 # If transaction is started and any @filecache property is
351 # changed at this point, it causes inconsistency between
350 # changed at this point, it causes inconsistency between
352 # in-memory cached property and streamclone-ed file on the
351 # in-memory cached property and streamclone-ed file on the
353 # disk. Nested transaction prevents transaction scope "clone"
352 # disk. Nested transaction prevents transaction scope "clone"
354 # below from writing in-memory changes out at the end of it,
353 # below from writing in-memory changes out at the end of it,
355 # even though in-memory changes are discarded at the end of it
354 # even though in-memory changes are discarded at the end of it
356 # regardless of transaction nesting.
355 # regardless of transaction nesting.
357 #
356 #
358 # But transaction nesting can't be simply prohibited, because
357 # But transaction nesting can't be simply prohibited, because
359 # nesting occurs also in ordinary case (e.g. enabling
358 # nesting occurs also in ordinary case (e.g. enabling
360 # clonebundles).
359 # clonebundles).
361
360
362 with repo.transaction('clone'):
361 with repo.transaction('clone'):
363 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
362 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
364 for i in pycompat.xrange(filecount):
363 for i in pycompat.xrange(filecount):
365 # XXX doesn't support '\n' or '\r' in filenames
364 # XXX doesn't support '\n' or '\r' in filenames
366 l = fp.readline()
365 l = fp.readline()
367 try:
366 try:
368 name, size = l.split('\0', 1)
367 name, size = l.split('\0', 1)
369 size = int(size)
368 size = int(size)
370 except (ValueError, TypeError):
369 except (ValueError, TypeError):
371 raise error.ResponseError(
370 raise error.ResponseError(
372 _('unexpected response from remote server:'), l)
371 _('unexpected response from remote server:'), l)
373 if repo.ui.debugflag:
372 if repo.ui.debugflag:
374 repo.ui.debug('adding %s (%s)\n' %
373 repo.ui.debug('adding %s (%s)\n' %
375 (name, util.bytecount(size)))
374 (name, util.bytecount(size)))
376 # for backwards compat, name was partially encoded
375 # for backwards compat, name was partially encoded
377 path = store.decodedir(name)
376 path = store.decodedir(name)
378 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
377 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
379 for chunk in util.filechunkiter(fp, limit=size):
378 for chunk in util.filechunkiter(fp, limit=size):
380 progress.increment(step=len(chunk))
379 progress.increment(step=len(chunk))
381 ofp.write(chunk)
380 ofp.write(chunk)
382
381
383 # force @filecache properties to be reloaded from
382 # force @filecache properties to be reloaded from
384 # streamclone-ed file at next access
383 # streamclone-ed file at next access
385 repo.invalidate(clearfilecache=True)
384 repo.invalidate(clearfilecache=True)
386
385
387 elapsed = util.timer() - start
386 elapsed = util.timer() - start
388 if elapsed <= 0:
387 if elapsed <= 0:
389 elapsed = 0.001
388 elapsed = 0.001
390 progress.complete()
389 progress.complete()
391 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
390 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
392 (util.bytecount(bytecount), elapsed,
391 (util.bytecount(bytecount), elapsed,
393 util.bytecount(bytecount / elapsed)))
392 util.bytecount(bytecount / elapsed)))
394
393
395 def readbundle1header(fp):
394 def readbundle1header(fp):
396 compression = fp.read(2)
395 compression = fp.read(2)
397 if compression != 'UN':
396 if compression != 'UN':
398 raise error.Abort(_('only uncompressed stream clone bundles are '
397 raise error.Abort(_('only uncompressed stream clone bundles are '
399 'supported; got %s') % compression)
398 'supported; got %s') % compression)
400
399
401 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
400 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
402 requireslen = struct.unpack('>H', fp.read(2))[0]
401 requireslen = struct.unpack('>H', fp.read(2))[0]
403 requires = fp.read(requireslen)
402 requires = fp.read(requireslen)
404
403
405 if not requires.endswith('\0'):
404 if not requires.endswith('\0'):
406 raise error.Abort(_('malformed stream clone bundle: '
405 raise error.Abort(_('malformed stream clone bundle: '
407 'requirements not properly encoded'))
406 'requirements not properly encoded'))
408
407
409 requirements = set(requires.rstrip('\0').split(','))
408 requirements = set(requires.rstrip('\0').split(','))
410
409
411 return filecount, bytecount, requirements
410 return filecount, bytecount, requirements
412
411
413 def applybundlev1(repo, fp):
412 def applybundlev1(repo, fp):
414 """Apply the content from a stream clone bundle version 1.
413 """Apply the content from a stream clone bundle version 1.
415
414
416 We assume the 4 byte header has been read and validated and the file handle
415 We assume the 4 byte header has been read and validated and the file handle
417 is at the 2 byte compression identifier.
416 is at the 2 byte compression identifier.
418 """
417 """
419 if len(repo):
418 if len(repo):
420 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
419 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
421 'repo'))
420 'repo'))
422
421
423 filecount, bytecount, requirements = readbundle1header(fp)
422 filecount, bytecount, requirements = readbundle1header(fp)
424 missingreqs = requirements - repo.supportedformats
423 missingreqs = requirements - repo.supportedformats
425 if missingreqs:
424 if missingreqs:
426 raise error.Abort(_('unable to apply stream clone: '
425 raise error.Abort(_('unable to apply stream clone: '
427 'unsupported format: %s') %
426 'unsupported format: %s') %
428 ', '.join(sorted(missingreqs)))
427 ', '.join(sorted(missingreqs)))
429
428
430 consumev1(repo, fp, filecount, bytecount)
429 consumev1(repo, fp, filecount, bytecount)
431
430
432 class streamcloneapplier(object):
431 class streamcloneapplier(object):
433 """Class to manage applying streaming clone bundles.
432 """Class to manage applying streaming clone bundles.
434
433
435 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
434 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
436 readers to perform bundle type-specific functionality.
435 readers to perform bundle type-specific functionality.
437 """
436 """
438 def __init__(self, fh):
437 def __init__(self, fh):
439 self._fh = fh
438 self._fh = fh
440
439
441 def apply(self, repo):
440 def apply(self, repo):
442 return applybundlev1(repo, self._fh)
441 return applybundlev1(repo, self._fh)
443
442
444 # type of file to stream
443 # type of file to stream
445 _fileappend = 0 # append only file
444 _fileappend = 0 # append only file
446 _filefull = 1 # full snapshot file
445 _filefull = 1 # full snapshot file
447
446
448 # Source of the file
447 # Source of the file
449 _srcstore = 's' # store (svfs)
448 _srcstore = 's' # store (svfs)
450 _srccache = 'c' # cache (cache)
449 _srccache = 'c' # cache (cache)
451
450
452 # This is it's own function so extensions can override it.
451 # This is it's own function so extensions can override it.
453 def _walkstreamfullstorefiles(repo):
452 def _walkstreamfullstorefiles(repo):
454 """list snapshot file from the store"""
453 """list snapshot file from the store"""
455 fnames = []
454 fnames = []
456 if not repo.publishing():
455 if not repo.publishing():
457 fnames.append('phaseroots')
456 fnames.append('phaseroots')
458 return fnames
457 return fnames
459
458
460 def _filterfull(entry, copy, vfsmap):
459 def _filterfull(entry, copy, vfsmap):
461 """actually copy the snapshot files"""
460 """actually copy the snapshot files"""
462 src, name, ftype, data = entry
461 src, name, ftype, data = entry
463 if ftype != _filefull:
462 if ftype != _filefull:
464 return entry
463 return entry
465 return (src, name, ftype, copy(vfsmap[src].join(name)))
464 return (src, name, ftype, copy(vfsmap[src].join(name)))
466
465
467 @contextlib.contextmanager
466 @contextlib.contextmanager
468 def maketempcopies():
467 def maketempcopies():
469 """return a function to temporary copy file"""
468 """return a function to temporary copy file"""
470 files = []
469 files = []
471 try:
470 try:
472 def copy(src):
471 def copy(src):
473 fd, dst = pycompat.mkstemp()
472 fd, dst = pycompat.mkstemp()
474 os.close(fd)
473 os.close(fd)
475 files.append(dst)
474 files.append(dst)
476 util.copyfiles(src, dst, hardlink=True)
475 util.copyfiles(src, dst, hardlink=True)
477 return dst
476 return dst
478 yield copy
477 yield copy
479 finally:
478 finally:
480 for tmp in files:
479 for tmp in files:
481 util.tryunlink(tmp)
480 util.tryunlink(tmp)
482
481
483 def _makemap(repo):
482 def _makemap(repo):
484 """make a (src -> vfs) map for the repo"""
483 """make a (src -> vfs) map for the repo"""
485 vfsmap = {
484 vfsmap = {
486 _srcstore: repo.svfs,
485 _srcstore: repo.svfs,
487 _srccache: repo.cachevfs,
486 _srccache: repo.cachevfs,
488 }
487 }
489 # we keep repo.vfs out of the on purpose, ther are too many danger there
488 # we keep repo.vfs out of the on purpose, ther are too many danger there
490 # (eg: .hg/hgrc)
489 # (eg: .hg/hgrc)
491 assert repo.vfs not in vfsmap.values()
490 assert repo.vfs not in vfsmap.values()
492
491
493 return vfsmap
492 return vfsmap
494
493
495 def _emit2(repo, entries, totalfilesize):
494 def _emit2(repo, entries, totalfilesize):
496 """actually emit the stream bundle"""
495 """actually emit the stream bundle"""
497 vfsmap = _makemap(repo)
496 vfsmap = _makemap(repo)
498 progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize,
497 progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize,
499 unit=_('bytes'))
498 unit=_('bytes'))
500 progress.update(0)
499 progress.update(0)
501 with maketempcopies() as copy, progress:
500 with maketempcopies() as copy, progress:
502 # copy is delayed until we are in the try
501 # copy is delayed until we are in the try
503 entries = [_filterfull(e, copy, vfsmap) for e in entries]
502 entries = [_filterfull(e, copy, vfsmap) for e in entries]
504 yield None # this release the lock on the repository
503 yield None # this release the lock on the repository
505 seen = 0
504 seen = 0
506
505
507 for src, name, ftype, data in entries:
506 for src, name, ftype, data in entries:
508 vfs = vfsmap[src]
507 vfs = vfsmap[src]
509 yield src
508 yield src
510 yield util.uvarintencode(len(name))
509 yield util.uvarintencode(len(name))
511 if ftype == _fileappend:
510 if ftype == _fileappend:
512 fp = vfs(name)
511 fp = vfs(name)
513 size = data
512 size = data
514 elif ftype == _filefull:
513 elif ftype == _filefull:
515 fp = open(data, 'rb')
514 fp = open(data, 'rb')
516 size = util.fstat(fp).st_size
515 size = util.fstat(fp).st_size
517 try:
516 try:
518 yield util.uvarintencode(size)
517 yield util.uvarintencode(size)
519 yield name
518 yield name
520 if size <= 65536:
519 if size <= 65536:
521 chunks = (fp.read(size),)
520 chunks = (fp.read(size),)
522 else:
521 else:
523 chunks = util.filechunkiter(fp, limit=size)
522 chunks = util.filechunkiter(fp, limit=size)
524 for chunk in chunks:
523 for chunk in chunks:
525 seen += len(chunk)
524 seen += len(chunk)
526 progress.update(seen)
525 progress.update(seen)
527 yield chunk
526 yield chunk
528 finally:
527 finally:
529 fp.close()
528 fp.close()
530
529
531 def generatev2(repo):
530 def generatev2(repo):
532 """Emit content for version 2 of a streaming clone.
531 """Emit content for version 2 of a streaming clone.
533
532
534 the data stream consists the following entries:
533 the data stream consists the following entries:
535 1) A char representing the file destination (eg: store or cache)
534 1) A char representing the file destination (eg: store or cache)
536 2) A varint containing the length of the filename
535 2) A varint containing the length of the filename
537 3) A varint containing the length of file data
536 3) A varint containing the length of file data
538 4) N bytes containing the filename (the internal, store-agnostic form)
537 4) N bytes containing the filename (the internal, store-agnostic form)
539 5) N bytes containing the file data
538 5) N bytes containing the file data
540
539
541 Returns a 3-tuple of (file count, file size, data iterator).
540 Returns a 3-tuple of (file count, file size, data iterator).
542 """
541 """
543
542
544 with repo.lock():
543 with repo.lock():
545
544
546 entries = []
545 entries = []
547 totalfilesize = 0
546 totalfilesize = 0
548
547
549 repo.ui.debug('scanning\n')
548 repo.ui.debug('scanning\n')
550 for name, ename, size in _walkstreamfiles(repo):
549 for name, ename, size in _walkstreamfiles(repo):
551 if size:
550 if size:
552 entries.append((_srcstore, name, _fileappend, size))
551 entries.append((_srcstore, name, _fileappend, size))
553 totalfilesize += size
552 totalfilesize += size
554 for name in _walkstreamfullstorefiles(repo):
553 for name in _walkstreamfullstorefiles(repo):
555 if repo.svfs.exists(name):
554 if repo.svfs.exists(name):
556 totalfilesize += repo.svfs.lstat(name).st_size
555 totalfilesize += repo.svfs.lstat(name).st_size
557 entries.append((_srcstore, name, _filefull, None))
556 entries.append((_srcstore, name, _filefull, None))
558 for name in cacheutil.cachetocopy(repo):
557 for name in cacheutil.cachetocopy(repo):
559 if repo.cachevfs.exists(name):
558 if repo.cachevfs.exists(name):
560 totalfilesize += repo.cachevfs.lstat(name).st_size
559 totalfilesize += repo.cachevfs.lstat(name).st_size
561 entries.append((_srccache, name, _filefull, None))
560 entries.append((_srccache, name, _filefull, None))
562
561
563 chunks = _emit2(repo, entries, totalfilesize)
562 chunks = _emit2(repo, entries, totalfilesize)
564 first = next(chunks)
563 first = next(chunks)
565 assert first is None
564 assert first is None
566
565
567 return len(entries), totalfilesize, chunks
566 return len(entries), totalfilesize, chunks
568
567
569 @contextlib.contextmanager
568 @contextlib.contextmanager
570 def nested(*ctxs):
569 def nested(*ctxs):
571 with warnings.catch_warnings():
570 this = ctxs[0]
572 # For some reason, Python decided 'nested' was deprecated without
571 rest = ctxs[1:]
573 # replacement. They officially advertised for filtering the deprecation
572 with this:
574 # warning for people who actually need the feature.
573 if rest:
575 warnings.filterwarnings("ignore",category=DeprecationWarning)
574 with nested(*rest):
576 with contextlib.nested(*ctxs):
575 yield
576 else:
577 yield
577 yield
578
578
579 def consumev2(repo, fp, filecount, filesize):
579 def consumev2(repo, fp, filecount, filesize):
580 """Apply the contents from a version 2 streaming clone.
580 """Apply the contents from a version 2 streaming clone.
581
581
582 Data is read from an object that only needs to provide a ``read(size)``
582 Data is read from an object that only needs to provide a ``read(size)``
583 method.
583 method.
584 """
584 """
585 with repo.lock():
585 with repo.lock():
586 repo.ui.status(_('%d files to transfer, %s of data\n') %
586 repo.ui.status(_('%d files to transfer, %s of data\n') %
587 (filecount, util.bytecount(filesize)))
587 (filecount, util.bytecount(filesize)))
588
588
589 start = util.timer()
589 start = util.timer()
590 progress = repo.ui.makeprogress(_('clone'), total=filesize,
590 progress = repo.ui.makeprogress(_('clone'), total=filesize,
591 unit=_('bytes'))
591 unit=_('bytes'))
592 progress.update(0)
592 progress.update(0)
593
593
594 vfsmap = _makemap(repo)
594 vfsmap = _makemap(repo)
595
595
596 with repo.transaction('clone'):
596 with repo.transaction('clone'):
597 ctxs = (vfs.backgroundclosing(repo.ui)
597 ctxs = (vfs.backgroundclosing(repo.ui)
598 for vfs in vfsmap.values())
598 for vfs in vfsmap.values())
599 with nested(*ctxs):
599 with nested(*ctxs):
600 for i in range(filecount):
600 for i in range(filecount):
601 src = util.readexactly(fp, 1)
601 src = util.readexactly(fp, 1)
602 vfs = vfsmap[src]
602 vfs = vfsmap[src]
603 namelen = util.uvarintdecodestream(fp)
603 namelen = util.uvarintdecodestream(fp)
604 datalen = util.uvarintdecodestream(fp)
604 datalen = util.uvarintdecodestream(fp)
605
605
606 name = util.readexactly(fp, namelen)
606 name = util.readexactly(fp, namelen)
607
607
608 if repo.ui.debugflag:
608 if repo.ui.debugflag:
609 repo.ui.debug('adding [%s] %s (%s)\n' %
609 repo.ui.debug('adding [%s] %s (%s)\n' %
610 (src, name, util.bytecount(datalen)))
610 (src, name, util.bytecount(datalen)))
611
611
612 with vfs(name, 'w') as ofp:
612 with vfs(name, 'w') as ofp:
613 for chunk in util.filechunkiter(fp, limit=datalen):
613 for chunk in util.filechunkiter(fp, limit=datalen):
614 progress.increment(step=len(chunk))
614 progress.increment(step=len(chunk))
615 ofp.write(chunk)
615 ofp.write(chunk)
616
616
617 # force @filecache properties to be reloaded from
617 # force @filecache properties to be reloaded from
618 # streamclone-ed file at next access
618 # streamclone-ed file at next access
619 repo.invalidate(clearfilecache=True)
619 repo.invalidate(clearfilecache=True)
620
620
621 elapsed = util.timer() - start
621 elapsed = util.timer() - start
622 if elapsed <= 0:
622 if elapsed <= 0:
623 elapsed = 0.001
623 elapsed = 0.001
624 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
624 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
625 (util.bytecount(progress.pos), elapsed,
625 (util.bytecount(progress.pos), elapsed,
626 util.bytecount(progress.pos / elapsed)))
626 util.bytecount(progress.pos / elapsed)))
627 progress.complete()
627 progress.complete()
628
628
629 def applybundlev2(repo, fp, filecount, filesize, requirements):
629 def applybundlev2(repo, fp, filecount, filesize, requirements):
630 from . import localrepo
630 from . import localrepo
631
631
632 missingreqs = [r for r in requirements if r not in repo.supported]
632 missingreqs = [r for r in requirements if r not in repo.supported]
633 if missingreqs:
633 if missingreqs:
634 raise error.Abort(_('unable to apply stream clone: '
634 raise error.Abort(_('unable to apply stream clone: '
635 'unsupported format: %s') %
635 'unsupported format: %s') %
636 ', '.join(sorted(missingreqs)))
636 ', '.join(sorted(missingreqs)))
637
637
638 consumev2(repo, fp, filecount, filesize)
638 consumev2(repo, fp, filecount, filesize)
639
639
640 # new requirements = old non-format requirements +
640 # new requirements = old non-format requirements +
641 # new format-related remote requirements
641 # new format-related remote requirements
642 # requirements from the streamed-in repository
642 # requirements from the streamed-in repository
643 repo.requirements = set(requirements) | (
643 repo.requirements = set(requirements) | (
644 repo.requirements - repo.supportedformats)
644 repo.requirements - repo.supportedformats)
645 repo.svfs.options = localrepo.resolvestorevfsoptions(
645 repo.svfs.options = localrepo.resolvestorevfsoptions(
646 repo.ui, repo.requirements)
646 repo.ui, repo.requirements)
647 repo._writerequirements()
647 repo._writerequirements()
General Comments 0
You need to be logged in to leave comments. Login now