##// END OF EJS Templates
transaction: support cache file in backupentries...
Pierre-Yves David -
r23312:006e9ef0 default
parent child Browse files
Show More
@@ -1,468 +1,500 b''
1 1 # transaction.py - simple journaling scheme for mercurial
2 2 #
3 3 # This transaction scheme is intended to gracefully handle program
4 4 # errors and interruptions. More serious failures like system crashes
5 5 # can be recovered with an fsck-like tool. As the whole repository is
6 6 # effectively log-structured, this should amount to simply truncating
7 7 # anything that isn't referenced in the changelog.
8 8 #
9 9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
10 10 #
11 11 # This software may be used and distributed according to the terms of the
12 12 # GNU General Public License version 2 or any later version.
13 13
14 14 from i18n import _
15 15 import errno
16 16 import error, util
17 17
18 18 version = 0
19 19
20 20 def active(func):
21 21 def _active(self, *args, **kwds):
22 22 if self.count == 0:
23 23 raise error.Abort(_(
24 24 'cannot use transaction when it is already committed/aborted'))
25 25 return func(self, *args, **kwds)
26 26 return _active
27 27
28 28 def _playback(journal, report, opener, vfsmap, entries, backupentries,
29 29 unlink=True):
30 30 for f, o, _ignore in entries:
31 31 if o or not unlink:
32 32 try:
33 33 fp = opener(f, 'a')
34 34 fp.truncate(o)
35 35 fp.close()
36 36 except IOError:
37 37 report(_("failed to truncate %s\n") % f)
38 38 raise
39 39 else:
40 40 try:
41 41 opener.unlink(f)
42 42 except (IOError, OSError), inst:
43 43 if inst.errno != errno.ENOENT:
44 44 raise
45 45
46 46 backupfiles = []
47 47 for l, f, b, c in backupentries:
48 if l not in vfsmap and c:
49 report("couldn't handle %s: unknown cache location %s\n"
50 % (b, l))
48 51 vfs = vfsmap[l]
52 try:
49 53 if f and b:
50 54 filepath = vfs.join(f)
51 55 backuppath = vfs.join(b)
52 56 try:
53 57 util.copyfile(backuppath, filepath)
54 58 backupfiles.append(b)
55 59 except IOError:
56 60 report(_("failed to recover %s\n") % f)
57 raise
58 61 else:
59 62 target = f or b
60 63 try:
61 64 vfs.unlink(target)
62 65 except (IOError, OSError), inst:
63 66 if inst.errno != errno.ENOENT:
64 67 raise
68 except (IOError, OSError, util.Abort), inst:
69 if not c:
70 raise
65 71
66 72 opener.unlink(journal)
67 73 backuppath = "%s.backupfiles" % journal
68 74 if opener.exists(backuppath):
69 75 opener.unlink(backuppath)
76 try:
70 77 for f in backupfiles:
71 78 if opener.exists(f):
72 79 opener.unlink(f)
80 except (IOError, OSError, util.Abort), inst:
81 # only pure backup file remains, it is sage to ignore any error
82 pass
73 83
74 84 class transaction(object):
75 85 def __init__(self, report, opener, vfsmap, journal, after=None,
76 86 createmode=None, onclose=None, onabort=None):
77 87 """Begin a new transaction
78 88
79 89 Begins a new transaction that allows rolling back writes in the event of
80 90 an exception.
81 91
82 92 * `after`: called after the transaction has been committed
83 93 * `createmode`: the mode of the journal file that will be created
84 94 * `onclose`: called as the transaction is closing, but before it is
85 95 closed
86 96 * `onabort`: called as the transaction is aborting, but before any files
87 97 have been truncated
88 98 """
89 99 self.count = 1
90 100 self.usages = 1
91 101 self.report = report
92 102 # a vfs to the store content
93 103 self.opener = opener
94 104 # a map to access file in various {location -> vfs}
95 105 vfsmap = vfsmap.copy()
96 106 vfsmap[''] = opener # set default value
97 107 self._vfsmap = vfsmap
98 108 self.after = after
99 109 self.onclose = onclose
100 110 self.onabort = onabort
101 111 self.entries = []
102 112 self.map = {}
103 113 self.journal = journal
104 114 self._queue = []
105 115 # a dict of arguments to be passed to hooks
106 116 self.hookargs = {}
107 117 self.file = opener.open(self.journal, "w")
108 118
109 119 # a list of ('location', 'path', 'backuppath', cache) entries.
110 120 # - if 'backuppath' is empty, no file existed at backup time
111 121 # - if 'path' is empty, this is a temporary transaction file
112 122 # - if 'location' is not empty, the path is outside main opener reach.
113 123 # use 'location' value as a key in a vfsmap to find the right 'vfs'
114 124 # (cache is currently unused)
115 125 self._backupentries = []
116 126 self._backupmap = {}
117 127 self._backupjournal = "%s.backupfiles" % journal
118 128 self._backupsfile = opener.open(self._backupjournal, 'w')
119 129 self._backupsfile.write('%d\n' % version)
120 130
121 131 if createmode is not None:
122 132 opener.chmod(self.journal, createmode & 0666)
123 133 opener.chmod(self._backupjournal, createmode & 0666)
124 134
125 135 # hold file generations to be performed on commit
126 136 self._filegenerators = {}
127 137 # hold callbalk to write pending data for hooks
128 138 self._pendingcallback = {}
129 139 # True is any pending data have been written ever
130 140 self._anypending = False
131 141 # holds callback to call when writing the transaction
132 142 self._finalizecallback = {}
133 143 # hold callbalk for post transaction close
134 144 self._postclosecallback = {}
135 145
136 146 def __del__(self):
137 147 if self.journal:
138 148 self._abort()
139 149
140 150 @active
141 151 def startgroup(self):
142 152 """delay registration of file entry
143 153
144 154 This is used by strip to delay vision of strip offset. The transaction
145 155 sees either none or all of the strip actions to be done."""
146 156 self._queue.append([])
147 157
148 158 @active
149 159 def endgroup(self):
150 160 """apply delayed registration of file entry.
151 161
152 162 This is used by strip to delay vision of strip offset. The transaction
153 163 sees either none or all of the strip actions to be done."""
154 164 q = self._queue.pop()
155 165 for f, o, data in q:
156 166 self._addentry(f, o, data)
157 167
158 168 @active
159 169 def add(self, file, offset, data=None):
160 170 """record the state of an append-only file before update"""
161 171 if file in self.map or file in self._backupmap:
162 172 return
163 173 if self._queue:
164 174 self._queue[-1].append((file, offset, data))
165 175 return
166 176
167 177 self._addentry(file, offset, data)
168 178
169 179 def _addentry(self, file, offset, data):
170 180 """add a append-only entry to memory and on-disk state"""
171 181 if file in self.map or file in self._backupmap:
172 182 return
173 183 self.entries.append((file, offset, data))
174 184 self.map[file] = len(self.entries) - 1
175 185 # add enough data to the journal to do the truncate
176 186 self.file.write("%s\0%d\n" % (file, offset))
177 187 self.file.flush()
178 188
179 189 @active
180 190 def addbackup(self, file, hardlink=True, vfs=None):
181 191 """Adds a backup of the file to the transaction
182 192
183 193 Calling addbackup() creates a hardlink backup of the specified file
184 194 that is used to recover the file in the event of the transaction
185 195 aborting.
186 196
187 197 * `file`: the file path, relative to .hg/store
188 198 * `hardlink`: use a hardlink to quickly create the backup
189 199 """
190 200 if self._queue:
191 201 msg = 'cannot use transaction.addbackup inside "group"'
192 202 raise RuntimeError(msg)
193 203
194 204 if file in self.map or file in self._backupmap:
195 205 return
196 206 backupfile = "%s.backup.%s" % (self.journal, file)
197 207 if vfs is None:
198 208 vfs = self.opener
199 209 if vfs.exists(file):
200 210 filepath = vfs.join(file)
201 211 backuppath = self.opener.join(backupfile)
202 212 util.copyfiles(filepath, backuppath, hardlink=hardlink)
203 213 else:
204 214 backupfile = ''
205 215
206 216 self._addbackupentry(('', file, backupfile, False))
207 217
208 218 def _addbackupentry(self, entry):
209 219 """register a new backup entry and write it to disk"""
210 220 self._backupentries.append(entry)
211 221 self._backupmap[file] = len(self._backupentries) - 1
212 222 self._backupsfile.write("%s\0%s\0%s\0%d\n" % entry)
213 223 self._backupsfile.flush()
214 224
215 225 @active
216 226 def registertmp(self, tmpfile):
217 227 """register a temporary transaction file
218 228
219 229 Such file will be delete when the transaction exit (on both failure and
220 230 success).
221 231 """
222 232 self._addbackupentry(('', '', tmpfile, False))
223 233
224 234 @active
225 235 def addfilegenerator(self, genid, filenames, genfunc, order=0, vfs=None):
226 236 """add a function to generates some files at transaction commit
227 237
228 238 The `genfunc` argument is a function capable of generating proper
229 239 content of each entry in the `filename` tuple.
230 240
231 241 At transaction close time, `genfunc` will be called with one file
232 242 object argument per entries in `filenames`.
233 243
234 244 The transaction itself is responsible for the backup, creation and
235 245 final write of such file.
236 246
237 247 The `genid` argument is used to ensure the same set of file is only
238 248 generated once. Call to `addfilegenerator` for a `genid` already
239 249 present will overwrite the old entry.
240 250
241 251 The `order` argument may be used to control the order in which multiple
242 252 generator will be executed.
243 253 """
244 254 # For now, we are unable to do proper backup and restore of custom vfs
245 255 # but for bookmarks that are handled outside this mechanism.
246 256 assert vfs is None or filenames == ('bookmarks',)
247 257 self._filegenerators[genid] = (order, filenames, genfunc, vfs)
248 258
249 259 def _generatefiles(self):
250 260 # write files registered for generation
251 261 for entry in sorted(self._filegenerators.values()):
252 262 order, filenames, genfunc, vfs = entry
253 263 if vfs is None:
254 264 vfs = self.opener
255 265 files = []
256 266 try:
257 267 for name in filenames:
258 268 # Some files are already backed up when creating the
259 269 # localrepo. Until this is properly fixed we disable the
260 270 # backup for them.
261 271 if name not in ('phaseroots', 'bookmarks'):
262 272 self.addbackup(name)
263 273 files.append(vfs(name, 'w', atomictemp=True))
264 274 genfunc(*files)
265 275 finally:
266 276 for f in files:
267 277 f.close()
268 278
269 279 @active
270 280 def find(self, file):
271 281 if file in self.map:
272 282 return self.entries[self.map[file]]
273 283 if file in self._backupmap:
274 284 return self._backupentries[self._backupmap[file]]
275 285 return None
276 286
277 287 @active
278 288 def replace(self, file, offset, data=None):
279 289 '''
280 290 replace can only replace already committed entries
281 291 that are not pending in the queue
282 292 '''
283 293
284 294 if file not in self.map:
285 295 raise KeyError(file)
286 296 index = self.map[file]
287 297 self.entries[index] = (file, offset, data)
288 298 self.file.write("%s\0%d\n" % (file, offset))
289 299 self.file.flush()
290 300
291 301 @active
292 302 def nest(self):
293 303 self.count += 1
294 304 self.usages += 1
295 305 return self
296 306
297 307 def release(self):
298 308 if self.count > 0:
299 309 self.usages -= 1
300 310 # if the transaction scopes are left without being closed, fail
301 311 if self.count > 0 and self.usages == 0:
302 312 self._abort()
303 313
304 314 def running(self):
305 315 return self.count > 0
306 316
307 317 def addpending(self, category, callback):
308 318 """add a callback to be called when the transaction is pending
309 319
310 320 The transaction will be given as callback's first argument.
311 321
312 322 Category is a unique identifier to allow overwriting an old callback
313 323 with a newer callback.
314 324 """
315 325 self._pendingcallback[category] = callback
316 326
317 327 @active
318 328 def writepending(self):
319 329 '''write pending file to temporary version
320 330
321 331 This is used to allow hooks to view a transaction before commit'''
322 332 categories = sorted(self._pendingcallback)
323 333 for cat in categories:
324 334 # remove callback since the data will have been flushed
325 335 any = self._pendingcallback.pop(cat)(self)
326 336 self._anypending = self._anypending or any
327 337 return self._anypending
328 338
329 339 @active
330 340 def addfinalize(self, category, callback):
331 341 """add a callback to be called when the transaction is closed
332 342
333 343 The transaction will be given as callback's first argument.
334 344
335 345 Category is a unique identifier to allow overwriting old callbacks with
336 346 newer callbacks.
337 347 """
338 348 self._finalizecallback[category] = callback
339 349
340 350 @active
341 351 def addpostclose(self, category, callback):
342 352 """add a callback to be called after the transaction is closed
343 353
344 354 The transaction will be given as callback's first argument.
345 355
346 356 Category is a unique identifier to allow overwriting an old callback
347 357 with a newer callback.
348 358 """
349 359 self._postclosecallback[category] = callback
350 360
351 361 @active
352 362 def close(self):
353 363 '''commit the transaction'''
354 364 if self.count == 1:
355 365 self._generatefiles()
356 366 categories = sorted(self._finalizecallback)
357 367 for cat in categories:
358 368 self._finalizecallback[cat](self)
359 369 if self.onclose is not None:
360 370 self.onclose()
361 371
362 372 self.count -= 1
363 373 if self.count != 0:
364 374 return
365 375 self.file.close()
366 376 self._backupsfile.close()
367 377 # cleanup temporary files
368 for l, f, b, _c in self._backupentries:
378 for l, f, b, c in self._backupentries:
379 if l not in self._vfsmap and c:
380 self.report("couldn't remote %s: unknown cache location %s\n"
381 % (b, l))
382 continue
369 383 vfs = self._vfsmap[l]
370 384 if not f and b and vfs.exists(b):
385 try:
371 386 vfs.unlink(b)
387 except (IOError, OSError, util.Abort), inst:
388 if not c:
389 raise
390 # Abort may be raise by read only opener
391 self.report("couldn't remote %s: %s\n"
392 % (vfs.join(b), inst))
372 393 self.entries = []
373 394 if self.after:
374 395 self.after()
375 396 if self.opener.isfile(self.journal):
376 397 self.opener.unlink(self.journal)
377 398 if self.opener.isfile(self._backupjournal):
378 399 self.opener.unlink(self._backupjournal)
379 for _l, _f, b, _c in self._backupentries:
400 for _l, _f, b, c in self._backupentries:
401 if l not in self._vfsmap and c:
402 self.report("couldn't remote %s: unknown cache location"
403 "%s\n" % (b, l))
404 continue
380 405 vfs = self._vfsmap[l]
381 406 if b and vfs.exists(b):
407 try:
382 408 vfs.unlink(b)
409 except (IOError, OSError, util.Abort), inst:
410 if not c:
411 raise
412 # Abort may be raise by read only opener
413 self.report("couldn't remote %s: %s\n"
414 % (vfs.join(b), inst))
383 415 self._backupentries = []
384 416 self.journal = None
385 417 # run post close action
386 418 categories = sorted(self._postclosecallback)
387 419 for cat in categories:
388 420 self._postclosecallback[cat](self)
389 421
390 422 @active
391 423 def abort(self):
392 424 '''abort the transaction (generally called on error, or when the
393 425 transaction is not explicitly committed before going out of
394 426 scope)'''
395 427 self._abort()
396 428
397 429 def _abort(self):
398 430 self.count = 0
399 431 self.usages = 0
400 432 self.file.close()
401 433 self._backupsfile.close()
402 434
403 435 if self.onabort is not None:
404 436 self.onabort()
405 437
406 438 try:
407 439 if not self.entries and not self._backupentries:
408 440 if self.journal:
409 441 self.opener.unlink(self.journal)
410 442 if self._backupjournal:
411 443 self.opener.unlink(self._backupjournal)
412 444 return
413 445
414 446 self.report(_("transaction abort!\n"))
415 447
416 448 try:
417 449 _playback(self.journal, self.report, self.opener, self._vfsmap,
418 450 self.entries, self._backupentries, False)
419 451 self.report(_("rollback completed\n"))
420 452 except Exception:
421 453 self.report(_("rollback failed - please run hg recover\n"))
422 454 finally:
423 455 self.journal = None
424 456
425 457
426 458 def rollback(opener, vfsmap, file, report):
427 459 """Rolls back the transaction contained in the given file
428 460
429 461 Reads the entries in the specified file, and the corresponding
430 462 '*.backupfiles' file, to recover from an incomplete transaction.
431 463
432 464 * `file`: a file containing a list of entries, specifying where
433 465 to truncate each file. The file should contain a list of
434 466 file\0offset pairs, delimited by newlines. The corresponding
435 467 '*.backupfiles' file should contain a list of file\0backupfile
436 468 pairs, delimited by \0.
437 469 """
438 470 entries = []
439 471 backupentries = []
440 472
441 473 fp = opener.open(file)
442 474 lines = fp.readlines()
443 475 fp.close()
444 476 for l in lines:
445 477 try:
446 478 f, o = l.split('\0')
447 479 entries.append((f, int(o), None))
448 480 except ValueError:
449 481 report(_("couldn't read journal entry %r!\n") % l)
450 482
451 483 backupjournal = "%s.backupfiles" % file
452 484 if opener.exists(backupjournal):
453 485 fp = opener.open(backupjournal)
454 486 lines = fp.readlines()
455 487 if lines:
456 488 ver = lines[0][:-1]
457 489 if ver == str(version):
458 490 for line in lines[1:]:
459 491 if line:
460 492 # Shave off the trailing newline
461 493 line = line[:-1]
462 494 l, f, b, c = line.split('\0')
463 495 backupentries.append((l, f, b, bool(c)))
464 496 else:
465 497 report(_("journal was created by a different version of "
466 498 "Mercurial"))
467 499
468 500 _playback(file, report, opener, vfsmap, entries, backupentries)
General Comments 0
You need to be logged in to leave comments. Login now