##// END OF EJS Templates
undo-files: no longer pass the `repo` to `cleanup_undo_files`...
marmoute -
r51196:d89eecf9 stable
parent child Browse files
Show More
@@ -1,345 +1,345 b''
1 1 # narrowbundle2.py - bundle2 extensions for narrow repository support
2 2 #
3 3 # Copyright 2017 Google, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import struct
10 10
11 11 from mercurial.i18n import _
12 12 from mercurial import (
13 13 bundle2,
14 14 changegroup,
15 15 error,
16 16 exchange,
17 17 localrepo,
18 18 narrowspec,
19 19 repair,
20 20 requirements,
21 21 scmutil,
22 22 transaction,
23 23 util,
24 24 wireprototypes,
25 25 )
26 26
27 27 _NARROWACL_SECTION = b'narrowacl'
28 28 _CHANGESPECPART = b'narrow:changespec'
29 29 _RESSPECS = b'narrow:responsespec'
30 30 _SPECPART = b'narrow:spec'
31 31 _SPECPART_INCLUDE = b'include'
32 32 _SPECPART_EXCLUDE = b'exclude'
33 33 _KILLNODESIGNAL = b'KILL'
34 34 _DONESIGNAL = b'DONE'
35 35 _ELIDEDCSHEADER = b'>20s20s20sl' # cset id, p1, p2, len(text)
36 36 _ELIDEDMFHEADER = b'>20s20s20s20sl' # manifest id, p1, p2, link id, len(text)
37 37 _CSHEADERSIZE = struct.calcsize(_ELIDEDCSHEADER)
38 38 _MFHEADERSIZE = struct.calcsize(_ELIDEDMFHEADER)
39 39
40 40 # Serve a changegroup for a client with a narrow clone.
41 41 def getbundlechangegrouppart_narrow(
42 42 bundler,
43 43 repo,
44 44 source,
45 45 bundlecaps=None,
46 46 b2caps=None,
47 47 heads=None,
48 48 common=None,
49 49 **kwargs
50 50 ):
51 51 assert repo.ui.configbool(b'experimental', b'narrowservebrokenellipses')
52 52
53 53 cgversions = b2caps.get(b'changegroup')
54 54 cgversions = [
55 55 v
56 56 for v in cgversions
57 57 if v in changegroup.supportedoutgoingversions(repo)
58 58 ]
59 59 if not cgversions:
60 60 raise ValueError(_(b'no common changegroup version'))
61 61 version = max(cgversions)
62 62
63 63 include = sorted(filter(bool, kwargs.get('includepats', [])))
64 64 exclude = sorted(filter(bool, kwargs.get('excludepats', [])))
65 65 generateellipsesbundle2(
66 66 bundler,
67 67 repo,
68 68 include,
69 69 exclude,
70 70 version,
71 71 common,
72 72 heads,
73 73 kwargs.get('depth', None),
74 74 )
75 75
76 76
77 77 def generateellipsesbundle2(
78 78 bundler,
79 79 repo,
80 80 include,
81 81 exclude,
82 82 version,
83 83 common,
84 84 heads,
85 85 depth,
86 86 ):
87 87 match = narrowspec.match(repo.root, include=include, exclude=exclude)
88 88 if depth is not None:
89 89 depth = int(depth)
90 90 if depth < 1:
91 91 raise error.Abort(_(b'depth must be positive, got %d') % depth)
92 92
93 93 heads = set(heads or repo.heads())
94 94 common = set(common or [repo.nullid])
95 95
96 96 visitnodes, relevant_nodes, ellipsisroots = exchange._computeellipsis(
97 97 repo, common, heads, set(), match, depth=depth
98 98 )
99 99
100 100 repo.ui.debug(b'Found %d relevant revs\n' % len(relevant_nodes))
101 101 if visitnodes:
102 102 packer = changegroup.getbundler(
103 103 version,
104 104 repo,
105 105 matcher=match,
106 106 ellipses=True,
107 107 shallow=depth is not None,
108 108 ellipsisroots=ellipsisroots,
109 109 fullnodes=relevant_nodes,
110 110 )
111 111 cgdata = packer.generate(common, visitnodes, False, b'narrow_widen')
112 112
113 113 part = bundler.newpart(b'changegroup', data=cgdata)
114 114 part.addparam(b'version', version)
115 115 if scmutil.istreemanifest(repo):
116 116 part.addparam(b'treemanifest', b'1')
117 117
118 118
119 119 def generate_ellipses_bundle2_for_widening(
120 120 bundler,
121 121 repo,
122 122 oldmatch,
123 123 newmatch,
124 124 version,
125 125 common,
126 126 known,
127 127 ):
128 128 common = set(common or [repo.nullid])
129 129 # Steps:
130 130 # 1. Send kill for "$known & ::common"
131 131 #
132 132 # 2. Send changegroup for ::common
133 133 #
134 134 # 3. Proceed.
135 135 #
136 136 # In the future, we can send kills for only the specific
137 137 # nodes we know should go away or change shape, and then
138 138 # send a data stream that tells the client something like this:
139 139 #
140 140 # a) apply this changegroup
141 141 # b) apply nodes XXX, YYY, ZZZ that you already have
142 142 # c) goto a
143 143 #
144 144 # until they've built up the full new state.
145 145 knownrevs = {repo.changelog.rev(n) for n in known}
146 146 # TODO: we could send only roots() of this set, and the
147 147 # list of nodes in common, and the client could work out
148 148 # what to strip, instead of us explicitly sending every
149 149 # single node.
150 150 deadrevs = knownrevs
151 151
152 152 def genkills():
153 153 for r in deadrevs:
154 154 yield _KILLNODESIGNAL
155 155 yield repo.changelog.node(r)
156 156 yield _DONESIGNAL
157 157
158 158 bundler.newpart(_CHANGESPECPART, data=genkills())
159 159 newvisit, newfull, newellipsis = exchange._computeellipsis(
160 160 repo, set(), common, knownrevs, newmatch
161 161 )
162 162 if newvisit:
163 163 packer = changegroup.getbundler(
164 164 version,
165 165 repo,
166 166 matcher=newmatch,
167 167 ellipses=True,
168 168 shallow=False,
169 169 ellipsisroots=newellipsis,
170 170 fullnodes=newfull,
171 171 )
172 172 cgdata = packer.generate(common, newvisit, False, b'narrow_widen')
173 173
174 174 part = bundler.newpart(b'changegroup', data=cgdata)
175 175 part.addparam(b'version', version)
176 176 if scmutil.istreemanifest(repo):
177 177 part.addparam(b'treemanifest', b'1')
178 178
179 179
180 180 @bundle2.parthandler(_SPECPART, (_SPECPART_INCLUDE, _SPECPART_EXCLUDE))
181 181 def _handlechangespec_2(op, inpart):
182 182 # XXX: This bundle2 handling is buggy and should be removed after hg5.2 is
183 183 # released. New servers will send a mandatory bundle2 part named
184 184 # 'Narrowspec' and will send specs as data instead of params.
185 185 # Refer to issue5952 and 6019
186 186 includepats = set(inpart.params.get(_SPECPART_INCLUDE, b'').splitlines())
187 187 excludepats = set(inpart.params.get(_SPECPART_EXCLUDE, b'').splitlines())
188 188 narrowspec.validatepatterns(includepats)
189 189 narrowspec.validatepatterns(excludepats)
190 190
191 191 if not requirements.NARROW_REQUIREMENT in op.repo.requirements:
192 192 op.repo.requirements.add(requirements.NARROW_REQUIREMENT)
193 193 scmutil.writereporequirements(op.repo)
194 194 op.repo.setnarrowpats(includepats, excludepats)
195 195 narrowspec.copytoworkingcopy(op.repo)
196 196
197 197
198 198 @bundle2.parthandler(_RESSPECS)
199 199 def _handlenarrowspecs(op, inpart):
200 200 data = inpart.read()
201 201 inc, exc = data.split(b'\0')
202 202 includepats = set(inc.splitlines())
203 203 excludepats = set(exc.splitlines())
204 204 narrowspec.validatepatterns(includepats)
205 205 narrowspec.validatepatterns(excludepats)
206 206
207 207 if requirements.NARROW_REQUIREMENT not in op.repo.requirements:
208 208 op.repo.requirements.add(requirements.NARROW_REQUIREMENT)
209 209 scmutil.writereporequirements(op.repo)
210 210 op.repo.setnarrowpats(includepats, excludepats)
211 211 narrowspec.copytoworkingcopy(op.repo)
212 212
213 213
214 214 @bundle2.parthandler(_CHANGESPECPART)
215 215 def _handlechangespec(op, inpart):
216 216 repo = op.repo
217 217 cl = repo.changelog
218 218
219 219 # changesets which need to be stripped entirely. either they're no longer
220 220 # needed in the new narrow spec, or the server is sending a replacement
221 221 # in the changegroup part.
222 222 clkills = set()
223 223
224 224 # A changespec part contains all the updates to ellipsis nodes
225 225 # that will happen as a result of widening or narrowing a
226 226 # repo. All the changes that this block encounters are ellipsis
227 227 # nodes or flags to kill an existing ellipsis.
228 228 chunksignal = changegroup.readexactly(inpart, 4)
229 229 while chunksignal != _DONESIGNAL:
230 230 if chunksignal == _KILLNODESIGNAL:
231 231 # a node used to be an ellipsis but isn't anymore
232 232 ck = changegroup.readexactly(inpart, 20)
233 233 if cl.hasnode(ck):
234 234 clkills.add(ck)
235 235 else:
236 236 raise error.Abort(
237 237 _(b'unexpected changespec node chunk type: %s') % chunksignal
238 238 )
239 239 chunksignal = changegroup.readexactly(inpart, 4)
240 240
241 241 if clkills:
242 242 # preserve bookmarks that repair.strip() would otherwise strip
243 243 op._bookmarksbackup = repo._bookmarks
244 244
245 245 class dummybmstore(dict):
246 246 def applychanges(self, repo, tr, changes):
247 247 pass
248 248
249 249 localrepo.localrepository._bookmarks.set(repo, dummybmstore())
250 250 chgrpfile = repair.strip(
251 251 op.ui, repo, list(clkills), backup=True, topic=b'widen'
252 252 )
253 253 if chgrpfile:
254 254 op._widen_uninterr = repo.ui.uninterruptible()
255 255 op._widen_uninterr.__enter__()
256 256 # presence of _widen_bundle attribute activates widen handler later
257 257 op._widen_bundle = chgrpfile
258 258 # Set the new narrowspec if we're widening. The setnewnarrowpats() method
259 259 # will currently always be there when using the core+narrowhg server, but
260 260 # other servers may include a changespec part even when not widening (e.g.
261 261 # because we're deepening a shallow repo).
262 262 if util.safehasattr(repo, 'setnewnarrowpats'):
263 263 op.gettransaction()
264 264 repo.setnewnarrowpats()
265 265
266 266
267 267 def handlechangegroup_widen(op, inpart):
268 268 """Changegroup exchange handler which restores temporarily-stripped nodes"""
269 269 # We saved a bundle with stripped node data we must now restore.
270 270 # This approach is based on mercurial/repair.py@6ee26a53c111.
271 271 repo = op.repo
272 272 ui = op.ui
273 273
274 274 chgrpfile = op._widen_bundle
275 275 del op._widen_bundle
276 276 vfs = repo.vfs
277 277
278 278 ui.note(_(b"adding branch\n"))
279 279 f = vfs.open(chgrpfile, b"rb")
280 280 try:
281 281 gen = exchange.readbundle(ui, f, chgrpfile, vfs)
282 282 # silence internal shuffling chatter
283 283 maybe_silent = (
284 284 ui.silent() if not ui.verbose else util.nullcontextmanager()
285 285 )
286 286 with maybe_silent:
287 287 if isinstance(gen, bundle2.unbundle20):
288 288 with repo.transaction(b'strip') as tr:
289 289 bundle2.processbundle(repo, gen, lambda: tr)
290 290 else:
291 291 gen.apply(
292 292 repo, b'strip', b'bundle:' + vfs.join(chgrpfile), True
293 293 )
294 294 finally:
295 295 f.close()
296 296
297 transaction.cleanup_undo_files(repo)
297 transaction.cleanup_undo_files(repo.ui.warn, repo.vfs_map)
298 298
299 299 # Remove partial backup only if there were no exceptions
300 300 op._widen_uninterr.__exit__(None, None, None)
301 301 vfs.unlink(chgrpfile)
302 302
303 303
304 304 def setup():
305 305 """Enable narrow repo support in bundle2-related extension points."""
306 306 getbundleargs = wireprototypes.GETBUNDLE_ARGUMENTS
307 307
308 308 getbundleargs[b'narrow'] = b'boolean'
309 309 getbundleargs[b'depth'] = b'plain'
310 310 getbundleargs[b'oldincludepats'] = b'csv'
311 311 getbundleargs[b'oldexcludepats'] = b'csv'
312 312 getbundleargs[b'known'] = b'csv'
313 313
314 314 # Extend changegroup serving to handle requests from narrow clients.
315 315 origcgfn = exchange.getbundle2partsmapping[b'changegroup']
316 316
317 317 def wrappedcgfn(*args, **kwargs):
318 318 repo = args[1]
319 319 if repo.ui.has_section(_NARROWACL_SECTION):
320 320 kwargs = exchange.applynarrowacl(repo, kwargs)
321 321
322 322 if kwargs.get('narrow', False) and repo.ui.configbool(
323 323 b'experimental', b'narrowservebrokenellipses'
324 324 ):
325 325 getbundlechangegrouppart_narrow(*args, **kwargs)
326 326 else:
327 327 origcgfn(*args, **kwargs)
328 328
329 329 exchange.getbundle2partsmapping[b'changegroup'] = wrappedcgfn
330 330
331 331 # Extend changegroup receiver so client can fixup after widen requests.
332 332 origcghandler = bundle2.parthandlermapping[b'changegroup']
333 333
334 334 def wrappedcghandler(op, inpart):
335 335 origcghandler(op, inpart)
336 336 if util.safehasattr(op, '_widen_bundle'):
337 337 handlechangegroup_widen(op, inpart)
338 338 if util.safehasattr(op, '_bookmarksbackup'):
339 339 localrepo.localrepository._bookmarks.set(
340 340 op.repo, op._bookmarksbackup
341 341 )
342 342 del op._bookmarksbackup
343 343
344 344 wrappedcghandler.params = origcghandler.params
345 345 bundle2.parthandlermapping[b'changegroup'] = wrappedcghandler
@@ -1,547 +1,547 b''
1 1 # repair.py - functions for repository repair for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Chris Mason <mason@suse.com>
4 4 # Copyright 2007 Olivia Mackall
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9
10 10 from .i18n import _
11 11 from .node import (
12 12 hex,
13 13 short,
14 14 )
15 15 from . import (
16 16 bundle2,
17 17 changegroup,
18 18 discovery,
19 19 error,
20 20 exchange,
21 21 obsolete,
22 22 obsutil,
23 23 pathutil,
24 24 phases,
25 25 requirements,
26 26 scmutil,
27 27 transaction,
28 28 util,
29 29 )
30 30 from .utils import (
31 31 hashutil,
32 32 urlutil,
33 33 )
34 34
35 35
36 36 def backupbundle(
37 37 repo, bases, heads, node, suffix, compress=True, obsolescence=True
38 38 ):
39 39 """create a bundle with the specified revisions as a backup"""
40 40
41 41 backupdir = b"strip-backup"
42 42 vfs = repo.vfs
43 43 if not vfs.isdir(backupdir):
44 44 vfs.mkdir(backupdir)
45 45
46 46 # Include a hash of all the nodes in the filename for uniqueness
47 47 allcommits = repo.set(b'%ln::%ln', bases, heads)
48 48 allhashes = sorted(c.hex() for c in allcommits)
49 49 totalhash = hashutil.sha1(b''.join(allhashes)).digest()
50 50 name = b"%s/%s-%s-%s.hg" % (
51 51 backupdir,
52 52 short(node),
53 53 hex(totalhash[:4]),
54 54 suffix,
55 55 )
56 56
57 57 cgversion = changegroup.localversion(repo)
58 58 comp = None
59 59 if cgversion != b'01':
60 60 bundletype = b"HG20"
61 61 if compress:
62 62 comp = b'BZ'
63 63 elif compress:
64 64 bundletype = b"HG10BZ"
65 65 else:
66 66 bundletype = b"HG10UN"
67 67
68 68 outgoing = discovery.outgoing(repo, missingroots=bases, ancestorsof=heads)
69 69 contentopts = {
70 70 b'cg.version': cgversion,
71 71 b'obsolescence': obsolescence,
72 72 b'phases': True,
73 73 }
74 74 return bundle2.writenewbundle(
75 75 repo.ui,
76 76 repo,
77 77 b'strip',
78 78 name,
79 79 bundletype,
80 80 outgoing,
81 81 contentopts,
82 82 vfs,
83 83 compression=comp,
84 84 )
85 85
86 86
87 87 def _collectfiles(repo, striprev):
88 88 """find out the filelogs affected by the strip"""
89 89 files = set()
90 90
91 91 for x in range(striprev, len(repo)):
92 92 files.update(repo[x].files())
93 93
94 94 return sorted(files)
95 95
96 96
97 97 def _collectrevlog(revlog, striprev):
98 98 _, brokenset = revlog.getstrippoint(striprev)
99 99 return [revlog.linkrev(r) for r in brokenset]
100 100
101 101
102 102 def _collectbrokencsets(repo, files, striprev):
103 103 """return the changesets which will be broken by the truncation"""
104 104 s = set()
105 105
106 106 for revlog in manifestrevlogs(repo):
107 107 s.update(_collectrevlog(revlog, striprev))
108 108 for fname in files:
109 109 s.update(_collectrevlog(repo.file(fname), striprev))
110 110
111 111 return s
112 112
113 113
114 114 def strip(ui, repo, nodelist, backup=True, topic=b'backup'):
115 115 # This function requires the caller to lock the repo, but it operates
116 116 # within a transaction of its own, and thus requires there to be no current
117 117 # transaction when it is called.
118 118 if repo.currenttransaction() is not None:
119 119 raise error.ProgrammingError(b'cannot strip from inside a transaction')
120 120
121 121 # Simple way to maintain backwards compatibility for this
122 122 # argument.
123 123 if backup in [b'none', b'strip']:
124 124 backup = False
125 125
126 126 repo = repo.unfiltered()
127 127 repo.destroying()
128 128 vfs = repo.vfs
129 129 # load bookmark before changelog to avoid side effect from outdated
130 130 # changelog (see repo._refreshchangelog)
131 131 repo._bookmarks
132 132 cl = repo.changelog
133 133
134 134 # TODO handle undo of merge sets
135 135 if isinstance(nodelist, bytes):
136 136 nodelist = [nodelist]
137 137 striplist = [cl.rev(node) for node in nodelist]
138 138 striprev = min(striplist)
139 139
140 140 files = _collectfiles(repo, striprev)
141 141 saverevs = _collectbrokencsets(repo, files, striprev)
142 142
143 143 # Some revisions with rev > striprev may not be descendants of striprev.
144 144 # We have to find these revisions and put them in a bundle, so that
145 145 # we can restore them after the truncations.
146 146 # To create the bundle we use repo.changegroupsubset which requires
147 147 # the list of heads and bases of the set of interesting revisions.
148 148 # (head = revision in the set that has no descendant in the set;
149 149 # base = revision in the set that has no ancestor in the set)
150 150 tostrip = set(striplist)
151 151 saveheads = set(saverevs)
152 152 for r in cl.revs(start=striprev + 1):
153 153 if any(p in tostrip for p in cl.parentrevs(r)):
154 154 tostrip.add(r)
155 155
156 156 if r not in tostrip:
157 157 saverevs.add(r)
158 158 saveheads.difference_update(cl.parentrevs(r))
159 159 saveheads.add(r)
160 160 saveheads = [cl.node(r) for r in saveheads]
161 161
162 162 # compute base nodes
163 163 if saverevs:
164 164 descendants = set(cl.descendants(saverevs))
165 165 saverevs.difference_update(descendants)
166 166 savebases = [cl.node(r) for r in saverevs]
167 167 stripbases = [cl.node(r) for r in tostrip]
168 168
169 169 stripobsidx = obsmarkers = ()
170 170 if repo.ui.configbool(b'devel', b'strip-obsmarkers'):
171 171 obsmarkers = obsutil.exclusivemarkers(repo, stripbases)
172 172 if obsmarkers:
173 173 stripobsidx = [
174 174 i for i, m in enumerate(repo.obsstore) if m in obsmarkers
175 175 ]
176 176
177 177 newbmtarget, updatebm = _bookmarkmovements(repo, tostrip)
178 178
179 179 backupfile = None
180 180 node = nodelist[-1]
181 181 if backup:
182 182 backupfile = _createstripbackup(repo, stripbases, node, topic)
183 183 # create a changegroup for all the branches we need to keep
184 184 tmpbundlefile = None
185 185 if saveheads:
186 186 # do not compress temporary bundle if we remove it from disk later
187 187 #
188 188 # We do not include obsolescence, it might re-introduce prune markers
189 189 # we are trying to strip. This is harmless since the stripped markers
190 190 # are already backed up and we did not touched the markers for the
191 191 # saved changesets.
192 192 tmpbundlefile = backupbundle(
193 193 repo,
194 194 savebases,
195 195 saveheads,
196 196 node,
197 197 b'temp',
198 198 compress=False,
199 199 obsolescence=False,
200 200 )
201 201
202 202 with ui.uninterruptible():
203 203 try:
204 204 with repo.transaction(b"strip") as tr:
205 205 # TODO this code violates the interface abstraction of the
206 206 # transaction and makes assumptions that file storage is
207 207 # using append-only files. We'll need some kind of storage
208 208 # API to handle stripping for us.
209 209 oldfiles = set(tr._offsetmap.keys())
210 210 oldfiles.update(tr._newfiles)
211 211
212 212 tr.startgroup()
213 213 cl.strip(striprev, tr)
214 214 stripmanifest(repo, striprev, tr, files)
215 215
216 216 for fn in files:
217 217 repo.file(fn).strip(striprev, tr)
218 218 tr.endgroup()
219 219
220 220 entries = tr.readjournal()
221 221
222 222 for file, troffset in entries:
223 223 if file in oldfiles:
224 224 continue
225 225 with repo.svfs(file, b'a', checkambig=True) as fp:
226 226 fp.truncate(troffset)
227 227 if troffset == 0:
228 228 repo.store.markremoved(file)
229 229
230 230 deleteobsmarkers(repo.obsstore, stripobsidx)
231 231 del repo.obsstore
232 232 repo.invalidatevolatilesets()
233 233 repo._phasecache.filterunknown(repo)
234 234
235 235 if tmpbundlefile:
236 236 ui.note(_(b"adding branch\n"))
237 237 f = vfs.open(tmpbundlefile, b"rb")
238 238 gen = exchange.readbundle(ui, f, tmpbundlefile, vfs)
239 239 # silence internal shuffling chatter
240 240 maybe_silent = (
241 241 repo.ui.silent()
242 242 if not repo.ui.verbose
243 243 else util.nullcontextmanager()
244 244 )
245 245 with maybe_silent:
246 246 tmpbundleurl = b'bundle:' + vfs.join(tmpbundlefile)
247 247 txnname = b'strip'
248 248 if not isinstance(gen, bundle2.unbundle20):
249 249 txnname = b"strip\n%s" % urlutil.hidepassword(
250 250 tmpbundleurl
251 251 )
252 252 with repo.transaction(txnname) as tr:
253 253 bundle2.applybundle(
254 254 repo, gen, tr, source=b'strip', url=tmpbundleurl
255 255 )
256 256 f.close()
257 257
258 258 with repo.transaction(b'repair') as tr:
259 259 bmchanges = [(m, repo[newbmtarget].node()) for m in updatebm]
260 260 repo._bookmarks.applychanges(repo, tr, bmchanges)
261 261
262 transaction.cleanup_undo_files(repo)
262 transaction.cleanup_undo_files(repo.ui.warn, repo.vfs_map)
263 263
264 264 except: # re-raises
265 265 if backupfile:
266 266 ui.warn(
267 267 _(b"strip failed, backup bundle stored in '%s'\n")
268 268 % vfs.join(backupfile)
269 269 )
270 270 if tmpbundlefile:
271 271 ui.warn(
272 272 _(b"strip failed, unrecovered changes stored in '%s'\n")
273 273 % vfs.join(tmpbundlefile)
274 274 )
275 275 ui.warn(
276 276 _(
277 277 b"(fix the problem, then recover the changesets with "
278 278 b"\"hg unbundle '%s'\")\n"
279 279 )
280 280 % vfs.join(tmpbundlefile)
281 281 )
282 282 raise
283 283 else:
284 284 if tmpbundlefile:
285 285 # Remove temporary bundle only if there were no exceptions
286 286 vfs.unlink(tmpbundlefile)
287 287
288 288 repo.destroyed()
289 289 # return the backup file path (or None if 'backup' was False) so
290 290 # extensions can use it
291 291 return backupfile
292 292
293 293
294 294 def softstrip(ui, repo, nodelist, backup=True, topic=b'backup'):
295 295 """perform a "soft" strip using the archived phase"""
296 296 tostrip = [c.node() for c in repo.set(b'sort(%ln::)', nodelist)]
297 297 if not tostrip:
298 298 return None
299 299
300 300 backupfile = None
301 301 if backup:
302 302 node = tostrip[0]
303 303 backupfile = _createstripbackup(repo, tostrip, node, topic)
304 304
305 305 newbmtarget, updatebm = _bookmarkmovements(repo, tostrip)
306 306 with repo.transaction(b'strip') as tr:
307 307 phases.retractboundary(repo, tr, phases.archived, tostrip)
308 308 bmchanges = [(m, repo[newbmtarget].node()) for m in updatebm]
309 309 repo._bookmarks.applychanges(repo, tr, bmchanges)
310 310 return backupfile
311 311
312 312
313 313 def _bookmarkmovements(repo, tostrip):
314 314 # compute necessary bookmark movement
315 315 bm = repo._bookmarks
316 316 updatebm = []
317 317 for m in bm:
318 318 rev = repo[bm[m]].rev()
319 319 if rev in tostrip:
320 320 updatebm.append(m)
321 321 newbmtarget = None
322 322 # If we need to move bookmarks, compute bookmark
323 323 # targets. Otherwise we can skip doing this logic.
324 324 if updatebm:
325 325 # For a set s, max(parents(s) - s) is the same as max(heads(::s - s)),
326 326 # but is much faster
327 327 newbmtarget = repo.revs(b'max(parents(%ld) - (%ld))', tostrip, tostrip)
328 328 if newbmtarget:
329 329 newbmtarget = repo[newbmtarget.first()].node()
330 330 else:
331 331 newbmtarget = b'.'
332 332 return newbmtarget, updatebm
333 333
334 334
335 335 def _createstripbackup(repo, stripbases, node, topic):
336 336 # backup the changeset we are about to strip
337 337 vfs = repo.vfs
338 338 cl = repo.changelog
339 339 backupfile = backupbundle(repo, stripbases, cl.heads(), node, topic)
340 340 repo.ui.status(_(b"saved backup bundle to %s\n") % vfs.join(backupfile))
341 341 repo.ui.log(
342 342 b"backupbundle", b"saved backup bundle to %s\n", vfs.join(backupfile)
343 343 )
344 344 return backupfile
345 345
346 346
347 347 def safestriproots(ui, repo, nodes):
348 348 """return list of roots of nodes where descendants are covered by nodes"""
349 349 torev = repo.unfiltered().changelog.rev
350 350 revs = {torev(n) for n in nodes}
351 351 # tostrip = wanted - unsafe = wanted - ancestors(orphaned)
352 352 # orphaned = affected - wanted
353 353 # affected = descendants(roots(wanted))
354 354 # wanted = revs
355 355 revset = b'%ld - ( ::( (roots(%ld):: and not _phase(%s)) -%ld) )'
356 356 tostrip = set(repo.revs(revset, revs, revs, phases.internal, revs))
357 357 notstrip = revs - tostrip
358 358 if notstrip:
359 359 nodestr = b', '.join(sorted(short(repo[n].node()) for n in notstrip))
360 360 ui.warn(
361 361 _(b'warning: orphaned descendants detected, not stripping %s\n')
362 362 % nodestr
363 363 )
364 364 return [c.node() for c in repo.set(b'roots(%ld)', tostrip)]
365 365
366 366
367 367 class stripcallback:
368 368 """used as a transaction postclose callback"""
369 369
370 370 def __init__(self, ui, repo, backup, topic):
371 371 self.ui = ui
372 372 self.repo = repo
373 373 self.backup = backup
374 374 self.topic = topic or b'backup'
375 375 self.nodelist = []
376 376
377 377 def addnodes(self, nodes):
378 378 self.nodelist.extend(nodes)
379 379
380 380 def __call__(self, tr):
381 381 roots = safestriproots(self.ui, self.repo, self.nodelist)
382 382 if roots:
383 383 strip(self.ui, self.repo, roots, self.backup, self.topic)
384 384
385 385
386 386 def delayedstrip(ui, repo, nodelist, topic=None, backup=True):
387 387 """like strip, but works inside transaction and won't strip irreverent revs
388 388
389 389 nodelist must explicitly contain all descendants. Otherwise a warning will
390 390 be printed that some nodes are not stripped.
391 391
392 392 Will do a backup if `backup` is True. The last non-None "topic" will be
393 393 used as the backup topic name. The default backup topic name is "backup".
394 394 """
395 395 tr = repo.currenttransaction()
396 396 if not tr:
397 397 nodes = safestriproots(ui, repo, nodelist)
398 398 return strip(ui, repo, nodes, backup=backup, topic=topic)
399 399 # transaction postclose callbacks are called in alphabet order.
400 400 # use '\xff' as prefix so we are likely to be called last.
401 401 callback = tr.getpostclose(b'\xffstrip')
402 402 if callback is None:
403 403 callback = stripcallback(ui, repo, backup=backup, topic=topic)
404 404 tr.addpostclose(b'\xffstrip', callback)
405 405 if topic:
406 406 callback.topic = topic
407 407 callback.addnodes(nodelist)
408 408
409 409
410 410 def stripmanifest(repo, striprev, tr, files):
411 411 for revlog in manifestrevlogs(repo):
412 412 revlog.strip(striprev, tr)
413 413
414 414
415 415 def manifestrevlogs(repo):
416 416 yield repo.manifestlog.getstorage(b'')
417 417 if scmutil.istreemanifest(repo):
418 418 # This logic is safe if treemanifest isn't enabled, but also
419 419 # pointless, so we skip it if treemanifest isn't enabled.
420 420 for t, unencoded, size in repo.store.datafiles():
421 421 if unencoded.startswith(b'meta/') and unencoded.endswith(
422 422 b'00manifest.i'
423 423 ):
424 424 dir = unencoded[5:-12]
425 425 yield repo.manifestlog.getstorage(dir)
426 426
427 427
428 428 def rebuildfncache(ui, repo, only_data=False):
429 429 """Rebuilds the fncache file from repo history.
430 430
431 431 Missing entries will be added. Extra entries will be removed.
432 432 """
433 433 repo = repo.unfiltered()
434 434
435 435 if requirements.FNCACHE_REQUIREMENT not in repo.requirements:
436 436 ui.warn(
437 437 _(
438 438 b'(not rebuilding fncache because repository does not '
439 439 b'support fncache)\n'
440 440 )
441 441 )
442 442 return
443 443
444 444 with repo.lock():
445 445 fnc = repo.store.fncache
446 446 fnc.ensureloaded(warn=ui.warn)
447 447
448 448 oldentries = set(fnc.entries)
449 449 newentries = set()
450 450 seenfiles = set()
451 451
452 452 if only_data:
453 453 # Trust the listing of .i from the fncache, but not the .d. This is
454 454 # much faster, because we only need to stat every possible .d files,
455 455 # instead of reading the full changelog
456 456 for f in fnc:
457 457 if f[:5] == b'data/' and f[-2:] == b'.i':
458 458 seenfiles.add(f[5:-2])
459 459 newentries.add(f)
460 460 dataf = f[:-2] + b'.d'
461 461 if repo.store._exists(dataf):
462 462 newentries.add(dataf)
463 463 else:
464 464 progress = ui.makeprogress(
465 465 _(b'rebuilding'), unit=_(b'changesets'), total=len(repo)
466 466 )
467 467 for rev in repo:
468 468 progress.update(rev)
469 469
470 470 ctx = repo[rev]
471 471 for f in ctx.files():
472 472 # This is to minimize I/O.
473 473 if f in seenfiles:
474 474 continue
475 475 seenfiles.add(f)
476 476
477 477 i = b'data/%s.i' % f
478 478 d = b'data/%s.d' % f
479 479
480 480 if repo.store._exists(i):
481 481 newentries.add(i)
482 482 if repo.store._exists(d):
483 483 newentries.add(d)
484 484
485 485 progress.complete()
486 486
487 487 if requirements.TREEMANIFEST_REQUIREMENT in repo.requirements:
488 488 # This logic is safe if treemanifest isn't enabled, but also
489 489 # pointless, so we skip it if treemanifest isn't enabled.
490 490 for dir in pathutil.dirs(seenfiles):
491 491 i = b'meta/%s/00manifest.i' % dir
492 492 d = b'meta/%s/00manifest.d' % dir
493 493
494 494 if repo.store._exists(i):
495 495 newentries.add(i)
496 496 if repo.store._exists(d):
497 497 newentries.add(d)
498 498
499 499 addcount = len(newentries - oldentries)
500 500 removecount = len(oldentries - newentries)
501 501 for p in sorted(oldentries - newentries):
502 502 ui.write(_(b'removing %s\n') % p)
503 503 for p in sorted(newentries - oldentries):
504 504 ui.write(_(b'adding %s\n') % p)
505 505
506 506 if addcount or removecount:
507 507 ui.write(
508 508 _(b'%d items added, %d removed from fncache\n')
509 509 % (addcount, removecount)
510 510 )
511 511 fnc.entries = newentries
512 512 fnc._dirty = True
513 513
514 514 with repo.transaction(b'fncache') as tr:
515 515 fnc.write(tr)
516 516 else:
517 517 ui.write(_(b'fncache already up to date\n'))
518 518
519 519
520 520 def deleteobsmarkers(obsstore, indices):
521 521 """Delete some obsmarkers from obsstore and return how many were deleted
522 522
523 523 'indices' is a list of ints which are the indices
524 524 of the markers to be deleted.
525 525
526 526 Every invocation of this function completely rewrites the obsstore file,
527 527 skipping the markers we want to be removed. The new temporary file is
528 528 created, remaining markers are written there and on .close() this file
529 529 gets atomically renamed to obsstore, thus guaranteeing consistency."""
530 530 if not indices:
531 531 # we don't want to rewrite the obsstore with the same content
532 532 return
533 533
534 534 left = []
535 535 current = obsstore._all
536 536 n = 0
537 537 for i, m in enumerate(current):
538 538 if i in indices:
539 539 n += 1
540 540 continue
541 541 left.append(m)
542 542
543 543 newobsstorefile = obsstore.svfs(b'obsstore', b'w', atomictemp=True)
544 544 for bytes in obsolete.encodemarkers(left, True, obsstore._version):
545 545 newobsstorefile.write(bytes)
546 546 newobsstorefile.close()
547 547 return n
@@ -1,935 +1,935 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import contextlib
10 10 import os
11 11 import struct
12 12
13 13 from .i18n import _
14 14 from .pycompat import open
15 15 from .interfaces import repository
16 16 from . import (
17 17 bookmarks,
18 18 cacheutil,
19 19 error,
20 20 narrowspec,
21 21 phases,
22 22 pycompat,
23 23 requirements as requirementsmod,
24 24 scmutil,
25 25 store,
26 26 transaction,
27 27 util,
28 28 )
29 29 from .revlogutils import (
30 30 nodemap,
31 31 )
32 32
33 33
34 34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
35 35 """determine the final set of requirement for a new stream clone
36 36
37 37 this method combine the "default" requirements that a new repository would
38 38 use with the constaint we get from the stream clone content. We keep local
39 39 configuration choice when possible.
40 40 """
41 41 requirements = set(default_requirements)
42 42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
43 43 requirements.update(streamed_requirements)
44 44 return requirements
45 45
46 46
47 47 def streamed_requirements(repo):
48 48 """the set of requirement the new clone will have to support
49 49
50 50 This is used for advertising the stream options and to generate the actual
51 51 stream content."""
52 52 requiredformats = (
53 53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
54 54 )
55 55 return requiredformats
56 56
57 57
58 58 def canperformstreamclone(pullop, bundle2=False):
59 59 """Whether it is possible to perform a streaming clone as part of pull.
60 60
61 61 ``bundle2`` will cause the function to consider stream clone through
62 62 bundle2 and only through bundle2.
63 63
64 64 Returns a tuple of (supported, requirements). ``supported`` is True if
65 65 streaming clone is supported and False otherwise. ``requirements`` is
66 66 a set of repo requirements from the remote, or ``None`` if stream clone
67 67 isn't supported.
68 68 """
69 69 repo = pullop.repo
70 70 remote = pullop.remote
71 71
72 72 bundle2supported = False
73 73 if pullop.canusebundle2:
74 74 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
75 75 bundle2supported = True
76 76 # else
77 77 # Server doesn't support bundle2 stream clone or doesn't support
78 78 # the versions we support. Fall back and possibly allow legacy.
79 79
80 80 # Ensures legacy code path uses available bundle2.
81 81 if bundle2supported and not bundle2:
82 82 return False, None
83 83 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
84 84 elif bundle2 and not bundle2supported:
85 85 return False, None
86 86
87 87 # Streaming clone only works on empty repositories.
88 88 if len(repo):
89 89 return False, None
90 90
91 91 # Streaming clone only works if all data is being requested.
92 92 if pullop.heads:
93 93 return False, None
94 94
95 95 streamrequested = pullop.streamclonerequested
96 96
97 97 # If we don't have a preference, let the server decide for us. This
98 98 # likely only comes into play in LANs.
99 99 if streamrequested is None:
100 100 # The server can advertise whether to prefer streaming clone.
101 101 streamrequested = remote.capable(b'stream-preferred')
102 102
103 103 if not streamrequested:
104 104 return False, None
105 105
106 106 # In order for stream clone to work, the client has to support all the
107 107 # requirements advertised by the server.
108 108 #
109 109 # The server advertises its requirements via the "stream" and "streamreqs"
110 110 # capability. "stream" (a value-less capability) is advertised if and only
111 111 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
112 112 # is advertised and contains a comma-delimited list of requirements.
113 113 requirements = set()
114 114 if remote.capable(b'stream'):
115 115 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
116 116 else:
117 117 streamreqs = remote.capable(b'streamreqs')
118 118 # This is weird and shouldn't happen with modern servers.
119 119 if not streamreqs:
120 120 pullop.repo.ui.warn(
121 121 _(
122 122 b'warning: stream clone requested but server has them '
123 123 b'disabled\n'
124 124 )
125 125 )
126 126 return False, None
127 127
128 128 streamreqs = set(streamreqs.split(b','))
129 129 # Server requires something we don't support. Bail.
130 130 missingreqs = streamreqs - repo.supported
131 131 if missingreqs:
132 132 pullop.repo.ui.warn(
133 133 _(
134 134 b'warning: stream clone requested but client is missing '
135 135 b'requirements: %s\n'
136 136 )
137 137 % b', '.join(sorted(missingreqs))
138 138 )
139 139 pullop.repo.ui.warn(
140 140 _(
141 141 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
142 142 b'for more information)\n'
143 143 )
144 144 )
145 145 return False, None
146 146 requirements = streamreqs
147 147
148 148 return True, requirements
149 149
150 150
151 151 def maybeperformlegacystreamclone(pullop):
152 152 """Possibly perform a legacy stream clone operation.
153 153
154 154 Legacy stream clones are performed as part of pull but before all other
155 155 operations.
156 156
157 157 A legacy stream clone will not be performed if a bundle2 stream clone is
158 158 supported.
159 159 """
160 160 from . import localrepo
161 161
162 162 supported, requirements = canperformstreamclone(pullop)
163 163
164 164 if not supported:
165 165 return
166 166
167 167 repo = pullop.repo
168 168 remote = pullop.remote
169 169
170 170 # Save remote branchmap. We will use it later to speed up branchcache
171 171 # creation.
172 172 rbranchmap = None
173 173 if remote.capable(b'branchmap'):
174 174 with remote.commandexecutor() as e:
175 175 rbranchmap = e.callcommand(b'branchmap', {}).result()
176 176
177 177 repo.ui.status(_(b'streaming all changes\n'))
178 178
179 179 with remote.commandexecutor() as e:
180 180 fp = e.callcommand(b'stream_out', {}).result()
181 181
182 182 # TODO strictly speaking, this code should all be inside the context
183 183 # manager because the context manager is supposed to ensure all wire state
184 184 # is flushed when exiting. But the legacy peers don't do this, so it
185 185 # doesn't matter.
186 186 l = fp.readline()
187 187 try:
188 188 resp = int(l)
189 189 except ValueError:
190 190 raise error.ResponseError(
191 191 _(b'unexpected response from remote server:'), l
192 192 )
193 193 if resp == 1:
194 194 raise error.Abort(_(b'operation forbidden by server'))
195 195 elif resp == 2:
196 196 raise error.Abort(_(b'locking the remote repository failed'))
197 197 elif resp != 0:
198 198 raise error.Abort(_(b'the server sent an unknown error code'))
199 199
200 200 l = fp.readline()
201 201 try:
202 202 filecount, bytecount = map(int, l.split(b' ', 1))
203 203 except (ValueError, TypeError):
204 204 raise error.ResponseError(
205 205 _(b'unexpected response from remote server:'), l
206 206 )
207 207
208 208 with repo.lock():
209 209 consumev1(repo, fp, filecount, bytecount)
210 210 repo.requirements = new_stream_clone_requirements(
211 211 repo.requirements,
212 212 requirements,
213 213 )
214 214 repo.svfs.options = localrepo.resolvestorevfsoptions(
215 215 repo.ui, repo.requirements, repo.features
216 216 )
217 217 scmutil.writereporequirements(repo)
218 218 nodemap.post_stream_cleanup(repo)
219 219
220 220 if rbranchmap:
221 221 repo._branchcaches.replace(repo, rbranchmap)
222 222
223 223 repo.invalidate()
224 224
225 225
226 226 def allowservergeneration(repo):
227 227 """Whether streaming clones are allowed from the server."""
228 228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
229 229 return False
230 230
231 231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
232 232 return False
233 233
234 234 # The way stream clone works makes it impossible to hide secret changesets.
235 235 # So don't allow this by default.
236 236 secret = phases.hassecret(repo)
237 237 if secret:
238 238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
239 239
240 240 return True
241 241
242 242
243 243 # This is it's own function so extensions can override it.
244 244 def _walkstreamfiles(repo, matcher=None):
245 245 return repo.store.walk(matcher)
246 246
247 247
248 248 def generatev1(repo):
249 249 """Emit content for version 1 of a streaming clone.
250 250
251 251 This returns a 3-tuple of (file count, byte size, data iterator).
252 252
253 253 The data iterator consists of N entries for each file being transferred.
254 254 Each file entry starts as a line with the file name and integer size
255 255 delimited by a null byte.
256 256
257 257 The raw file data follows. Following the raw file data is the next file
258 258 entry, or EOF.
259 259
260 260 When used on the wire protocol, an additional line indicating protocol
261 261 success will be prepended to the stream. This function is not responsible
262 262 for adding it.
263 263
264 264 This function will obtain a repository lock to ensure a consistent view of
265 265 the store is captured. It therefore may raise LockError.
266 266 """
267 267 entries = []
268 268 total_bytes = 0
269 269 # Get consistent snapshot of repo, lock during scan.
270 270 with repo.lock():
271 271 repo.ui.debug(b'scanning\n')
272 272 for file_type, name, size in _walkstreamfiles(repo):
273 273 if size:
274 274 entries.append((name, size))
275 275 total_bytes += size
276 276 _test_sync_point_walk_1(repo)
277 277 _test_sync_point_walk_2(repo)
278 278
279 279 repo.ui.debug(
280 280 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
281 281 )
282 282
283 283 svfs = repo.svfs
284 284 debugflag = repo.ui.debugflag
285 285
286 286 def emitrevlogdata():
287 287 for name, size in entries:
288 288 if debugflag:
289 289 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
290 290 # partially encode name over the wire for backwards compat
291 291 yield b'%s\0%d\n' % (store.encodedir(name), size)
292 292 # auditing at this stage is both pointless (paths are already
293 293 # trusted by the local repo) and expensive
294 294 with svfs(name, b'rb', auditpath=False) as fp:
295 295 if size <= 65536:
296 296 yield fp.read(size)
297 297 else:
298 298 for chunk in util.filechunkiter(fp, limit=size):
299 299 yield chunk
300 300
301 301 return len(entries), total_bytes, emitrevlogdata()
302 302
303 303
304 304 def generatev1wireproto(repo):
305 305 """Emit content for version 1 of streaming clone suitable for the wire.
306 306
307 307 This is the data output from ``generatev1()`` with 2 header lines. The
308 308 first line indicates overall success. The 2nd contains the file count and
309 309 byte size of payload.
310 310
311 311 The success line contains "0" for success, "1" for stream generation not
312 312 allowed, and "2" for error locking the repository (possibly indicating
313 313 a permissions error for the server process).
314 314 """
315 315 if not allowservergeneration(repo):
316 316 yield b'1\n'
317 317 return
318 318
319 319 try:
320 320 filecount, bytecount, it = generatev1(repo)
321 321 except error.LockError:
322 322 yield b'2\n'
323 323 return
324 324
325 325 # Indicates successful response.
326 326 yield b'0\n'
327 327 yield b'%d %d\n' % (filecount, bytecount)
328 328 for chunk in it:
329 329 yield chunk
330 330
331 331
332 332 def generatebundlev1(repo, compression=b'UN'):
333 333 """Emit content for version 1 of a stream clone bundle.
334 334
335 335 The first 4 bytes of the output ("HGS1") denote this as stream clone
336 336 bundle version 1.
337 337
338 338 The next 2 bytes indicate the compression type. Only "UN" is currently
339 339 supported.
340 340
341 341 The next 16 bytes are two 64-bit big endian unsigned integers indicating
342 342 file count and byte count, respectively.
343 343
344 344 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
345 345 of the requirements string, including a trailing \0. The following N bytes
346 346 are the requirements string, which is ASCII containing a comma-delimited
347 347 list of repo requirements that are needed to support the data.
348 348
349 349 The remaining content is the output of ``generatev1()`` (which may be
350 350 compressed in the future).
351 351
352 352 Returns a tuple of (requirements, data generator).
353 353 """
354 354 if compression != b'UN':
355 355 raise ValueError(b'we do not support the compression argument yet')
356 356
357 357 requirements = streamed_requirements(repo)
358 358 requires = b','.join(sorted(requirements))
359 359
360 360 def gen():
361 361 yield b'HGS1'
362 362 yield compression
363 363
364 364 filecount, bytecount, it = generatev1(repo)
365 365 repo.ui.status(
366 366 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
367 367 )
368 368
369 369 yield struct.pack(b'>QQ', filecount, bytecount)
370 370 yield struct.pack(b'>H', len(requires) + 1)
371 371 yield requires + b'\0'
372 372
373 373 # This is where we'll add compression in the future.
374 374 assert compression == b'UN'
375 375
376 376 progress = repo.ui.makeprogress(
377 377 _(b'bundle'), total=bytecount, unit=_(b'bytes')
378 378 )
379 379 progress.update(0)
380 380
381 381 for chunk in it:
382 382 progress.increment(step=len(chunk))
383 383 yield chunk
384 384
385 385 progress.complete()
386 386
387 387 return requirements, gen()
388 388
389 389
390 390 def consumev1(repo, fp, filecount, bytecount):
391 391 """Apply the contents from version 1 of a streaming clone file handle.
392 392
393 393 This takes the output from "stream_out" and applies it to the specified
394 394 repository.
395 395
396 396 Like "stream_out," the status line added by the wire protocol is not
397 397 handled by this function.
398 398 """
399 399 with repo.lock():
400 400 repo.ui.status(
401 401 _(b'%d files to transfer, %s of data\n')
402 402 % (filecount, util.bytecount(bytecount))
403 403 )
404 404 progress = repo.ui.makeprogress(
405 405 _(b'clone'), total=bytecount, unit=_(b'bytes')
406 406 )
407 407 progress.update(0)
408 408 start = util.timer()
409 409
410 410 # TODO: get rid of (potential) inconsistency
411 411 #
412 412 # If transaction is started and any @filecache property is
413 413 # changed at this point, it causes inconsistency between
414 414 # in-memory cached property and streamclone-ed file on the
415 415 # disk. Nested transaction prevents transaction scope "clone"
416 416 # below from writing in-memory changes out at the end of it,
417 417 # even though in-memory changes are discarded at the end of it
418 418 # regardless of transaction nesting.
419 419 #
420 420 # But transaction nesting can't be simply prohibited, because
421 421 # nesting occurs also in ordinary case (e.g. enabling
422 422 # clonebundles).
423 423
424 424 with repo.transaction(b'clone'):
425 425 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
426 426 for i in range(filecount):
427 427 # XXX doesn't support '\n' or '\r' in filenames
428 428 l = fp.readline()
429 429 try:
430 430 name, size = l.split(b'\0', 1)
431 431 size = int(size)
432 432 except (ValueError, TypeError):
433 433 raise error.ResponseError(
434 434 _(b'unexpected response from remote server:'), l
435 435 )
436 436 if repo.ui.debugflag:
437 437 repo.ui.debug(
438 438 b'adding %s (%s)\n' % (name, util.bytecount(size))
439 439 )
440 440 # for backwards compat, name was partially encoded
441 441 path = store.decodedir(name)
442 442 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
443 443 for chunk in util.filechunkiter(fp, limit=size):
444 444 progress.increment(step=len(chunk))
445 445 ofp.write(chunk)
446 446
447 447 # force @filecache properties to be reloaded from
448 448 # streamclone-ed file at next access
449 449 repo.invalidate(clearfilecache=True)
450 450
451 451 elapsed = util.timer() - start
452 452 if elapsed <= 0:
453 453 elapsed = 0.001
454 454 progress.complete()
455 455 repo.ui.status(
456 456 _(b'transferred %s in %.1f seconds (%s/sec)\n')
457 457 % (
458 458 util.bytecount(bytecount),
459 459 elapsed,
460 460 util.bytecount(bytecount / elapsed),
461 461 )
462 462 )
463 463
464 464
465 465 def readbundle1header(fp):
466 466 compression = fp.read(2)
467 467 if compression != b'UN':
468 468 raise error.Abort(
469 469 _(
470 470 b'only uncompressed stream clone bundles are '
471 471 b'supported; got %s'
472 472 )
473 473 % compression
474 474 )
475 475
476 476 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
477 477 requireslen = struct.unpack(b'>H', fp.read(2))[0]
478 478 requires = fp.read(requireslen)
479 479
480 480 if not requires.endswith(b'\0'):
481 481 raise error.Abort(
482 482 _(
483 483 b'malformed stream clone bundle: '
484 484 b'requirements not properly encoded'
485 485 )
486 486 )
487 487
488 488 requirements = set(requires.rstrip(b'\0').split(b','))
489 489
490 490 return filecount, bytecount, requirements
491 491
492 492
493 493 def applybundlev1(repo, fp):
494 494 """Apply the content from a stream clone bundle version 1.
495 495
496 496 We assume the 4 byte header has been read and validated and the file handle
497 497 is at the 2 byte compression identifier.
498 498 """
499 499 if len(repo):
500 500 raise error.Abort(
501 501 _(b'cannot apply stream clone bundle on non-empty repo')
502 502 )
503 503
504 504 filecount, bytecount, requirements = readbundle1header(fp)
505 505 missingreqs = requirements - repo.supported
506 506 if missingreqs:
507 507 raise error.Abort(
508 508 _(b'unable to apply stream clone: unsupported format: %s')
509 509 % b', '.join(sorted(missingreqs))
510 510 )
511 511
512 512 consumev1(repo, fp, filecount, bytecount)
513 513 nodemap.post_stream_cleanup(repo)
514 514
515 515
516 516 class streamcloneapplier:
517 517 """Class to manage applying streaming clone bundles.
518 518
519 519 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
520 520 readers to perform bundle type-specific functionality.
521 521 """
522 522
523 523 def __init__(self, fh):
524 524 self._fh = fh
525 525
526 526 def apply(self, repo):
527 527 return applybundlev1(repo, self._fh)
528 528
529 529
530 530 # type of file to stream
531 531 _fileappend = 0 # append only file
532 532 _filefull = 1 # full snapshot file
533 533
534 534 # Source of the file
535 535 _srcstore = b's' # store (svfs)
536 536 _srccache = b'c' # cache (cache)
537 537
538 538 # This is it's own function so extensions can override it.
539 539 def _walkstreamfullstorefiles(repo):
540 540 """list snapshot file from the store"""
541 541 fnames = []
542 542 if not repo.publishing():
543 543 fnames.append(b'phaseroots')
544 544 return fnames
545 545
546 546
547 547 def _filterfull(entry, copy, vfsmap):
548 548 """actually copy the snapshot files"""
549 549 src, name, ftype, data = entry
550 550 if ftype != _filefull:
551 551 return entry
552 552 return (src, name, ftype, copy(vfsmap[src].join(name)))
553 553
554 554
555 555 @contextlib.contextmanager
556 556 def maketempcopies():
557 557 """return a function to temporary copy file"""
558 558
559 559 files = []
560 560 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
561 561 try:
562 562
563 563 def copy(src):
564 564 fd, dst = pycompat.mkstemp(
565 565 prefix=os.path.basename(src), dir=dst_dir
566 566 )
567 567 os.close(fd)
568 568 files.append(dst)
569 569 util.copyfiles(src, dst, hardlink=True)
570 570 return dst
571 571
572 572 yield copy
573 573 finally:
574 574 for tmp in files:
575 575 util.tryunlink(tmp)
576 576 util.tryrmdir(dst_dir)
577 577
578 578
579 579 def _makemap(repo):
580 580 """make a (src -> vfs) map for the repo"""
581 581 vfsmap = {
582 582 _srcstore: repo.svfs,
583 583 _srccache: repo.cachevfs,
584 584 }
585 585 # we keep repo.vfs out of the on purpose, ther are too many danger there
586 586 # (eg: .hg/hgrc)
587 587 assert repo.vfs not in vfsmap.values()
588 588
589 589 return vfsmap
590 590
591 591
592 592 def _emit2(repo, entries, totalfilesize):
593 593 """actually emit the stream bundle"""
594 594 vfsmap = _makemap(repo)
595 595 # we keep repo.vfs out of the on purpose, ther are too many danger there
596 596 # (eg: .hg/hgrc),
597 597 #
598 598 # this assert is duplicated (from _makemap) as author might think this is
599 599 # fine, while this is really not fine.
600 600 if repo.vfs in vfsmap.values():
601 601 raise error.ProgrammingError(
602 602 b'repo.vfs must not be added to vfsmap for security reasons'
603 603 )
604 604
605 605 progress = repo.ui.makeprogress(
606 606 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
607 607 )
608 608 progress.update(0)
609 609 with maketempcopies() as copy, progress:
610 610 # copy is delayed until we are in the try
611 611 entries = [_filterfull(e, copy, vfsmap) for e in entries]
612 612 yield None # this release the lock on the repository
613 613 totalbytecount = 0
614 614
615 615 for src, name, ftype, data in entries:
616 616 vfs = vfsmap[src]
617 617 yield src
618 618 yield util.uvarintencode(len(name))
619 619 if ftype == _fileappend:
620 620 fp = vfs(name)
621 621 size = data
622 622 elif ftype == _filefull:
623 623 fp = open(data, b'rb')
624 624 size = util.fstat(fp).st_size
625 625 bytecount = 0
626 626 try:
627 627 yield util.uvarintencode(size)
628 628 yield name
629 629 if size <= 65536:
630 630 chunks = (fp.read(size),)
631 631 else:
632 632 chunks = util.filechunkiter(fp, limit=size)
633 633 for chunk in chunks:
634 634 bytecount += len(chunk)
635 635 totalbytecount += len(chunk)
636 636 progress.update(totalbytecount)
637 637 yield chunk
638 638 if bytecount != size:
639 639 # Would most likely be caused by a race due to `hg strip` or
640 640 # a revlog split
641 641 raise error.Abort(
642 642 _(
643 643 b'clone could only read %d bytes from %s, but '
644 644 b'expected %d bytes'
645 645 )
646 646 % (bytecount, name, size)
647 647 )
648 648 finally:
649 649 fp.close()
650 650
651 651
652 652 def _test_sync_point_walk_1(repo):
653 653 """a function for synchronisation during tests"""
654 654
655 655
656 656 def _test_sync_point_walk_2(repo):
657 657 """a function for synchronisation during tests"""
658 658
659 659
660 660 def _v2_walk(repo, includes, excludes, includeobsmarkers):
661 661 """emit a seris of files information useful to clone a repo
662 662
663 663 return (entries, totalfilesize)
664 664
665 665 entries is a list of tuple (vfs-key, file-path, file-type, size)
666 666
667 667 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
668 668 - `name`: file path of the file to copy (to be feed to the vfss)
669 669 - `file-type`: do this file need to be copied with the source lock ?
670 670 - `size`: the size of the file (or None)
671 671 """
672 672 assert repo._currentlock(repo._lockref) is not None
673 673 entries = []
674 674 totalfilesize = 0
675 675
676 676 matcher = None
677 677 if includes or excludes:
678 678 matcher = narrowspec.match(repo.root, includes, excludes)
679 679
680 680 for rl_type, name, size in _walkstreamfiles(repo, matcher):
681 681 if size:
682 682 ft = _fileappend
683 683 if rl_type & store.FILEFLAGS_VOLATILE:
684 684 ft = _filefull
685 685 entries.append((_srcstore, name, ft, size))
686 686 totalfilesize += size
687 687 for name in _walkstreamfullstorefiles(repo):
688 688 if repo.svfs.exists(name):
689 689 totalfilesize += repo.svfs.lstat(name).st_size
690 690 entries.append((_srcstore, name, _filefull, None))
691 691 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
692 692 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
693 693 entries.append((_srcstore, b'obsstore', _filefull, None))
694 694 for name in cacheutil.cachetocopy(repo):
695 695 if repo.cachevfs.exists(name):
696 696 totalfilesize += repo.cachevfs.lstat(name).st_size
697 697 entries.append((_srccache, name, _filefull, None))
698 698 return entries, totalfilesize
699 699
700 700
701 701 def generatev2(repo, includes, excludes, includeobsmarkers):
702 702 """Emit content for version 2 of a streaming clone.
703 703
704 704 the data stream consists the following entries:
705 705 1) A char representing the file destination (eg: store or cache)
706 706 2) A varint containing the length of the filename
707 707 3) A varint containing the length of file data
708 708 4) N bytes containing the filename (the internal, store-agnostic form)
709 709 5) N bytes containing the file data
710 710
711 711 Returns a 3-tuple of (file count, file size, data iterator).
712 712 """
713 713
714 714 with repo.lock():
715 715
716 716 repo.ui.debug(b'scanning\n')
717 717
718 718 entries, totalfilesize = _v2_walk(
719 719 repo,
720 720 includes=includes,
721 721 excludes=excludes,
722 722 includeobsmarkers=includeobsmarkers,
723 723 )
724 724
725 725 chunks = _emit2(repo, entries, totalfilesize)
726 726 first = next(chunks)
727 727 assert first is None
728 728 _test_sync_point_walk_1(repo)
729 729 _test_sync_point_walk_2(repo)
730 730
731 731 return len(entries), totalfilesize, chunks
732 732
733 733
734 734 @contextlib.contextmanager
735 735 def nested(*ctxs):
736 736 this = ctxs[0]
737 737 rest = ctxs[1:]
738 738 with this:
739 739 if rest:
740 740 with nested(*rest):
741 741 yield
742 742 else:
743 743 yield
744 744
745 745
746 746 def consumev2(repo, fp, filecount, filesize):
747 747 """Apply the contents from a version 2 streaming clone.
748 748
749 749 Data is read from an object that only needs to provide a ``read(size)``
750 750 method.
751 751 """
752 752 with repo.lock():
753 753 repo.ui.status(
754 754 _(b'%d files to transfer, %s of data\n')
755 755 % (filecount, util.bytecount(filesize))
756 756 )
757 757
758 758 start = util.timer()
759 759 progress = repo.ui.makeprogress(
760 760 _(b'clone'), total=filesize, unit=_(b'bytes')
761 761 )
762 762 progress.update(0)
763 763
764 764 vfsmap = _makemap(repo)
765 765 # we keep repo.vfs out of the on purpose, ther are too many danger
766 766 # there (eg: .hg/hgrc),
767 767 #
768 768 # this assert is duplicated (from _makemap) as author might think this
769 769 # is fine, while this is really not fine.
770 770 if repo.vfs in vfsmap.values():
771 771 raise error.ProgrammingError(
772 772 b'repo.vfs must not be added to vfsmap for security reasons'
773 773 )
774 774
775 775 with repo.transaction(b'clone'):
776 776 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
777 777 with nested(*ctxs):
778 778 for i in range(filecount):
779 779 src = util.readexactly(fp, 1)
780 780 vfs = vfsmap[src]
781 781 namelen = util.uvarintdecodestream(fp)
782 782 datalen = util.uvarintdecodestream(fp)
783 783
784 784 name = util.readexactly(fp, namelen)
785 785
786 786 if repo.ui.debugflag:
787 787 repo.ui.debug(
788 788 b'adding [%s] %s (%s)\n'
789 789 % (src, name, util.bytecount(datalen))
790 790 )
791 791
792 792 with vfs(name, b'w') as ofp:
793 793 for chunk in util.filechunkiter(fp, limit=datalen):
794 794 progress.increment(step=len(chunk))
795 795 ofp.write(chunk)
796 796
797 797 # force @filecache properties to be reloaded from
798 798 # streamclone-ed file at next access
799 799 repo.invalidate(clearfilecache=True)
800 800
801 801 elapsed = util.timer() - start
802 802 if elapsed <= 0:
803 803 elapsed = 0.001
804 804 repo.ui.status(
805 805 _(b'transferred %s in %.1f seconds (%s/sec)\n')
806 806 % (
807 807 util.bytecount(progress.pos),
808 808 elapsed,
809 809 util.bytecount(progress.pos / elapsed),
810 810 )
811 811 )
812 812 progress.complete()
813 813
814 814
815 815 def applybundlev2(repo, fp, filecount, filesize, requirements):
816 816 from . import localrepo
817 817
818 818 missingreqs = [r for r in requirements if r not in repo.supported]
819 819 if missingreqs:
820 820 raise error.Abort(
821 821 _(b'unable to apply stream clone: unsupported format: %s')
822 822 % b', '.join(sorted(missingreqs))
823 823 )
824 824
825 825 consumev2(repo, fp, filecount, filesize)
826 826
827 827 repo.requirements = new_stream_clone_requirements(
828 828 repo.requirements,
829 829 requirements,
830 830 )
831 831 repo.svfs.options = localrepo.resolvestorevfsoptions(
832 832 repo.ui, repo.requirements, repo.features
833 833 )
834 834 scmutil.writereporequirements(repo)
835 835 nodemap.post_stream_cleanup(repo)
836 836
837 837
838 838 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
839 839 hardlink = [True]
840 840
841 841 def copy_used():
842 842 hardlink[0] = False
843 843 progress.topic = _(b'copying')
844 844
845 845 for k, path, size in entries:
846 846 src_vfs = src_vfs_map[k]
847 847 dst_vfs = dst_vfs_map[k]
848 848 src_path = src_vfs.join(path)
849 849 dst_path = dst_vfs.join(path)
850 850 # We cannot use dirname and makedirs of dst_vfs here because the store
851 851 # encoding confuses them. See issue 6581 for details.
852 852 dirname = os.path.dirname(dst_path)
853 853 if not os.path.exists(dirname):
854 854 util.makedirs(dirname)
855 855 dst_vfs.register_file(path)
856 856 # XXX we could use the #nb_bytes argument.
857 857 util.copyfile(
858 858 src_path,
859 859 dst_path,
860 860 hardlink=hardlink[0],
861 861 no_hardlink_cb=copy_used,
862 862 check_fs_hardlink=False,
863 863 )
864 864 progress.increment()
865 865 return hardlink[0]
866 866
867 867
868 868 def local_copy(src_repo, dest_repo):
869 869 """copy all content from one local repository to another
870 870
871 871 This is useful for local clone"""
872 872 src_store_requirements = {
873 873 r
874 874 for r in src_repo.requirements
875 875 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
876 876 }
877 877 dest_store_requirements = {
878 878 r
879 879 for r in dest_repo.requirements
880 880 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
881 881 }
882 882 assert src_store_requirements == dest_store_requirements
883 883
884 884 with dest_repo.lock():
885 885 with src_repo.lock():
886 886
887 887 # bookmark is not integrated to the streaming as it might use the
888 888 # `repo.vfs` and they are too many sentitive data accessible
889 889 # through `repo.vfs` to expose it to streaming clone.
890 890 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
891 891 srcbookmarks = src_book_vfs.join(b'bookmarks')
892 892 bm_count = 0
893 893 if os.path.exists(srcbookmarks):
894 894 bm_count = 1
895 895
896 896 entries, totalfilesize = _v2_walk(
897 897 src_repo,
898 898 includes=None,
899 899 excludes=None,
900 900 includeobsmarkers=True,
901 901 )
902 902 src_vfs_map = _makemap(src_repo)
903 903 dest_vfs_map = _makemap(dest_repo)
904 904 progress = src_repo.ui.makeprogress(
905 905 topic=_(b'linking'),
906 906 total=len(entries) + bm_count,
907 907 unit=_(b'files'),
908 908 )
909 909 # copy files
910 910 #
911 911 # We could copy the full file while the source repository is locked
912 912 # and the other one without the lock. However, in the linking case,
913 913 # this would also requires checks that nobody is appending any data
914 914 # to the files while we do the clone, so this is not done yet. We
915 915 # could do this blindly when copying files.
916 916 files = ((k, path, size) for k, path, ftype, size in entries)
917 917 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
918 918
919 919 # copy bookmarks over
920 920 if bm_count:
921 921 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
922 922 dstbookmarks = dst_book_vfs.join(b'bookmarks')
923 923 util.copyfile(srcbookmarks, dstbookmarks)
924 924 progress.complete()
925 925 if hardlink:
926 926 msg = b'linked %d files\n'
927 927 else:
928 928 msg = b'copied %d files\n'
929 929 src_repo.ui.debug(msg % (len(entries) + bm_count))
930 930
931 931 with dest_repo.transaction(b"localclone") as tr:
932 932 dest_repo.store.write(tr)
933 933
934 934 # clean up transaction file as they do not make sense
935 transaction.cleanup_undo_files(dest_repo)
935 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
@@ -1,884 +1,884 b''
1 1 # transaction.py - simple journaling scheme for mercurial
2 2 #
3 3 # This transaction scheme is intended to gracefully handle program
4 4 # errors and interruptions. More serious failures like system crashes
5 5 # can be recovered with an fsck-like tool. As the whole repository is
6 6 # effectively log-structured, this should amount to simply truncating
7 7 # anything that isn't referenced in the changelog.
8 8 #
9 9 # Copyright 2005, 2006 Olivia Mackall <olivia@selenic.com>
10 10 #
11 11 # This software may be used and distributed according to the terms of the
12 12 # GNU General Public License version 2 or any later version.
13 13
14 14 import errno
15 15 import os
16 16
17 17 from .i18n import _
18 18 from . import (
19 19 error,
20 20 pycompat,
21 21 util,
22 22 )
23 23 from .utils import stringutil
24 24
25 25 version = 2
26 26
27 27 GEN_GROUP_ALL = b'all'
28 28 GEN_GROUP_PRE_FINALIZE = b'prefinalize'
29 29 GEN_GROUP_POST_FINALIZE = b'postfinalize'
30 30
31 31
32 32 def active(func):
33 33 def _active(self, *args, **kwds):
34 34 if self._count == 0:
35 35 raise error.ProgrammingError(
36 36 b'cannot use transaction when it is already committed/aborted'
37 37 )
38 38 return func(self, *args, **kwds)
39 39
40 40 return _active
41 41
42 42
43 43 UNDO_BACKUP = b'undo.backupfiles'
44 44
45 45 UNDO_FILES_MAY_NEED_CLEANUP = [
46 46 (b'plain', b'undo.desc'),
47 47 # Always delete undo last to make sure we detect that a clean up is needed if
48 48 # the process is interrupted.
49 49 (b'store', b'undo'),
50 50 ]
51 51
52 52
53 def cleanup_undo_files(repo):
53 def cleanup_undo_files(report, vfsmap):
54 54 """remove "undo" files used by the rollback logic
55 55
56 56 This is useful to prevent rollback running in situation were it does not
57 57 make sense. For example after a strip.
58 58 """
59 59 backup_entries = []
60 60 undo_files = []
61 vfsmap = repo.vfs_map
61 svfs = vfsmap[b'store']
62 62 try:
63 with repo.svfs(UNDO_BACKUP) as f:
64 backup_entries = read_backup_files(repo.ui.warn, f)
63 with svfs(UNDO_BACKUP) as f:
64 backup_entries = read_backup_files(report, f)
65 65 except OSError as e:
66 66 if e.errno != errno.ENOENT:
67 67 msg = _(b'could not read %s: %s\n')
68 msg %= (repo.svfs.join(UNDO_BACKUP), stringutil.forcebytestr(e))
69 repo.ui.warn(msg)
68 msg %= (svfs.join(UNDO_BACKUP), stringutil.forcebytestr(e))
69 report(msg)
70 70
71 71 for location, f, backup_path, c in backup_entries:
72 72 if location in vfsmap and backup_path:
73 73 undo_files.append((vfsmap[location], backup_path))
74 74
75 undo_files.append((repo.svfs, UNDO_BACKUP))
75 undo_files.append((svfs, UNDO_BACKUP))
76 76 for location, undo_path in UNDO_FILES_MAY_NEED_CLEANUP:
77 77 undo_files.append((vfsmap[location], undo_path))
78 78 for undovfs, undofile in undo_files:
79 79 try:
80 80 undovfs.unlink(undofile)
81 81 except OSError as e:
82 82 if e.errno != errno.ENOENT:
83 83 msg = _(b'error removing %s: %s\n')
84 84 msg %= (undovfs.join(undofile), stringutil.forcebytestr(e))
85 repo.ui.warn(msg)
85 report(msg)
86 86
87 87
88 88 def _playback(
89 89 journal,
90 90 report,
91 91 opener,
92 92 vfsmap,
93 93 entries,
94 94 backupentries,
95 95 unlink=True,
96 96 checkambigfiles=None,
97 97 ):
98 98 for f, o in sorted(dict(entries).items()):
99 99 if o or not unlink:
100 100 checkambig = checkambigfiles and (f, b'') in checkambigfiles
101 101 try:
102 102 fp = opener(f, b'a', checkambig=checkambig)
103 103 if fp.tell() < o:
104 104 raise error.Abort(
105 105 _(
106 106 b"attempted to truncate %s to %d bytes, but it was "
107 107 b"already %d bytes\n"
108 108 )
109 109 % (f, o, fp.tell())
110 110 )
111 111 fp.truncate(o)
112 112 fp.close()
113 113 except IOError:
114 114 report(_(b"failed to truncate %s\n") % f)
115 115 raise
116 116 else:
117 117 try:
118 118 opener.unlink(f)
119 119 except FileNotFoundError:
120 120 pass
121 121
122 122 backupfiles = []
123 123 for l, f, b, c in backupentries:
124 124 if l not in vfsmap and c:
125 125 report(b"couldn't handle %s: unknown cache location %s\n" % (b, l))
126 126 vfs = vfsmap[l]
127 127 try:
128 128 if f and b:
129 129 filepath = vfs.join(f)
130 130 backuppath = vfs.join(b)
131 131 checkambig = checkambigfiles and (f, l) in checkambigfiles
132 132 try:
133 133 util.copyfile(backuppath, filepath, checkambig=checkambig)
134 134 backupfiles.append(b)
135 135 except IOError as exc:
136 136 e_msg = stringutil.forcebytestr(exc)
137 137 report(_(b"failed to recover %s (%s)\n") % (f, e_msg))
138 138 else:
139 139 target = f or b
140 140 try:
141 141 vfs.unlink(target)
142 142 except FileNotFoundError:
143 143 pass
144 144 except (IOError, OSError, error.Abort):
145 145 if not c:
146 146 raise
147 147
148 148 backuppath = b"%s.backupfiles" % journal
149 149 if opener.exists(backuppath):
150 150 opener.unlink(backuppath)
151 151 opener.unlink(journal)
152 152 try:
153 153 for f in backupfiles:
154 154 if opener.exists(f):
155 155 opener.unlink(f)
156 156 except (IOError, OSError, error.Abort):
157 157 # only pure backup file remains, it is sage to ignore any error
158 158 pass
159 159
160 160
161 161 class transaction(util.transactional):
162 162 def __init__(
163 163 self,
164 164 report,
165 165 opener,
166 166 vfsmap,
167 167 journalname,
168 168 undoname=None,
169 169 after=None,
170 170 createmode=None,
171 171 validator=None,
172 172 releasefn=None,
173 173 checkambigfiles=None,
174 174 name='<unnamed>',
175 175 ):
176 176 """Begin a new transaction
177 177
178 178 Begins a new transaction that allows rolling back writes in the event of
179 179 an exception.
180 180
181 181 * `after`: called after the transaction has been committed
182 182 * `createmode`: the mode of the journal file that will be created
183 183 * `releasefn`: called after releasing (with transaction and result)
184 184
185 185 `checkambigfiles` is a set of (path, vfs-location) tuples,
186 186 which determine whether file stat ambiguity should be avoided
187 187 for corresponded files.
188 188 """
189 189 self._count = 1
190 190 self._usages = 1
191 191 self._report = report
192 192 # a vfs to the store content
193 193 self._opener = opener
194 194 # a map to access file in various {location -> vfs}
195 195 vfsmap = vfsmap.copy()
196 196 vfsmap[b''] = opener # set default value
197 197 self._vfsmap = vfsmap
198 198 self._after = after
199 199 self._offsetmap = {}
200 200 self._newfiles = set()
201 201 self._journal = journalname
202 202 self._journal_files = []
203 203 self._undoname = undoname
204 204 self._queue = []
205 205 # A callback to do something just after releasing transaction.
206 206 if releasefn is None:
207 207 releasefn = lambda tr, success: None
208 208 self._releasefn = releasefn
209 209
210 210 self._checkambigfiles = set()
211 211 if checkambigfiles:
212 212 self._checkambigfiles.update(checkambigfiles)
213 213
214 214 self._names = [name]
215 215
216 216 # A dict dedicated to precisely tracking the changes introduced in the
217 217 # transaction.
218 218 self.changes = {}
219 219
220 220 # a dict of arguments to be passed to hooks
221 221 self.hookargs = {}
222 222 self._file = opener.open(self._journal, b"w+")
223 223
224 224 # a list of ('location', 'path', 'backuppath', cache) entries.
225 225 # - if 'backuppath' is empty, no file existed at backup time
226 226 # - if 'path' is empty, this is a temporary transaction file
227 227 # - if 'location' is not empty, the path is outside main opener reach.
228 228 # use 'location' value as a key in a vfsmap to find the right 'vfs'
229 229 # (cache is currently unused)
230 230 self._backupentries = []
231 231 self._backupmap = {}
232 232 self._backupjournal = b"%s.backupfiles" % self._journal
233 233 self._backupsfile = opener.open(self._backupjournal, b'w')
234 234 self._backupsfile.write(b'%d\n' % version)
235 235
236 236 if createmode is not None:
237 237 opener.chmod(self._journal, createmode & 0o666)
238 238 opener.chmod(self._backupjournal, createmode & 0o666)
239 239
240 240 # hold file generations to be performed on commit
241 241 self._filegenerators = {}
242 242 # hold callback to write pending data for hooks
243 243 self._pendingcallback = {}
244 244 # True is any pending data have been written ever
245 245 self._anypending = False
246 246 # holds callback to call when writing the transaction
247 247 self._finalizecallback = {}
248 248 # holds callback to call when validating the transaction
249 249 # should raise exception if anything is wrong
250 250 self._validatecallback = {}
251 251 if validator is not None:
252 252 self._validatecallback[b'001-userhooks'] = validator
253 253 # hold callback for post transaction close
254 254 self._postclosecallback = {}
255 255 # holds callbacks to call during abort
256 256 self._abortcallback = {}
257 257
258 258 def __repr__(self):
259 259 name = '/'.join(self._names)
260 260 return '<transaction name=%s, count=%d, usages=%d>' % (
261 261 name,
262 262 self._count,
263 263 self._usages,
264 264 )
265 265
266 266 def __del__(self):
267 267 if self._journal:
268 268 self._abort()
269 269
270 270 @property
271 271 def finalized(self):
272 272 return self._finalizecallback is None
273 273
274 274 @active
275 275 def startgroup(self):
276 276 """delay registration of file entry
277 277
278 278 This is used by strip to delay vision of strip offset. The transaction
279 279 sees either none or all of the strip actions to be done."""
280 280 self._queue.append([])
281 281
282 282 @active
283 283 def endgroup(self):
284 284 """apply delayed registration of file entry.
285 285
286 286 This is used by strip to delay vision of strip offset. The transaction
287 287 sees either none or all of the strip actions to be done."""
288 288 q = self._queue.pop()
289 289 for f, o in q:
290 290 self._addentry(f, o)
291 291
292 292 @active
293 293 def add(self, file, offset):
294 294 """record the state of an append-only file before update"""
295 295 if (
296 296 file in self._newfiles
297 297 or file in self._offsetmap
298 298 or file in self._backupmap
299 299 ):
300 300 return
301 301 if self._queue:
302 302 self._queue[-1].append((file, offset))
303 303 return
304 304
305 305 self._addentry(file, offset)
306 306
307 307 def _addentry(self, file, offset):
308 308 """add a append-only entry to memory and on-disk state"""
309 309 if (
310 310 file in self._newfiles
311 311 or file in self._offsetmap
312 312 or file in self._backupmap
313 313 ):
314 314 return
315 315 if offset:
316 316 self._offsetmap[file] = offset
317 317 else:
318 318 self._newfiles.add(file)
319 319 # add enough data to the journal to do the truncate
320 320 self._file.write(b"%s\0%d\n" % (file, offset))
321 321 self._file.flush()
322 322
323 323 @active
324 324 def addbackup(self, file, hardlink=True, location=b''):
325 325 """Adds a backup of the file to the transaction
326 326
327 327 Calling addbackup() creates a hardlink backup of the specified file
328 328 that is used to recover the file in the event of the transaction
329 329 aborting.
330 330
331 331 * `file`: the file path, relative to .hg/store
332 332 * `hardlink`: use a hardlink to quickly create the backup
333 333 """
334 334 if self._queue:
335 335 msg = b'cannot use transaction.addbackup inside "group"'
336 336 raise error.ProgrammingError(msg)
337 337
338 338 if (
339 339 file in self._newfiles
340 340 or file in self._offsetmap
341 341 or file in self._backupmap
342 342 ):
343 343 return
344 344 vfs = self._vfsmap[location]
345 345 dirname, filename = vfs.split(file)
346 346 backupfilename = b"%s.backup.%s" % (self._journal, filename)
347 347 backupfile = vfs.reljoin(dirname, backupfilename)
348 348 if vfs.exists(file):
349 349 filepath = vfs.join(file)
350 350 backuppath = vfs.join(backupfile)
351 351 util.copyfile(filepath, backuppath, hardlink=hardlink)
352 352 else:
353 353 backupfile = b''
354 354
355 355 self._addbackupentry((location, file, backupfile, False))
356 356
357 357 def _addbackupentry(self, entry):
358 358 """register a new backup entry and write it to disk"""
359 359 self._backupentries.append(entry)
360 360 self._backupmap[entry[1]] = len(self._backupentries) - 1
361 361 self._backupsfile.write(b"%s\0%s\0%s\0%d\n" % entry)
362 362 self._backupsfile.flush()
363 363
364 364 @active
365 365 def registertmp(self, tmpfile, location=b''):
366 366 """register a temporary transaction file
367 367
368 368 Such files will be deleted when the transaction exits (on both
369 369 failure and success).
370 370 """
371 371 self._addbackupentry((location, b'', tmpfile, False))
372 372
373 373 @active
374 374 def addfilegenerator(
375 375 self,
376 376 genid,
377 377 filenames,
378 378 genfunc,
379 379 order=0,
380 380 location=b'',
381 381 post_finalize=False,
382 382 ):
383 383 """add a function to generates some files at transaction commit
384 384
385 385 The `genfunc` argument is a function capable of generating proper
386 386 content of each entry in the `filename` tuple.
387 387
388 388 At transaction close time, `genfunc` will be called with one file
389 389 object argument per entries in `filenames`.
390 390
391 391 The transaction itself is responsible for the backup, creation and
392 392 final write of such file.
393 393
394 394 The `genid` argument is used to ensure the same set of file is only
395 395 generated once. Call to `addfilegenerator` for a `genid` already
396 396 present will overwrite the old entry.
397 397
398 398 The `order` argument may be used to control the order in which multiple
399 399 generator will be executed.
400 400
401 401 The `location` arguments may be used to indicate the files are located
402 402 outside of the the standard directory for transaction. It should match
403 403 one of the key of the `transaction.vfsmap` dictionary.
404 404
405 405 The `post_finalize` argument can be set to `True` for file generation
406 406 that must be run after the transaction has been finalized.
407 407 """
408 408 # For now, we are unable to do proper backup and restore of custom vfs
409 409 # but for bookmarks that are handled outside this mechanism.
410 410 entry = (order, filenames, genfunc, location, post_finalize)
411 411 self._filegenerators[genid] = entry
412 412
413 413 @active
414 414 def removefilegenerator(self, genid):
415 415 """reverse of addfilegenerator, remove a file generator function"""
416 416 if genid in self._filegenerators:
417 417 del self._filegenerators[genid]
418 418
419 419 def _generatefiles(self, suffix=b'', group=GEN_GROUP_ALL):
420 420 # write files registered for generation
421 421 any = False
422 422
423 423 if group == GEN_GROUP_ALL:
424 424 skip_post = skip_pre = False
425 425 else:
426 426 skip_pre = group == GEN_GROUP_POST_FINALIZE
427 427 skip_post = group == GEN_GROUP_PRE_FINALIZE
428 428
429 429 for id, entry in sorted(self._filegenerators.items()):
430 430 any = True
431 431 order, filenames, genfunc, location, post_finalize = entry
432 432
433 433 # for generation at closing, check if it's before or after finalize
434 434 if skip_post and post_finalize:
435 435 continue
436 436 elif skip_pre and not post_finalize:
437 437 continue
438 438
439 439 vfs = self._vfsmap[location]
440 440 files = []
441 441 try:
442 442 for name in filenames:
443 443 name += suffix
444 444 if suffix:
445 445 self.registertmp(name, location=location)
446 446 checkambig = False
447 447 else:
448 448 self.addbackup(name, location=location)
449 449 checkambig = (name, location) in self._checkambigfiles
450 450 files.append(
451 451 vfs(name, b'w', atomictemp=True, checkambig=checkambig)
452 452 )
453 453 genfunc(*files)
454 454 for f in files:
455 455 f.close()
456 456 # skip discard() loop since we're sure no open file remains
457 457 del files[:]
458 458 finally:
459 459 for f in files:
460 460 f.discard()
461 461 return any
462 462
463 463 @active
464 464 def findoffset(self, file):
465 465 if file in self._newfiles:
466 466 return 0
467 467 return self._offsetmap.get(file)
468 468
469 469 @active
470 470 def readjournal(self):
471 471 self._file.seek(0)
472 472 entries = []
473 473 for l in self._file.readlines():
474 474 file, troffset = l.split(b'\0')
475 475 entries.append((file, int(troffset)))
476 476 return entries
477 477
478 478 @active
479 479 def replace(self, file, offset):
480 480 """
481 481 replace can only replace already committed entries
482 482 that are not pending in the queue
483 483 """
484 484 if file in self._newfiles:
485 485 if not offset:
486 486 return
487 487 self._newfiles.remove(file)
488 488 self._offsetmap[file] = offset
489 489 elif file in self._offsetmap:
490 490 if not offset:
491 491 del self._offsetmap[file]
492 492 self._newfiles.add(file)
493 493 else:
494 494 self._offsetmap[file] = offset
495 495 else:
496 496 raise KeyError(file)
497 497 self._file.write(b"%s\0%d\n" % (file, offset))
498 498 self._file.flush()
499 499
500 500 @active
501 501 def nest(self, name='<unnamed>'):
502 502 self._count += 1
503 503 self._usages += 1
504 504 self._names.append(name)
505 505 return self
506 506
507 507 def release(self):
508 508 if self._count > 0:
509 509 self._usages -= 1
510 510 if self._names:
511 511 self._names.pop()
512 512 # if the transaction scopes are left without being closed, fail
513 513 if self._count > 0 and self._usages == 0:
514 514 self._abort()
515 515
516 516 def running(self):
517 517 return self._count > 0
518 518
519 519 def addpending(self, category, callback):
520 520 """add a callback to be called when the transaction is pending
521 521
522 522 The transaction will be given as callback's first argument.
523 523
524 524 Category is a unique identifier to allow overwriting an old callback
525 525 with a newer callback.
526 526 """
527 527 self._pendingcallback[category] = callback
528 528
529 529 @active
530 530 def writepending(self):
531 531 """write pending file to temporary version
532 532
533 533 This is used to allow hooks to view a transaction before commit"""
534 534 categories = sorted(self._pendingcallback)
535 535 for cat in categories:
536 536 # remove callback since the data will have been flushed
537 537 any = self._pendingcallback.pop(cat)(self)
538 538 self._anypending = self._anypending or any
539 539 self._anypending |= self._generatefiles(suffix=b'.pending')
540 540 return self._anypending
541 541
542 542 @active
543 543 def hasfinalize(self, category):
544 544 """check is a callback already exist for a category"""
545 545 return category in self._finalizecallback
546 546
547 547 @active
548 548 def addfinalize(self, category, callback):
549 549 """add a callback to be called when the transaction is closed
550 550
551 551 The transaction will be given as callback's first argument.
552 552
553 553 Category is a unique identifier to allow overwriting old callbacks with
554 554 newer callbacks.
555 555 """
556 556 self._finalizecallback[category] = callback
557 557
558 558 @active
559 559 def addpostclose(self, category, callback):
560 560 """add or replace a callback to be called after the transaction closed
561 561
562 562 The transaction will be given as callback's first argument.
563 563
564 564 Category is a unique identifier to allow overwriting an old callback
565 565 with a newer callback.
566 566 """
567 567 self._postclosecallback[category] = callback
568 568
569 569 @active
570 570 def getpostclose(self, category):
571 571 """return a postclose callback added before, or None"""
572 572 return self._postclosecallback.get(category, None)
573 573
574 574 @active
575 575 def addabort(self, category, callback):
576 576 """add a callback to be called when the transaction is aborted.
577 577
578 578 The transaction will be given as the first argument to the callback.
579 579
580 580 Category is a unique identifier to allow overwriting an old callback
581 581 with a newer callback.
582 582 """
583 583 self._abortcallback[category] = callback
584 584
585 585 @active
586 586 def addvalidator(self, category, callback):
587 587 """adds a callback to be called when validating the transaction.
588 588
589 589 The transaction will be given as the first argument to the callback.
590 590
591 591 callback should raise exception if to abort transaction"""
592 592 self._validatecallback[category] = callback
593 593
594 594 @active
595 595 def close(self):
596 596 '''commit the transaction'''
597 597 if self._count == 1:
598 598 for category in sorted(self._validatecallback):
599 599 self._validatecallback[category](self)
600 600 self._validatecallback = None # Help prevent cycles.
601 601 self._generatefiles(group=GEN_GROUP_PRE_FINALIZE)
602 602 while self._finalizecallback:
603 603 callbacks = self._finalizecallback
604 604 self._finalizecallback = {}
605 605 categories = sorted(callbacks)
606 606 for cat in categories:
607 607 callbacks[cat](self)
608 608 # Prevent double usage and help clear cycles.
609 609 self._finalizecallback = None
610 610 self._generatefiles(group=GEN_GROUP_POST_FINALIZE)
611 611
612 612 self._count -= 1
613 613 if self._count != 0:
614 614 return
615 615 self._file.close()
616 616 self._backupsfile.close()
617 617 # cleanup temporary files
618 618 for l, f, b, c in self._backupentries:
619 619 if l not in self._vfsmap and c:
620 620 self._report(
621 621 b"couldn't remove %s: unknown cache location %s\n" % (b, l)
622 622 )
623 623 continue
624 624 vfs = self._vfsmap[l]
625 625 if not f and b and vfs.exists(b):
626 626 try:
627 627 vfs.unlink(b)
628 628 except (IOError, OSError, error.Abort) as inst:
629 629 if not c:
630 630 raise
631 631 # Abort may be raise by read only opener
632 632 self._report(
633 633 b"couldn't remove %s: %s\n" % (vfs.join(b), inst)
634 634 )
635 635 self._offsetmap = {}
636 636 self._newfiles = set()
637 637 self._writeundo()
638 638 if self._after:
639 639 self._after()
640 640 self._after = None # Help prevent cycles.
641 641 if self._opener.isfile(self._backupjournal):
642 642 self._opener.unlink(self._backupjournal)
643 643 if self._opener.isfile(self._journal):
644 644 self._opener.unlink(self._journal)
645 645 for l, _f, b, c in self._backupentries:
646 646 if l not in self._vfsmap and c:
647 647 self._report(
648 648 b"couldn't remove %s: unknown cache location"
649 649 b"%s\n" % (b, l)
650 650 )
651 651 continue
652 652 vfs = self._vfsmap[l]
653 653 if b and vfs.exists(b):
654 654 try:
655 655 vfs.unlink(b)
656 656 except (IOError, OSError, error.Abort) as inst:
657 657 if not c:
658 658 raise
659 659 # Abort may be raise by read only opener
660 660 self._report(
661 661 b"couldn't remove %s: %s\n" % (vfs.join(b), inst)
662 662 )
663 663 self._backupentries = []
664 664 self._journal = None
665 665
666 666 self._releasefn(self, True) # notify success of closing transaction
667 667 self._releasefn = None # Help prevent cycles.
668 668
669 669 # run post close action
670 670 categories = sorted(self._postclosecallback)
671 671 for cat in categories:
672 672 self._postclosecallback[cat](self)
673 673 # Prevent double usage and help clear cycles.
674 674 self._postclosecallback = None
675 675
676 676 @active
677 677 def abort(self):
678 678 """abort the transaction (generally called on error, or when the
679 679 transaction is not explicitly committed before going out of
680 680 scope)"""
681 681 self._abort()
682 682
683 683 @active
684 684 def add_journal(self, vfs_id, path):
685 685 self._journal_files.append((vfs_id, path))
686 686
687 687 def _writeundo(self):
688 688 """write transaction data for possible future undo call"""
689 689 if self._undoname is None:
690 690 return
691 691
692 692 def undoname(fn: bytes) -> bytes:
693 693 base, name = os.path.split(fn)
694 694 assert name.startswith(self._journal)
695 695 new_name = name.replace(self._journal, self._undoname, 1)
696 696 return os.path.join(base, new_name)
697 697
698 698 undo_backup_path = b"%s.backupfiles" % self._undoname
699 699 undobackupfile = self._opener.open(undo_backup_path, b'w')
700 700 undobackupfile.write(b'%d\n' % version)
701 701 for l, f, b, c in self._backupentries:
702 702 if not f: # temporary file
703 703 continue
704 704 if not b:
705 705 u = b''
706 706 else:
707 707 if l not in self._vfsmap and c:
708 708 self._report(
709 709 b"couldn't remove %s: unknown cache location"
710 710 b"%s\n" % (b, l)
711 711 )
712 712 continue
713 713 vfs = self._vfsmap[l]
714 714 u = undoname(b)
715 715 util.copyfile(vfs.join(b), vfs.join(u), hardlink=True)
716 716 undobackupfile.write(b"%s\0%s\0%s\0%d\n" % (l, f, u, c))
717 717 undobackupfile.close()
718 718 for vfs, src in self._journal_files:
719 719 dest = undoname(src)
720 720 # if src and dest refer to a same file, vfs.rename is a no-op,
721 721 # leaving both src and dest on disk. delete dest to make sure
722 722 # the rename couldn't be such a no-op.
723 723 vfs.tryunlink(dest)
724 724 try:
725 725 vfs.rename(src, dest)
726 726 except FileNotFoundError: # journal file does not yet exist
727 727 pass
728 728
729 729 def _abort(self):
730 730 entries = self.readjournal()
731 731 self._count = 0
732 732 self._usages = 0
733 733 self._file.close()
734 734 self._backupsfile.close()
735 735
736 736 quick = self._can_quick_abort(entries)
737 737 try:
738 738 if not quick:
739 739 self._report(_(b"transaction abort!\n"))
740 740 for cat in sorted(self._abortcallback):
741 741 self._abortcallback[cat](self)
742 742 # Prevent double usage and help clear cycles.
743 743 self._abortcallback = None
744 744 if quick:
745 745 self._do_quick_abort(entries)
746 746 else:
747 747 self._do_full_abort(entries)
748 748 finally:
749 749 self._journal = None
750 750 self._releasefn(self, False) # notify failure of transaction
751 751 self._releasefn = None # Help prevent cycles.
752 752
753 753 def _can_quick_abort(self, entries):
754 754 """False if any semantic content have been written on disk
755 755
756 756 True if nothing, except temporary files has been writen on disk."""
757 757 if entries:
758 758 return False
759 759 for e in self._backupentries:
760 760 if e[1]:
761 761 return False
762 762 return True
763 763
764 764 def _do_quick_abort(self, entries):
765 765 """(Silently) do a quick cleanup (see _can_quick_abort)"""
766 766 assert self._can_quick_abort(entries)
767 767 tmp_files = [e for e in self._backupentries if not e[1]]
768 768 for vfs_id, old_path, tmp_path, xxx in tmp_files:
769 769 vfs = self._vfsmap[vfs_id]
770 770 try:
771 771 vfs.unlink(tmp_path)
772 772 except FileNotFoundError:
773 773 pass
774 774 if self._backupjournal:
775 775 self._opener.unlink(self._backupjournal)
776 776 if self._journal:
777 777 self._opener.unlink(self._journal)
778 778
779 779 def _do_full_abort(self, entries):
780 780 """(Noisily) rollback all the change introduced by the transaction"""
781 781 try:
782 782 _playback(
783 783 self._journal,
784 784 self._report,
785 785 self._opener,
786 786 self._vfsmap,
787 787 entries,
788 788 self._backupentries,
789 789 False,
790 790 checkambigfiles=self._checkambigfiles,
791 791 )
792 792 self._report(_(b"rollback completed\n"))
793 793 except BaseException as exc:
794 794 self._report(_(b"rollback failed - please run hg recover\n"))
795 795 self._report(
796 796 _(b"(failure reason: %s)\n") % stringutil.forcebytestr(exc)
797 797 )
798 798
799 799
800 800 BAD_VERSION_MSG = _(
801 801 b"journal was created by a different version of Mercurial\n"
802 802 )
803 803
804 804
805 805 def read_backup_files(report, fp):
806 806 """parse an (already open) backup file an return contained backup entries
807 807
808 808 entries are in the form: (location, file, backupfile, xxx)
809 809
810 810 :location: the vfs identifier (vfsmap's key)
811 811 :file: original file path (in the vfs)
812 812 :backupfile: path of the backup (in the vfs)
813 813 :cache: a boolean currently always set to False
814 814 """
815 815 lines = fp.readlines()
816 816 backupentries = []
817 817 if lines:
818 818 ver = lines[0][:-1]
819 819 if ver != (b'%d' % version):
820 820 report(BAD_VERSION_MSG)
821 821 else:
822 822 for line in lines[1:]:
823 823 if line:
824 824 # Shave off the trailing newline
825 825 line = line[:-1]
826 826 l, f, b, c = line.split(b'\0')
827 827 backupentries.append((l, f, b, bool(c)))
828 828 return backupentries
829 829
830 830
831 831 def rollback(
832 832 opener,
833 833 vfsmap,
834 834 file,
835 835 report,
836 836 checkambigfiles=None,
837 837 skip_journal_pattern=None,
838 838 ):
839 839 """Rolls back the transaction contained in the given file
840 840
841 841 Reads the entries in the specified file, and the corresponding
842 842 '*.backupfiles' file, to recover from an incomplete transaction.
843 843
844 844 * `file`: a file containing a list of entries, specifying where
845 845 to truncate each file. The file should contain a list of
846 846 file\0offset pairs, delimited by newlines. The corresponding
847 847 '*.backupfiles' file should contain a list of file\0backupfile
848 848 pairs, delimited by \0.
849 849
850 850 `checkambigfiles` is a set of (path, vfs-location) tuples,
851 851 which determine whether file stat ambiguity should be avoided at
852 852 restoring corresponded files.
853 853 """
854 854 entries = []
855 855 backupentries = []
856 856
857 857 with opener.open(file) as fp:
858 858 lines = fp.readlines()
859 859 for l in lines:
860 860 try:
861 861 f, o = l.split(b'\0')
862 862 entries.append((f, int(o)))
863 863 except ValueError:
864 864 report(
865 865 _(b"couldn't read journal entry %r!\n") % pycompat.bytestr(l)
866 866 )
867 867
868 868 backupjournal = b"%s.backupfiles" % file
869 869 if opener.exists(backupjournal):
870 870 with opener.open(backupjournal) as fp:
871 871 backupentries = read_backup_files(report, fp)
872 872 if skip_journal_pattern is not None:
873 873 keep = lambda x: not skip_journal_pattern.match(x[1])
874 874 backupentries = [x for x in backupentries if keep(x)]
875 875
876 876 _playback(
877 877 file,
878 878 report,
879 879 opener,
880 880 vfsmap,
881 881 entries,
882 882 backupentries,
883 883 checkambigfiles=checkambigfiles,
884 884 )
General Comments 0
You need to be logged in to leave comments. Login now