##// END OF EJS Templates
streamclone: add explicit check for empty local repo...
Gregory Szorc -
r26447:591088f7 default
parent child Browse files
Show More
@@ -1,278 +1,282 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import time
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 store,
17 17 util,
18 18 )
19 19
20 20 def canperformstreamclone(repo, remote, heads, streamrequested=None):
21 21 """Whether it is possible to perform a streaming clone as part of pull.
22 22
23 23 Returns a tuple of (supported, requirements). ``supported`` is True if
24 24 streaming clone is supported and False otherwise. ``requirements`` is
25 25 a set of repo requirements from the remote, or ``None`` if stream clone
26 26 isn't supported.
27 27 """
28 # Streaming clone only works on empty repositories.
29 if len(repo):
30 return False, None
31
28 32 # Streaming clone only works if all data is being requested.
29 33 if heads:
30 34 return False, None
31 35
32 36 # If we don't have a preference, let the server decide for us. This
33 37 # likely only comes into play in LANs.
34 38 if streamrequested is None:
35 39 # The server can advertise whether to prefer streaming clone.
36 40 streamrequested = remote.capable('stream-preferred')
37 41
38 42 if not streamrequested:
39 43 return False, None
40 44
41 45 # In order for stream clone to work, the client has to support all the
42 46 # requirements advertised by the server.
43 47 #
44 48 # The server advertises its requirements via the "stream" and "streamreqs"
45 49 # capability. "stream" (a value-less capability) is advertised if and only
46 50 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
47 51 # is advertised and contains a comma-delimited list of requirements.
48 52 requirements = set()
49 53 if remote.capable('stream'):
50 54 requirements.add('revlogv1')
51 55 else:
52 56 streamreqs = remote.capable('streamreqs')
53 57 # This is weird and shouldn't happen with modern servers.
54 58 if not streamreqs:
55 59 return False, None
56 60
57 61 streamreqs = set(streamreqs.split(','))
58 62 # Server requires something we don't support. Bail.
59 63 if streamreqs - repo.supportedformats:
60 64 return False, None
61 65 requirements = streamreqs
62 66
63 67 return True, requirements
64 68
65 69 def maybeperformstreamclone(repo, remote, heads, stream):
66 70 supported, requirements = canperformstreamclone(repo, remote, heads,
67 71 streamrequested=stream)
68 72 if not supported:
69 73 return
70 74
71 75 streamin(repo, remote, requirements)
72 76
73 77 def allowservergeneration(ui):
74 78 """Whether streaming clones are allowed from the server."""
75 79 return ui.configbool('server', 'uncompressed', True, untrusted=True)
76 80
77 81 # This is it's own function so extensions can override it.
78 82 def _walkstreamfiles(repo):
79 83 return repo.store.walk()
80 84
81 85 def generatev1(repo):
82 86 """Emit content for version 1 of a streaming clone.
83 87
84 88 This is a generator of raw chunks that constitute a streaming clone.
85 89
86 90 The stream begins with a line of 2 space-delimited integers containing the
87 91 number of entries and total bytes size.
88 92
89 93 Next, are N entries for each file being transferred. Each file entry starts
90 94 as a line with the file name and integer size delimited by a null byte.
91 95 The raw file data follows. Following the raw file data is the next file
92 96 entry, or EOF.
93 97
94 98 When used on the wire protocol, an additional line indicating protocol
95 99 success will be prepended to the stream. This function is not responsible
96 100 for adding it.
97 101
98 102 This function will obtain a repository lock to ensure a consistent view of
99 103 the store is captured. It therefore may raise LockError.
100 104 """
101 105 entries = []
102 106 total_bytes = 0
103 107 # Get consistent snapshot of repo, lock during scan.
104 108 lock = repo.lock()
105 109 try:
106 110 repo.ui.debug('scanning\n')
107 111 for name, ename, size in _walkstreamfiles(repo):
108 112 if size:
109 113 entries.append((name, size))
110 114 total_bytes += size
111 115 finally:
112 116 lock.release()
113 117
114 118 repo.ui.debug('%d files, %d bytes to transfer\n' %
115 119 (len(entries), total_bytes))
116 120 yield '%d %d\n' % (len(entries), total_bytes)
117 121
118 122 svfs = repo.svfs
119 123 oldaudit = svfs.mustaudit
120 124 debugflag = repo.ui.debugflag
121 125 svfs.mustaudit = False
122 126
123 127 try:
124 128 for name, size in entries:
125 129 if debugflag:
126 130 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
127 131 # partially encode name over the wire for backwards compat
128 132 yield '%s\0%d\n' % (store.encodedir(name), size)
129 133 if size <= 65536:
130 134 fp = svfs(name)
131 135 try:
132 136 data = fp.read(size)
133 137 finally:
134 138 fp.close()
135 139 yield data
136 140 else:
137 141 for chunk in util.filechunkiter(svfs(name), limit=size):
138 142 yield chunk
139 143 finally:
140 144 svfs.mustaudit = oldaudit
141 145
142 146 def consumev1(repo, fp):
143 147 """Apply the contents from version 1 of a streaming clone file handle.
144 148
145 149 This takes the output from "streamout" and applies it to the specified
146 150 repository.
147 151
148 152 Like "streamout," the status line added by the wire protocol is not handled
149 153 by this function.
150 154 """
151 155 lock = repo.lock()
152 156 try:
153 157 repo.ui.status(_('streaming all changes\n'))
154 158 l = fp.readline()
155 159 try:
156 160 total_files, total_bytes = map(int, l.split(' ', 1))
157 161 except (ValueError, TypeError):
158 162 raise error.ResponseError(
159 163 _('unexpected response from remote server:'), l)
160 164 repo.ui.status(_('%d files to transfer, %s of data\n') %
161 165 (total_files, util.bytecount(total_bytes)))
162 166 handled_bytes = 0
163 167 repo.ui.progress(_('clone'), 0, total=total_bytes)
164 168 start = time.time()
165 169
166 170 tr = repo.transaction(_('clone'))
167 171 try:
168 172 for i in xrange(total_files):
169 173 # XXX doesn't support '\n' or '\r' in filenames
170 174 l = fp.readline()
171 175 try:
172 176 name, size = l.split('\0', 1)
173 177 size = int(size)
174 178 except (ValueError, TypeError):
175 179 raise error.ResponseError(
176 180 _('unexpected response from remote server:'), l)
177 181 if repo.ui.debugflag:
178 182 repo.ui.debug('adding %s (%s)\n' %
179 183 (name, util.bytecount(size)))
180 184 # for backwards compat, name was partially encoded
181 185 ofp = repo.svfs(store.decodedir(name), 'w')
182 186 for chunk in util.filechunkiter(fp, limit=size):
183 187 handled_bytes += len(chunk)
184 188 repo.ui.progress(_('clone'), handled_bytes,
185 189 total=total_bytes)
186 190 ofp.write(chunk)
187 191 ofp.close()
188 192 tr.close()
189 193 finally:
190 194 tr.release()
191 195
192 196 # Writing straight to files circumvented the inmemory caches
193 197 repo.invalidate()
194 198
195 199 elapsed = time.time() - start
196 200 if elapsed <= 0:
197 201 elapsed = 0.001
198 202 repo.ui.progress(_('clone'), None)
199 203 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
200 204 (util.bytecount(total_bytes), elapsed,
201 205 util.bytecount(total_bytes / elapsed)))
202 206 finally:
203 207 lock.release()
204 208
205 209 def streamin(repo, remote, remotereqs):
206 210 # Save remote branchmap. We will use it later
207 211 # to speed up branchcache creation
208 212 rbranchmap = None
209 213 if remote.capable("branchmap"):
210 214 rbranchmap = remote.branchmap()
211 215
212 216 fp = remote.stream_out()
213 217 l = fp.readline()
214 218 try:
215 219 resp = int(l)
216 220 except ValueError:
217 221 raise error.ResponseError(
218 222 _('unexpected response from remote server:'), l)
219 223 if resp == 1:
220 224 raise util.Abort(_('operation forbidden by server'))
221 225 elif resp == 2:
222 226 raise util.Abort(_('locking the remote repository failed'))
223 227 elif resp != 0:
224 228 raise util.Abort(_('the server sent an unknown error code'))
225 229
226 230 applyremotedata(repo, remotereqs, rbranchmap, fp)
227 231 return len(repo.heads()) + 1
228 232
229 233 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
230 234 """Apply stream clone data to a repository.
231 235
232 236 "remotereqs" is a set of requirements to handle the incoming data.
233 237 "remotebranchmap" is the result of a branchmap lookup on the remote. It
234 238 can be None.
235 239 "fp" is a file object containing the raw stream data, suitable for
236 240 feeding into consumev1().
237 241 """
238 242 lock = repo.lock()
239 243 try:
240 244 consumev1(repo, fp)
241 245
242 246 # new requirements = old non-format requirements +
243 247 # new format-related remote requirements
244 248 # requirements from the streamed-in repository
245 249 repo.requirements = remotereqs | (
246 250 repo.requirements - repo.supportedformats)
247 251 repo._applyopenerreqs()
248 252 repo._writerequirements()
249 253
250 254 if remotebranchmap:
251 255 rbheads = []
252 256 closed = []
253 257 for bheads in remotebranchmap.itervalues():
254 258 rbheads.extend(bheads)
255 259 for h in bheads:
256 260 r = repo.changelog.rev(h)
257 261 b, c = repo.changelog.branchinfo(r)
258 262 if c:
259 263 closed.append(h)
260 264
261 265 if rbheads:
262 266 rtiprev = max((int(repo.changelog.rev(node))
263 267 for node in rbheads))
264 268 cache = branchmap.branchcache(remotebranchmap,
265 269 repo[rtiprev].node(),
266 270 rtiprev,
267 271 closednodes=closed)
268 272 # Try to stick it as low as possible
269 273 # filter above served are unlikely to be fetch from a clone
270 274 for candidate in ('base', 'immutable', 'served'):
271 275 rview = repo.filtered(candidate)
272 276 if cache.validfor(rview):
273 277 repo._branchcaches[candidate] = cache
274 278 cache.write(rview)
275 279 break
276 280 repo.invalidate()
277 281 finally:
278 282 lock.release()
General Comments 0
You need to be logged in to leave comments. Login now