##// END OF EJS Templates
changelog-delay: move the delay/divert logic inside the (inner) revlog...
marmoute -
r51999:d83d7885 default
parent child Browse files
Show More
@@ -27,7 +27,6 b' from .utils import ('
27 27 from .revlogutils import (
28 28 constants as revlog_constants,
29 29 flagutil,
30 randomaccessfile,
31 30 )
32 31
33 32 _defaultextra = {b'branch': b'default'}
@@ -92,38 +91,6 b' def stripdesc(desc):'
92 91 return b'\n'.join([l.rstrip() for l in desc.splitlines()]).strip(b'\n')
93 92
94 93
95 class _divertopener:
96 def __init__(self, opener, target):
97 self._opener = opener
98 self._target = target
99
100 def __call__(self, name, mode=b'r', checkambig=False, **kwargs):
101 if name != self._target:
102 return self._opener(name, mode, **kwargs)
103 return self._opener(name + b".a", mode, **kwargs)
104
105 def __getattr__(self, attr):
106 return getattr(self._opener, attr)
107
108
109 class _delayopener:
110 """build an opener that stores chunks in 'buf' instead of 'target'"""
111
112 def __init__(self, opener, target, buf):
113 self._opener = opener
114 self._target = target
115 self._buf = buf
116
117 def __call__(self, name, mode=b'r', checkambig=False, **kwargs):
118 if name != self._target:
119 return self._opener(name, mode, **kwargs)
120 assert not kwargs
121 return randomaccessfile.appender(self._opener, name, mode, self._buf)
122
123 def __getattr__(self, attr):
124 return getattr(self._opener, attr)
125
126
127 94 @attr.s
128 95 class _changelogrevision:
129 96 # Extensions might modify _defaultextra, so let the constructor below pass
@@ -354,10 +321,7 b' class changelog(revlog.revlog):'
354 321 # chains.
355 322 self._storedeltachains = False
356 323
357 self._realopener = opener
358 self._delayed = False
359 self._delaybuf = None
360 self._divert = False
324 self._v2_delayed = False
361 325 self._filteredrevs = frozenset()
362 326 self._filteredrevs_hashcache = {}
363 327 self._copiesstorage = opener.options.get(b'copies-storage')
@@ -374,90 +338,47 b' class changelog(revlog.revlog):'
374 338 self._filteredrevs_hashcache = {}
375 339
376 340 def _write_docket(self, tr):
377 if not self.is_delaying:
341 if not self._v2_delayed:
378 342 super(changelog, self)._write_docket(tr)
379 343
380 @property
381 def is_delaying(self):
382 return self._delayed
383
384 344 def delayupdate(self, tr):
385 345 """delay visibility of index updates to other readers"""
386 346 assert not self._inner.is_open
387 if self._docket is None and not self.is_delaying:
388 if len(self) == 0:
389 self._divert = True
390 if self._realopener.exists(self._indexfile + b'.a'):
391 self._realopener.unlink(self._indexfile + b'.a')
392 self.opener = _divertopener(self._realopener, self._indexfile)
393 else:
394 self._delaybuf = []
395 self.opener = _delayopener(
396 self._realopener, self._indexfile, self._delaybuf
397 )
398 self._inner.opener = self.opener
399 self._inner._segmentfile.opener = self.opener
400 self._inner._segmentfile_sidedata.opener = self.opener
401 self._delayed = True
347 if self._docket is not None:
348 self._v2_delayed = True
349 else:
350 new_index = self._inner.delay()
351 if new_index is not None:
352 self._indexfile = new_index
353 tr.registertmp(new_index)
402 354 tr.addpending(b'cl-%i' % id(self), self._writepending)
403 355 tr.addfinalize(b'cl-%i' % id(self), self._finalize)
404 356
405 357 def _finalize(self, tr):
406 358 """finalize index updates"""
407 359 assert not self._inner.is_open
408 self._delayed = False
409 self.opener = self._realopener
410 self._inner.opener = self.opener
411 self._inner._segmentfile.opener = self.opener
412 self._inner._segmentfile_sidedata.opener = self.opener
413 # move redirected index data back into place
414 360 if self._docket is not None:
415 self._write_docket(tr)
416 elif self._divert:
417 assert not self._delaybuf
418 tmpname = self._indexfile + b".a"
419 nfile = self.opener.open(tmpname)
420 nfile.close()
421 self.opener.rename(tmpname, self._indexfile, checkambig=True)
422 elif self._delaybuf:
423 fp = self.opener(self._indexfile, b'a', checkambig=True)
424 fp.write(b"".join(self._delaybuf))
425 fp.close()
426 self._delaybuf = None
427 self._divert = False
428 # split when we're done
429 self._enforceinlinesize(tr, side_write=False)
361 self._docket.write(tr)
362 self._v2_delayed = False
363 else:
364 new_index_file = self._inner.finalize_pending()
365 self._indexfile = new_index_file
366 # split when we're done
367 self._enforceinlinesize(tr, side_write=False)
430 368
431 369 def _writepending(self, tr):
432 370 """create a file containing the unfinalized state for
433 371 pretxnchangegroup"""
434 372 assert not self._inner.is_open
435 373 if self._docket:
436 return self._docket.write(tr, pending=True)
437 if self._delaybuf:
438 # make a temporary copy of the index
439 fp1 = self._realopener(self._indexfile)
440 pendingfilename = self._indexfile + b".a"
441 # register as a temp file to ensure cleanup on failure
442 tr.registertmp(pendingfilename)
443 # write existing data
444 fp2 = self._realopener(pendingfilename, b"w")
445 fp2.write(fp1.read())
446 # add pending data
447 fp2.write(b"".join(self._delaybuf))
448 fp2.close()
449 # switch modes so finalize can simply rename
450 self._delaybuf = None
451 self._divert = True
452 self.opener = _divertopener(self._realopener, self._indexfile)
453 self._inner.opener = self.opener
454 self._inner._segmentfile.opener = self.opener
455 self._inner._segmentfile_sidedata.opener = self.opener
456
457 if self._divert:
458 return True
459
460 return False
374 any_pending = self._docket.write(tr, pending=True)
375 self._v2_delayed = False
376 else:
377 new_index, any_pending = self._inner.write_pending()
378 if new_index is not None:
379 self._indexfile = new_index
380 tr.registertmp(new_index)
381 return any_pending
461 382
462 383 def _enforceinlinesize(self, tr, side_write=True):
463 384 if not self.is_delaying:
@@ -129,7 +129,7 b' def copycache(srcrepo, destrepo):'
129 129 srcfilecache = srcrepo._filecache
130 130 if b'changelog' in srcfilecache:
131 131 destfilecache[b'changelog'] = ce = srcfilecache[b'changelog']
132 ce.obj.opener = ce.obj._realopener = destrepo.svfs
132 ce.obj.opener = ce.obj._inner.opener = destrepo.svfs
133 133 if b'obsstore' in srcfilecache:
134 134 destfilecache[b'obsstore'] = ce = srcfilecache[b'obsstore']
135 135 ce.obj.svfs = destrepo.svfs
@@ -369,6 +369,9 b' class _InnerRevlog:'
369 369 self.delta_config = delta_config
370 370 self.feature_config = feature_config
371 371
372 # used during diverted write.
373 self._orig_index_file = None
374
372 375 self._default_compression_header = default_compression_header
373 376
374 377 # index
@@ -393,6 +396,8 b' class _InnerRevlog:'
393 396 # 3-tuple of (node, rev, text) for a raw revision.
394 397 self._revisioncache = None
395 398
399 self._delay_buffer = None
400
396 401 @property
397 402 def index_file(self):
398 403 return self.__index_file
@@ -407,14 +412,27 b' class _InnerRevlog:'
407 412 return len(self.index)
408 413
409 414 def clear_cache(self):
415 assert not self.is_delaying
410 416 self._revisioncache = None
411 417 self._segmentfile.clear_cache()
412 418 self._segmentfile_sidedata.clear_cache()
413 419
414 420 @property
415 421 def canonical_index_file(self):
422 if self._orig_index_file is not None:
423 return self._orig_index_file
416 424 return self.index_file
417 425
426 @property
427 def is_delaying(self):
428 """is the revlog is currently delaying the visibility of written data?
429
430 The delaying mechanism can be either in-memory or written on disk in a
431 side-file."""
432 return (self._delay_buffer is not None) or (
433 self._orig_index_file is not None
434 )
435
418 436 # Derived from index values.
419 437
420 438 def start(self, rev):
@@ -700,22 +718,36 b' class _InnerRevlog:'
700 718 You should not use this directly and use `_writing` instead
701 719 """
702 720 try:
703 f = self.opener(
704 self.index_file,
705 mode=b"r+",
706 checkambig=self.data_config.check_ambig,
707 )
721 if self._delay_buffer is None:
722 f = self.opener(
723 self.index_file,
724 mode=b"r+",
725 checkambig=self.data_config.check_ambig,
726 )
727 else:
728 # check_ambig affect we way we open file for writing, however
729 # here, we do not actually open a file for writting as write
730 # will appened to a delay_buffer. So check_ambig is not
731 # meaningful and unneeded here.
732 f = randomaccessfile.appender(
733 self.opener, self.index_file, b"r+", self._delay_buffer
734 )
708 735 if index_end is None:
709 736 f.seek(0, os.SEEK_END)
710 737 else:
711 738 f.seek(index_end, os.SEEK_SET)
712 739 return f
713 740 except FileNotFoundError:
714 return self.opener(
715 self.index_file,
716 mode=b"w+",
717 checkambig=self.data_config.check_ambig,
718 )
741 if self._delay_buffer is None:
742 return self.opener(
743 self.index_file,
744 mode=b"w+",
745 checkambig=self.data_config.check_ambig,
746 )
747 else:
748 return randomaccessfile.appender(
749 self.opener, self.index_file, b"w+", self._delay_buffer
750 )
719 751
720 752 def __index_new_fp(self):
721 753 """internal method to create a new index file for writing
@@ -1044,20 +1076,101 b' class _InnerRevlog:'
1044 1076 dfh.write(data[1])
1045 1077 if sidedata:
1046 1078 sdfh.write(sidedata)
1047 ifh.write(entry)
1079 if self._delay_buffer is None:
1080 ifh.write(entry)
1081 else:
1082 self._delay_buffer.append(entry)
1048 1083 else:
1049 1084 offset += curr * self.index.entry_size
1050 1085 transaction.add(self.canonical_index_file, offset)
1051 ifh.write(entry)
1052 ifh.write(data[0])
1053 ifh.write(data[1])
1054 1086 assert not sidedata
1087 if self._delay_buffer is None:
1088 ifh.write(entry)
1089 ifh.write(data[0])
1090 ifh.write(data[1])
1091 else:
1092 self._delay_buffer.append(entry)
1093 self._delay_buffer.append(data[0])
1094 self._delay_buffer.append(data[1])
1055 1095 return (
1056 1096 ifh.tell(),
1057 1097 dfh.tell() if dfh else None,
1058 1098 sdfh.tell() if sdfh else None,
1059 1099 )
1060 1100
1101 def _divert_index(self):
1102 return self.index_file + b'.a'
1103
1104 def delay(self):
1105 assert not self.is_open
1106 if self._delay_buffer is not None or self._orig_index_file is not None:
1107 # delay or divert already in place
1108 return None
1109 elif len(self.index) == 0:
1110 self._orig_index_file = self.index_file
1111 self.index_file = self._divert_index()
1112 self._segmentfile.filename = self.index_file
1113 assert self._orig_index_file is not None
1114 assert self.index_file is not None
1115 if self.opener.exists(self.index_file):
1116 self.opener.unlink(self.index_file)
1117 return self.index_file
1118 else:
1119 self._segmentfile._delay_buffer = self._delay_buffer = []
1120 return None
1121
1122 def write_pending(self):
1123 assert not self.is_open
1124 if self._orig_index_file is not None:
1125 return None, True
1126 any_pending = False
1127 pending_index_file = self._divert_index()
1128 if self.opener.exists(pending_index_file):
1129 self.opener.unlink(pending_index_file)
1130 util.copyfile(
1131 self.opener.join(self.index_file),
1132 self.opener.join(pending_index_file),
1133 )
1134 if self._delay_buffer:
1135 with self.opener(pending_index_file, b'r+') as ifh:
1136 ifh.seek(0, os.SEEK_END)
1137 ifh.write(b"".join(self._delay_buffer))
1138 any_pending = True
1139 self._segmentfile._delay_buffer = self._delay_buffer = None
1140 self._orig_index_file = self.index_file
1141 self.index_file = pending_index_file
1142 self._segmentfile.filename = self.index_file
1143 return self.index_file, any_pending
1144
1145 def finalize_pending(self):
1146 assert not self.is_open
1147
1148 delay = self._delay_buffer is not None
1149 divert = self._orig_index_file is not None
1150
1151 if delay and divert:
1152 assert False, "unreachable"
1153 elif delay:
1154 if self._delay_buffer:
1155 with self.opener(self.index_file, b'r+') as ifh:
1156 ifh.seek(0, os.SEEK_END)
1157 ifh.write(b"".join(self._delay_buffer))
1158 self._segmentfile._delay_buffer = self._delay_buffer = None
1159 elif divert:
1160 if self.opener.exists(self.index_file):
1161 self.opener.rename(
1162 self.index_file,
1163 self._orig_index_file,
1164 checkambig=True,
1165 )
1166 self.index_file = self._orig_index_file
1167 self._orig_index_file = None
1168 self._segmentfile.filename = self.index_file
1169 else:
1170 msg = b"not delay or divert found on this revlog"
1171 raise error.ProgrammingError(msg)
1172 return self.canonical_index_file
1173
1061 1174
1062 1175 class revlog:
1063 1176 """
@@ -2925,6 +3038,10 b' class revlog:'
2925 3038 if self._docket is not None:
2926 3039 self._write_docket(transaction)
2927 3040
3041 @property
3042 def is_delaying(self):
3043 return self._inner.is_delaying
3044
2928 3045 def _write_docket(self, transaction):
2929 3046 """write the current docket on disk
2930 3047
@@ -116,6 +116,8 b' class randomaccessfile:'
116 116 if initial_cache:
117 117 self._cached_chunk_position, self._cached_chunk = initial_cache
118 118
119 self._delay_buffer = None
120
119 121 def clear_cache(self):
120 122 self._cached_chunk = b''
121 123 self._cached_chunk_position = 0
@@ -131,7 +133,12 b' class randomaccessfile:'
131 133
132 134 def _open(self, mode=b'r'):
133 135 """Return a file object"""
134 return self.opener(self.filename, mode=mode)
136 if self._delay_buffer is None:
137 return self.opener(self.filename, mode=mode)
138 else:
139 return appender(
140 self.opener, self.filename, mode, self._delay_buffer
141 )
135 142
136 143 @contextlib.contextmanager
137 144 def _read_handle(self):
@@ -1042,8 +1042,6 b' Verify the global server.bundle1 option '
1042 1042 adding changesets
1043 1043 remote: abort: incompatible Mercurial client; bundle2 required
1044 1044 remote: (see https://www.mercurial-scm.org/wiki/IncompatibleClient)
1045 transaction abort!
1046 rollback completed
1047 1045 abort: stream ended unexpectedly (got 0 bytes, expected 4)
1048 1046 [255]
1049 1047
@@ -725,8 +725,6 b' Server stops sending after bundle2 part '
725 725 $ hg clone http://localhost:$HGPORT/ clone
726 726 requesting all changes
727 727 adding changesets
728 transaction abort!
729 rollback completed
730 728 abort: HTTP request error (incomplete response)
731 729 (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
732 730 [255]
@@ -759,8 +757,6 b' Server stops after bundle2 part payload '
759 757 $ hg clone http://localhost:$HGPORT/ clone
760 758 requesting all changes
761 759 adding changesets
762 transaction abort!
763 rollback completed
764 760 abort: HTTP request error (incomplete response*) (glob)
765 761 (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
766 762 [255]
@@ -795,8 +791,6 b' Server stops sending in middle of bundle'
795 791 $ hg clone http://localhost:$HGPORT/ clone
796 792 requesting all changes
797 793 adding changesets
798 transaction abort!
799 rollback completed
800 794 abort: HTTP request error (incomplete response)
801 795 (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
802 796 [255]
General Comments 0
You need to be logged in to leave comments. Login now