##// END OF EJS Templates
stream: prefer keeping an open file handle to volatile file instead of copy...
marmoute -
r52912:a47f09da default
parent child Browse files
Show More
@@ -1,1178 +1,1238 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import annotations
9 9
10 10 import contextlib
11 11 import os
12 12 import struct
13 13
14 14 from .i18n import _
15 15 from .interfaces import repository
16 16 from . import (
17 17 bookmarks,
18 18 bundle2 as bundle2mod,
19 19 cacheutil,
20 20 error,
21 21 narrowspec,
22 22 phases,
23 23 pycompat,
24 24 requirements as requirementsmod,
25 25 scmutil,
26 26 store,
27 27 transaction,
28 28 util,
29 29 )
30 30 from .revlogutils import (
31 31 nodemap,
32 32 )
33 33
34 34
35 35 def new_stream_clone_requirements(default_requirements, streamed_requirements):
36 36 """determine the final set of requirement for a new stream clone
37 37
38 38 this method combine the "default" requirements that a new repository would
39 39 use with the constaint we get from the stream clone content. We keep local
40 40 configuration choice when possible.
41 41 """
42 42 requirements = set(default_requirements)
43 43 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
44 44 requirements.update(streamed_requirements)
45 45 return requirements
46 46
47 47
48 48 def streamed_requirements(repo):
49 49 """the set of requirement the new clone will have to support
50 50
51 51 This is used for advertising the stream options and to generate the actual
52 52 stream content."""
53 53 requiredformats = (
54 54 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
55 55 )
56 56 return requiredformats
57 57
58 58
59 59 def canperformstreamclone(pullop, bundle2=False):
60 60 """Whether it is possible to perform a streaming clone as part of pull.
61 61
62 62 ``bundle2`` will cause the function to consider stream clone through
63 63 bundle2 and only through bundle2.
64 64
65 65 Returns a tuple of (supported, requirements). ``supported`` is True if
66 66 streaming clone is supported and False otherwise. ``requirements`` is
67 67 a set of repo requirements from the remote, or ``None`` if stream clone
68 68 isn't supported.
69 69 """
70 70 repo = pullop.repo
71 71 remote = pullop.remote
72 72
73 73 # should we consider streaming clone at all ?
74 74 streamrequested = pullop.streamclonerequested
75 75 # If we don't have a preference, let the server decide for us. This
76 76 # likely only comes into play in LANs.
77 77 if streamrequested is None:
78 78 # The server can advertise whether to prefer streaming clone.
79 79 streamrequested = remote.capable(b'stream-preferred')
80 80 if not streamrequested:
81 81 return False, None
82 82
83 83 # Streaming clone only works on an empty destination repository
84 84 if len(repo):
85 85 return False, None
86 86
87 87 # Streaming clone only works if all data is being requested.
88 88 if pullop.heads:
89 89 return False, None
90 90
91 91 bundle2supported = False
92 92 if pullop.canusebundle2:
93 93 local_caps = bundle2mod.getrepocaps(repo, role=b'client')
94 94 local_supported = set(local_caps.get(b'stream', []))
95 95 remote_supported = set(pullop.remotebundle2caps.get(b'stream', []))
96 96 bundle2supported = bool(local_supported & remote_supported)
97 97 # else
98 98 # Server doesn't support bundle2 stream clone or doesn't support
99 99 # the versions we support. Fall back and possibly allow legacy.
100 100
101 101 # Ensures legacy code path uses available bundle2.
102 102 if bundle2supported and not bundle2:
103 103 return False, None
104 104 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
105 105 elif bundle2 and not bundle2supported:
106 106 return False, None
107 107
108 108 # In order for stream clone to work, the client has to support all the
109 109 # requirements advertised by the server.
110 110 #
111 111 # The server advertises its requirements via the "stream" and "streamreqs"
112 112 # capability. "stream" (a value-less capability) is advertised if and only
113 113 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
114 114 # is advertised and contains a comma-delimited list of requirements.
115 115 requirements = set()
116 116 if remote.capable(b'stream'):
117 117 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
118 118 else:
119 119 streamreqs = remote.capable(b'streamreqs')
120 120 # This is weird and shouldn't happen with modern servers.
121 121 if not streamreqs:
122 122 pullop.repo.ui.warn(
123 123 _(
124 124 b'warning: stream clone requested but server has them '
125 125 b'disabled\n'
126 126 )
127 127 )
128 128 return False, None
129 129
130 130 streamreqs = set(streamreqs.split(b','))
131 131 # Server requires something we don't support. Bail.
132 132 missingreqs = streamreqs - repo.supported
133 133 if missingreqs:
134 134 pullop.repo.ui.warn(
135 135 _(
136 136 b'warning: stream clone requested but client is missing '
137 137 b'requirements: %s\n'
138 138 )
139 139 % b', '.join(sorted(missingreqs))
140 140 )
141 141 pullop.repo.ui.warn(
142 142 _(
143 143 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
144 144 b'for more information)\n'
145 145 )
146 146 )
147 147 return False, None
148 148 requirements = streamreqs
149 149
150 150 return True, requirements
151 151
152 152
153 153 def maybeperformlegacystreamclone(pullop):
154 154 """Possibly perform a legacy stream clone operation.
155 155
156 156 Legacy stream clones are performed as part of pull but before all other
157 157 operations.
158 158
159 159 A legacy stream clone will not be performed if a bundle2 stream clone is
160 160 supported.
161 161 """
162 162 from . import localrepo
163 163
164 164 supported, requirements = canperformstreamclone(pullop)
165 165
166 166 if not supported:
167 167 return
168 168
169 169 repo = pullop.repo
170 170 remote = pullop.remote
171 171
172 172 # Save remote branchmap. We will use it later to speed up branchcache
173 173 # creation.
174 174 rbranchmap = None
175 175 if remote.capable(b'branchmap'):
176 176 with remote.commandexecutor() as e:
177 177 rbranchmap = e.callcommand(b'branchmap', {}).result()
178 178
179 179 repo.ui.status(_(b'streaming all changes\n'))
180 180
181 181 with remote.commandexecutor() as e:
182 182 fp = e.callcommand(b'stream_out', {}).result()
183 183
184 184 # TODO strictly speaking, this code should all be inside the context
185 185 # manager because the context manager is supposed to ensure all wire state
186 186 # is flushed when exiting. But the legacy peers don't do this, so it
187 187 # doesn't matter.
188 188 l = fp.readline()
189 189 try:
190 190 resp = int(l)
191 191 except ValueError:
192 192 raise error.ResponseError(
193 193 _(b'unexpected response from remote server:'), l
194 194 )
195 195 if resp == 1:
196 196 raise error.Abort(_(b'operation forbidden by server'))
197 197 elif resp == 2:
198 198 raise error.Abort(_(b'locking the remote repository failed'))
199 199 elif resp != 0:
200 200 raise error.Abort(_(b'the server sent an unknown error code'))
201 201
202 202 l = fp.readline()
203 203 try:
204 204 filecount, bytecount = map(int, l.split(b' ', 1))
205 205 except (ValueError, TypeError):
206 206 raise error.ResponseError(
207 207 _(b'unexpected response from remote server:'), l
208 208 )
209 209
210 210 with repo.lock():
211 211 consumev1(repo, fp, filecount, bytecount)
212 212 repo.requirements = new_stream_clone_requirements(
213 213 repo.requirements,
214 214 requirements,
215 215 )
216 216 repo.svfs.options = localrepo.resolvestorevfsoptions(
217 217 repo.ui, repo.requirements, repo.features
218 218 )
219 219 scmutil.writereporequirements(repo)
220 220 nodemap.post_stream_cleanup(repo)
221 221
222 222 if rbranchmap:
223 223 repo._branchcaches.replace(repo, rbranchmap)
224 224
225 225 repo.invalidate()
226 226
227 227
228 228 def allowservergeneration(repo):
229 229 """Whether streaming clones are allowed from the server."""
230 230 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
231 231 return False
232 232
233 233 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
234 234 return False
235 235
236 236 # The way stream clone works makes it impossible to hide secret changesets.
237 237 # So don't allow this by default.
238 238 secret = phases.hassecret(repo)
239 239 if secret:
240 240 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
241 241
242 242 return True
243 243
244 244
245 245 # This is it's own function so extensions can override it.
246 246 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
247 247 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
248 248
249 249
250 250 def generatev1(repo):
251 251 """Emit content for version 1 of a streaming clone.
252 252
253 253 This returns a 3-tuple of (file count, byte size, data iterator).
254 254
255 255 The data iterator consists of N entries for each file being transferred.
256 256 Each file entry starts as a line with the file name and integer size
257 257 delimited by a null byte.
258 258
259 259 The raw file data follows. Following the raw file data is the next file
260 260 entry, or EOF.
261 261
262 262 When used on the wire protocol, an additional line indicating protocol
263 263 success will be prepended to the stream. This function is not responsible
264 264 for adding it.
265 265
266 266 This function will obtain a repository lock to ensure a consistent view of
267 267 the store is captured. It therefore may raise LockError.
268 268 """
269 269 entries = []
270 270 total_bytes = 0
271 271 # Get consistent snapshot of repo, lock during scan.
272 272 with repo.lock():
273 273 repo.ui.debug(b'scanning\n')
274 274 for entry in _walkstreamfiles(repo):
275 275 for f in entry.files():
276 276 file_size = f.file_size(repo.store.vfs)
277 277 if file_size:
278 278 entries.append((f.unencoded_path, file_size))
279 279 total_bytes += file_size
280 280 _test_sync_point_walk_1(repo)
281 281 _test_sync_point_walk_2(repo)
282 282
283 283 repo.ui.debug(
284 284 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
285 285 )
286 286
287 287 svfs = repo.svfs
288 288 debugflag = repo.ui.debugflag
289 289
290 290 def emitrevlogdata():
291 291 for name, size in entries:
292 292 if debugflag:
293 293 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
294 294 # partially encode name over the wire for backwards compat
295 295 yield b'%s\0%d\n' % (store.encodedir(name), size)
296 296 # auditing at this stage is both pointless (paths are already
297 297 # trusted by the local repo) and expensive
298 298 with svfs(name, b'rb', auditpath=False) as fp:
299 299 if size <= 65536:
300 300 yield fp.read(size)
301 301 else:
302 302 for chunk in util.filechunkiter(fp, limit=size):
303 303 yield chunk
304 304
305 305 return len(entries), total_bytes, emitrevlogdata()
306 306
307 307
308 308 def generatev1wireproto(repo):
309 309 """Emit content for version 1 of streaming clone suitable for the wire.
310 310
311 311 This is the data output from ``generatev1()`` with 2 header lines. The
312 312 first line indicates overall success. The 2nd contains the file count and
313 313 byte size of payload.
314 314
315 315 The success line contains "0" for success, "1" for stream generation not
316 316 allowed, and "2" for error locking the repository (possibly indicating
317 317 a permissions error for the server process).
318 318 """
319 319 if not allowservergeneration(repo):
320 320 yield b'1\n'
321 321 return
322 322
323 323 try:
324 324 filecount, bytecount, it = generatev1(repo)
325 325 except error.LockError:
326 326 yield b'2\n'
327 327 return
328 328
329 329 # Indicates successful response.
330 330 yield b'0\n'
331 331 yield b'%d %d\n' % (filecount, bytecount)
332 332 for chunk in it:
333 333 yield chunk
334 334
335 335
336 336 def generatebundlev1(repo, compression=b'UN'):
337 337 """Emit content for version 1 of a stream clone bundle.
338 338
339 339 The first 4 bytes of the output ("HGS1") denote this as stream clone
340 340 bundle version 1.
341 341
342 342 The next 2 bytes indicate the compression type. Only "UN" is currently
343 343 supported.
344 344
345 345 The next 16 bytes are two 64-bit big endian unsigned integers indicating
346 346 file count and byte count, respectively.
347 347
348 348 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
349 349 of the requirements string, including a trailing \0. The following N bytes
350 350 are the requirements string, which is ASCII containing a comma-delimited
351 351 list of repo requirements that are needed to support the data.
352 352
353 353 The remaining content is the output of ``generatev1()`` (which may be
354 354 compressed in the future).
355 355
356 356 Returns a tuple of (requirements, data generator).
357 357 """
358 358 if compression != b'UN':
359 359 raise ValueError(b'we do not support the compression argument yet')
360 360
361 361 requirements = streamed_requirements(repo)
362 362 requires = b','.join(sorted(requirements))
363 363
364 364 def gen():
365 365 yield b'HGS1'
366 366 yield compression
367 367
368 368 filecount, bytecount, it = generatev1(repo)
369 369 repo.ui.status(
370 370 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
371 371 )
372 372
373 373 yield struct.pack(b'>QQ', filecount, bytecount)
374 374 yield struct.pack(b'>H', len(requires) + 1)
375 375 yield requires + b'\0'
376 376
377 377 # This is where we'll add compression in the future.
378 378 assert compression == b'UN'
379 379
380 380 progress = repo.ui.makeprogress(
381 381 _(b'bundle'), total=bytecount, unit=_(b'bytes')
382 382 )
383 383 progress.update(0)
384 384
385 385 for chunk in it:
386 386 progress.increment(step=len(chunk))
387 387 yield chunk
388 388
389 389 progress.complete()
390 390
391 391 return requirements, gen()
392 392
393 393
394 394 def consumev1(repo, fp, filecount, bytecount):
395 395 """Apply the contents from version 1 of a streaming clone file handle.
396 396
397 397 This takes the output from "stream_out" and applies it to the specified
398 398 repository.
399 399
400 400 Like "stream_out," the status line added by the wire protocol is not
401 401 handled by this function.
402 402 """
403 403 with repo.lock():
404 404 repo.ui.status(
405 405 _(b'%d files to transfer, %s of data\n')
406 406 % (filecount, util.bytecount(bytecount))
407 407 )
408 408 progress = repo.ui.makeprogress(
409 409 _(b'clone'), total=bytecount, unit=_(b'bytes')
410 410 )
411 411 progress.update(0)
412 412 start = util.timer()
413 413
414 414 # TODO: get rid of (potential) inconsistency
415 415 #
416 416 # If transaction is started and any @filecache property is
417 417 # changed at this point, it causes inconsistency between
418 418 # in-memory cached property and streamclone-ed file on the
419 419 # disk. Nested transaction prevents transaction scope "clone"
420 420 # below from writing in-memory changes out at the end of it,
421 421 # even though in-memory changes are discarded at the end of it
422 422 # regardless of transaction nesting.
423 423 #
424 424 # But transaction nesting can't be simply prohibited, because
425 425 # nesting occurs also in ordinary case (e.g. enabling
426 426 # clonebundles).
427 427
428 428 with repo.transaction(b'clone'):
429 429 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
430 430 for i in range(filecount):
431 431 # XXX doesn't support '\n' or '\r' in filenames
432 432 if hasattr(fp, 'readline'):
433 433 l = fp.readline()
434 434 else:
435 435 # inline clonebundles use a chunkbuffer, so no readline
436 436 # --> this should be small anyway, the first line
437 437 # only contains the size of the bundle
438 438 l_buf = []
439 439 while not (l_buf and l_buf[-1] == b'\n'):
440 440 l_buf.append(fp.read(1))
441 441 l = b''.join(l_buf)
442 442 try:
443 443 name, size = l.split(b'\0', 1)
444 444 size = int(size)
445 445 except (ValueError, TypeError):
446 446 raise error.ResponseError(
447 447 _(b'unexpected response from remote server:'), l
448 448 )
449 449 if repo.ui.debugflag:
450 450 repo.ui.debug(
451 451 b'adding %s (%s)\n' % (name, util.bytecount(size))
452 452 )
453 453 # for backwards compat, name was partially encoded
454 454 path = store.decodedir(name)
455 455 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
456 456 for chunk in util.filechunkiter(fp, limit=size):
457 457 progress.increment(step=len(chunk))
458 458 ofp.write(chunk)
459 459
460 460 # force @filecache properties to be reloaded from
461 461 # streamclone-ed file at next access
462 462 repo.invalidate(clearfilecache=True)
463 463
464 464 elapsed = util.timer() - start
465 465 if elapsed <= 0:
466 466 elapsed = 0.001
467 467 progress.complete()
468 468 repo.ui.status(
469 469 _(b'transferred %s in %.1f seconds (%s/sec)\n')
470 470 % (
471 471 util.bytecount(bytecount),
472 472 elapsed,
473 473 util.bytecount(bytecount / elapsed),
474 474 )
475 475 )
476 476
477 477
478 478 def readbundle1header(fp):
479 479 compression = fp.read(2)
480 480 if compression != b'UN':
481 481 raise error.Abort(
482 482 _(
483 483 b'only uncompressed stream clone bundles are '
484 484 b'supported; got %s'
485 485 )
486 486 % compression
487 487 )
488 488
489 489 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
490 490 requireslen = struct.unpack(b'>H', fp.read(2))[0]
491 491 requires = fp.read(requireslen)
492 492
493 493 if not requires.endswith(b'\0'):
494 494 raise error.Abort(
495 495 _(
496 496 b'malformed stream clone bundle: '
497 497 b'requirements not properly encoded'
498 498 )
499 499 )
500 500
501 501 requirements = set(requires.rstrip(b'\0').split(b','))
502 502
503 503 return filecount, bytecount, requirements
504 504
505 505
506 506 def applybundlev1(repo, fp):
507 507 """Apply the content from a stream clone bundle version 1.
508 508
509 509 We assume the 4 byte header has been read and validated and the file handle
510 510 is at the 2 byte compression identifier.
511 511 """
512 512 if len(repo):
513 513 raise error.Abort(
514 514 _(b'cannot apply stream clone bundle on non-empty repo')
515 515 )
516 516
517 517 filecount, bytecount, requirements = readbundle1header(fp)
518 518 missingreqs = requirements - repo.supported
519 519 if missingreqs:
520 520 raise error.Abort(
521 521 _(b'unable to apply stream clone: unsupported format: %s')
522 522 % b', '.join(sorted(missingreqs))
523 523 )
524 524
525 525 consumev1(repo, fp, filecount, bytecount)
526 526 nodemap.post_stream_cleanup(repo)
527 527
528 528
529 529 class streamcloneapplier:
530 530 """Class to manage applying streaming clone bundles.
531 531
532 532 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
533 533 readers to perform bundle type-specific functionality.
534 534 """
535 535
536 536 def __init__(self, fh):
537 537 self._fh = fh
538 538
539 539 def apply(self, repo):
540 540 return applybundlev1(repo, self._fh)
541 541
542 542
543 543 # type of file to stream
544 544 _fileappend = 0 # append only file
545 545 _filefull = 1 # full snapshot file
546 546
547 547 # Source of the file
548 548 _srcstore = b's' # store (svfs)
549 549 _srccache = b'c' # cache (cache)
550 550
551 551
552 552 # This is it's own function so extensions can override it.
553 553 def _walkstreamfullstorefiles(repo):
554 554 """list snapshot file from the store"""
555 555 fnames = []
556 556 if not repo.publishing():
557 557 fnames.append(b'phaseroots')
558 558 return fnames
559 559
560 560
561 561 def _filterfull(entry, copy, vfsmap):
562 562 """actually copy the snapshot files"""
563 563 src, name, ftype, data = entry
564 564 if ftype != _filefull:
565 565 return entry
566 566 return (src, name, ftype, copy(vfsmap[src].join(name)))
567 567
568 568
569 569 class VolatileManager:
570 """Manage temporary backup of volatile file during stream clone
570 """Manage temporary backups of volatile files during stream clone.
571 571
572 This should be used as a Python context, the copies will be discarded when
573 exiting the context.
572 This class will keep open file handles for the volatile files, writing the
573 smaller ones on disk if the number of open file handles grow too much.
574 574
575 A copy can be done by calling the object on the real path (encoded full
576 path)
575 This should be used as a Python context, the file handles and copies will
576 be discarded when exiting the context.
577 577
578 The backup path can be retrieved using the __getitem__ protocol, obj[path].
579 On file without backup, it will return the unmodified path. (equivalent to
580 `dict.get(x, x)`)
578 The preservation can be done by calling the object on the real path
579 (encoded full path).
580
581 Valid filehandles for any file should be retrieved by calling `open(path)`.
581 582 """
582 583
584 # arbitrarily picked as "it seemed fine" and much higher than the current
585 # usage.
586 MAX_OPEN = 100
587
583 588 def __init__(self):
589 self._counter = 0
590 self._volatile_fps = None
584 591 self._copies = None
585 592 self._dst_dir = None
586 593
587 594 def __enter__(self):
588 if self._copies is not None:
589 msg = "Copies context already open"
590 raise error.ProgrammingError(msg)
591 self._copies = {}
592 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
595 if self._counter == 0:
596 assert self._volatile_fps is None
597 self._volatile_fps = {}
598 self._counter += 1
593 599 return self
594 600
595 def __call__(self, src):
596 """create a backup of the file at src"""
597 prefix = os.path.basename(src)
598 fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
599 os.close(fd)
600 self._copies[src] = dst
601 util.copyfiles(src, dst, hardlink=True)
602 return dst
603
604 @contextlib.contextmanager
605 def open(self, src):
606 actual_path = self._copies.get(src, src)
607 with open(actual_path, 'rb') as fp:
608 yield fp
609
610 601 def __exit__(self, *args, **kwars):
611 602 """discard all backups"""
603 self._counter -= 1
604 if self._counter == 0:
605 for _size, fp in self._volatile_fps.values():
606 fp.close()
607 self._volatile_fps = None
608 if self._copies is not None:
612 609 for tmp in self._copies.values():
613 610 util.tryunlink(tmp)
614 611 util.tryrmdir(self._dst_dir)
615 612 self._copies = None
616 613 self._dst_dir = None
614 assert self._volatile_fps is None
615 assert self._copies is None
616 assert self._dst_dir is None
617
618 def _init_tmp_copies(self):
619 """prepare a temporary directory to save volatile files
620
621 This will be used as backup if we have too many files open"""
622 assert 0 < self._counter
623 assert self._copies is None
624 assert self._dst_dir is None
625 self._copies = {}
626 self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
627
628 def _flush_some_on_disk(self):
629 """move some of the open files to tempory files on disk"""
630 if self._copies is None:
631 self._init_tmp_copies()
632 flush_count = self.MAX_OPEN // 2
633 for src, (size, fp) in sorted(self._volatile_fps.items())[:flush_count]:
634 prefix = os.path.basename(src)
635 fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir)
636 self._copies[src] = dst
637 os.close(fd)
638 # we no longer hardlink, but on the other hand we rarely do this,
639 # and we do it for the smallest file only and not at all in the
640 # common case.
641 with open(dst, 'wb') as bck:
642 fp.seek(0)
643 bck.write(fp.read())
644 del self._volatile_fps[src]
645 fp.close()
646
647 def _keep_one(self, src):
648 """preserve an open file handle for a given path"""
649 # store the file quickly to ensure we close it if any error happens
650 _, fp = self._volatile_fps[src] = (None, open(src, 'rb'))
651 fp.seek(0, os.SEEK_END)
652 size = fp.tell()
653 self._volatile_fps[src] = (size, fp)
654
655 def __call__(self, src):
656 """preserve the volatile file at src"""
657 assert 0 < self._counter
658 if len(self._volatile_fps) >= (self.MAX_OPEN - 1):
659 self._flush_some_on_disk()
660 self._keep_one(src)
661
662 @contextlib.contextmanager
663 def open(self, src):
664 assert 0 < self._counter
665 entry = self._volatile_fps.get(src)
666 if entry is not None:
667 _size, fp = entry
668 fp.seek(0)
669 yield fp
670 else:
671 if self._copies is None:
672 actual_path = src
673 else:
674 actual_path = self._copies.get(src, src)
675 with open(actual_path, 'rb') as fp:
676 yield fp
617 677
618 678
619 679 def _makemap(repo):
620 680 """make a (src -> vfs) map for the repo"""
621 681 vfsmap = {
622 682 _srcstore: repo.svfs,
623 683 _srccache: repo.cachevfs,
624 684 }
625 685 # we keep repo.vfs out of the on purpose, ther are too many danger there
626 686 # (eg: .hg/hgrc)
627 687 assert repo.vfs not in vfsmap.values()
628 688
629 689 return vfsmap
630 690
631 691
632 692 def _emit2(repo, entries):
633 693 """actually emit the stream bundle"""
634 694 vfsmap = _makemap(repo)
635 695 # we keep repo.vfs out of the on purpose, ther are too many danger there
636 696 # (eg: .hg/hgrc),
637 697 #
638 698 # this assert is duplicated (from _makemap) as author might think this is
639 699 # fine, while this is really not fine.
640 700 if repo.vfs in vfsmap.values():
641 701 raise error.ProgrammingError(
642 702 b'repo.vfs must not be added to vfsmap for security reasons'
643 703 )
644 704
645 705 # translate the vfs one
646 706 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
647 707
648 708 max_linkrev = len(repo)
649 709 file_count = totalfilesize = 0
650 710 with util.nogc():
651 711 # record the expected size of every file
652 712 for k, vfs, e in entries:
653 713 for f in e.files():
654 714 file_count += 1
655 715 totalfilesize += f.file_size(vfs)
656 716
657 717 progress = repo.ui.makeprogress(
658 718 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
659 719 )
660 720 progress.update(0)
661 721 with VolatileManager() as volatiles, progress:
662 722 # make sure we preserve volatile files
663 723 for k, vfs, e in entries:
664 724 for f in e.files():
665 725 if f.is_volatile:
666 726 volatiles(vfs.join(f.unencoded_path))
667 727 # the first yield release the lock on the repository
668 728 yield file_count, totalfilesize
669 729 totalbytecount = 0
670 730
671 731 for src, vfs, e in entries:
672 732 entry_streams = e.get_streams(
673 733 repo=repo,
674 734 vfs=vfs,
675 735 volatiles=volatiles,
676 736 max_changeset=max_linkrev,
677 737 preserve_file_count=True,
678 738 )
679 739 for name, stream, size in entry_streams:
680 740 yield src
681 741 yield util.uvarintencode(len(name))
682 742 yield util.uvarintencode(size)
683 743 yield name
684 744 bytecount = 0
685 745 for chunk in stream:
686 746 bytecount += len(chunk)
687 747 totalbytecount += len(chunk)
688 748 progress.update(totalbytecount)
689 749 yield chunk
690 750 if bytecount != size:
691 751 # Would most likely be caused by a race due to `hg
692 752 # strip` or a revlog split
693 753 msg = _(
694 754 b'clone could only read %d bytes from %s, but '
695 755 b'expected %d bytes'
696 756 )
697 757 raise error.Abort(msg % (bytecount, name, size))
698 758
699 759
700 760 def _emit3(repo, entries):
701 761 """actually emit the stream bundle (v3)"""
702 762 vfsmap = _makemap(repo)
703 763 # we keep repo.vfs out of the map on purpose, ther are too many dangers
704 764 # there (eg: .hg/hgrc),
705 765 #
706 766 # this assert is duplicated (from _makemap) as authors might think this is
707 767 # fine, while this is really not fine.
708 768 if repo.vfs in vfsmap.values():
709 769 raise error.ProgrammingError(
710 770 b'repo.vfs must not be added to vfsmap for security reasons'
711 771 )
712 772
713 773 # translate the vfs once
714 774 entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
715 775 total_entry_count = len(entries)
716 776
717 777 max_linkrev = len(repo)
718 778 progress = repo.ui.makeprogress(
719 779 _(b'bundle'),
720 780 total=total_entry_count,
721 781 unit=_(b'entry'),
722 782 )
723 783 progress.update(0)
724 784 with VolatileManager() as volatiles, progress:
725 785 # make sure we preserve volatile files
726 786 for k, vfs, e in entries:
727 787 if e.maybe_volatile:
728 788 for f in e.files():
729 789 if f.is_volatile:
730 790 # record the expected size under lock
731 791 f.file_size(vfs)
732 792 volatiles(vfs.join(f.unencoded_path))
733 793 # the first yield release the lock on the repository
734 794 yield None
735 795
736 796 yield util.uvarintencode(total_entry_count)
737 797
738 798 for src, vfs, e in entries:
739 799 entry_streams = e.get_streams(
740 800 repo=repo,
741 801 vfs=vfs,
742 802 volatiles=volatiles,
743 803 max_changeset=max_linkrev,
744 804 )
745 805 yield util.uvarintencode(len(entry_streams))
746 806 for name, stream, size in entry_streams:
747 807 yield src
748 808 yield util.uvarintencode(len(name))
749 809 yield util.uvarintencode(size)
750 810 yield name
751 811 yield from stream
752 812 progress.increment()
753 813
754 814
755 815 def _test_sync_point_walk_1(repo):
756 816 """a function for synchronisation during tests"""
757 817
758 818
759 819 def _test_sync_point_walk_2(repo):
760 820 """a function for synchronisation during tests"""
761 821
762 822
763 823 def _entries_walk(repo, includes, excludes, includeobsmarkers):
764 824 """emit a seris of files information useful to clone a repo
765 825
766 826 return (vfs-key, entry) iterator
767 827
768 828 Where `entry` is StoreEntry. (used even for cache entries)
769 829 """
770 830 assert repo._currentlock(repo._lockref) is not None
771 831
772 832 matcher = None
773 833 if includes or excludes:
774 834 matcher = narrowspec.match(repo.root, includes, excludes)
775 835
776 836 phase = not repo.publishing()
777 837 # Python is getting crazy at all the small container we creates, disabling
778 838 # the gc while we do so helps performance a lot.
779 839 with util.nogc():
780 840 entries = _walkstreamfiles(
781 841 repo,
782 842 matcher,
783 843 phase=phase,
784 844 obsolescence=includeobsmarkers,
785 845 )
786 846 for entry in entries:
787 847 yield (_srcstore, entry)
788 848
789 849 for name in cacheutil.cachetocopy(repo):
790 850 if repo.cachevfs.exists(name):
791 851 # not really a StoreEntry, but close enough
792 852 entry = store.SimpleStoreEntry(
793 853 entry_path=name,
794 854 is_volatile=True,
795 855 )
796 856 yield (_srccache, entry)
797 857
798 858
799 859 def generatev2(repo, includes, excludes, includeobsmarkers):
800 860 """Emit content for version 2 of a streaming clone.
801 861
802 862 the data stream consists the following entries:
803 863 1) A char representing the file destination (eg: store or cache)
804 864 2) A varint containing the length of the filename
805 865 3) A varint containing the length of file data
806 866 4) N bytes containing the filename (the internal, store-agnostic form)
807 867 5) N bytes containing the file data
808 868
809 869 Returns a 3-tuple of (file count, file size, data iterator).
810 870 """
811 871
812 872 with repo.lock():
813 873 repo.ui.debug(b'scanning\n')
814 874
815 875 entries = _entries_walk(
816 876 repo,
817 877 includes=includes,
818 878 excludes=excludes,
819 879 includeobsmarkers=includeobsmarkers,
820 880 )
821 881
822 882 chunks = _emit2(repo, entries)
823 883 first = next(chunks)
824 884 file_count, total_file_size = first
825 885 _test_sync_point_walk_1(repo)
826 886 _test_sync_point_walk_2(repo)
827 887
828 888 return file_count, total_file_size, chunks
829 889
830 890
831 891 def generatev3(repo, includes, excludes, includeobsmarkers):
832 892 """Emit content for version 3 of a streaming clone.
833 893
834 894 the data stream consists the following:
835 895 1) A varint E containing the number of entries (can be 0), then E entries follow
836 896 2) For each entry:
837 897 2.1) The number of files in this entry (can be 0, but typically 1 or 2)
838 898 2.2) For each file:
839 899 2.2.1) A char representing the file destination (eg: store or cache)
840 900 2.2.2) A varint N containing the length of the filename
841 901 2.2.3) A varint M containing the length of file data
842 902 2.2.4) N bytes containing the filename (the internal, store-agnostic form)
843 903 2.2.5) M bytes containing the file data
844 904
845 905 Returns the data iterator.
846 906
847 907 XXX This format is experimental and subject to change. Here is a
848 908 XXX non-exhaustive list of things this format could do or change:
849 909
850 910 - making it easier to write files in parallel
851 911 - holding the lock for a shorter time
852 912 - improving progress information
853 913 - ways to adjust the number of expected entries/files ?
854 914 """
855 915
856 916 # Python is getting crazy at all the small container we creates while
857 917 # considering the files to preserve, disabling the gc while we do so helps
858 918 # performance a lot.
859 919 with repo.lock(), util.nogc():
860 920 repo.ui.debug(b'scanning\n')
861 921
862 922 entries = _entries_walk(
863 923 repo,
864 924 includes=includes,
865 925 excludes=excludes,
866 926 includeobsmarkers=includeobsmarkers,
867 927 )
868 928 chunks = _emit3(repo, list(entries))
869 929 first = next(chunks)
870 930 assert first is None
871 931 _test_sync_point_walk_1(repo)
872 932 _test_sync_point_walk_2(repo)
873 933
874 934 return chunks
875 935
876 936
877 937 @contextlib.contextmanager
878 938 def nested(*ctxs):
879 939 this = ctxs[0]
880 940 rest = ctxs[1:]
881 941 with this:
882 942 if rest:
883 943 with nested(*rest):
884 944 yield
885 945 else:
886 946 yield
887 947
888 948
889 949 def consumev2(repo, fp, filecount, filesize):
890 950 """Apply the contents from a version 2 streaming clone.
891 951
892 952 Data is read from an object that only needs to provide a ``read(size)``
893 953 method.
894 954 """
895 955 with repo.lock():
896 956 repo.ui.status(
897 957 _(b'%d files to transfer, %s of data\n')
898 958 % (filecount, util.bytecount(filesize))
899 959 )
900 960
901 961 start = util.timer()
902 962 progress = repo.ui.makeprogress(
903 963 _(b'clone'), total=filesize, unit=_(b'bytes')
904 964 )
905 965 progress.update(0)
906 966
907 967 vfsmap = _makemap(repo)
908 968 # we keep repo.vfs out of the on purpose, ther are too many danger
909 969 # there (eg: .hg/hgrc),
910 970 #
911 971 # this assert is duplicated (from _makemap) as author might think this
912 972 # is fine, while this is really not fine.
913 973 if repo.vfs in vfsmap.values():
914 974 raise error.ProgrammingError(
915 975 b'repo.vfs must not be added to vfsmap for security reasons'
916 976 )
917 977
918 978 with repo.transaction(b'clone'):
919 979 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
920 980 with nested(*ctxs):
921 981 for i in range(filecount):
922 982 src = util.readexactly(fp, 1)
923 983 vfs = vfsmap[src]
924 984 namelen = util.uvarintdecodestream(fp)
925 985 datalen = util.uvarintdecodestream(fp)
926 986
927 987 name = util.readexactly(fp, namelen)
928 988
929 989 if repo.ui.debugflag:
930 990 repo.ui.debug(
931 991 b'adding [%s] %s (%s)\n'
932 992 % (src, name, util.bytecount(datalen))
933 993 )
934 994
935 995 with vfs(name, b'w') as ofp:
936 996 for chunk in util.filechunkiter(fp, limit=datalen):
937 997 progress.increment(step=len(chunk))
938 998 ofp.write(chunk)
939 999
940 1000 # force @filecache properties to be reloaded from
941 1001 # streamclone-ed file at next access
942 1002 repo.invalidate(clearfilecache=True)
943 1003
944 1004 elapsed = util.timer() - start
945 1005 if elapsed <= 0:
946 1006 elapsed = 0.001
947 1007 repo.ui.status(
948 1008 _(b'transferred %s in %.1f seconds (%s/sec)\n')
949 1009 % (
950 1010 util.bytecount(progress.pos),
951 1011 elapsed,
952 1012 util.bytecount(progress.pos / elapsed),
953 1013 )
954 1014 )
955 1015 progress.complete()
956 1016
957 1017
958 1018 def consumev3(repo, fp):
959 1019 """Apply the contents from a version 3 streaming clone.
960 1020
961 1021 Data is read from an object that only needs to provide a ``read(size)``
962 1022 method.
963 1023 """
964 1024 with repo.lock():
965 1025 start = util.timer()
966 1026
967 1027 entrycount = util.uvarintdecodestream(fp)
968 1028 repo.ui.status(_(b'%d entries to transfer\n') % (entrycount))
969 1029
970 1030 progress = repo.ui.makeprogress(
971 1031 _(b'clone'),
972 1032 total=entrycount,
973 1033 unit=_(b'entries'),
974 1034 )
975 1035 progress.update(0)
976 1036 bytes_transferred = 0
977 1037
978 1038 vfsmap = _makemap(repo)
979 1039 # we keep repo.vfs out of the on purpose, there are too many dangers
980 1040 # there (eg: .hg/hgrc),
981 1041 #
982 1042 # this assert is duplicated (from _makemap) as authors might think this
983 1043 # is fine, while this is really not fine.
984 1044 if repo.vfs in vfsmap.values():
985 1045 raise error.ProgrammingError(
986 1046 b'repo.vfs must not be added to vfsmap for security reasons'
987 1047 )
988 1048
989 1049 with repo.transaction(b'clone'):
990 1050 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
991 1051 with nested(*ctxs):
992 1052 for i in range(entrycount):
993 1053 filecount = util.uvarintdecodestream(fp)
994 1054 if filecount == 0:
995 1055 if repo.ui.debugflag:
996 1056 repo.ui.debug(b'entry with no files [%d]\n' % (i))
997 1057 for i in range(filecount):
998 1058 src = util.readexactly(fp, 1)
999 1059 vfs = vfsmap[src]
1000 1060 namelen = util.uvarintdecodestream(fp)
1001 1061 datalen = util.uvarintdecodestream(fp)
1002 1062
1003 1063 name = util.readexactly(fp, namelen)
1004 1064
1005 1065 if repo.ui.debugflag:
1006 1066 msg = b'adding [%s] %s (%s)\n'
1007 1067 msg %= (src, name, util.bytecount(datalen))
1008 1068 repo.ui.debug(msg)
1009 1069 bytes_transferred += datalen
1010 1070
1011 1071 with vfs(name, b'w') as ofp:
1012 1072 for chunk in util.filechunkiter(fp, limit=datalen):
1013 1073 ofp.write(chunk)
1014 1074 progress.increment(step=1)
1015 1075
1016 1076 # force @filecache properties to be reloaded from
1017 1077 # streamclone-ed file at next access
1018 1078 repo.invalidate(clearfilecache=True)
1019 1079
1020 1080 elapsed = util.timer() - start
1021 1081 if elapsed <= 0:
1022 1082 elapsed = 0.001
1023 1083 msg = _(b'transferred %s in %.1f seconds (%s/sec)\n')
1024 1084 byte_count = util.bytecount(bytes_transferred)
1025 1085 bytes_sec = util.bytecount(bytes_transferred / elapsed)
1026 1086 msg %= (byte_count, elapsed, bytes_sec)
1027 1087 repo.ui.status(msg)
1028 1088 progress.complete()
1029 1089
1030 1090
1031 1091 def applybundlev2(repo, fp, filecount, filesize, requirements):
1032 1092 from . import localrepo
1033 1093
1034 1094 missingreqs = [r for r in requirements if r not in repo.supported]
1035 1095 if missingreqs:
1036 1096 raise error.Abort(
1037 1097 _(b'unable to apply stream clone: unsupported format: %s')
1038 1098 % b', '.join(sorted(missingreqs))
1039 1099 )
1040 1100
1041 1101 consumev2(repo, fp, filecount, filesize)
1042 1102
1043 1103 repo.requirements = new_stream_clone_requirements(
1044 1104 repo.requirements,
1045 1105 requirements,
1046 1106 )
1047 1107 repo.svfs.options = localrepo.resolvestorevfsoptions(
1048 1108 repo.ui, repo.requirements, repo.features
1049 1109 )
1050 1110 scmutil.writereporequirements(repo)
1051 1111 nodemap.post_stream_cleanup(repo)
1052 1112
1053 1113
1054 1114 def applybundlev3(repo, fp, requirements):
1055 1115 from . import localrepo
1056 1116
1057 1117 missingreqs = [r for r in requirements if r not in repo.supported]
1058 1118 if missingreqs:
1059 1119 msg = _(b'unable to apply stream clone: unsupported format: %s')
1060 1120 msg %= b', '.join(sorted(missingreqs))
1061 1121 raise error.Abort(msg)
1062 1122
1063 1123 consumev3(repo, fp)
1064 1124
1065 1125 repo.requirements = new_stream_clone_requirements(
1066 1126 repo.requirements,
1067 1127 requirements,
1068 1128 )
1069 1129 repo.svfs.options = localrepo.resolvestorevfsoptions(
1070 1130 repo.ui, repo.requirements, repo.features
1071 1131 )
1072 1132 scmutil.writereporequirements(repo)
1073 1133 nodemap.post_stream_cleanup(repo)
1074 1134
1075 1135
1076 1136 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
1077 1137 hardlink = [True]
1078 1138
1079 1139 def copy_used():
1080 1140 hardlink[0] = False
1081 1141 progress.topic = _(b'copying')
1082 1142
1083 1143 for k, path in entries:
1084 1144 src_vfs = src_vfs_map[k]
1085 1145 dst_vfs = dst_vfs_map[k]
1086 1146 src_path = src_vfs.join(path)
1087 1147 dst_path = dst_vfs.join(path)
1088 1148 # We cannot use dirname and makedirs of dst_vfs here because the store
1089 1149 # encoding confuses them. See issue 6581 for details.
1090 1150 dirname = os.path.dirname(dst_path)
1091 1151 if not os.path.exists(dirname):
1092 1152 util.makedirs(dirname)
1093 1153 dst_vfs.register_file(path)
1094 1154 # XXX we could use the #nb_bytes argument.
1095 1155 util.copyfile(
1096 1156 src_path,
1097 1157 dst_path,
1098 1158 hardlink=hardlink[0],
1099 1159 no_hardlink_cb=copy_used,
1100 1160 check_fs_hardlink=False,
1101 1161 )
1102 1162 progress.increment()
1103 1163 return hardlink[0]
1104 1164
1105 1165
1106 1166 def local_copy(src_repo, dest_repo):
1107 1167 """copy all content from one local repository to another
1108 1168
1109 1169 This is useful for local clone"""
1110 1170 src_store_requirements = {
1111 1171 r
1112 1172 for r in src_repo.requirements
1113 1173 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
1114 1174 }
1115 1175 dest_store_requirements = {
1116 1176 r
1117 1177 for r in dest_repo.requirements
1118 1178 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
1119 1179 }
1120 1180 assert src_store_requirements == dest_store_requirements
1121 1181
1122 1182 with dest_repo.lock():
1123 1183 with src_repo.lock():
1124 1184 # bookmark is not integrated to the streaming as it might use the
1125 1185 # `repo.vfs` and they are too many sentitive data accessible
1126 1186 # through `repo.vfs` to expose it to streaming clone.
1127 1187 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
1128 1188 srcbookmarks = src_book_vfs.join(b'bookmarks')
1129 1189 bm_count = 0
1130 1190 if os.path.exists(srcbookmarks):
1131 1191 bm_count = 1
1132 1192
1133 1193 entries = _entries_walk(
1134 1194 src_repo,
1135 1195 includes=None,
1136 1196 excludes=None,
1137 1197 includeobsmarkers=True,
1138 1198 )
1139 1199 entries = list(entries)
1140 1200 src_vfs_map = _makemap(src_repo)
1141 1201 dest_vfs_map = _makemap(dest_repo)
1142 1202 total_files = sum(len(e[1].files()) for e in entries) + bm_count
1143 1203 progress = src_repo.ui.makeprogress(
1144 1204 topic=_(b'linking'),
1145 1205 total=total_files,
1146 1206 unit=_(b'files'),
1147 1207 )
1148 1208 # copy files
1149 1209 #
1150 1210 # We could copy the full file while the source repository is locked
1151 1211 # and the other one without the lock. However, in the linking case,
1152 1212 # this would also requires checks that nobody is appending any data
1153 1213 # to the files while we do the clone, so this is not done yet. We
1154 1214 # could do this blindly when copying files.
1155 1215 files = [
1156 1216 (vfs_key, f.unencoded_path)
1157 1217 for vfs_key, e in entries
1158 1218 for f in e.files()
1159 1219 ]
1160 1220 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
1161 1221
1162 1222 # copy bookmarks over
1163 1223 if bm_count:
1164 1224 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
1165 1225 dstbookmarks = dst_book_vfs.join(b'bookmarks')
1166 1226 util.copyfile(srcbookmarks, dstbookmarks)
1167 1227 progress.complete()
1168 1228 if hardlink:
1169 1229 msg = b'linked %d files\n'
1170 1230 else:
1171 1231 msg = b'copied %d files\n'
1172 1232 src_repo.ui.debug(msg % total_files)
1173 1233
1174 1234 with dest_repo.transaction(b"localclone") as tr:
1175 1235 dest_repo.store.write(tr)
1176 1236
1177 1237 # clean up transaction file as they do not make sense
1178 1238 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
General Comments 0
You need to be logged in to leave comments. Login now