##// END OF EJS Templates
changelog-delay: move the delay/divert logic inside the (inner) revlog...
marmoute -
r51999:d83d7885 default
parent child Browse files
Show More
@@ -27,7 +27,6 b' from .utils import ('
27 from .revlogutils import (
27 from .revlogutils import (
28 constants as revlog_constants,
28 constants as revlog_constants,
29 flagutil,
29 flagutil,
30 randomaccessfile,
31 )
30 )
32
31
33 _defaultextra = {b'branch': b'default'}
32 _defaultextra = {b'branch': b'default'}
@@ -92,38 +91,6 b' def stripdesc(desc):'
92 return b'\n'.join([l.rstrip() for l in desc.splitlines()]).strip(b'\n')
91 return b'\n'.join([l.rstrip() for l in desc.splitlines()]).strip(b'\n')
93
92
94
93
95 class _divertopener:
96 def __init__(self, opener, target):
97 self._opener = opener
98 self._target = target
99
100 def __call__(self, name, mode=b'r', checkambig=False, **kwargs):
101 if name != self._target:
102 return self._opener(name, mode, **kwargs)
103 return self._opener(name + b".a", mode, **kwargs)
104
105 def __getattr__(self, attr):
106 return getattr(self._opener, attr)
107
108
109 class _delayopener:
110 """build an opener that stores chunks in 'buf' instead of 'target'"""
111
112 def __init__(self, opener, target, buf):
113 self._opener = opener
114 self._target = target
115 self._buf = buf
116
117 def __call__(self, name, mode=b'r', checkambig=False, **kwargs):
118 if name != self._target:
119 return self._opener(name, mode, **kwargs)
120 assert not kwargs
121 return randomaccessfile.appender(self._opener, name, mode, self._buf)
122
123 def __getattr__(self, attr):
124 return getattr(self._opener, attr)
125
126
127 @attr.s
94 @attr.s
128 class _changelogrevision:
95 class _changelogrevision:
129 # Extensions might modify _defaultextra, so let the constructor below pass
96 # Extensions might modify _defaultextra, so let the constructor below pass
@@ -354,10 +321,7 b' class changelog(revlog.revlog):'
354 # chains.
321 # chains.
355 self._storedeltachains = False
322 self._storedeltachains = False
356
323
357 self._realopener = opener
324 self._v2_delayed = False
358 self._delayed = False
359 self._delaybuf = None
360 self._divert = False
361 self._filteredrevs = frozenset()
325 self._filteredrevs = frozenset()
362 self._filteredrevs_hashcache = {}
326 self._filteredrevs_hashcache = {}
363 self._copiesstorage = opener.options.get(b'copies-storage')
327 self._copiesstorage = opener.options.get(b'copies-storage')
@@ -374,90 +338,47 b' class changelog(revlog.revlog):'
374 self._filteredrevs_hashcache = {}
338 self._filteredrevs_hashcache = {}
375
339
376 def _write_docket(self, tr):
340 def _write_docket(self, tr):
377 if not self.is_delaying:
341 if not self._v2_delayed:
378 super(changelog, self)._write_docket(tr)
342 super(changelog, self)._write_docket(tr)
379
343
380 @property
381 def is_delaying(self):
382 return self._delayed
383
384 def delayupdate(self, tr):
344 def delayupdate(self, tr):
385 """delay visibility of index updates to other readers"""
345 """delay visibility of index updates to other readers"""
386 assert not self._inner.is_open
346 assert not self._inner.is_open
387 if self._docket is None and not self.is_delaying:
347 if self._docket is not None:
388 if len(self) == 0:
348 self._v2_delayed = True
389 self._divert = True
349 else:
390 if self._realopener.exists(self._indexfile + b'.a'):
350 new_index = self._inner.delay()
391 self._realopener.unlink(self._indexfile + b'.a')
351 if new_index is not None:
392 self.opener = _divertopener(self._realopener, self._indexfile)
352 self._indexfile = new_index
393 else:
353 tr.registertmp(new_index)
394 self._delaybuf = []
395 self.opener = _delayopener(
396 self._realopener, self._indexfile, self._delaybuf
397 )
398 self._inner.opener = self.opener
399 self._inner._segmentfile.opener = self.opener
400 self._inner._segmentfile_sidedata.opener = self.opener
401 self._delayed = True
402 tr.addpending(b'cl-%i' % id(self), self._writepending)
354 tr.addpending(b'cl-%i' % id(self), self._writepending)
403 tr.addfinalize(b'cl-%i' % id(self), self._finalize)
355 tr.addfinalize(b'cl-%i' % id(self), self._finalize)
404
356
405 def _finalize(self, tr):
357 def _finalize(self, tr):
406 """finalize index updates"""
358 """finalize index updates"""
407 assert not self._inner.is_open
359 assert not self._inner.is_open
408 self._delayed = False
409 self.opener = self._realopener
410 self._inner.opener = self.opener
411 self._inner._segmentfile.opener = self.opener
412 self._inner._segmentfile_sidedata.opener = self.opener
413 # move redirected index data back into place
414 if self._docket is not None:
360 if self._docket is not None:
415 self._write_docket(tr)
361 self._docket.write(tr)
416 elif self._divert:
362 self._v2_delayed = False
417 assert not self._delaybuf
363 else:
418 tmpname = self._indexfile + b".a"
364 new_index_file = self._inner.finalize_pending()
419 nfile = self.opener.open(tmpname)
365 self._indexfile = new_index_file
420 nfile.close()
366 # split when we're done
421 self.opener.rename(tmpname, self._indexfile, checkambig=True)
367 self._enforceinlinesize(tr, side_write=False)
422 elif self._delaybuf:
423 fp = self.opener(self._indexfile, b'a', checkambig=True)
424 fp.write(b"".join(self._delaybuf))
425 fp.close()
426 self._delaybuf = None
427 self._divert = False
428 # split when we're done
429 self._enforceinlinesize(tr, side_write=False)
430
368
431 def _writepending(self, tr):
369 def _writepending(self, tr):
432 """create a file containing the unfinalized state for
370 """create a file containing the unfinalized state for
433 pretxnchangegroup"""
371 pretxnchangegroup"""
434 assert not self._inner.is_open
372 assert not self._inner.is_open
435 if self._docket:
373 if self._docket:
436 return self._docket.write(tr, pending=True)
374 any_pending = self._docket.write(tr, pending=True)
437 if self._delaybuf:
375 self._v2_delayed = False
438 # make a temporary copy of the index
376 else:
439 fp1 = self._realopener(self._indexfile)
377 new_index, any_pending = self._inner.write_pending()
440 pendingfilename = self._indexfile + b".a"
378 if new_index is not None:
441 # register as a temp file to ensure cleanup on failure
379 self._indexfile = new_index
442 tr.registertmp(pendingfilename)
380 tr.registertmp(new_index)
443 # write existing data
381 return any_pending
444 fp2 = self._realopener(pendingfilename, b"w")
445 fp2.write(fp1.read())
446 # add pending data
447 fp2.write(b"".join(self._delaybuf))
448 fp2.close()
449 # switch modes so finalize can simply rename
450 self._delaybuf = None
451 self._divert = True
452 self.opener = _divertopener(self._realopener, self._indexfile)
453 self._inner.opener = self.opener
454 self._inner._segmentfile.opener = self.opener
455 self._inner._segmentfile_sidedata.opener = self.opener
456
457 if self._divert:
458 return True
459
460 return False
461
382
462 def _enforceinlinesize(self, tr, side_write=True):
383 def _enforceinlinesize(self, tr, side_write=True):
463 if not self.is_delaying:
384 if not self.is_delaying:
@@ -129,7 +129,7 b' def copycache(srcrepo, destrepo):'
129 srcfilecache = srcrepo._filecache
129 srcfilecache = srcrepo._filecache
130 if b'changelog' in srcfilecache:
130 if b'changelog' in srcfilecache:
131 destfilecache[b'changelog'] = ce = srcfilecache[b'changelog']
131 destfilecache[b'changelog'] = ce = srcfilecache[b'changelog']
132 ce.obj.opener = ce.obj._realopener = destrepo.svfs
132 ce.obj.opener = ce.obj._inner.opener = destrepo.svfs
133 if b'obsstore' in srcfilecache:
133 if b'obsstore' in srcfilecache:
134 destfilecache[b'obsstore'] = ce = srcfilecache[b'obsstore']
134 destfilecache[b'obsstore'] = ce = srcfilecache[b'obsstore']
135 ce.obj.svfs = destrepo.svfs
135 ce.obj.svfs = destrepo.svfs
@@ -369,6 +369,9 b' class _InnerRevlog:'
369 self.delta_config = delta_config
369 self.delta_config = delta_config
370 self.feature_config = feature_config
370 self.feature_config = feature_config
371
371
372 # used during diverted write.
373 self._orig_index_file = None
374
372 self._default_compression_header = default_compression_header
375 self._default_compression_header = default_compression_header
373
376
374 # index
377 # index
@@ -393,6 +396,8 b' class _InnerRevlog:'
393 # 3-tuple of (node, rev, text) for a raw revision.
396 # 3-tuple of (node, rev, text) for a raw revision.
394 self._revisioncache = None
397 self._revisioncache = None
395
398
399 self._delay_buffer = None
400
396 @property
401 @property
397 def index_file(self):
402 def index_file(self):
398 return self.__index_file
403 return self.__index_file
@@ -407,14 +412,27 b' class _InnerRevlog:'
407 return len(self.index)
412 return len(self.index)
408
413
409 def clear_cache(self):
414 def clear_cache(self):
415 assert not self.is_delaying
410 self._revisioncache = None
416 self._revisioncache = None
411 self._segmentfile.clear_cache()
417 self._segmentfile.clear_cache()
412 self._segmentfile_sidedata.clear_cache()
418 self._segmentfile_sidedata.clear_cache()
413
419
414 @property
420 @property
415 def canonical_index_file(self):
421 def canonical_index_file(self):
422 if self._orig_index_file is not None:
423 return self._orig_index_file
416 return self.index_file
424 return self.index_file
417
425
426 @property
427 def is_delaying(self):
428 """is the revlog is currently delaying the visibility of written data?
429
430 The delaying mechanism can be either in-memory or written on disk in a
431 side-file."""
432 return (self._delay_buffer is not None) or (
433 self._orig_index_file is not None
434 )
435
418 # Derived from index values.
436 # Derived from index values.
419
437
420 def start(self, rev):
438 def start(self, rev):
@@ -700,22 +718,36 b' class _InnerRevlog:'
700 You should not use this directly and use `_writing` instead
718 You should not use this directly and use `_writing` instead
701 """
719 """
702 try:
720 try:
703 f = self.opener(
721 if self._delay_buffer is None:
704 self.index_file,
722 f = self.opener(
705 mode=b"r+",
723 self.index_file,
706 checkambig=self.data_config.check_ambig,
724 mode=b"r+",
707 )
725 checkambig=self.data_config.check_ambig,
726 )
727 else:
728 # check_ambig affect we way we open file for writing, however
729 # here, we do not actually open a file for writting as write
730 # will appened to a delay_buffer. So check_ambig is not
731 # meaningful and unneeded here.
732 f = randomaccessfile.appender(
733 self.opener, self.index_file, b"r+", self._delay_buffer
734 )
708 if index_end is None:
735 if index_end is None:
709 f.seek(0, os.SEEK_END)
736 f.seek(0, os.SEEK_END)
710 else:
737 else:
711 f.seek(index_end, os.SEEK_SET)
738 f.seek(index_end, os.SEEK_SET)
712 return f
739 return f
713 except FileNotFoundError:
740 except FileNotFoundError:
714 return self.opener(
741 if self._delay_buffer is None:
715 self.index_file,
742 return self.opener(
716 mode=b"w+",
743 self.index_file,
717 checkambig=self.data_config.check_ambig,
744 mode=b"w+",
718 )
745 checkambig=self.data_config.check_ambig,
746 )
747 else:
748 return randomaccessfile.appender(
749 self.opener, self.index_file, b"w+", self._delay_buffer
750 )
719
751
720 def __index_new_fp(self):
752 def __index_new_fp(self):
721 """internal method to create a new index file for writing
753 """internal method to create a new index file for writing
@@ -1044,20 +1076,101 b' class _InnerRevlog:'
1044 dfh.write(data[1])
1076 dfh.write(data[1])
1045 if sidedata:
1077 if sidedata:
1046 sdfh.write(sidedata)
1078 sdfh.write(sidedata)
1047 ifh.write(entry)
1079 if self._delay_buffer is None:
1080 ifh.write(entry)
1081 else:
1082 self._delay_buffer.append(entry)
1048 else:
1083 else:
1049 offset += curr * self.index.entry_size
1084 offset += curr * self.index.entry_size
1050 transaction.add(self.canonical_index_file, offset)
1085 transaction.add(self.canonical_index_file, offset)
1051 ifh.write(entry)
1052 ifh.write(data[0])
1053 ifh.write(data[1])
1054 assert not sidedata
1086 assert not sidedata
1087 if self._delay_buffer is None:
1088 ifh.write(entry)
1089 ifh.write(data[0])
1090 ifh.write(data[1])
1091 else:
1092 self._delay_buffer.append(entry)
1093 self._delay_buffer.append(data[0])
1094 self._delay_buffer.append(data[1])
1055 return (
1095 return (
1056 ifh.tell(),
1096 ifh.tell(),
1057 dfh.tell() if dfh else None,
1097 dfh.tell() if dfh else None,
1058 sdfh.tell() if sdfh else None,
1098 sdfh.tell() if sdfh else None,
1059 )
1099 )
1060
1100
1101 def _divert_index(self):
1102 return self.index_file + b'.a'
1103
1104 def delay(self):
1105 assert not self.is_open
1106 if self._delay_buffer is not None or self._orig_index_file is not None:
1107 # delay or divert already in place
1108 return None
1109 elif len(self.index) == 0:
1110 self._orig_index_file = self.index_file
1111 self.index_file = self._divert_index()
1112 self._segmentfile.filename = self.index_file
1113 assert self._orig_index_file is not None
1114 assert self.index_file is not None
1115 if self.opener.exists(self.index_file):
1116 self.opener.unlink(self.index_file)
1117 return self.index_file
1118 else:
1119 self._segmentfile._delay_buffer = self._delay_buffer = []
1120 return None
1121
1122 def write_pending(self):
1123 assert not self.is_open
1124 if self._orig_index_file is not None:
1125 return None, True
1126 any_pending = False
1127 pending_index_file = self._divert_index()
1128 if self.opener.exists(pending_index_file):
1129 self.opener.unlink(pending_index_file)
1130 util.copyfile(
1131 self.opener.join(self.index_file),
1132 self.opener.join(pending_index_file),
1133 )
1134 if self._delay_buffer:
1135 with self.opener(pending_index_file, b'r+') as ifh:
1136 ifh.seek(0, os.SEEK_END)
1137 ifh.write(b"".join(self._delay_buffer))
1138 any_pending = True
1139 self._segmentfile._delay_buffer = self._delay_buffer = None
1140 self._orig_index_file = self.index_file
1141 self.index_file = pending_index_file
1142 self._segmentfile.filename = self.index_file
1143 return self.index_file, any_pending
1144
1145 def finalize_pending(self):
1146 assert not self.is_open
1147
1148 delay = self._delay_buffer is not None
1149 divert = self._orig_index_file is not None
1150
1151 if delay and divert:
1152 assert False, "unreachable"
1153 elif delay:
1154 if self._delay_buffer:
1155 with self.opener(self.index_file, b'r+') as ifh:
1156 ifh.seek(0, os.SEEK_END)
1157 ifh.write(b"".join(self._delay_buffer))
1158 self._segmentfile._delay_buffer = self._delay_buffer = None
1159 elif divert:
1160 if self.opener.exists(self.index_file):
1161 self.opener.rename(
1162 self.index_file,
1163 self._orig_index_file,
1164 checkambig=True,
1165 )
1166 self.index_file = self._orig_index_file
1167 self._orig_index_file = None
1168 self._segmentfile.filename = self.index_file
1169 else:
1170 msg = b"not delay or divert found on this revlog"
1171 raise error.ProgrammingError(msg)
1172 return self.canonical_index_file
1173
1061
1174
1062 class revlog:
1175 class revlog:
1063 """
1176 """
@@ -2925,6 +3038,10 b' class revlog:'
2925 if self._docket is not None:
3038 if self._docket is not None:
2926 self._write_docket(transaction)
3039 self._write_docket(transaction)
2927
3040
3041 @property
3042 def is_delaying(self):
3043 return self._inner.is_delaying
3044
2928 def _write_docket(self, transaction):
3045 def _write_docket(self, transaction):
2929 """write the current docket on disk
3046 """write the current docket on disk
2930
3047
@@ -116,6 +116,8 b' class randomaccessfile:'
116 if initial_cache:
116 if initial_cache:
117 self._cached_chunk_position, self._cached_chunk = initial_cache
117 self._cached_chunk_position, self._cached_chunk = initial_cache
118
118
119 self._delay_buffer = None
120
119 def clear_cache(self):
121 def clear_cache(self):
120 self._cached_chunk = b''
122 self._cached_chunk = b''
121 self._cached_chunk_position = 0
123 self._cached_chunk_position = 0
@@ -131,7 +133,12 b' class randomaccessfile:'
131
133
132 def _open(self, mode=b'r'):
134 def _open(self, mode=b'r'):
133 """Return a file object"""
135 """Return a file object"""
134 return self.opener(self.filename, mode=mode)
136 if self._delay_buffer is None:
137 return self.opener(self.filename, mode=mode)
138 else:
139 return appender(
140 self.opener, self.filename, mode, self._delay_buffer
141 )
135
142
136 @contextlib.contextmanager
143 @contextlib.contextmanager
137 def _read_handle(self):
144 def _read_handle(self):
@@ -1042,8 +1042,6 b' Verify the global server.bundle1 option '
1042 adding changesets
1042 adding changesets
1043 remote: abort: incompatible Mercurial client; bundle2 required
1043 remote: abort: incompatible Mercurial client; bundle2 required
1044 remote: (see https://www.mercurial-scm.org/wiki/IncompatibleClient)
1044 remote: (see https://www.mercurial-scm.org/wiki/IncompatibleClient)
1045 transaction abort!
1046 rollback completed
1047 abort: stream ended unexpectedly (got 0 bytes, expected 4)
1045 abort: stream ended unexpectedly (got 0 bytes, expected 4)
1048 [255]
1046 [255]
1049
1047
@@ -725,8 +725,6 b' Server stops sending after bundle2 part '
725 $ hg clone http://localhost:$HGPORT/ clone
725 $ hg clone http://localhost:$HGPORT/ clone
726 requesting all changes
726 requesting all changes
727 adding changesets
727 adding changesets
728 transaction abort!
729 rollback completed
730 abort: HTTP request error (incomplete response)
728 abort: HTTP request error (incomplete response)
731 (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
729 (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
732 [255]
730 [255]
@@ -759,8 +757,6 b' Server stops after bundle2 part payload '
759 $ hg clone http://localhost:$HGPORT/ clone
757 $ hg clone http://localhost:$HGPORT/ clone
760 requesting all changes
758 requesting all changes
761 adding changesets
759 adding changesets
762 transaction abort!
763 rollback completed
764 abort: HTTP request error (incomplete response*) (glob)
760 abort: HTTP request error (incomplete response*) (glob)
765 (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
761 (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
766 [255]
762 [255]
@@ -795,8 +791,6 b' Server stops sending in middle of bundle'
795 $ hg clone http://localhost:$HGPORT/ clone
791 $ hg clone http://localhost:$HGPORT/ clone
796 requesting all changes
792 requesting all changes
797 adding changesets
793 adding changesets
798 transaction abort!
799 rollback completed
800 abort: HTTP request error (incomplete response)
794 abort: HTTP request error (incomplete response)
801 (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
795 (this may be an intermittent network failure; if the error persists, consider contacting the network or server operator)
802 [255]
796 [255]
General Comments 0
You need to be logged in to leave comments. Login now