##// END OF EJS Templates
streamclone: pass narrowing related info in _walkstreamfiles()...
Pulkit Goyal -
r40375:f0e8f277 default
parent child Browse files
Show More
@@ -1,655 +1,660 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import contextlib
10 import contextlib
11 import os
11 import os
12 import struct
12 import struct
13
13
14 from .i18n import _
14 from .i18n import _
15 from . import (
15 from . import (
16 branchmap,
16 branchmap,
17 cacheutil,
17 cacheutil,
18 error,
18 error,
19 narrowspec,
19 phases,
20 phases,
20 pycompat,
21 pycompat,
21 repository,
22 repository,
22 store,
23 store,
23 util,
24 util,
24 )
25 )
25
26
26 def canperformstreamclone(pullop, bundle2=False):
27 def canperformstreamclone(pullop, bundle2=False):
27 """Whether it is possible to perform a streaming clone as part of pull.
28 """Whether it is possible to perform a streaming clone as part of pull.
28
29
29 ``bundle2`` will cause the function to consider stream clone through
30 ``bundle2`` will cause the function to consider stream clone through
30 bundle2 and only through bundle2.
31 bundle2 and only through bundle2.
31
32
32 Returns a tuple of (supported, requirements). ``supported`` is True if
33 Returns a tuple of (supported, requirements). ``supported`` is True if
33 streaming clone is supported and False otherwise. ``requirements`` is
34 streaming clone is supported and False otherwise. ``requirements`` is
34 a set of repo requirements from the remote, or ``None`` if stream clone
35 a set of repo requirements from the remote, or ``None`` if stream clone
35 isn't supported.
36 isn't supported.
36 """
37 """
37 repo = pullop.repo
38 repo = pullop.repo
38 remote = pullop.remote
39 remote = pullop.remote
39
40
40 bundle2supported = False
41 bundle2supported = False
41 if pullop.canusebundle2:
42 if pullop.canusebundle2:
42 if 'v2' in pullop.remotebundle2caps.get('stream', []):
43 if 'v2' in pullop.remotebundle2caps.get('stream', []):
43 bundle2supported = True
44 bundle2supported = True
44 # else
45 # else
45 # Server doesn't support bundle2 stream clone or doesn't support
46 # Server doesn't support bundle2 stream clone or doesn't support
46 # the versions we support. Fall back and possibly allow legacy.
47 # the versions we support. Fall back and possibly allow legacy.
47
48
48 # Ensures legacy code path uses available bundle2.
49 # Ensures legacy code path uses available bundle2.
49 if bundle2supported and not bundle2:
50 if bundle2supported and not bundle2:
50 return False, None
51 return False, None
51 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
52 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
52 elif bundle2 and not bundle2supported:
53 elif bundle2 and not bundle2supported:
53 return False, None
54 return False, None
54
55
55 # Streaming clone only works on empty repositories.
56 # Streaming clone only works on empty repositories.
56 if len(repo):
57 if len(repo):
57 return False, None
58 return False, None
58
59
59 # Streaming clone only works if all data is being requested.
60 # Streaming clone only works if all data is being requested.
60 if pullop.heads:
61 if pullop.heads:
61 return False, None
62 return False, None
62
63
63 streamrequested = pullop.streamclonerequested
64 streamrequested = pullop.streamclonerequested
64
65
65 # If we don't have a preference, let the server decide for us. This
66 # If we don't have a preference, let the server decide for us. This
66 # likely only comes into play in LANs.
67 # likely only comes into play in LANs.
67 if streamrequested is None:
68 if streamrequested is None:
68 # The server can advertise whether to prefer streaming clone.
69 # The server can advertise whether to prefer streaming clone.
69 streamrequested = remote.capable('stream-preferred')
70 streamrequested = remote.capable('stream-preferred')
70
71
71 if not streamrequested:
72 if not streamrequested:
72 return False, None
73 return False, None
73
74
74 # In order for stream clone to work, the client has to support all the
75 # In order for stream clone to work, the client has to support all the
75 # requirements advertised by the server.
76 # requirements advertised by the server.
76 #
77 #
77 # The server advertises its requirements via the "stream" and "streamreqs"
78 # The server advertises its requirements via the "stream" and "streamreqs"
78 # capability. "stream" (a value-less capability) is advertised if and only
79 # capability. "stream" (a value-less capability) is advertised if and only
79 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
80 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
80 # is advertised and contains a comma-delimited list of requirements.
81 # is advertised and contains a comma-delimited list of requirements.
81 requirements = set()
82 requirements = set()
82 if remote.capable('stream'):
83 if remote.capable('stream'):
83 requirements.add('revlogv1')
84 requirements.add('revlogv1')
84 else:
85 else:
85 streamreqs = remote.capable('streamreqs')
86 streamreqs = remote.capable('streamreqs')
86 # This is weird and shouldn't happen with modern servers.
87 # This is weird and shouldn't happen with modern servers.
87 if not streamreqs:
88 if not streamreqs:
88 pullop.repo.ui.warn(_(
89 pullop.repo.ui.warn(_(
89 'warning: stream clone requested but server has them '
90 'warning: stream clone requested but server has them '
90 'disabled\n'))
91 'disabled\n'))
91 return False, None
92 return False, None
92
93
93 streamreqs = set(streamreqs.split(','))
94 streamreqs = set(streamreqs.split(','))
94 # Server requires something we don't support. Bail.
95 # Server requires something we don't support. Bail.
95 missingreqs = streamreqs - repo.supportedformats
96 missingreqs = streamreqs - repo.supportedformats
96 if missingreqs:
97 if missingreqs:
97 pullop.repo.ui.warn(_(
98 pullop.repo.ui.warn(_(
98 'warning: stream clone requested but client is missing '
99 'warning: stream clone requested but client is missing '
99 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
100 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
100 pullop.repo.ui.warn(
101 pullop.repo.ui.warn(
101 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
102 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
102 'for more information)\n'))
103 'for more information)\n'))
103 return False, None
104 return False, None
104 requirements = streamreqs
105 requirements = streamreqs
105
106
106 return True, requirements
107 return True, requirements
107
108
108 def maybeperformlegacystreamclone(pullop):
109 def maybeperformlegacystreamclone(pullop):
109 """Possibly perform a legacy stream clone operation.
110 """Possibly perform a legacy stream clone operation.
110
111
111 Legacy stream clones are performed as part of pull but before all other
112 Legacy stream clones are performed as part of pull but before all other
112 operations.
113 operations.
113
114
114 A legacy stream clone will not be performed if a bundle2 stream clone is
115 A legacy stream clone will not be performed if a bundle2 stream clone is
115 supported.
116 supported.
116 """
117 """
117 from . import localrepo
118 from . import localrepo
118
119
119 supported, requirements = canperformstreamclone(pullop)
120 supported, requirements = canperformstreamclone(pullop)
120
121
121 if not supported:
122 if not supported:
122 return
123 return
123
124
124 repo = pullop.repo
125 repo = pullop.repo
125 remote = pullop.remote
126 remote = pullop.remote
126
127
127 # Save remote branchmap. We will use it later to speed up branchcache
128 # Save remote branchmap. We will use it later to speed up branchcache
128 # creation.
129 # creation.
129 rbranchmap = None
130 rbranchmap = None
130 if remote.capable('branchmap'):
131 if remote.capable('branchmap'):
131 with remote.commandexecutor() as e:
132 with remote.commandexecutor() as e:
132 rbranchmap = e.callcommand('branchmap', {}).result()
133 rbranchmap = e.callcommand('branchmap', {}).result()
133
134
134 repo.ui.status(_('streaming all changes\n'))
135 repo.ui.status(_('streaming all changes\n'))
135
136
136 with remote.commandexecutor() as e:
137 with remote.commandexecutor() as e:
137 fp = e.callcommand('stream_out', {}).result()
138 fp = e.callcommand('stream_out', {}).result()
138
139
139 # TODO strictly speaking, this code should all be inside the context
140 # TODO strictly speaking, this code should all be inside the context
140 # manager because the context manager is supposed to ensure all wire state
141 # manager because the context manager is supposed to ensure all wire state
141 # is flushed when exiting. But the legacy peers don't do this, so it
142 # is flushed when exiting. But the legacy peers don't do this, so it
142 # doesn't matter.
143 # doesn't matter.
143 l = fp.readline()
144 l = fp.readline()
144 try:
145 try:
145 resp = int(l)
146 resp = int(l)
146 except ValueError:
147 except ValueError:
147 raise error.ResponseError(
148 raise error.ResponseError(
148 _('unexpected response from remote server:'), l)
149 _('unexpected response from remote server:'), l)
149 if resp == 1:
150 if resp == 1:
150 raise error.Abort(_('operation forbidden by server'))
151 raise error.Abort(_('operation forbidden by server'))
151 elif resp == 2:
152 elif resp == 2:
152 raise error.Abort(_('locking the remote repository failed'))
153 raise error.Abort(_('locking the remote repository failed'))
153 elif resp != 0:
154 elif resp != 0:
154 raise error.Abort(_('the server sent an unknown error code'))
155 raise error.Abort(_('the server sent an unknown error code'))
155
156
156 l = fp.readline()
157 l = fp.readline()
157 try:
158 try:
158 filecount, bytecount = map(int, l.split(' ', 1))
159 filecount, bytecount = map(int, l.split(' ', 1))
159 except (ValueError, TypeError):
160 except (ValueError, TypeError):
160 raise error.ResponseError(
161 raise error.ResponseError(
161 _('unexpected response from remote server:'), l)
162 _('unexpected response from remote server:'), l)
162
163
163 with repo.lock():
164 with repo.lock():
164 consumev1(repo, fp, filecount, bytecount)
165 consumev1(repo, fp, filecount, bytecount)
165
166
166 # new requirements = old non-format requirements +
167 # new requirements = old non-format requirements +
167 # new format-related remote requirements
168 # new format-related remote requirements
168 # requirements from the streamed-in repository
169 # requirements from the streamed-in repository
169 repo.requirements = requirements | (
170 repo.requirements = requirements | (
170 repo.requirements - repo.supportedformats)
171 repo.requirements - repo.supportedformats)
171 repo.svfs.options = localrepo.resolvestorevfsoptions(
172 repo.svfs.options = localrepo.resolvestorevfsoptions(
172 repo.ui, repo.requirements, repo.features)
173 repo.ui, repo.requirements, repo.features)
173 repo._writerequirements()
174 repo._writerequirements()
174
175
175 if rbranchmap:
176 if rbranchmap:
176 branchmap.replacecache(repo, rbranchmap)
177 branchmap.replacecache(repo, rbranchmap)
177
178
178 repo.invalidate()
179 repo.invalidate()
179
180
180 def allowservergeneration(repo):
181 def allowservergeneration(repo):
181 """Whether streaming clones are allowed from the server."""
182 """Whether streaming clones are allowed from the server."""
182 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
183 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
183 return False
184 return False
184
185
185 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
186 if not repo.ui.configbool('server', 'uncompressed', untrusted=True):
186 return False
187 return False
187
188
188 # The way stream clone works makes it impossible to hide secret changesets.
189 # The way stream clone works makes it impossible to hide secret changesets.
189 # So don't allow this by default.
190 # So don't allow this by default.
190 secret = phases.hassecret(repo)
191 secret = phases.hassecret(repo)
191 if secret:
192 if secret:
192 return repo.ui.configbool('server', 'uncompressedallowsecret')
193 return repo.ui.configbool('server', 'uncompressedallowsecret')
193
194
194 return True
195 return True
195
196
196 # This is it's own function so extensions can override it.
197 # This is it's own function so extensions can override it.
197 def _walkstreamfiles(repo):
198 def _walkstreamfiles(repo, matcher=None):
198 return repo.store.walk()
199 return repo.store.walk(matcher)
199
200
200 def generatev1(repo):
201 def generatev1(repo):
201 """Emit content for version 1 of a streaming clone.
202 """Emit content for version 1 of a streaming clone.
202
203
203 This returns a 3-tuple of (file count, byte size, data iterator).
204 This returns a 3-tuple of (file count, byte size, data iterator).
204
205
205 The data iterator consists of N entries for each file being transferred.
206 The data iterator consists of N entries for each file being transferred.
206 Each file entry starts as a line with the file name and integer size
207 Each file entry starts as a line with the file name and integer size
207 delimited by a null byte.
208 delimited by a null byte.
208
209
209 The raw file data follows. Following the raw file data is the next file
210 The raw file data follows. Following the raw file data is the next file
210 entry, or EOF.
211 entry, or EOF.
211
212
212 When used on the wire protocol, an additional line indicating protocol
213 When used on the wire protocol, an additional line indicating protocol
213 success will be prepended to the stream. This function is not responsible
214 success will be prepended to the stream. This function is not responsible
214 for adding it.
215 for adding it.
215
216
216 This function will obtain a repository lock to ensure a consistent view of
217 This function will obtain a repository lock to ensure a consistent view of
217 the store is captured. It therefore may raise LockError.
218 the store is captured. It therefore may raise LockError.
218 """
219 """
219 entries = []
220 entries = []
220 total_bytes = 0
221 total_bytes = 0
221 # Get consistent snapshot of repo, lock during scan.
222 # Get consistent snapshot of repo, lock during scan.
222 with repo.lock():
223 with repo.lock():
223 repo.ui.debug('scanning\n')
224 repo.ui.debug('scanning\n')
224 for name, ename, size in _walkstreamfiles(repo):
225 for name, ename, size in _walkstreamfiles(repo):
225 if size:
226 if size:
226 entries.append((name, size))
227 entries.append((name, size))
227 total_bytes += size
228 total_bytes += size
228
229
229 repo.ui.debug('%d files, %d bytes to transfer\n' %
230 repo.ui.debug('%d files, %d bytes to transfer\n' %
230 (len(entries), total_bytes))
231 (len(entries), total_bytes))
231
232
232 svfs = repo.svfs
233 svfs = repo.svfs
233 debugflag = repo.ui.debugflag
234 debugflag = repo.ui.debugflag
234
235
235 def emitrevlogdata():
236 def emitrevlogdata():
236 for name, size in entries:
237 for name, size in entries:
237 if debugflag:
238 if debugflag:
238 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
239 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
239 # partially encode name over the wire for backwards compat
240 # partially encode name over the wire for backwards compat
240 yield '%s\0%d\n' % (store.encodedir(name), size)
241 yield '%s\0%d\n' % (store.encodedir(name), size)
241 # auditing at this stage is both pointless (paths are already
242 # auditing at this stage is both pointless (paths are already
242 # trusted by the local repo) and expensive
243 # trusted by the local repo) and expensive
243 with svfs(name, 'rb', auditpath=False) as fp:
244 with svfs(name, 'rb', auditpath=False) as fp:
244 if size <= 65536:
245 if size <= 65536:
245 yield fp.read(size)
246 yield fp.read(size)
246 else:
247 else:
247 for chunk in util.filechunkiter(fp, limit=size):
248 for chunk in util.filechunkiter(fp, limit=size):
248 yield chunk
249 yield chunk
249
250
250 return len(entries), total_bytes, emitrevlogdata()
251 return len(entries), total_bytes, emitrevlogdata()
251
252
252 def generatev1wireproto(repo):
253 def generatev1wireproto(repo):
253 """Emit content for version 1 of streaming clone suitable for the wire.
254 """Emit content for version 1 of streaming clone suitable for the wire.
254
255
255 This is the data output from ``generatev1()`` with 2 header lines. The
256 This is the data output from ``generatev1()`` with 2 header lines. The
256 first line indicates overall success. The 2nd contains the file count and
257 first line indicates overall success. The 2nd contains the file count and
257 byte size of payload.
258 byte size of payload.
258
259
259 The success line contains "0" for success, "1" for stream generation not
260 The success line contains "0" for success, "1" for stream generation not
260 allowed, and "2" for error locking the repository (possibly indicating
261 allowed, and "2" for error locking the repository (possibly indicating
261 a permissions error for the server process).
262 a permissions error for the server process).
262 """
263 """
263 if not allowservergeneration(repo):
264 if not allowservergeneration(repo):
264 yield '1\n'
265 yield '1\n'
265 return
266 return
266
267
267 try:
268 try:
268 filecount, bytecount, it = generatev1(repo)
269 filecount, bytecount, it = generatev1(repo)
269 except error.LockError:
270 except error.LockError:
270 yield '2\n'
271 yield '2\n'
271 return
272 return
272
273
273 # Indicates successful response.
274 # Indicates successful response.
274 yield '0\n'
275 yield '0\n'
275 yield '%d %d\n' % (filecount, bytecount)
276 yield '%d %d\n' % (filecount, bytecount)
276 for chunk in it:
277 for chunk in it:
277 yield chunk
278 yield chunk
278
279
279 def generatebundlev1(repo, compression='UN'):
280 def generatebundlev1(repo, compression='UN'):
280 """Emit content for version 1 of a stream clone bundle.
281 """Emit content for version 1 of a stream clone bundle.
281
282
282 The first 4 bytes of the output ("HGS1") denote this as stream clone
283 The first 4 bytes of the output ("HGS1") denote this as stream clone
283 bundle version 1.
284 bundle version 1.
284
285
285 The next 2 bytes indicate the compression type. Only "UN" is currently
286 The next 2 bytes indicate the compression type. Only "UN" is currently
286 supported.
287 supported.
287
288
288 The next 16 bytes are two 64-bit big endian unsigned integers indicating
289 The next 16 bytes are two 64-bit big endian unsigned integers indicating
289 file count and byte count, respectively.
290 file count and byte count, respectively.
290
291
291 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
292 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
292 of the requirements string, including a trailing \0. The following N bytes
293 of the requirements string, including a trailing \0. The following N bytes
293 are the requirements string, which is ASCII containing a comma-delimited
294 are the requirements string, which is ASCII containing a comma-delimited
294 list of repo requirements that are needed to support the data.
295 list of repo requirements that are needed to support the data.
295
296
296 The remaining content is the output of ``generatev1()`` (which may be
297 The remaining content is the output of ``generatev1()`` (which may be
297 compressed in the future).
298 compressed in the future).
298
299
299 Returns a tuple of (requirements, data generator).
300 Returns a tuple of (requirements, data generator).
300 """
301 """
301 if compression != 'UN':
302 if compression != 'UN':
302 raise ValueError('we do not support the compression argument yet')
303 raise ValueError('we do not support the compression argument yet')
303
304
304 requirements = repo.requirements & repo.supportedformats
305 requirements = repo.requirements & repo.supportedformats
305 requires = ','.join(sorted(requirements))
306 requires = ','.join(sorted(requirements))
306
307
307 def gen():
308 def gen():
308 yield 'HGS1'
309 yield 'HGS1'
309 yield compression
310 yield compression
310
311
311 filecount, bytecount, it = generatev1(repo)
312 filecount, bytecount, it = generatev1(repo)
312 repo.ui.status(_('writing %d bytes for %d files\n') %
313 repo.ui.status(_('writing %d bytes for %d files\n') %
313 (bytecount, filecount))
314 (bytecount, filecount))
314
315
315 yield struct.pack('>QQ', filecount, bytecount)
316 yield struct.pack('>QQ', filecount, bytecount)
316 yield struct.pack('>H', len(requires) + 1)
317 yield struct.pack('>H', len(requires) + 1)
317 yield requires + '\0'
318 yield requires + '\0'
318
319
319 # This is where we'll add compression in the future.
320 # This is where we'll add compression in the future.
320 assert compression == 'UN'
321 assert compression == 'UN'
321
322
322 progress = repo.ui.makeprogress(_('bundle'), total=bytecount,
323 progress = repo.ui.makeprogress(_('bundle'), total=bytecount,
323 unit=_('bytes'))
324 unit=_('bytes'))
324 progress.update(0)
325 progress.update(0)
325
326
326 for chunk in it:
327 for chunk in it:
327 progress.increment(step=len(chunk))
328 progress.increment(step=len(chunk))
328 yield chunk
329 yield chunk
329
330
330 progress.complete()
331 progress.complete()
331
332
332 return requirements, gen()
333 return requirements, gen()
333
334
334 def consumev1(repo, fp, filecount, bytecount):
335 def consumev1(repo, fp, filecount, bytecount):
335 """Apply the contents from version 1 of a streaming clone file handle.
336 """Apply the contents from version 1 of a streaming clone file handle.
336
337
337 This takes the output from "stream_out" and applies it to the specified
338 This takes the output from "stream_out" and applies it to the specified
338 repository.
339 repository.
339
340
340 Like "stream_out," the status line added by the wire protocol is not
341 Like "stream_out," the status line added by the wire protocol is not
341 handled by this function.
342 handled by this function.
342 """
343 """
343 with repo.lock():
344 with repo.lock():
344 repo.ui.status(_('%d files to transfer, %s of data\n') %
345 repo.ui.status(_('%d files to transfer, %s of data\n') %
345 (filecount, util.bytecount(bytecount)))
346 (filecount, util.bytecount(bytecount)))
346 progress = repo.ui.makeprogress(_('clone'), total=bytecount,
347 progress = repo.ui.makeprogress(_('clone'), total=bytecount,
347 unit=_('bytes'))
348 unit=_('bytes'))
348 progress.update(0)
349 progress.update(0)
349 start = util.timer()
350 start = util.timer()
350
351
351 # TODO: get rid of (potential) inconsistency
352 # TODO: get rid of (potential) inconsistency
352 #
353 #
353 # If transaction is started and any @filecache property is
354 # If transaction is started and any @filecache property is
354 # changed at this point, it causes inconsistency between
355 # changed at this point, it causes inconsistency between
355 # in-memory cached property and streamclone-ed file on the
356 # in-memory cached property and streamclone-ed file on the
356 # disk. Nested transaction prevents transaction scope "clone"
357 # disk. Nested transaction prevents transaction scope "clone"
357 # below from writing in-memory changes out at the end of it,
358 # below from writing in-memory changes out at the end of it,
358 # even though in-memory changes are discarded at the end of it
359 # even though in-memory changes are discarded at the end of it
359 # regardless of transaction nesting.
360 # regardless of transaction nesting.
360 #
361 #
361 # But transaction nesting can't be simply prohibited, because
362 # But transaction nesting can't be simply prohibited, because
362 # nesting occurs also in ordinary case (e.g. enabling
363 # nesting occurs also in ordinary case (e.g. enabling
363 # clonebundles).
364 # clonebundles).
364
365
365 with repo.transaction('clone'):
366 with repo.transaction('clone'):
366 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
367 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
367 for i in pycompat.xrange(filecount):
368 for i in pycompat.xrange(filecount):
368 # XXX doesn't support '\n' or '\r' in filenames
369 # XXX doesn't support '\n' or '\r' in filenames
369 l = fp.readline()
370 l = fp.readline()
370 try:
371 try:
371 name, size = l.split('\0', 1)
372 name, size = l.split('\0', 1)
372 size = int(size)
373 size = int(size)
373 except (ValueError, TypeError):
374 except (ValueError, TypeError):
374 raise error.ResponseError(
375 raise error.ResponseError(
375 _('unexpected response from remote server:'), l)
376 _('unexpected response from remote server:'), l)
376 if repo.ui.debugflag:
377 if repo.ui.debugflag:
377 repo.ui.debug('adding %s (%s)\n' %
378 repo.ui.debug('adding %s (%s)\n' %
378 (name, util.bytecount(size)))
379 (name, util.bytecount(size)))
379 # for backwards compat, name was partially encoded
380 # for backwards compat, name was partially encoded
380 path = store.decodedir(name)
381 path = store.decodedir(name)
381 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
382 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
382 for chunk in util.filechunkiter(fp, limit=size):
383 for chunk in util.filechunkiter(fp, limit=size):
383 progress.increment(step=len(chunk))
384 progress.increment(step=len(chunk))
384 ofp.write(chunk)
385 ofp.write(chunk)
385
386
386 # force @filecache properties to be reloaded from
387 # force @filecache properties to be reloaded from
387 # streamclone-ed file at next access
388 # streamclone-ed file at next access
388 repo.invalidate(clearfilecache=True)
389 repo.invalidate(clearfilecache=True)
389
390
390 elapsed = util.timer() - start
391 elapsed = util.timer() - start
391 if elapsed <= 0:
392 if elapsed <= 0:
392 elapsed = 0.001
393 elapsed = 0.001
393 progress.complete()
394 progress.complete()
394 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
395 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
395 (util.bytecount(bytecount), elapsed,
396 (util.bytecount(bytecount), elapsed,
396 util.bytecount(bytecount / elapsed)))
397 util.bytecount(bytecount / elapsed)))
397
398
398 def readbundle1header(fp):
399 def readbundle1header(fp):
399 compression = fp.read(2)
400 compression = fp.read(2)
400 if compression != 'UN':
401 if compression != 'UN':
401 raise error.Abort(_('only uncompressed stream clone bundles are '
402 raise error.Abort(_('only uncompressed stream clone bundles are '
402 'supported; got %s') % compression)
403 'supported; got %s') % compression)
403
404
404 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
405 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
405 requireslen = struct.unpack('>H', fp.read(2))[0]
406 requireslen = struct.unpack('>H', fp.read(2))[0]
406 requires = fp.read(requireslen)
407 requires = fp.read(requireslen)
407
408
408 if not requires.endswith('\0'):
409 if not requires.endswith('\0'):
409 raise error.Abort(_('malformed stream clone bundle: '
410 raise error.Abort(_('malformed stream clone bundle: '
410 'requirements not properly encoded'))
411 'requirements not properly encoded'))
411
412
412 requirements = set(requires.rstrip('\0').split(','))
413 requirements = set(requires.rstrip('\0').split(','))
413
414
414 return filecount, bytecount, requirements
415 return filecount, bytecount, requirements
415
416
416 def applybundlev1(repo, fp):
417 def applybundlev1(repo, fp):
417 """Apply the content from a stream clone bundle version 1.
418 """Apply the content from a stream clone bundle version 1.
418
419
419 We assume the 4 byte header has been read and validated and the file handle
420 We assume the 4 byte header has been read and validated and the file handle
420 is at the 2 byte compression identifier.
421 is at the 2 byte compression identifier.
421 """
422 """
422 if len(repo):
423 if len(repo):
423 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
424 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
424 'repo'))
425 'repo'))
425
426
426 filecount, bytecount, requirements = readbundle1header(fp)
427 filecount, bytecount, requirements = readbundle1header(fp)
427 missingreqs = requirements - repo.supportedformats
428 missingreqs = requirements - repo.supportedformats
428 if missingreqs:
429 if missingreqs:
429 raise error.Abort(_('unable to apply stream clone: '
430 raise error.Abort(_('unable to apply stream clone: '
430 'unsupported format: %s') %
431 'unsupported format: %s') %
431 ', '.join(sorted(missingreqs)))
432 ', '.join(sorted(missingreqs)))
432
433
433 consumev1(repo, fp, filecount, bytecount)
434 consumev1(repo, fp, filecount, bytecount)
434
435
435 class streamcloneapplier(object):
436 class streamcloneapplier(object):
436 """Class to manage applying streaming clone bundles.
437 """Class to manage applying streaming clone bundles.
437
438
438 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
439 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
439 readers to perform bundle type-specific functionality.
440 readers to perform bundle type-specific functionality.
440 """
441 """
441 def __init__(self, fh):
442 def __init__(self, fh):
442 self._fh = fh
443 self._fh = fh
443
444
444 def apply(self, repo):
445 def apply(self, repo):
445 return applybundlev1(repo, self._fh)
446 return applybundlev1(repo, self._fh)
446
447
447 # type of file to stream
448 # type of file to stream
448 _fileappend = 0 # append only file
449 _fileappend = 0 # append only file
449 _filefull = 1 # full snapshot file
450 _filefull = 1 # full snapshot file
450
451
451 # Source of the file
452 # Source of the file
452 _srcstore = 's' # store (svfs)
453 _srcstore = 's' # store (svfs)
453 _srccache = 'c' # cache (cache)
454 _srccache = 'c' # cache (cache)
454
455
455 # This is it's own function so extensions can override it.
456 # This is it's own function so extensions can override it.
456 def _walkstreamfullstorefiles(repo):
457 def _walkstreamfullstorefiles(repo):
457 """list snapshot file from the store"""
458 """list snapshot file from the store"""
458 fnames = []
459 fnames = []
459 if not repo.publishing():
460 if not repo.publishing():
460 fnames.append('phaseroots')
461 fnames.append('phaseroots')
461 return fnames
462 return fnames
462
463
463 def _filterfull(entry, copy, vfsmap):
464 def _filterfull(entry, copy, vfsmap):
464 """actually copy the snapshot files"""
465 """actually copy the snapshot files"""
465 src, name, ftype, data = entry
466 src, name, ftype, data = entry
466 if ftype != _filefull:
467 if ftype != _filefull:
467 return entry
468 return entry
468 return (src, name, ftype, copy(vfsmap[src].join(name)))
469 return (src, name, ftype, copy(vfsmap[src].join(name)))
469
470
470 @contextlib.contextmanager
471 @contextlib.contextmanager
471 def maketempcopies():
472 def maketempcopies():
472 """return a function to temporary copy file"""
473 """return a function to temporary copy file"""
473 files = []
474 files = []
474 try:
475 try:
475 def copy(src):
476 def copy(src):
476 fd, dst = pycompat.mkstemp()
477 fd, dst = pycompat.mkstemp()
477 os.close(fd)
478 os.close(fd)
478 files.append(dst)
479 files.append(dst)
479 util.copyfiles(src, dst, hardlink=True)
480 util.copyfiles(src, dst, hardlink=True)
480 return dst
481 return dst
481 yield copy
482 yield copy
482 finally:
483 finally:
483 for tmp in files:
484 for tmp in files:
484 util.tryunlink(tmp)
485 util.tryunlink(tmp)
485
486
486 def _makemap(repo):
487 def _makemap(repo):
487 """make a (src -> vfs) map for the repo"""
488 """make a (src -> vfs) map for the repo"""
488 vfsmap = {
489 vfsmap = {
489 _srcstore: repo.svfs,
490 _srcstore: repo.svfs,
490 _srccache: repo.cachevfs,
491 _srccache: repo.cachevfs,
491 }
492 }
492 # we keep repo.vfs out of the on purpose, ther are too many danger there
493 # we keep repo.vfs out of the on purpose, ther are too many danger there
493 # (eg: .hg/hgrc)
494 # (eg: .hg/hgrc)
494 assert repo.vfs not in vfsmap.values()
495 assert repo.vfs not in vfsmap.values()
495
496
496 return vfsmap
497 return vfsmap
497
498
498 def _emit2(repo, entries, totalfilesize):
499 def _emit2(repo, entries, totalfilesize):
499 """actually emit the stream bundle"""
500 """actually emit the stream bundle"""
500 vfsmap = _makemap(repo)
501 vfsmap = _makemap(repo)
501 progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize,
502 progress = repo.ui.makeprogress(_('bundle'), total=totalfilesize,
502 unit=_('bytes'))
503 unit=_('bytes'))
503 progress.update(0)
504 progress.update(0)
504 with maketempcopies() as copy, progress:
505 with maketempcopies() as copy, progress:
505 # copy is delayed until we are in the try
506 # copy is delayed until we are in the try
506 entries = [_filterfull(e, copy, vfsmap) for e in entries]
507 entries = [_filterfull(e, copy, vfsmap) for e in entries]
507 yield None # this release the lock on the repository
508 yield None # this release the lock on the repository
508 seen = 0
509 seen = 0
509
510
510 for src, name, ftype, data in entries:
511 for src, name, ftype, data in entries:
511 vfs = vfsmap[src]
512 vfs = vfsmap[src]
512 yield src
513 yield src
513 yield util.uvarintencode(len(name))
514 yield util.uvarintencode(len(name))
514 if ftype == _fileappend:
515 if ftype == _fileappend:
515 fp = vfs(name)
516 fp = vfs(name)
516 size = data
517 size = data
517 elif ftype == _filefull:
518 elif ftype == _filefull:
518 fp = open(data, 'rb')
519 fp = open(data, 'rb')
519 size = util.fstat(fp).st_size
520 size = util.fstat(fp).st_size
520 try:
521 try:
521 yield util.uvarintencode(size)
522 yield util.uvarintencode(size)
522 yield name
523 yield name
523 if size <= 65536:
524 if size <= 65536:
524 chunks = (fp.read(size),)
525 chunks = (fp.read(size),)
525 else:
526 else:
526 chunks = util.filechunkiter(fp, limit=size)
527 chunks = util.filechunkiter(fp, limit=size)
527 for chunk in chunks:
528 for chunk in chunks:
528 seen += len(chunk)
529 seen += len(chunk)
529 progress.update(seen)
530 progress.update(seen)
530 yield chunk
531 yield chunk
531 finally:
532 finally:
532 fp.close()
533 fp.close()
533
534
534 def generatev2(repo, includes, excludes):
535 def generatev2(repo, includes, excludes):
535 """Emit content for version 2 of a streaming clone.
536 """Emit content for version 2 of a streaming clone.
536
537
537 the data stream consists the following entries:
538 the data stream consists the following entries:
538 1) A char representing the file destination (eg: store or cache)
539 1) A char representing the file destination (eg: store or cache)
539 2) A varint containing the length of the filename
540 2) A varint containing the length of the filename
540 3) A varint containing the length of file data
541 3) A varint containing the length of file data
541 4) N bytes containing the filename (the internal, store-agnostic form)
542 4) N bytes containing the filename (the internal, store-agnostic form)
542 5) N bytes containing the file data
543 5) N bytes containing the file data
543
544
544 Returns a 3-tuple of (file count, file size, data iterator).
545 Returns a 3-tuple of (file count, file size, data iterator).
545 """
546 """
546
547
547 # temporarily raise error until we add storage level logic
548 # temporarily raise error until we add storage level logic
548 if includes or excludes:
549 if includes or excludes:
549 raise error.Abort(_("server does not support narrow stream clones"))
550 raise error.Abort(_("server does not support narrow stream clones"))
550
551
551 with repo.lock():
552 with repo.lock():
552
553
553 entries = []
554 entries = []
554 totalfilesize = 0
555 totalfilesize = 0
555
556
557 matcher = None
558 if includes or excludes:
559 matcher = narrowspec.match(repo.root, includes, excludes)
560
556 repo.ui.debug('scanning\n')
561 repo.ui.debug('scanning\n')
557 for name, ename, size in _walkstreamfiles(repo):
562 for name, ename, size in _walkstreamfiles(repo, matcher):
558 if size:
563 if size:
559 entries.append((_srcstore, name, _fileappend, size))
564 entries.append((_srcstore, name, _fileappend, size))
560 totalfilesize += size
565 totalfilesize += size
561 for name in _walkstreamfullstorefiles(repo):
566 for name in _walkstreamfullstorefiles(repo):
562 if repo.svfs.exists(name):
567 if repo.svfs.exists(name):
563 totalfilesize += repo.svfs.lstat(name).st_size
568 totalfilesize += repo.svfs.lstat(name).st_size
564 entries.append((_srcstore, name, _filefull, None))
569 entries.append((_srcstore, name, _filefull, None))
565 for name in cacheutil.cachetocopy(repo):
570 for name in cacheutil.cachetocopy(repo):
566 if repo.cachevfs.exists(name):
571 if repo.cachevfs.exists(name):
567 totalfilesize += repo.cachevfs.lstat(name).st_size
572 totalfilesize += repo.cachevfs.lstat(name).st_size
568 entries.append((_srccache, name, _filefull, None))
573 entries.append((_srccache, name, _filefull, None))
569
574
570 chunks = _emit2(repo, entries, totalfilesize)
575 chunks = _emit2(repo, entries, totalfilesize)
571 first = next(chunks)
576 first = next(chunks)
572 assert first is None
577 assert first is None
573
578
574 return len(entries), totalfilesize, chunks
579 return len(entries), totalfilesize, chunks
575
580
576 @contextlib.contextmanager
581 @contextlib.contextmanager
577 def nested(*ctxs):
582 def nested(*ctxs):
578 this = ctxs[0]
583 this = ctxs[0]
579 rest = ctxs[1:]
584 rest = ctxs[1:]
580 with this:
585 with this:
581 if rest:
586 if rest:
582 with nested(*rest):
587 with nested(*rest):
583 yield
588 yield
584 else:
589 else:
585 yield
590 yield
586
591
587 def consumev2(repo, fp, filecount, filesize):
592 def consumev2(repo, fp, filecount, filesize):
588 """Apply the contents from a version 2 streaming clone.
593 """Apply the contents from a version 2 streaming clone.
589
594
590 Data is read from an object that only needs to provide a ``read(size)``
595 Data is read from an object that only needs to provide a ``read(size)``
591 method.
596 method.
592 """
597 """
593 with repo.lock():
598 with repo.lock():
594 repo.ui.status(_('%d files to transfer, %s of data\n') %
599 repo.ui.status(_('%d files to transfer, %s of data\n') %
595 (filecount, util.bytecount(filesize)))
600 (filecount, util.bytecount(filesize)))
596
601
597 start = util.timer()
602 start = util.timer()
598 progress = repo.ui.makeprogress(_('clone'), total=filesize,
603 progress = repo.ui.makeprogress(_('clone'), total=filesize,
599 unit=_('bytes'))
604 unit=_('bytes'))
600 progress.update(0)
605 progress.update(0)
601
606
602 vfsmap = _makemap(repo)
607 vfsmap = _makemap(repo)
603
608
604 with repo.transaction('clone'):
609 with repo.transaction('clone'):
605 ctxs = (vfs.backgroundclosing(repo.ui)
610 ctxs = (vfs.backgroundclosing(repo.ui)
606 for vfs in vfsmap.values())
611 for vfs in vfsmap.values())
607 with nested(*ctxs):
612 with nested(*ctxs):
608 for i in range(filecount):
613 for i in range(filecount):
609 src = util.readexactly(fp, 1)
614 src = util.readexactly(fp, 1)
610 vfs = vfsmap[src]
615 vfs = vfsmap[src]
611 namelen = util.uvarintdecodestream(fp)
616 namelen = util.uvarintdecodestream(fp)
612 datalen = util.uvarintdecodestream(fp)
617 datalen = util.uvarintdecodestream(fp)
613
618
614 name = util.readexactly(fp, namelen)
619 name = util.readexactly(fp, namelen)
615
620
616 if repo.ui.debugflag:
621 if repo.ui.debugflag:
617 repo.ui.debug('adding [%s] %s (%s)\n' %
622 repo.ui.debug('adding [%s] %s (%s)\n' %
618 (src, name, util.bytecount(datalen)))
623 (src, name, util.bytecount(datalen)))
619
624
620 with vfs(name, 'w') as ofp:
625 with vfs(name, 'w') as ofp:
621 for chunk in util.filechunkiter(fp, limit=datalen):
626 for chunk in util.filechunkiter(fp, limit=datalen):
622 progress.increment(step=len(chunk))
627 progress.increment(step=len(chunk))
623 ofp.write(chunk)
628 ofp.write(chunk)
624
629
625 # force @filecache properties to be reloaded from
630 # force @filecache properties to be reloaded from
626 # streamclone-ed file at next access
631 # streamclone-ed file at next access
627 repo.invalidate(clearfilecache=True)
632 repo.invalidate(clearfilecache=True)
628
633
629 elapsed = util.timer() - start
634 elapsed = util.timer() - start
630 if elapsed <= 0:
635 if elapsed <= 0:
631 elapsed = 0.001
636 elapsed = 0.001
632 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
637 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
633 (util.bytecount(progress.pos), elapsed,
638 (util.bytecount(progress.pos), elapsed,
634 util.bytecount(progress.pos / elapsed)))
639 util.bytecount(progress.pos / elapsed)))
635 progress.complete()
640 progress.complete()
636
641
637 def applybundlev2(repo, fp, filecount, filesize, requirements):
642 def applybundlev2(repo, fp, filecount, filesize, requirements):
638 from . import localrepo
643 from . import localrepo
639
644
640 missingreqs = [r for r in requirements if r not in repo.supported]
645 missingreqs = [r for r in requirements if r not in repo.supported]
641 if missingreqs:
646 if missingreqs:
642 raise error.Abort(_('unable to apply stream clone: '
647 raise error.Abort(_('unable to apply stream clone: '
643 'unsupported format: %s') %
648 'unsupported format: %s') %
644 ', '.join(sorted(missingreqs)))
649 ', '.join(sorted(missingreqs)))
645
650
646 consumev2(repo, fp, filecount, filesize)
651 consumev2(repo, fp, filecount, filesize)
647
652
648 # new requirements = old non-format requirements +
653 # new requirements = old non-format requirements +
649 # new format-related remote requirements
654 # new format-related remote requirements
650 # requirements from the streamed-in repository
655 # requirements from the streamed-in repository
651 repo.requirements = set(requirements) | (
656 repo.requirements = set(requirements) | (
652 repo.requirements - repo.supportedformats)
657 repo.requirements - repo.supportedformats)
653 repo.svfs.options = localrepo.resolvestorevfsoptions(
658 repo.svfs.options = localrepo.resolvestorevfsoptions(
654 repo.ui, repo.requirements, repo.features)
659 repo.ui, repo.requirements, repo.features)
655 repo._writerequirements()
660 repo._writerequirements()
General Comments 0
You need to be logged in to leave comments. Login now