##// END OF EJS Templates
streamclone: comment why path auditing is disabled in generatev1()...
Yuya Nishihara -
r33411:50b49bb0 default
parent child Browse files
Show More
@@ -1,412 +1,414 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import struct
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 phases,
17 17 store,
18 18 util,
19 19 )
20 20
21 21 def canperformstreamclone(pullop, bailifbundle2supported=False):
22 22 """Whether it is possible to perform a streaming clone as part of pull.
23 23
24 24 ``bailifbundle2supported`` will cause the function to return False if
25 25 bundle2 stream clones are supported. It should only be called by the
26 26 legacy stream clone code path.
27 27
28 28 Returns a tuple of (supported, requirements). ``supported`` is True if
29 29 streaming clone is supported and False otherwise. ``requirements`` is
30 30 a set of repo requirements from the remote, or ``None`` if stream clone
31 31 isn't supported.
32 32 """
33 33 repo = pullop.repo
34 34 remote = pullop.remote
35 35
36 36 bundle2supported = False
37 37 if pullop.canusebundle2:
38 38 if 'v1' in pullop.remotebundle2caps.get('stream', []):
39 39 bundle2supported = True
40 40 # else
41 41 # Server doesn't support bundle2 stream clone or doesn't support
42 42 # the versions we support. Fall back and possibly allow legacy.
43 43
44 44 # Ensures legacy code path uses available bundle2.
45 45 if bailifbundle2supported and bundle2supported:
46 46 return False, None
47 47 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
48 48 #elif not bailifbundle2supported and not bundle2supported:
49 49 # return False, None
50 50
51 51 # Streaming clone only works on empty repositories.
52 52 if len(repo):
53 53 return False, None
54 54
55 55 # Streaming clone only works if all data is being requested.
56 56 if pullop.heads:
57 57 return False, None
58 58
59 59 streamrequested = pullop.streamclonerequested
60 60
61 61 # If we don't have a preference, let the server decide for us. This
62 62 # likely only comes into play in LANs.
63 63 if streamrequested is None:
64 64 # The server can advertise whether to prefer streaming clone.
65 65 streamrequested = remote.capable('stream-preferred')
66 66
67 67 if not streamrequested:
68 68 return False, None
69 69
70 70 # In order for stream clone to work, the client has to support all the
71 71 # requirements advertised by the server.
72 72 #
73 73 # The server advertises its requirements via the "stream" and "streamreqs"
74 74 # capability. "stream" (a value-less capability) is advertised if and only
75 75 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
76 76 # is advertised and contains a comma-delimited list of requirements.
77 77 requirements = set()
78 78 if remote.capable('stream'):
79 79 requirements.add('revlogv1')
80 80 else:
81 81 streamreqs = remote.capable('streamreqs')
82 82 # This is weird and shouldn't happen with modern servers.
83 83 if not streamreqs:
84 84 pullop.repo.ui.warn(_(
85 85 'warning: stream clone requested but server has them '
86 86 'disabled\n'))
87 87 return False, None
88 88
89 89 streamreqs = set(streamreqs.split(','))
90 90 # Server requires something we don't support. Bail.
91 91 missingreqs = streamreqs - repo.supportedformats
92 92 if missingreqs:
93 93 pullop.repo.ui.warn(_(
94 94 'warning: stream clone requested but client is missing '
95 95 'requirements: %s\n') % ', '.join(sorted(missingreqs)))
96 96 pullop.repo.ui.warn(
97 97 _('(see https://www.mercurial-scm.org/wiki/MissingRequirement '
98 98 'for more information)\n'))
99 99 return False, None
100 100 requirements = streamreqs
101 101
102 102 return True, requirements
103 103
104 104 def maybeperformlegacystreamclone(pullop):
105 105 """Possibly perform a legacy stream clone operation.
106 106
107 107 Legacy stream clones are performed as part of pull but before all other
108 108 operations.
109 109
110 110 A legacy stream clone will not be performed if a bundle2 stream clone is
111 111 supported.
112 112 """
113 113 supported, requirements = canperformstreamclone(pullop)
114 114
115 115 if not supported:
116 116 return
117 117
118 118 repo = pullop.repo
119 119 remote = pullop.remote
120 120
121 121 # Save remote branchmap. We will use it later to speed up branchcache
122 122 # creation.
123 123 rbranchmap = None
124 124 if remote.capable('branchmap'):
125 125 rbranchmap = remote.branchmap()
126 126
127 127 repo.ui.status(_('streaming all changes\n'))
128 128
129 129 fp = remote.stream_out()
130 130 l = fp.readline()
131 131 try:
132 132 resp = int(l)
133 133 except ValueError:
134 134 raise error.ResponseError(
135 135 _('unexpected response from remote server:'), l)
136 136 if resp == 1:
137 137 raise error.Abort(_('operation forbidden by server'))
138 138 elif resp == 2:
139 139 raise error.Abort(_('locking the remote repository failed'))
140 140 elif resp != 0:
141 141 raise error.Abort(_('the server sent an unknown error code'))
142 142
143 143 l = fp.readline()
144 144 try:
145 145 filecount, bytecount = map(int, l.split(' ', 1))
146 146 except (ValueError, TypeError):
147 147 raise error.ResponseError(
148 148 _('unexpected response from remote server:'), l)
149 149
150 150 with repo.lock():
151 151 consumev1(repo, fp, filecount, bytecount)
152 152
153 153 # new requirements = old non-format requirements +
154 154 # new format-related remote requirements
155 155 # requirements from the streamed-in repository
156 156 repo.requirements = requirements | (
157 157 repo.requirements - repo.supportedformats)
158 158 repo._applyopenerreqs()
159 159 repo._writerequirements()
160 160
161 161 if rbranchmap:
162 162 branchmap.replacecache(repo, rbranchmap)
163 163
164 164 repo.invalidate()
165 165
166 166 def allowservergeneration(repo):
167 167 """Whether streaming clones are allowed from the server."""
168 168 if not repo.ui.configbool('server', 'uncompressed', True, untrusted=True):
169 169 return False
170 170
171 171 # The way stream clone works makes it impossible to hide secret changesets.
172 172 # So don't allow this by default.
173 173 secret = phases.hassecret(repo)
174 174 if secret:
175 175 return repo.ui.configbool('server', 'uncompressedallowsecret')
176 176
177 177 return True
178 178
179 179 # This is it's own function so extensions can override it.
180 180 def _walkstreamfiles(repo):
181 181 return repo.store.walk()
182 182
183 183 def generatev1(repo):
184 184 """Emit content for version 1 of a streaming clone.
185 185
186 186 This returns a 3-tuple of (file count, byte size, data iterator).
187 187
188 188 The data iterator consists of N entries for each file being transferred.
189 189 Each file entry starts as a line with the file name and integer size
190 190 delimited by a null byte.
191 191
192 192 The raw file data follows. Following the raw file data is the next file
193 193 entry, or EOF.
194 194
195 195 When used on the wire protocol, an additional line indicating protocol
196 196 success will be prepended to the stream. This function is not responsible
197 197 for adding it.
198 198
199 199 This function will obtain a repository lock to ensure a consistent view of
200 200 the store is captured. It therefore may raise LockError.
201 201 """
202 202 entries = []
203 203 total_bytes = 0
204 204 # Get consistent snapshot of repo, lock during scan.
205 205 with repo.lock():
206 206 repo.ui.debug('scanning\n')
207 207 for name, ename, size in _walkstreamfiles(repo):
208 208 if size:
209 209 entries.append((name, size))
210 210 total_bytes += size
211 211
212 212 repo.ui.debug('%d files, %d bytes to transfer\n' %
213 213 (len(entries), total_bytes))
214 214
215 215 svfs = repo.svfs
216 216 debugflag = repo.ui.debugflag
217 217
218 218 def emitrevlogdata():
219 219 for name, size in entries:
220 220 if debugflag:
221 221 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
222 222 # partially encode name over the wire for backwards compat
223 223 yield '%s\0%d\n' % (store.encodedir(name), size)
224 # auditing at this stage is both pointless (paths are already
225 # trusted by the local repo) and expensive
224 226 with svfs(name, 'rb', auditpath=False) as fp:
225 227 if size <= 65536:
226 228 yield fp.read(size)
227 229 else:
228 230 for chunk in util.filechunkiter(fp, limit=size):
229 231 yield chunk
230 232
231 233 return len(entries), total_bytes, emitrevlogdata()
232 234
233 235 def generatev1wireproto(repo):
234 236 """Emit content for version 1 of streaming clone suitable for the wire.
235 237
236 238 This is the data output from ``generatev1()`` with a header line
237 239 indicating file count and byte size.
238 240 """
239 241 filecount, bytecount, it = generatev1(repo)
240 242 yield '%d %d\n' % (filecount, bytecount)
241 243 for chunk in it:
242 244 yield chunk
243 245
244 246 def generatebundlev1(repo, compression='UN'):
245 247 """Emit content for version 1 of a stream clone bundle.
246 248
247 249 The first 4 bytes of the output ("HGS1") denote this as stream clone
248 250 bundle version 1.
249 251
250 252 The next 2 bytes indicate the compression type. Only "UN" is currently
251 253 supported.
252 254
253 255 The next 16 bytes are two 64-bit big endian unsigned integers indicating
254 256 file count and byte count, respectively.
255 257
256 258 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
257 259 of the requirements string, including a trailing \0. The following N bytes
258 260 are the requirements string, which is ASCII containing a comma-delimited
259 261 list of repo requirements that are needed to support the data.
260 262
261 263 The remaining content is the output of ``generatev1()`` (which may be
262 264 compressed in the future).
263 265
264 266 Returns a tuple of (requirements, data generator).
265 267 """
266 268 if compression != 'UN':
267 269 raise ValueError('we do not support the compression argument yet')
268 270
269 271 requirements = repo.requirements & repo.supportedformats
270 272 requires = ','.join(sorted(requirements))
271 273
272 274 def gen():
273 275 yield 'HGS1'
274 276 yield compression
275 277
276 278 filecount, bytecount, it = generatev1(repo)
277 279 repo.ui.status(_('writing %d bytes for %d files\n') %
278 280 (bytecount, filecount))
279 281
280 282 yield struct.pack('>QQ', filecount, bytecount)
281 283 yield struct.pack('>H', len(requires) + 1)
282 284 yield requires + '\0'
283 285
284 286 # This is where we'll add compression in the future.
285 287 assert compression == 'UN'
286 288
287 289 seen = 0
288 290 repo.ui.progress(_('bundle'), 0, total=bytecount, unit=_('bytes'))
289 291
290 292 for chunk in it:
291 293 seen += len(chunk)
292 294 repo.ui.progress(_('bundle'), seen, total=bytecount,
293 295 unit=_('bytes'))
294 296 yield chunk
295 297
296 298 repo.ui.progress(_('bundle'), None)
297 299
298 300 return requirements, gen()
299 301
300 302 def consumev1(repo, fp, filecount, bytecount):
301 303 """Apply the contents from version 1 of a streaming clone file handle.
302 304
303 305 This takes the output from "stream_out" and applies it to the specified
304 306 repository.
305 307
306 308 Like "stream_out," the status line added by the wire protocol is not
307 309 handled by this function.
308 310 """
309 311 with repo.lock():
310 312 repo.ui.status(_('%d files to transfer, %s of data\n') %
311 313 (filecount, util.bytecount(bytecount)))
312 314 handled_bytes = 0
313 315 repo.ui.progress(_('clone'), 0, total=bytecount, unit=_('bytes'))
314 316 start = util.timer()
315 317
316 318 # TODO: get rid of (potential) inconsistency
317 319 #
318 320 # If transaction is started and any @filecache property is
319 321 # changed at this point, it causes inconsistency between
320 322 # in-memory cached property and streamclone-ed file on the
321 323 # disk. Nested transaction prevents transaction scope "clone"
322 324 # below from writing in-memory changes out at the end of it,
323 325 # even though in-memory changes are discarded at the end of it
324 326 # regardless of transaction nesting.
325 327 #
326 328 # But transaction nesting can't be simply prohibited, because
327 329 # nesting occurs also in ordinary case (e.g. enabling
328 330 # clonebundles).
329 331
330 332 with repo.transaction('clone'):
331 333 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
332 334 for i in xrange(filecount):
333 335 # XXX doesn't support '\n' or '\r' in filenames
334 336 l = fp.readline()
335 337 try:
336 338 name, size = l.split('\0', 1)
337 339 size = int(size)
338 340 except (ValueError, TypeError):
339 341 raise error.ResponseError(
340 342 _('unexpected response from remote server:'), l)
341 343 if repo.ui.debugflag:
342 344 repo.ui.debug('adding %s (%s)\n' %
343 345 (name, util.bytecount(size)))
344 346 # for backwards compat, name was partially encoded
345 347 path = store.decodedir(name)
346 348 with repo.svfs(path, 'w', backgroundclose=True) as ofp:
347 349 for chunk in util.filechunkiter(fp, limit=size):
348 350 handled_bytes += len(chunk)
349 351 repo.ui.progress(_('clone'), handled_bytes,
350 352 total=bytecount, unit=_('bytes'))
351 353 ofp.write(chunk)
352 354
353 355 # force @filecache properties to be reloaded from
354 356 # streamclone-ed file at next access
355 357 repo.invalidate(clearfilecache=True)
356 358
357 359 elapsed = util.timer() - start
358 360 if elapsed <= 0:
359 361 elapsed = 0.001
360 362 repo.ui.progress(_('clone'), None)
361 363 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
362 364 (util.bytecount(bytecount), elapsed,
363 365 util.bytecount(bytecount / elapsed)))
364 366
365 367 def readbundle1header(fp):
366 368 compression = fp.read(2)
367 369 if compression != 'UN':
368 370 raise error.Abort(_('only uncompressed stream clone bundles are '
369 371 'supported; got %s') % compression)
370 372
371 373 filecount, bytecount = struct.unpack('>QQ', fp.read(16))
372 374 requireslen = struct.unpack('>H', fp.read(2))[0]
373 375 requires = fp.read(requireslen)
374 376
375 377 if not requires.endswith('\0'):
376 378 raise error.Abort(_('malformed stream clone bundle: '
377 379 'requirements not properly encoded'))
378 380
379 381 requirements = set(requires.rstrip('\0').split(','))
380 382
381 383 return filecount, bytecount, requirements
382 384
383 385 def applybundlev1(repo, fp):
384 386 """Apply the content from a stream clone bundle version 1.
385 387
386 388 We assume the 4 byte header has been read and validated and the file handle
387 389 is at the 2 byte compression identifier.
388 390 """
389 391 if len(repo):
390 392 raise error.Abort(_('cannot apply stream clone bundle on non-empty '
391 393 'repo'))
392 394
393 395 filecount, bytecount, requirements = readbundle1header(fp)
394 396 missingreqs = requirements - repo.supportedformats
395 397 if missingreqs:
396 398 raise error.Abort(_('unable to apply stream clone: '
397 399 'unsupported format: %s') %
398 400 ', '.join(sorted(missingreqs)))
399 401
400 402 consumev1(repo, fp, filecount, bytecount)
401 403
402 404 class streamcloneapplier(object):
403 405 """Class to manage applying streaming clone bundles.
404 406
405 407 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
406 408 readers to perform bundle type-specific functionality.
407 409 """
408 410 def __init__(self, fh):
409 411 self._fh = fh
410 412
411 413 def apply(self, repo):
412 414 return applybundlev1(repo, self._fh)
General Comments 0
You need to be logged in to leave comments. Login now