##// END OF EJS Templates
streamclone: refactor canperformstreamclone to accept a pullop...
Gregory Szorc -
r26466:3515db5a default
parent child Browse files
Show More
@@ -1,257 +1,260 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import time
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 store,
17 17 util,
18 18 )
19 19
20 def canperformstreamclone(repo, remote, heads, streamrequested=None):
20 def canperformstreamclone(pullop):
21 21 """Whether it is possible to perform a streaming clone as part of pull.
22 22
23 23 Returns a tuple of (supported, requirements). ``supported`` is True if
24 24 streaming clone is supported and False otherwise. ``requirements`` is
25 25 a set of repo requirements from the remote, or ``None`` if stream clone
26 26 isn't supported.
27 27 """
28 repo = pullop.repo
29 remote = pullop.remote
30
28 31 # Streaming clone only works on empty repositories.
29 32 if len(repo):
30 33 return False, None
31 34
32 35 # Streaming clone only works if all data is being requested.
33 if heads:
36 if pullop.heads:
34 37 return False, None
35 38
39 streamrequested = pullop.streamclonerequested
40
36 41 # If we don't have a preference, let the server decide for us. This
37 42 # likely only comes into play in LANs.
38 43 if streamrequested is None:
39 44 # The server can advertise whether to prefer streaming clone.
40 45 streamrequested = remote.capable('stream-preferred')
41 46
42 47 if not streamrequested:
43 48 return False, None
44 49
45 50 # In order for stream clone to work, the client has to support all the
46 51 # requirements advertised by the server.
47 52 #
48 53 # The server advertises its requirements via the "stream" and "streamreqs"
49 54 # capability. "stream" (a value-less capability) is advertised if and only
50 55 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
51 56 # is advertised and contains a comma-delimited list of requirements.
52 57 requirements = set()
53 58 if remote.capable('stream'):
54 59 requirements.add('revlogv1')
55 60 else:
56 61 streamreqs = remote.capable('streamreqs')
57 62 # This is weird and shouldn't happen with modern servers.
58 63 if not streamreqs:
59 64 return False, None
60 65
61 66 streamreqs = set(streamreqs.split(','))
62 67 # Server requires something we don't support. Bail.
63 68 if streamreqs - repo.supportedformats:
64 69 return False, None
65 70 requirements = streamreqs
66 71
67 72 return True, requirements
68 73
69 74 def maybeperformlegacystreamclone(pullop):
70 75 """Possibly perform a legacy stream clone operation.
71 76
72 77 Legacy stream clones are performed as part of pull but before all other
73 78 operations.
74 79
75 80 A legacy stream clone will not be performed if a bundle2 stream clone is
76 81 supported.
77 82 """
78 repo = pullop.repo
79 remote = pullop.remote
80
81 r = canperformstreamclone(repo, remote, pullop.heads,
82 streamrequested=pullop.streamclonerequested)
83 supported, requirements = r
83 supported, requirements = canperformstreamclone(pullop)
84 84
85 85 if not supported:
86 86 return
87 87
88 repo = pullop.repo
89 remote = pullop.remote
90
88 91 # Save remote branchmap. We will use it later to speed up branchcache
89 92 # creation.
90 93 rbranchmap = None
91 94 if remote.capable('branchmap'):
92 95 rbranchmap = remote.branchmap()
93 96
94 97 fp = remote.stream_out()
95 98 l = fp.readline()
96 99 try:
97 100 resp = int(l)
98 101 except ValueError:
99 102 raise error.ResponseError(
100 103 _('unexpected response from remote server:'), l)
101 104 if resp == 1:
102 105 raise util.Abort(_('operation forbidden by server'))
103 106 elif resp == 2:
104 107 raise util.Abort(_('locking the remote repository failed'))
105 108 elif resp != 0:
106 109 raise util.Abort(_('the server sent an unknown error code'))
107 110
108 111 lock = repo.lock()
109 112 try:
110 113 consumev1(repo, fp)
111 114
112 115 # new requirements = old non-format requirements +
113 116 # new format-related remote requirements
114 117 # requirements from the streamed-in repository
115 118 repo.requirements = requirements | (
116 119 repo.requirements - repo.supportedformats)
117 120 repo._applyopenerreqs()
118 121 repo._writerequirements()
119 122
120 123 if rbranchmap:
121 124 branchmap.replacecache(repo, rbranchmap)
122 125
123 126 repo.invalidate()
124 127 finally:
125 128 lock.release()
126 129
127 130 def allowservergeneration(ui):
128 131 """Whether streaming clones are allowed from the server."""
129 132 return ui.configbool('server', 'uncompressed', True, untrusted=True)
130 133
131 134 # This is it's own function so extensions can override it.
132 135 def _walkstreamfiles(repo):
133 136 return repo.store.walk()
134 137
135 138 def generatev1(repo):
136 139 """Emit content for version 1 of a streaming clone.
137 140
138 141 This is a generator of raw chunks that constitute a streaming clone.
139 142
140 143 The stream begins with a line of 2 space-delimited integers containing the
141 144 number of entries and total bytes size.
142 145
143 146 Next, are N entries for each file being transferred. Each file entry starts
144 147 as a line with the file name and integer size delimited by a null byte.
145 148 The raw file data follows. Following the raw file data is the next file
146 149 entry, or EOF.
147 150
148 151 When used on the wire protocol, an additional line indicating protocol
149 152 success will be prepended to the stream. This function is not responsible
150 153 for adding it.
151 154
152 155 This function will obtain a repository lock to ensure a consistent view of
153 156 the store is captured. It therefore may raise LockError.
154 157 """
155 158 entries = []
156 159 total_bytes = 0
157 160 # Get consistent snapshot of repo, lock during scan.
158 161 lock = repo.lock()
159 162 try:
160 163 repo.ui.debug('scanning\n')
161 164 for name, ename, size in _walkstreamfiles(repo):
162 165 if size:
163 166 entries.append((name, size))
164 167 total_bytes += size
165 168 finally:
166 169 lock.release()
167 170
168 171 repo.ui.debug('%d files, %d bytes to transfer\n' %
169 172 (len(entries), total_bytes))
170 173 yield '%d %d\n' % (len(entries), total_bytes)
171 174
172 175 svfs = repo.svfs
173 176 oldaudit = svfs.mustaudit
174 177 debugflag = repo.ui.debugflag
175 178 svfs.mustaudit = False
176 179
177 180 try:
178 181 for name, size in entries:
179 182 if debugflag:
180 183 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
181 184 # partially encode name over the wire for backwards compat
182 185 yield '%s\0%d\n' % (store.encodedir(name), size)
183 186 if size <= 65536:
184 187 fp = svfs(name)
185 188 try:
186 189 data = fp.read(size)
187 190 finally:
188 191 fp.close()
189 192 yield data
190 193 else:
191 194 for chunk in util.filechunkiter(svfs(name), limit=size):
192 195 yield chunk
193 196 finally:
194 197 svfs.mustaudit = oldaudit
195 198
196 199 def consumev1(repo, fp):
197 200 """Apply the contents from version 1 of a streaming clone file handle.
198 201
199 202 This takes the output from "streamout" and applies it to the specified
200 203 repository.
201 204
202 205 Like "streamout," the status line added by the wire protocol is not handled
203 206 by this function.
204 207 """
205 208 lock = repo.lock()
206 209 try:
207 210 repo.ui.status(_('streaming all changes\n'))
208 211 l = fp.readline()
209 212 try:
210 213 total_files, total_bytes = map(int, l.split(' ', 1))
211 214 except (ValueError, TypeError):
212 215 raise error.ResponseError(
213 216 _('unexpected response from remote server:'), l)
214 217 repo.ui.status(_('%d files to transfer, %s of data\n') %
215 218 (total_files, util.bytecount(total_bytes)))
216 219 handled_bytes = 0
217 220 repo.ui.progress(_('clone'), 0, total=total_bytes)
218 221 start = time.time()
219 222
220 223 tr = repo.transaction(_('clone'))
221 224 try:
222 225 for i in xrange(total_files):
223 226 # XXX doesn't support '\n' or '\r' in filenames
224 227 l = fp.readline()
225 228 try:
226 229 name, size = l.split('\0', 1)
227 230 size = int(size)
228 231 except (ValueError, TypeError):
229 232 raise error.ResponseError(
230 233 _('unexpected response from remote server:'), l)
231 234 if repo.ui.debugflag:
232 235 repo.ui.debug('adding %s (%s)\n' %
233 236 (name, util.bytecount(size)))
234 237 # for backwards compat, name was partially encoded
235 238 ofp = repo.svfs(store.decodedir(name), 'w')
236 239 for chunk in util.filechunkiter(fp, limit=size):
237 240 handled_bytes += len(chunk)
238 241 repo.ui.progress(_('clone'), handled_bytes,
239 242 total=total_bytes)
240 243 ofp.write(chunk)
241 244 ofp.close()
242 245 tr.close()
243 246 finally:
244 247 tr.release()
245 248
246 249 # Writing straight to files circumvented the inmemory caches
247 250 repo.invalidate()
248 251
249 252 elapsed = time.time() - start
250 253 if elapsed <= 0:
251 254 elapsed = 0.001
252 255 repo.ui.progress(_('clone'), None)
253 256 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
254 257 (util.bytecount(total_bytes), elapsed,
255 258 util.bytecount(total_bytes / elapsed)))
256 259 finally:
257 260 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now