##// END OF EJS Templates
streamclone: refactor canperformstreamclone to accept a pullop...
Gregory Szorc -
r26466:3515db5a default
parent child Browse files
Show More
@@ -1,257 +1,260 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import time
10 import time
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 branchmap,
14 branchmap,
15 error,
15 error,
16 store,
16 store,
17 util,
17 util,
18 )
18 )
19
19
20 def canperformstreamclone(repo, remote, heads, streamrequested=None):
20 def canperformstreamclone(pullop):
21 """Whether it is possible to perform a streaming clone as part of pull.
21 """Whether it is possible to perform a streaming clone as part of pull.
22
22
23 Returns a tuple of (supported, requirements). ``supported`` is True if
23 Returns a tuple of (supported, requirements). ``supported`` is True if
24 streaming clone is supported and False otherwise. ``requirements`` is
24 streaming clone is supported and False otherwise. ``requirements`` is
25 a set of repo requirements from the remote, or ``None`` if stream clone
25 a set of repo requirements from the remote, or ``None`` if stream clone
26 isn't supported.
26 isn't supported.
27 """
27 """
28 repo = pullop.repo
29 remote = pullop.remote
30
28 # Streaming clone only works on empty repositories.
31 # Streaming clone only works on empty repositories.
29 if len(repo):
32 if len(repo):
30 return False, None
33 return False, None
31
34
32 # Streaming clone only works if all data is being requested.
35 # Streaming clone only works if all data is being requested.
33 if heads:
36 if pullop.heads:
34 return False, None
37 return False, None
35
38
39 streamrequested = pullop.streamclonerequested
40
36 # If we don't have a preference, let the server decide for us. This
41 # If we don't have a preference, let the server decide for us. This
37 # likely only comes into play in LANs.
42 # likely only comes into play in LANs.
38 if streamrequested is None:
43 if streamrequested is None:
39 # The server can advertise whether to prefer streaming clone.
44 # The server can advertise whether to prefer streaming clone.
40 streamrequested = remote.capable('stream-preferred')
45 streamrequested = remote.capable('stream-preferred')
41
46
42 if not streamrequested:
47 if not streamrequested:
43 return False, None
48 return False, None
44
49
45 # In order for stream clone to work, the client has to support all the
50 # In order for stream clone to work, the client has to support all the
46 # requirements advertised by the server.
51 # requirements advertised by the server.
47 #
52 #
48 # The server advertises its requirements via the "stream" and "streamreqs"
53 # The server advertises its requirements via the "stream" and "streamreqs"
49 # capability. "stream" (a value-less capability) is advertised if and only
54 # capability. "stream" (a value-less capability) is advertised if and only
50 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
55 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
51 # is advertised and contains a comma-delimited list of requirements.
56 # is advertised and contains a comma-delimited list of requirements.
52 requirements = set()
57 requirements = set()
53 if remote.capable('stream'):
58 if remote.capable('stream'):
54 requirements.add('revlogv1')
59 requirements.add('revlogv1')
55 else:
60 else:
56 streamreqs = remote.capable('streamreqs')
61 streamreqs = remote.capable('streamreqs')
57 # This is weird and shouldn't happen with modern servers.
62 # This is weird and shouldn't happen with modern servers.
58 if not streamreqs:
63 if not streamreqs:
59 return False, None
64 return False, None
60
65
61 streamreqs = set(streamreqs.split(','))
66 streamreqs = set(streamreqs.split(','))
62 # Server requires something we don't support. Bail.
67 # Server requires something we don't support. Bail.
63 if streamreqs - repo.supportedformats:
68 if streamreqs - repo.supportedformats:
64 return False, None
69 return False, None
65 requirements = streamreqs
70 requirements = streamreqs
66
71
67 return True, requirements
72 return True, requirements
68
73
69 def maybeperformlegacystreamclone(pullop):
74 def maybeperformlegacystreamclone(pullop):
70 """Possibly perform a legacy stream clone operation.
75 """Possibly perform a legacy stream clone operation.
71
76
72 Legacy stream clones are performed as part of pull but before all other
77 Legacy stream clones are performed as part of pull but before all other
73 operations.
78 operations.
74
79
75 A legacy stream clone will not be performed if a bundle2 stream clone is
80 A legacy stream clone will not be performed if a bundle2 stream clone is
76 supported.
81 supported.
77 """
82 """
78 repo = pullop.repo
83 supported, requirements = canperformstreamclone(pullop)
79 remote = pullop.remote
80
81 r = canperformstreamclone(repo, remote, pullop.heads,
82 streamrequested=pullop.streamclonerequested)
83 supported, requirements = r
84
84
85 if not supported:
85 if not supported:
86 return
86 return
87
87
88 repo = pullop.repo
89 remote = pullop.remote
90
88 # Save remote branchmap. We will use it later to speed up branchcache
91 # Save remote branchmap. We will use it later to speed up branchcache
89 # creation.
92 # creation.
90 rbranchmap = None
93 rbranchmap = None
91 if remote.capable('branchmap'):
94 if remote.capable('branchmap'):
92 rbranchmap = remote.branchmap()
95 rbranchmap = remote.branchmap()
93
96
94 fp = remote.stream_out()
97 fp = remote.stream_out()
95 l = fp.readline()
98 l = fp.readline()
96 try:
99 try:
97 resp = int(l)
100 resp = int(l)
98 except ValueError:
101 except ValueError:
99 raise error.ResponseError(
102 raise error.ResponseError(
100 _('unexpected response from remote server:'), l)
103 _('unexpected response from remote server:'), l)
101 if resp == 1:
104 if resp == 1:
102 raise util.Abort(_('operation forbidden by server'))
105 raise util.Abort(_('operation forbidden by server'))
103 elif resp == 2:
106 elif resp == 2:
104 raise util.Abort(_('locking the remote repository failed'))
107 raise util.Abort(_('locking the remote repository failed'))
105 elif resp != 0:
108 elif resp != 0:
106 raise util.Abort(_('the server sent an unknown error code'))
109 raise util.Abort(_('the server sent an unknown error code'))
107
110
108 lock = repo.lock()
111 lock = repo.lock()
109 try:
112 try:
110 consumev1(repo, fp)
113 consumev1(repo, fp)
111
114
112 # new requirements = old non-format requirements +
115 # new requirements = old non-format requirements +
113 # new format-related remote requirements
116 # new format-related remote requirements
114 # requirements from the streamed-in repository
117 # requirements from the streamed-in repository
115 repo.requirements = requirements | (
118 repo.requirements = requirements | (
116 repo.requirements - repo.supportedformats)
119 repo.requirements - repo.supportedformats)
117 repo._applyopenerreqs()
120 repo._applyopenerreqs()
118 repo._writerequirements()
121 repo._writerequirements()
119
122
120 if rbranchmap:
123 if rbranchmap:
121 branchmap.replacecache(repo, rbranchmap)
124 branchmap.replacecache(repo, rbranchmap)
122
125
123 repo.invalidate()
126 repo.invalidate()
124 finally:
127 finally:
125 lock.release()
128 lock.release()
126
129
127 def allowservergeneration(ui):
130 def allowservergeneration(ui):
128 """Whether streaming clones are allowed from the server."""
131 """Whether streaming clones are allowed from the server."""
129 return ui.configbool('server', 'uncompressed', True, untrusted=True)
132 return ui.configbool('server', 'uncompressed', True, untrusted=True)
130
133
131 # This is it's own function so extensions can override it.
134 # This is it's own function so extensions can override it.
132 def _walkstreamfiles(repo):
135 def _walkstreamfiles(repo):
133 return repo.store.walk()
136 return repo.store.walk()
134
137
135 def generatev1(repo):
138 def generatev1(repo):
136 """Emit content for version 1 of a streaming clone.
139 """Emit content for version 1 of a streaming clone.
137
140
138 This is a generator of raw chunks that constitute a streaming clone.
141 This is a generator of raw chunks that constitute a streaming clone.
139
142
140 The stream begins with a line of 2 space-delimited integers containing the
143 The stream begins with a line of 2 space-delimited integers containing the
141 number of entries and total bytes size.
144 number of entries and total bytes size.
142
145
143 Next, are N entries for each file being transferred. Each file entry starts
146 Next, are N entries for each file being transferred. Each file entry starts
144 as a line with the file name and integer size delimited by a null byte.
147 as a line with the file name and integer size delimited by a null byte.
145 The raw file data follows. Following the raw file data is the next file
148 The raw file data follows. Following the raw file data is the next file
146 entry, or EOF.
149 entry, or EOF.
147
150
148 When used on the wire protocol, an additional line indicating protocol
151 When used on the wire protocol, an additional line indicating protocol
149 success will be prepended to the stream. This function is not responsible
152 success will be prepended to the stream. This function is not responsible
150 for adding it.
153 for adding it.
151
154
152 This function will obtain a repository lock to ensure a consistent view of
155 This function will obtain a repository lock to ensure a consistent view of
153 the store is captured. It therefore may raise LockError.
156 the store is captured. It therefore may raise LockError.
154 """
157 """
155 entries = []
158 entries = []
156 total_bytes = 0
159 total_bytes = 0
157 # Get consistent snapshot of repo, lock during scan.
160 # Get consistent snapshot of repo, lock during scan.
158 lock = repo.lock()
161 lock = repo.lock()
159 try:
162 try:
160 repo.ui.debug('scanning\n')
163 repo.ui.debug('scanning\n')
161 for name, ename, size in _walkstreamfiles(repo):
164 for name, ename, size in _walkstreamfiles(repo):
162 if size:
165 if size:
163 entries.append((name, size))
166 entries.append((name, size))
164 total_bytes += size
167 total_bytes += size
165 finally:
168 finally:
166 lock.release()
169 lock.release()
167
170
168 repo.ui.debug('%d files, %d bytes to transfer\n' %
171 repo.ui.debug('%d files, %d bytes to transfer\n' %
169 (len(entries), total_bytes))
172 (len(entries), total_bytes))
170 yield '%d %d\n' % (len(entries), total_bytes)
173 yield '%d %d\n' % (len(entries), total_bytes)
171
174
172 svfs = repo.svfs
175 svfs = repo.svfs
173 oldaudit = svfs.mustaudit
176 oldaudit = svfs.mustaudit
174 debugflag = repo.ui.debugflag
177 debugflag = repo.ui.debugflag
175 svfs.mustaudit = False
178 svfs.mustaudit = False
176
179
177 try:
180 try:
178 for name, size in entries:
181 for name, size in entries:
179 if debugflag:
182 if debugflag:
180 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
183 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
181 # partially encode name over the wire for backwards compat
184 # partially encode name over the wire for backwards compat
182 yield '%s\0%d\n' % (store.encodedir(name), size)
185 yield '%s\0%d\n' % (store.encodedir(name), size)
183 if size <= 65536:
186 if size <= 65536:
184 fp = svfs(name)
187 fp = svfs(name)
185 try:
188 try:
186 data = fp.read(size)
189 data = fp.read(size)
187 finally:
190 finally:
188 fp.close()
191 fp.close()
189 yield data
192 yield data
190 else:
193 else:
191 for chunk in util.filechunkiter(svfs(name), limit=size):
194 for chunk in util.filechunkiter(svfs(name), limit=size):
192 yield chunk
195 yield chunk
193 finally:
196 finally:
194 svfs.mustaudit = oldaudit
197 svfs.mustaudit = oldaudit
195
198
196 def consumev1(repo, fp):
199 def consumev1(repo, fp):
197 """Apply the contents from version 1 of a streaming clone file handle.
200 """Apply the contents from version 1 of a streaming clone file handle.
198
201
199 This takes the output from "streamout" and applies it to the specified
202 This takes the output from "streamout" and applies it to the specified
200 repository.
203 repository.
201
204
202 Like "streamout," the status line added by the wire protocol is not handled
205 Like "streamout," the status line added by the wire protocol is not handled
203 by this function.
206 by this function.
204 """
207 """
205 lock = repo.lock()
208 lock = repo.lock()
206 try:
209 try:
207 repo.ui.status(_('streaming all changes\n'))
210 repo.ui.status(_('streaming all changes\n'))
208 l = fp.readline()
211 l = fp.readline()
209 try:
212 try:
210 total_files, total_bytes = map(int, l.split(' ', 1))
213 total_files, total_bytes = map(int, l.split(' ', 1))
211 except (ValueError, TypeError):
214 except (ValueError, TypeError):
212 raise error.ResponseError(
215 raise error.ResponseError(
213 _('unexpected response from remote server:'), l)
216 _('unexpected response from remote server:'), l)
214 repo.ui.status(_('%d files to transfer, %s of data\n') %
217 repo.ui.status(_('%d files to transfer, %s of data\n') %
215 (total_files, util.bytecount(total_bytes)))
218 (total_files, util.bytecount(total_bytes)))
216 handled_bytes = 0
219 handled_bytes = 0
217 repo.ui.progress(_('clone'), 0, total=total_bytes)
220 repo.ui.progress(_('clone'), 0, total=total_bytes)
218 start = time.time()
221 start = time.time()
219
222
220 tr = repo.transaction(_('clone'))
223 tr = repo.transaction(_('clone'))
221 try:
224 try:
222 for i in xrange(total_files):
225 for i in xrange(total_files):
223 # XXX doesn't support '\n' or '\r' in filenames
226 # XXX doesn't support '\n' or '\r' in filenames
224 l = fp.readline()
227 l = fp.readline()
225 try:
228 try:
226 name, size = l.split('\0', 1)
229 name, size = l.split('\0', 1)
227 size = int(size)
230 size = int(size)
228 except (ValueError, TypeError):
231 except (ValueError, TypeError):
229 raise error.ResponseError(
232 raise error.ResponseError(
230 _('unexpected response from remote server:'), l)
233 _('unexpected response from remote server:'), l)
231 if repo.ui.debugflag:
234 if repo.ui.debugflag:
232 repo.ui.debug('adding %s (%s)\n' %
235 repo.ui.debug('adding %s (%s)\n' %
233 (name, util.bytecount(size)))
236 (name, util.bytecount(size)))
234 # for backwards compat, name was partially encoded
237 # for backwards compat, name was partially encoded
235 ofp = repo.svfs(store.decodedir(name), 'w')
238 ofp = repo.svfs(store.decodedir(name), 'w')
236 for chunk in util.filechunkiter(fp, limit=size):
239 for chunk in util.filechunkiter(fp, limit=size):
237 handled_bytes += len(chunk)
240 handled_bytes += len(chunk)
238 repo.ui.progress(_('clone'), handled_bytes,
241 repo.ui.progress(_('clone'), handled_bytes,
239 total=total_bytes)
242 total=total_bytes)
240 ofp.write(chunk)
243 ofp.write(chunk)
241 ofp.close()
244 ofp.close()
242 tr.close()
245 tr.close()
243 finally:
246 finally:
244 tr.release()
247 tr.release()
245
248
246 # Writing straight to files circumvented the inmemory caches
249 # Writing straight to files circumvented the inmemory caches
247 repo.invalidate()
250 repo.invalidate()
248
251
249 elapsed = time.time() - start
252 elapsed = time.time() - start
250 if elapsed <= 0:
253 if elapsed <= 0:
251 elapsed = 0.001
254 elapsed = 0.001
252 repo.ui.progress(_('clone'), None)
255 repo.ui.progress(_('clone'), None)
253 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
256 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
254 (util.bytecount(total_bytes), elapsed,
257 (util.bytecount(total_bytes), elapsed,
255 util.bytecount(total_bytes / elapsed)))
258 util.bytecount(total_bytes / elapsed)))
256 finally:
259 finally:
257 lock.release()
260 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now