##// END OF EJS Templates
streamclone: move "streaming all changes" message location...
Gregory Szorc -
r26470:4b5647d9 default
parent child Browse files
Show More
@@ -1,291 +1,292 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import time
10 import time
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 branchmap,
14 branchmap,
15 error,
15 error,
16 store,
16 store,
17 util,
17 util,
18 )
18 )
19
19
20 def canperformstreamclone(pullop, bailifbundle2supported=False):
20 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 """Whether it is possible to perform a streaming clone as part of pull.
21 """Whether it is possible to perform a streaming clone as part of pull.
22
22
23 ``bailifbundle2supported`` will cause the function to return False if
23 ``bailifbundle2supported`` will cause the function to return False if
24 bundle2 stream clones are supported. It should only be called by the
24 bundle2 stream clones are supported. It should only be called by the
25 legacy stream clone code path.
25 legacy stream clone code path.
26
26
27 Returns a tuple of (supported, requirements). ``supported`` is True if
27 Returns a tuple of (supported, requirements). ``supported`` is True if
28 streaming clone is supported and False otherwise. ``requirements`` is
28 streaming clone is supported and False otherwise. ``requirements`` is
29 a set of repo requirements from the remote, or ``None`` if stream clone
29 a set of repo requirements from the remote, or ``None`` if stream clone
30 isn't supported.
30 isn't supported.
31 """
31 """
32 repo = pullop.repo
32 repo = pullop.repo
33 remote = pullop.remote
33 remote = pullop.remote
34
34
35 bundle2supported = False
35 bundle2supported = False
36 if pullop.canusebundle2:
36 if pullop.canusebundle2:
37 if 'v1' in pullop.remotebundle2caps.get('stream', []):
37 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 bundle2supported = True
38 bundle2supported = True
39 # else
39 # else
40 # Server doesn't support bundle2 stream clone or doesn't support
40 # Server doesn't support bundle2 stream clone or doesn't support
41 # the versions we support. Fall back and possibly allow legacy.
41 # the versions we support. Fall back and possibly allow legacy.
42
42
43 # Ensures legacy code path uses available bundle2.
43 # Ensures legacy code path uses available bundle2.
44 if bailifbundle2supported and bundle2supported:
44 if bailifbundle2supported and bundle2supported:
45 return False, None
45 return False, None
46 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
46 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 #elif not bailifbundle2supported and not bundle2supported:
47 #elif not bailifbundle2supported and not bundle2supported:
48 # return False, None
48 # return False, None
49
49
50 # Streaming clone only works on empty repositories.
50 # Streaming clone only works on empty repositories.
51 if len(repo):
51 if len(repo):
52 return False, None
52 return False, None
53
53
54 # Streaming clone only works if all data is being requested.
54 # Streaming clone only works if all data is being requested.
55 if pullop.heads:
55 if pullop.heads:
56 return False, None
56 return False, None
57
57
58 streamrequested = pullop.streamclonerequested
58 streamrequested = pullop.streamclonerequested
59
59
60 # If we don't have a preference, let the server decide for us. This
60 # If we don't have a preference, let the server decide for us. This
61 # likely only comes into play in LANs.
61 # likely only comes into play in LANs.
62 if streamrequested is None:
62 if streamrequested is None:
63 # The server can advertise whether to prefer streaming clone.
63 # The server can advertise whether to prefer streaming clone.
64 streamrequested = remote.capable('stream-preferred')
64 streamrequested = remote.capable('stream-preferred')
65
65
66 if not streamrequested:
66 if not streamrequested:
67 return False, None
67 return False, None
68
68
69 # In order for stream clone to work, the client has to support all the
69 # In order for stream clone to work, the client has to support all the
70 # requirements advertised by the server.
70 # requirements advertised by the server.
71 #
71 #
72 # The server advertises its requirements via the "stream" and "streamreqs"
72 # The server advertises its requirements via the "stream" and "streamreqs"
73 # capability. "stream" (a value-less capability) is advertised if and only
73 # capability. "stream" (a value-less capability) is advertised if and only
74 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
74 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
75 # is advertised and contains a comma-delimited list of requirements.
75 # is advertised and contains a comma-delimited list of requirements.
76 requirements = set()
76 requirements = set()
77 if remote.capable('stream'):
77 if remote.capable('stream'):
78 requirements.add('revlogv1')
78 requirements.add('revlogv1')
79 else:
79 else:
80 streamreqs = remote.capable('streamreqs')
80 streamreqs = remote.capable('streamreqs')
81 # This is weird and shouldn't happen with modern servers.
81 # This is weird and shouldn't happen with modern servers.
82 if not streamreqs:
82 if not streamreqs:
83 return False, None
83 return False, None
84
84
85 streamreqs = set(streamreqs.split(','))
85 streamreqs = set(streamreqs.split(','))
86 # Server requires something we don't support. Bail.
86 # Server requires something we don't support. Bail.
87 if streamreqs - repo.supportedformats:
87 if streamreqs - repo.supportedformats:
88 return False, None
88 return False, None
89 requirements = streamreqs
89 requirements = streamreqs
90
90
91 return True, requirements
91 return True, requirements
92
92
93 def maybeperformlegacystreamclone(pullop):
93 def maybeperformlegacystreamclone(pullop):
94 """Possibly perform a legacy stream clone operation.
94 """Possibly perform a legacy stream clone operation.
95
95
96 Legacy stream clones are performed as part of pull but before all other
96 Legacy stream clones are performed as part of pull but before all other
97 operations.
97 operations.
98
98
99 A legacy stream clone will not be performed if a bundle2 stream clone is
99 A legacy stream clone will not be performed if a bundle2 stream clone is
100 supported.
100 supported.
101 """
101 """
102 supported, requirements = canperformstreamclone(pullop)
102 supported, requirements = canperformstreamclone(pullop)
103
103
104 if not supported:
104 if not supported:
105 return
105 return
106
106
107 repo = pullop.repo
107 repo = pullop.repo
108 remote = pullop.remote
108 remote = pullop.remote
109
109
110 # Save remote branchmap. We will use it later to speed up branchcache
110 # Save remote branchmap. We will use it later to speed up branchcache
111 # creation.
111 # creation.
112 rbranchmap = None
112 rbranchmap = None
113 if remote.capable('branchmap'):
113 if remote.capable('branchmap'):
114 rbranchmap = remote.branchmap()
114 rbranchmap = remote.branchmap()
115
115
116 repo.ui.status(_('streaming all changes\n'))
117
116 fp = remote.stream_out()
118 fp = remote.stream_out()
117 l = fp.readline()
119 l = fp.readline()
118 try:
120 try:
119 resp = int(l)
121 resp = int(l)
120 except ValueError:
122 except ValueError:
121 raise error.ResponseError(
123 raise error.ResponseError(
122 _('unexpected response from remote server:'), l)
124 _('unexpected response from remote server:'), l)
123 if resp == 1:
125 if resp == 1:
124 raise util.Abort(_('operation forbidden by server'))
126 raise util.Abort(_('operation forbidden by server'))
125 elif resp == 2:
127 elif resp == 2:
126 raise util.Abort(_('locking the remote repository failed'))
128 raise util.Abort(_('locking the remote repository failed'))
127 elif resp != 0:
129 elif resp != 0:
128 raise util.Abort(_('the server sent an unknown error code'))
130 raise util.Abort(_('the server sent an unknown error code'))
129
131
130 l = fp.readline()
132 l = fp.readline()
131 try:
133 try:
132 filecount, bytecount = map(int, l.split(' ', 1))
134 filecount, bytecount = map(int, l.split(' ', 1))
133 except (ValueError, TypeError):
135 except (ValueError, TypeError):
134 raise error.ResponseError(
136 raise error.ResponseError(
135 _('unexpected response from remote server:'), l)
137 _('unexpected response from remote server:'), l)
136
138
137 lock = repo.lock()
139 lock = repo.lock()
138 try:
140 try:
139 consumev1(repo, fp, filecount, bytecount)
141 consumev1(repo, fp, filecount, bytecount)
140
142
141 # new requirements = old non-format requirements +
143 # new requirements = old non-format requirements +
142 # new format-related remote requirements
144 # new format-related remote requirements
143 # requirements from the streamed-in repository
145 # requirements from the streamed-in repository
144 repo.requirements = requirements | (
146 repo.requirements = requirements | (
145 repo.requirements - repo.supportedformats)
147 repo.requirements - repo.supportedformats)
146 repo._applyopenerreqs()
148 repo._applyopenerreqs()
147 repo._writerequirements()
149 repo._writerequirements()
148
150
149 if rbranchmap:
151 if rbranchmap:
150 branchmap.replacecache(repo, rbranchmap)
152 branchmap.replacecache(repo, rbranchmap)
151
153
152 repo.invalidate()
154 repo.invalidate()
153 finally:
155 finally:
154 lock.release()
156 lock.release()
155
157
156 def allowservergeneration(ui):
158 def allowservergeneration(ui):
157 """Whether streaming clones are allowed from the server."""
159 """Whether streaming clones are allowed from the server."""
158 return ui.configbool('server', 'uncompressed', True, untrusted=True)
160 return ui.configbool('server', 'uncompressed', True, untrusted=True)
159
161
160 # This is it's own function so extensions can override it.
162 # This is it's own function so extensions can override it.
161 def _walkstreamfiles(repo):
163 def _walkstreamfiles(repo):
162 return repo.store.walk()
164 return repo.store.walk()
163
165
164 def generatev1(repo):
166 def generatev1(repo):
165 """Emit content for version 1 of a streaming clone.
167 """Emit content for version 1 of a streaming clone.
166
168
167 This returns a 3-tuple of (file count, byte size, data iterator).
169 This returns a 3-tuple of (file count, byte size, data iterator).
168
170
169 The data iterator consists of N entries for each file being transferred.
171 The data iterator consists of N entries for each file being transferred.
170 Each file entry starts as a line with the file name and integer size
172 Each file entry starts as a line with the file name and integer size
171 delimited by a null byte.
173 delimited by a null byte.
172
174
173 The raw file data follows. Following the raw file data is the next file
175 The raw file data follows. Following the raw file data is the next file
174 entry, or EOF.
176 entry, or EOF.
175
177
176 When used on the wire protocol, an additional line indicating protocol
178 When used on the wire protocol, an additional line indicating protocol
177 success will be prepended to the stream. This function is not responsible
179 success will be prepended to the stream. This function is not responsible
178 for adding it.
180 for adding it.
179
181
180 This function will obtain a repository lock to ensure a consistent view of
182 This function will obtain a repository lock to ensure a consistent view of
181 the store is captured. It therefore may raise LockError.
183 the store is captured. It therefore may raise LockError.
182 """
184 """
183 entries = []
185 entries = []
184 total_bytes = 0
186 total_bytes = 0
185 # Get consistent snapshot of repo, lock during scan.
187 # Get consistent snapshot of repo, lock during scan.
186 lock = repo.lock()
188 lock = repo.lock()
187 try:
189 try:
188 repo.ui.debug('scanning\n')
190 repo.ui.debug('scanning\n')
189 for name, ename, size in _walkstreamfiles(repo):
191 for name, ename, size in _walkstreamfiles(repo):
190 if size:
192 if size:
191 entries.append((name, size))
193 entries.append((name, size))
192 total_bytes += size
194 total_bytes += size
193 finally:
195 finally:
194 lock.release()
196 lock.release()
195
197
196 repo.ui.debug('%d files, %d bytes to transfer\n' %
198 repo.ui.debug('%d files, %d bytes to transfer\n' %
197 (len(entries), total_bytes))
199 (len(entries), total_bytes))
198
200
199 svfs = repo.svfs
201 svfs = repo.svfs
200 oldaudit = svfs.mustaudit
202 oldaudit = svfs.mustaudit
201 debugflag = repo.ui.debugflag
203 debugflag = repo.ui.debugflag
202 svfs.mustaudit = False
204 svfs.mustaudit = False
203
205
204 def emitrevlogdata():
206 def emitrevlogdata():
205 try:
207 try:
206 for name, size in entries:
208 for name, size in entries:
207 if debugflag:
209 if debugflag:
208 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
210 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
209 # partially encode name over the wire for backwards compat
211 # partially encode name over the wire for backwards compat
210 yield '%s\0%d\n' % (store.encodedir(name), size)
212 yield '%s\0%d\n' % (store.encodedir(name), size)
211 if size <= 65536:
213 if size <= 65536:
212 fp = svfs(name)
214 fp = svfs(name)
213 try:
215 try:
214 data = fp.read(size)
216 data = fp.read(size)
215 finally:
217 finally:
216 fp.close()
218 fp.close()
217 yield data
219 yield data
218 else:
220 else:
219 for chunk in util.filechunkiter(svfs(name), limit=size):
221 for chunk in util.filechunkiter(svfs(name), limit=size):
220 yield chunk
222 yield chunk
221 finally:
223 finally:
222 svfs.mustaudit = oldaudit
224 svfs.mustaudit = oldaudit
223
225
224 return len(entries), total_bytes, emitrevlogdata()
226 return len(entries), total_bytes, emitrevlogdata()
225
227
226 def generatev1wireproto(repo):
228 def generatev1wireproto(repo):
227 """Emit content for version 1 of streaming clone suitable for the wire.
229 """Emit content for version 1 of streaming clone suitable for the wire.
228
230
229 This is the data output from ``generatev1()`` with a header line
231 This is the data output from ``generatev1()`` with a header line
230 indicating file count and byte size.
232 indicating file count and byte size.
231 """
233 """
232 filecount, bytecount, it = generatev1(repo)
234 filecount, bytecount, it = generatev1(repo)
233 yield '%d %d\n' % (filecount, bytecount)
235 yield '%d %d\n' % (filecount, bytecount)
234 for chunk in it:
236 for chunk in it:
235 yield chunk
237 yield chunk
236
238
237 def consumev1(repo, fp, filecount, bytecount):
239 def consumev1(repo, fp, filecount, bytecount):
238 """Apply the contents from version 1 of a streaming clone file handle.
240 """Apply the contents from version 1 of a streaming clone file handle.
239
241
240 This takes the output from "streamout" and applies it to the specified
242 This takes the output from "streamout" and applies it to the specified
241 repository.
243 repository.
242
244
243 Like "streamout," the status line added by the wire protocol is not handled
245 Like "streamout," the status line added by the wire protocol is not handled
244 by this function.
246 by this function.
245 """
247 """
246 lock = repo.lock()
248 lock = repo.lock()
247 try:
249 try:
248 repo.ui.status(_('streaming all changes\n'))
249 repo.ui.status(_('%d files to transfer, %s of data\n') %
250 repo.ui.status(_('%d files to transfer, %s of data\n') %
250 (filecount, util.bytecount(bytecount)))
251 (filecount, util.bytecount(bytecount)))
251 handled_bytes = 0
252 handled_bytes = 0
252 repo.ui.progress(_('clone'), 0, total=bytecount)
253 repo.ui.progress(_('clone'), 0, total=bytecount)
253 start = time.time()
254 start = time.time()
254
255
255 tr = repo.transaction(_('clone'))
256 tr = repo.transaction(_('clone'))
256 try:
257 try:
257 for i in xrange(filecount):
258 for i in xrange(filecount):
258 # XXX doesn't support '\n' or '\r' in filenames
259 # XXX doesn't support '\n' or '\r' in filenames
259 l = fp.readline()
260 l = fp.readline()
260 try:
261 try:
261 name, size = l.split('\0', 1)
262 name, size = l.split('\0', 1)
262 size = int(size)
263 size = int(size)
263 except (ValueError, TypeError):
264 except (ValueError, TypeError):
264 raise error.ResponseError(
265 raise error.ResponseError(
265 _('unexpected response from remote server:'), l)
266 _('unexpected response from remote server:'), l)
266 if repo.ui.debugflag:
267 if repo.ui.debugflag:
267 repo.ui.debug('adding %s (%s)\n' %
268 repo.ui.debug('adding %s (%s)\n' %
268 (name, util.bytecount(size)))
269 (name, util.bytecount(size)))
269 # for backwards compat, name was partially encoded
270 # for backwards compat, name was partially encoded
270 ofp = repo.svfs(store.decodedir(name), 'w')
271 ofp = repo.svfs(store.decodedir(name), 'w')
271 for chunk in util.filechunkiter(fp, limit=size):
272 for chunk in util.filechunkiter(fp, limit=size):
272 handled_bytes += len(chunk)
273 handled_bytes += len(chunk)
273 repo.ui.progress(_('clone'), handled_bytes, total=bytecount)
274 repo.ui.progress(_('clone'), handled_bytes, total=bytecount)
274 ofp.write(chunk)
275 ofp.write(chunk)
275 ofp.close()
276 ofp.close()
276 tr.close()
277 tr.close()
277 finally:
278 finally:
278 tr.release()
279 tr.release()
279
280
280 # Writing straight to files circumvented the inmemory caches
281 # Writing straight to files circumvented the inmemory caches
281 repo.invalidate()
282 repo.invalidate()
282
283
283 elapsed = time.time() - start
284 elapsed = time.time() - start
284 if elapsed <= 0:
285 if elapsed <= 0:
285 elapsed = 0.001
286 elapsed = 0.001
286 repo.ui.progress(_('clone'), None)
287 repo.ui.progress(_('clone'), None)
287 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
288 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
288 (util.bytecount(bytecount), elapsed,
289 (util.bytecount(bytecount), elapsed,
289 util.bytecount(bytecount / elapsed)))
290 util.bytecount(bytecount / elapsed)))
290 finally:
291 finally:
291 lock.release()
292 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now