##// END OF EJS Templates
streamclone: move payload header line consumption...
Gregory Szorc -
r26468:19bbd53a default
parent child Browse files
Show More
@@ -1,279 +1,279 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import time
10 import time
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 branchmap,
14 branchmap,
15 error,
15 error,
16 store,
16 store,
17 util,
17 util,
18 )
18 )
19
19
20 def canperformstreamclone(pullop, bailifbundle2supported=False):
20 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 """Whether it is possible to perform a streaming clone as part of pull.
21 """Whether it is possible to perform a streaming clone as part of pull.
22
22
23 ``bailifbundle2supported`` will cause the function to return False if
23 ``bailifbundle2supported`` will cause the function to return False if
24 bundle2 stream clones are supported. It should only be called by the
24 bundle2 stream clones are supported. It should only be called by the
25 legacy stream clone code path.
25 legacy stream clone code path.
26
26
27 Returns a tuple of (supported, requirements). ``supported`` is True if
27 Returns a tuple of (supported, requirements). ``supported`` is True if
28 streaming clone is supported and False otherwise. ``requirements`` is
28 streaming clone is supported and False otherwise. ``requirements`` is
29 a set of repo requirements from the remote, or ``None`` if stream clone
29 a set of repo requirements from the remote, or ``None`` if stream clone
30 isn't supported.
30 isn't supported.
31 """
31 """
32 repo = pullop.repo
32 repo = pullop.repo
33 remote = pullop.remote
33 remote = pullop.remote
34
34
35 bundle2supported = False
35 bundle2supported = False
36 if pullop.canusebundle2:
36 if pullop.canusebundle2:
37 if 'v1' in pullop.remotebundle2caps.get('stream', []):
37 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 bundle2supported = True
38 bundle2supported = True
39 # else
39 # else
40 # Server doesn't support bundle2 stream clone or doesn't support
40 # Server doesn't support bundle2 stream clone or doesn't support
41 # the versions we support. Fall back and possibly allow legacy.
41 # the versions we support. Fall back and possibly allow legacy.
42
42
43 # Ensures legacy code path uses available bundle2.
43 # Ensures legacy code path uses available bundle2.
44 if bailifbundle2supported and bundle2supported:
44 if bailifbundle2supported and bundle2supported:
45 return False, None
45 return False, None
46 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
46 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 #elif not bailifbundle2supported and not bundle2supported:
47 #elif not bailifbundle2supported and not bundle2supported:
48 # return False, None
48 # return False, None
49
49
50 # Streaming clone only works on empty repositories.
50 # Streaming clone only works on empty repositories.
51 if len(repo):
51 if len(repo):
52 return False, None
52 return False, None
53
53
54 # Streaming clone only works if all data is being requested.
54 # Streaming clone only works if all data is being requested.
55 if pullop.heads:
55 if pullop.heads:
56 return False, None
56 return False, None
57
57
58 streamrequested = pullop.streamclonerequested
58 streamrequested = pullop.streamclonerequested
59
59
60 # If we don't have a preference, let the server decide for us. This
60 # If we don't have a preference, let the server decide for us. This
61 # likely only comes into play in LANs.
61 # likely only comes into play in LANs.
62 if streamrequested is None:
62 if streamrequested is None:
63 # The server can advertise whether to prefer streaming clone.
63 # The server can advertise whether to prefer streaming clone.
64 streamrequested = remote.capable('stream-preferred')
64 streamrequested = remote.capable('stream-preferred')
65
65
66 if not streamrequested:
66 if not streamrequested:
67 return False, None
67 return False, None
68
68
69 # In order for stream clone to work, the client has to support all the
69 # In order for stream clone to work, the client has to support all the
70 # requirements advertised by the server.
70 # requirements advertised by the server.
71 #
71 #
72 # The server advertises its requirements via the "stream" and "streamreqs"
72 # The server advertises its requirements via the "stream" and "streamreqs"
73 # capability. "stream" (a value-less capability) is advertised if and only
73 # capability. "stream" (a value-less capability) is advertised if and only
74 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
74 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
75 # is advertised and contains a comma-delimited list of requirements.
75 # is advertised and contains a comma-delimited list of requirements.
76 requirements = set()
76 requirements = set()
77 if remote.capable('stream'):
77 if remote.capable('stream'):
78 requirements.add('revlogv1')
78 requirements.add('revlogv1')
79 else:
79 else:
80 streamreqs = remote.capable('streamreqs')
80 streamreqs = remote.capable('streamreqs')
81 # This is weird and shouldn't happen with modern servers.
81 # This is weird and shouldn't happen with modern servers.
82 if not streamreqs:
82 if not streamreqs:
83 return False, None
83 return False, None
84
84
85 streamreqs = set(streamreqs.split(','))
85 streamreqs = set(streamreqs.split(','))
86 # Server requires something we don't support. Bail.
86 # Server requires something we don't support. Bail.
87 if streamreqs - repo.supportedformats:
87 if streamreqs - repo.supportedformats:
88 return False, None
88 return False, None
89 requirements = streamreqs
89 requirements = streamreqs
90
90
91 return True, requirements
91 return True, requirements
92
92
93 def maybeperformlegacystreamclone(pullop):
93 def maybeperformlegacystreamclone(pullop):
94 """Possibly perform a legacy stream clone operation.
94 """Possibly perform a legacy stream clone operation.
95
95
96 Legacy stream clones are performed as part of pull but before all other
96 Legacy stream clones are performed as part of pull but before all other
97 operations.
97 operations.
98
98
99 A legacy stream clone will not be performed if a bundle2 stream clone is
99 A legacy stream clone will not be performed if a bundle2 stream clone is
100 supported.
100 supported.
101 """
101 """
102 supported, requirements = canperformstreamclone(pullop)
102 supported, requirements = canperformstreamclone(pullop)
103
103
104 if not supported:
104 if not supported:
105 return
105 return
106
106
107 repo = pullop.repo
107 repo = pullop.repo
108 remote = pullop.remote
108 remote = pullop.remote
109
109
110 # Save remote branchmap. We will use it later to speed up branchcache
110 # Save remote branchmap. We will use it later to speed up branchcache
111 # creation.
111 # creation.
112 rbranchmap = None
112 rbranchmap = None
113 if remote.capable('branchmap'):
113 if remote.capable('branchmap'):
114 rbranchmap = remote.branchmap()
114 rbranchmap = remote.branchmap()
115
115
116 fp = remote.stream_out()
116 fp = remote.stream_out()
117 l = fp.readline()
117 l = fp.readline()
118 try:
118 try:
119 resp = int(l)
119 resp = int(l)
120 except ValueError:
120 except ValueError:
121 raise error.ResponseError(
121 raise error.ResponseError(
122 _('unexpected response from remote server:'), l)
122 _('unexpected response from remote server:'), l)
123 if resp == 1:
123 if resp == 1:
124 raise util.Abort(_('operation forbidden by server'))
124 raise util.Abort(_('operation forbidden by server'))
125 elif resp == 2:
125 elif resp == 2:
126 raise util.Abort(_('locking the remote repository failed'))
126 raise util.Abort(_('locking the remote repository failed'))
127 elif resp != 0:
127 elif resp != 0:
128 raise util.Abort(_('the server sent an unknown error code'))
128 raise util.Abort(_('the server sent an unknown error code'))
129
129
130 l = fp.readline()
131 try:
132 filecount, bytecount = map(int, l.split(' ', 1))
133 except (ValueError, TypeError):
134 raise error.ResponseError(
135 _('unexpected response from remote server:'), l)
136
130 lock = repo.lock()
137 lock = repo.lock()
131 try:
138 try:
132 consumev1(repo, fp)
139 consumev1(repo, fp, filecount, bytecount)
133
140
134 # new requirements = old non-format requirements +
141 # new requirements = old non-format requirements +
135 # new format-related remote requirements
142 # new format-related remote requirements
136 # requirements from the streamed-in repository
143 # requirements from the streamed-in repository
137 repo.requirements = requirements | (
144 repo.requirements = requirements | (
138 repo.requirements - repo.supportedformats)
145 repo.requirements - repo.supportedformats)
139 repo._applyopenerreqs()
146 repo._applyopenerreqs()
140 repo._writerequirements()
147 repo._writerequirements()
141
148
142 if rbranchmap:
149 if rbranchmap:
143 branchmap.replacecache(repo, rbranchmap)
150 branchmap.replacecache(repo, rbranchmap)
144
151
145 repo.invalidate()
152 repo.invalidate()
146 finally:
153 finally:
147 lock.release()
154 lock.release()
148
155
149 def allowservergeneration(ui):
156 def allowservergeneration(ui):
150 """Whether streaming clones are allowed from the server."""
157 """Whether streaming clones are allowed from the server."""
151 return ui.configbool('server', 'uncompressed', True, untrusted=True)
158 return ui.configbool('server', 'uncompressed', True, untrusted=True)
152
159
153 # This is it's own function so extensions can override it.
160 # This is it's own function so extensions can override it.
154 def _walkstreamfiles(repo):
161 def _walkstreamfiles(repo):
155 return repo.store.walk()
162 return repo.store.walk()
156
163
157 def generatev1(repo):
164 def generatev1(repo):
158 """Emit content for version 1 of a streaming clone.
165 """Emit content for version 1 of a streaming clone.
159
166
160 This is a generator of raw chunks that constitute a streaming clone.
167 This is a generator of raw chunks that constitute a streaming clone.
161
168
162 The stream begins with a line of 2 space-delimited integers containing the
169 The stream begins with a line of 2 space-delimited integers containing the
163 number of entries and total bytes size.
170 number of entries and total bytes size.
164
171
165 Next, are N entries for each file being transferred. Each file entry starts
172 Next, are N entries for each file being transferred. Each file entry starts
166 as a line with the file name and integer size delimited by a null byte.
173 as a line with the file name and integer size delimited by a null byte.
167 The raw file data follows. Following the raw file data is the next file
174 The raw file data follows. Following the raw file data is the next file
168 entry, or EOF.
175 entry, or EOF.
169
176
170 When used on the wire protocol, an additional line indicating protocol
177 When used on the wire protocol, an additional line indicating protocol
171 success will be prepended to the stream. This function is not responsible
178 success will be prepended to the stream. This function is not responsible
172 for adding it.
179 for adding it.
173
180
174 This function will obtain a repository lock to ensure a consistent view of
181 This function will obtain a repository lock to ensure a consistent view of
175 the store is captured. It therefore may raise LockError.
182 the store is captured. It therefore may raise LockError.
176 """
183 """
177 entries = []
184 entries = []
178 total_bytes = 0
185 total_bytes = 0
179 # Get consistent snapshot of repo, lock during scan.
186 # Get consistent snapshot of repo, lock during scan.
180 lock = repo.lock()
187 lock = repo.lock()
181 try:
188 try:
182 repo.ui.debug('scanning\n')
189 repo.ui.debug('scanning\n')
183 for name, ename, size in _walkstreamfiles(repo):
190 for name, ename, size in _walkstreamfiles(repo):
184 if size:
191 if size:
185 entries.append((name, size))
192 entries.append((name, size))
186 total_bytes += size
193 total_bytes += size
187 finally:
194 finally:
188 lock.release()
195 lock.release()
189
196
190 repo.ui.debug('%d files, %d bytes to transfer\n' %
197 repo.ui.debug('%d files, %d bytes to transfer\n' %
191 (len(entries), total_bytes))
198 (len(entries), total_bytes))
192 yield '%d %d\n' % (len(entries), total_bytes)
199 yield '%d %d\n' % (len(entries), total_bytes)
193
200
194 svfs = repo.svfs
201 svfs = repo.svfs
195 oldaudit = svfs.mustaudit
202 oldaudit = svfs.mustaudit
196 debugflag = repo.ui.debugflag
203 debugflag = repo.ui.debugflag
197 svfs.mustaudit = False
204 svfs.mustaudit = False
198
205
199 try:
206 try:
200 for name, size in entries:
207 for name, size in entries:
201 if debugflag:
208 if debugflag:
202 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
209 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
203 # partially encode name over the wire for backwards compat
210 # partially encode name over the wire for backwards compat
204 yield '%s\0%d\n' % (store.encodedir(name), size)
211 yield '%s\0%d\n' % (store.encodedir(name), size)
205 if size <= 65536:
212 if size <= 65536:
206 fp = svfs(name)
213 fp = svfs(name)
207 try:
214 try:
208 data = fp.read(size)
215 data = fp.read(size)
209 finally:
216 finally:
210 fp.close()
217 fp.close()
211 yield data
218 yield data
212 else:
219 else:
213 for chunk in util.filechunkiter(svfs(name), limit=size):
220 for chunk in util.filechunkiter(svfs(name), limit=size):
214 yield chunk
221 yield chunk
215 finally:
222 finally:
216 svfs.mustaudit = oldaudit
223 svfs.mustaudit = oldaudit
217
224
218 def consumev1(repo, fp):
225 def consumev1(repo, fp, filecount, bytecount):
219 """Apply the contents from version 1 of a streaming clone file handle.
226 """Apply the contents from version 1 of a streaming clone file handle.
220
227
221 This takes the output from "streamout" and applies it to the specified
228 This takes the output from "streamout" and applies it to the specified
222 repository.
229 repository.
223
230
224 Like "streamout," the status line added by the wire protocol is not handled
231 Like "streamout," the status line added by the wire protocol is not handled
225 by this function.
232 by this function.
226 """
233 """
227 lock = repo.lock()
234 lock = repo.lock()
228 try:
235 try:
229 repo.ui.status(_('streaming all changes\n'))
236 repo.ui.status(_('streaming all changes\n'))
230 l = fp.readline()
231 try:
232 total_files, total_bytes = map(int, l.split(' ', 1))
233 except (ValueError, TypeError):
234 raise error.ResponseError(
235 _('unexpected response from remote server:'), l)
236 repo.ui.status(_('%d files to transfer, %s of data\n') %
237 repo.ui.status(_('%d files to transfer, %s of data\n') %
237 (total_files, util.bytecount(total_bytes)))
238 (filecount, util.bytecount(bytecount)))
238 handled_bytes = 0
239 handled_bytes = 0
239 repo.ui.progress(_('clone'), 0, total=total_bytes)
240 repo.ui.progress(_('clone'), 0, total=bytecount)
240 start = time.time()
241 start = time.time()
241
242
242 tr = repo.transaction(_('clone'))
243 tr = repo.transaction(_('clone'))
243 try:
244 try:
244 for i in xrange(total_files):
245 for i in xrange(filecount):
245 # XXX doesn't support '\n' or '\r' in filenames
246 # XXX doesn't support '\n' or '\r' in filenames
246 l = fp.readline()
247 l = fp.readline()
247 try:
248 try:
248 name, size = l.split('\0', 1)
249 name, size = l.split('\0', 1)
249 size = int(size)
250 size = int(size)
250 except (ValueError, TypeError):
251 except (ValueError, TypeError):
251 raise error.ResponseError(
252 raise error.ResponseError(
252 _('unexpected response from remote server:'), l)
253 _('unexpected response from remote server:'), l)
253 if repo.ui.debugflag:
254 if repo.ui.debugflag:
254 repo.ui.debug('adding %s (%s)\n' %
255 repo.ui.debug('adding %s (%s)\n' %
255 (name, util.bytecount(size)))
256 (name, util.bytecount(size)))
256 # for backwards compat, name was partially encoded
257 # for backwards compat, name was partially encoded
257 ofp = repo.svfs(store.decodedir(name), 'w')
258 ofp = repo.svfs(store.decodedir(name), 'w')
258 for chunk in util.filechunkiter(fp, limit=size):
259 for chunk in util.filechunkiter(fp, limit=size):
259 handled_bytes += len(chunk)
260 handled_bytes += len(chunk)
260 repo.ui.progress(_('clone'), handled_bytes,
261 repo.ui.progress(_('clone'), handled_bytes, total=bytecount)
261 total=total_bytes)
262 ofp.write(chunk)
262 ofp.write(chunk)
263 ofp.close()
263 ofp.close()
264 tr.close()
264 tr.close()
265 finally:
265 finally:
266 tr.release()
266 tr.release()
267
267
268 # Writing straight to files circumvented the inmemory caches
268 # Writing straight to files circumvented the inmemory caches
269 repo.invalidate()
269 repo.invalidate()
270
270
271 elapsed = time.time() - start
271 elapsed = time.time() - start
272 if elapsed <= 0:
272 if elapsed <= 0:
273 elapsed = 0.001
273 elapsed = 0.001
274 repo.ui.progress(_('clone'), None)
274 repo.ui.progress(_('clone'), None)
275 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
275 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
276 (util.bytecount(total_bytes), elapsed,
276 (util.bytecount(bytecount), elapsed,
277 util.bytecount(total_bytes / elapsed)))
277 util.bytecount(bytecount / elapsed)))
278 finally:
278 finally:
279 lock.release()
279 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now