##// END OF EJS Templates
streamclone: teach canperformstreamclone to be bundle2 aware...
Gregory Szorc -
r26467:ff2c8923 default
parent child Browse files
Show More
@@ -1,260 +1,279 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import time
10 import time
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 branchmap,
14 branchmap,
15 error,
15 error,
16 store,
16 store,
17 util,
17 util,
18 )
18 )
19
19
20 def canperformstreamclone(pullop):
20 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 """Whether it is possible to perform a streaming clone as part of pull.
21 """Whether it is possible to perform a streaming clone as part of pull.
22
22
23 ``bailifbundle2supported`` will cause the function to return False if
24 bundle2 stream clones are supported. It should only be called by the
25 legacy stream clone code path.
26
23 Returns a tuple of (supported, requirements). ``supported`` is True if
27 Returns a tuple of (supported, requirements). ``supported`` is True if
24 streaming clone is supported and False otherwise. ``requirements`` is
28 streaming clone is supported and False otherwise. ``requirements`` is
25 a set of repo requirements from the remote, or ``None`` if stream clone
29 a set of repo requirements from the remote, or ``None`` if stream clone
26 isn't supported.
30 isn't supported.
27 """
31 """
28 repo = pullop.repo
32 repo = pullop.repo
29 remote = pullop.remote
33 remote = pullop.remote
30
34
35 bundle2supported = False
36 if pullop.canusebundle2:
37 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 bundle2supported = True
39 # else
40 # Server doesn't support bundle2 stream clone or doesn't support
41 # the versions we support. Fall back and possibly allow legacy.
42
43 # Ensures legacy code path uses available bundle2.
44 if bailifbundle2supported and bundle2supported:
45 return False, None
46 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 #elif not bailifbundle2supported and not bundle2supported:
48 # return False, None
49
31 # Streaming clone only works on empty repositories.
50 # Streaming clone only works on empty repositories.
32 if len(repo):
51 if len(repo):
33 return False, None
52 return False, None
34
53
35 # Streaming clone only works if all data is being requested.
54 # Streaming clone only works if all data is being requested.
36 if pullop.heads:
55 if pullop.heads:
37 return False, None
56 return False, None
38
57
39 streamrequested = pullop.streamclonerequested
58 streamrequested = pullop.streamclonerequested
40
59
41 # If we don't have a preference, let the server decide for us. This
60 # If we don't have a preference, let the server decide for us. This
42 # likely only comes into play in LANs.
61 # likely only comes into play in LANs.
43 if streamrequested is None:
62 if streamrequested is None:
44 # The server can advertise whether to prefer streaming clone.
63 # The server can advertise whether to prefer streaming clone.
45 streamrequested = remote.capable('stream-preferred')
64 streamrequested = remote.capable('stream-preferred')
46
65
47 if not streamrequested:
66 if not streamrequested:
48 return False, None
67 return False, None
49
68
50 # In order for stream clone to work, the client has to support all the
69 # In order for stream clone to work, the client has to support all the
51 # requirements advertised by the server.
70 # requirements advertised by the server.
52 #
71 #
53 # The server advertises its requirements via the "stream" and "streamreqs"
72 # The server advertises its requirements via the "stream" and "streamreqs"
54 # capability. "stream" (a value-less capability) is advertised if and only
73 # capability. "stream" (a value-less capability) is advertised if and only
55 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
74 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
56 # is advertised and contains a comma-delimited list of requirements.
75 # is advertised and contains a comma-delimited list of requirements.
57 requirements = set()
76 requirements = set()
58 if remote.capable('stream'):
77 if remote.capable('stream'):
59 requirements.add('revlogv1')
78 requirements.add('revlogv1')
60 else:
79 else:
61 streamreqs = remote.capable('streamreqs')
80 streamreqs = remote.capable('streamreqs')
62 # This is weird and shouldn't happen with modern servers.
81 # This is weird and shouldn't happen with modern servers.
63 if not streamreqs:
82 if not streamreqs:
64 return False, None
83 return False, None
65
84
66 streamreqs = set(streamreqs.split(','))
85 streamreqs = set(streamreqs.split(','))
67 # Server requires something we don't support. Bail.
86 # Server requires something we don't support. Bail.
68 if streamreqs - repo.supportedformats:
87 if streamreqs - repo.supportedformats:
69 return False, None
88 return False, None
70 requirements = streamreqs
89 requirements = streamreqs
71
90
72 return True, requirements
91 return True, requirements
73
92
74 def maybeperformlegacystreamclone(pullop):
93 def maybeperformlegacystreamclone(pullop):
75 """Possibly perform a legacy stream clone operation.
94 """Possibly perform a legacy stream clone operation.
76
95
77 Legacy stream clones are performed as part of pull but before all other
96 Legacy stream clones are performed as part of pull but before all other
78 operations.
97 operations.
79
98
80 A legacy stream clone will not be performed if a bundle2 stream clone is
99 A legacy stream clone will not be performed if a bundle2 stream clone is
81 supported.
100 supported.
82 """
101 """
83 supported, requirements = canperformstreamclone(pullop)
102 supported, requirements = canperformstreamclone(pullop)
84
103
85 if not supported:
104 if not supported:
86 return
105 return
87
106
88 repo = pullop.repo
107 repo = pullop.repo
89 remote = pullop.remote
108 remote = pullop.remote
90
109
91 # Save remote branchmap. We will use it later to speed up branchcache
110 # Save remote branchmap. We will use it later to speed up branchcache
92 # creation.
111 # creation.
93 rbranchmap = None
112 rbranchmap = None
94 if remote.capable('branchmap'):
113 if remote.capable('branchmap'):
95 rbranchmap = remote.branchmap()
114 rbranchmap = remote.branchmap()
96
115
97 fp = remote.stream_out()
116 fp = remote.stream_out()
98 l = fp.readline()
117 l = fp.readline()
99 try:
118 try:
100 resp = int(l)
119 resp = int(l)
101 except ValueError:
120 except ValueError:
102 raise error.ResponseError(
121 raise error.ResponseError(
103 _('unexpected response from remote server:'), l)
122 _('unexpected response from remote server:'), l)
104 if resp == 1:
123 if resp == 1:
105 raise util.Abort(_('operation forbidden by server'))
124 raise util.Abort(_('operation forbidden by server'))
106 elif resp == 2:
125 elif resp == 2:
107 raise util.Abort(_('locking the remote repository failed'))
126 raise util.Abort(_('locking the remote repository failed'))
108 elif resp != 0:
127 elif resp != 0:
109 raise util.Abort(_('the server sent an unknown error code'))
128 raise util.Abort(_('the server sent an unknown error code'))
110
129
111 lock = repo.lock()
130 lock = repo.lock()
112 try:
131 try:
113 consumev1(repo, fp)
132 consumev1(repo, fp)
114
133
115 # new requirements = old non-format requirements +
134 # new requirements = old non-format requirements +
116 # new format-related remote requirements
135 # new format-related remote requirements
117 # requirements from the streamed-in repository
136 # requirements from the streamed-in repository
118 repo.requirements = requirements | (
137 repo.requirements = requirements | (
119 repo.requirements - repo.supportedformats)
138 repo.requirements - repo.supportedformats)
120 repo._applyopenerreqs()
139 repo._applyopenerreqs()
121 repo._writerequirements()
140 repo._writerequirements()
122
141
123 if rbranchmap:
142 if rbranchmap:
124 branchmap.replacecache(repo, rbranchmap)
143 branchmap.replacecache(repo, rbranchmap)
125
144
126 repo.invalidate()
145 repo.invalidate()
127 finally:
146 finally:
128 lock.release()
147 lock.release()
129
148
130 def allowservergeneration(ui):
149 def allowservergeneration(ui):
131 """Whether streaming clones are allowed from the server."""
150 """Whether streaming clones are allowed from the server."""
132 return ui.configbool('server', 'uncompressed', True, untrusted=True)
151 return ui.configbool('server', 'uncompressed', True, untrusted=True)
133
152
134 # This is it's own function so extensions can override it.
153 # This is it's own function so extensions can override it.
135 def _walkstreamfiles(repo):
154 def _walkstreamfiles(repo):
136 return repo.store.walk()
155 return repo.store.walk()
137
156
138 def generatev1(repo):
157 def generatev1(repo):
139 """Emit content for version 1 of a streaming clone.
158 """Emit content for version 1 of a streaming clone.
140
159
141 This is a generator of raw chunks that constitute a streaming clone.
160 This is a generator of raw chunks that constitute a streaming clone.
142
161
143 The stream begins with a line of 2 space-delimited integers containing the
162 The stream begins with a line of 2 space-delimited integers containing the
144 number of entries and total bytes size.
163 number of entries and total bytes size.
145
164
146 Next, are N entries for each file being transferred. Each file entry starts
165 Next, are N entries for each file being transferred. Each file entry starts
147 as a line with the file name and integer size delimited by a null byte.
166 as a line with the file name and integer size delimited by a null byte.
148 The raw file data follows. Following the raw file data is the next file
167 The raw file data follows. Following the raw file data is the next file
149 entry, or EOF.
168 entry, or EOF.
150
169
151 When used on the wire protocol, an additional line indicating protocol
170 When used on the wire protocol, an additional line indicating protocol
152 success will be prepended to the stream. This function is not responsible
171 success will be prepended to the stream. This function is not responsible
153 for adding it.
172 for adding it.
154
173
155 This function will obtain a repository lock to ensure a consistent view of
174 This function will obtain a repository lock to ensure a consistent view of
156 the store is captured. It therefore may raise LockError.
175 the store is captured. It therefore may raise LockError.
157 """
176 """
158 entries = []
177 entries = []
159 total_bytes = 0
178 total_bytes = 0
160 # Get consistent snapshot of repo, lock during scan.
179 # Get consistent snapshot of repo, lock during scan.
161 lock = repo.lock()
180 lock = repo.lock()
162 try:
181 try:
163 repo.ui.debug('scanning\n')
182 repo.ui.debug('scanning\n')
164 for name, ename, size in _walkstreamfiles(repo):
183 for name, ename, size in _walkstreamfiles(repo):
165 if size:
184 if size:
166 entries.append((name, size))
185 entries.append((name, size))
167 total_bytes += size
186 total_bytes += size
168 finally:
187 finally:
169 lock.release()
188 lock.release()
170
189
171 repo.ui.debug('%d files, %d bytes to transfer\n' %
190 repo.ui.debug('%d files, %d bytes to transfer\n' %
172 (len(entries), total_bytes))
191 (len(entries), total_bytes))
173 yield '%d %d\n' % (len(entries), total_bytes)
192 yield '%d %d\n' % (len(entries), total_bytes)
174
193
175 svfs = repo.svfs
194 svfs = repo.svfs
176 oldaudit = svfs.mustaudit
195 oldaudit = svfs.mustaudit
177 debugflag = repo.ui.debugflag
196 debugflag = repo.ui.debugflag
178 svfs.mustaudit = False
197 svfs.mustaudit = False
179
198
180 try:
199 try:
181 for name, size in entries:
200 for name, size in entries:
182 if debugflag:
201 if debugflag:
183 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
202 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
184 # partially encode name over the wire for backwards compat
203 # partially encode name over the wire for backwards compat
185 yield '%s\0%d\n' % (store.encodedir(name), size)
204 yield '%s\0%d\n' % (store.encodedir(name), size)
186 if size <= 65536:
205 if size <= 65536:
187 fp = svfs(name)
206 fp = svfs(name)
188 try:
207 try:
189 data = fp.read(size)
208 data = fp.read(size)
190 finally:
209 finally:
191 fp.close()
210 fp.close()
192 yield data
211 yield data
193 else:
212 else:
194 for chunk in util.filechunkiter(svfs(name), limit=size):
213 for chunk in util.filechunkiter(svfs(name), limit=size):
195 yield chunk
214 yield chunk
196 finally:
215 finally:
197 svfs.mustaudit = oldaudit
216 svfs.mustaudit = oldaudit
198
217
199 def consumev1(repo, fp):
218 def consumev1(repo, fp):
200 """Apply the contents from version 1 of a streaming clone file handle.
219 """Apply the contents from version 1 of a streaming clone file handle.
201
220
202 This takes the output from "streamout" and applies it to the specified
221 This takes the output from "streamout" and applies it to the specified
203 repository.
222 repository.
204
223
205 Like "streamout," the status line added by the wire protocol is not handled
224 Like "streamout," the status line added by the wire protocol is not handled
206 by this function.
225 by this function.
207 """
226 """
208 lock = repo.lock()
227 lock = repo.lock()
209 try:
228 try:
210 repo.ui.status(_('streaming all changes\n'))
229 repo.ui.status(_('streaming all changes\n'))
211 l = fp.readline()
230 l = fp.readline()
212 try:
231 try:
213 total_files, total_bytes = map(int, l.split(' ', 1))
232 total_files, total_bytes = map(int, l.split(' ', 1))
214 except (ValueError, TypeError):
233 except (ValueError, TypeError):
215 raise error.ResponseError(
234 raise error.ResponseError(
216 _('unexpected response from remote server:'), l)
235 _('unexpected response from remote server:'), l)
217 repo.ui.status(_('%d files to transfer, %s of data\n') %
236 repo.ui.status(_('%d files to transfer, %s of data\n') %
218 (total_files, util.bytecount(total_bytes)))
237 (total_files, util.bytecount(total_bytes)))
219 handled_bytes = 0
238 handled_bytes = 0
220 repo.ui.progress(_('clone'), 0, total=total_bytes)
239 repo.ui.progress(_('clone'), 0, total=total_bytes)
221 start = time.time()
240 start = time.time()
222
241
223 tr = repo.transaction(_('clone'))
242 tr = repo.transaction(_('clone'))
224 try:
243 try:
225 for i in xrange(total_files):
244 for i in xrange(total_files):
226 # XXX doesn't support '\n' or '\r' in filenames
245 # XXX doesn't support '\n' or '\r' in filenames
227 l = fp.readline()
246 l = fp.readline()
228 try:
247 try:
229 name, size = l.split('\0', 1)
248 name, size = l.split('\0', 1)
230 size = int(size)
249 size = int(size)
231 except (ValueError, TypeError):
250 except (ValueError, TypeError):
232 raise error.ResponseError(
251 raise error.ResponseError(
233 _('unexpected response from remote server:'), l)
252 _('unexpected response from remote server:'), l)
234 if repo.ui.debugflag:
253 if repo.ui.debugflag:
235 repo.ui.debug('adding %s (%s)\n' %
254 repo.ui.debug('adding %s (%s)\n' %
236 (name, util.bytecount(size)))
255 (name, util.bytecount(size)))
237 # for backwards compat, name was partially encoded
256 # for backwards compat, name was partially encoded
238 ofp = repo.svfs(store.decodedir(name), 'w')
257 ofp = repo.svfs(store.decodedir(name), 'w')
239 for chunk in util.filechunkiter(fp, limit=size):
258 for chunk in util.filechunkiter(fp, limit=size):
240 handled_bytes += len(chunk)
259 handled_bytes += len(chunk)
241 repo.ui.progress(_('clone'), handled_bytes,
260 repo.ui.progress(_('clone'), handled_bytes,
242 total=total_bytes)
261 total=total_bytes)
243 ofp.write(chunk)
262 ofp.write(chunk)
244 ofp.close()
263 ofp.close()
245 tr.close()
264 tr.close()
246 finally:
265 finally:
247 tr.release()
266 tr.release()
248
267
249 # Writing straight to files circumvented the inmemory caches
268 # Writing straight to files circumvented the inmemory caches
250 repo.invalidate()
269 repo.invalidate()
251
270
252 elapsed = time.time() - start
271 elapsed = time.time() - start
253 if elapsed <= 0:
272 if elapsed <= 0:
254 elapsed = 0.001
273 elapsed = 0.001
255 repo.ui.progress(_('clone'), None)
274 repo.ui.progress(_('clone'), None)
256 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
275 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
257 (util.bytecount(total_bytes), elapsed,
276 (util.bytecount(total_bytes), elapsed,
258 util.bytecount(total_bytes / elapsed)))
277 util.bytecount(total_bytes / elapsed)))
259 finally:
278 finally:
260 lock.release()
279 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now