##// END OF EJS Templates
transaction: clarify the name of 'journal' argument for transaction...
Pierre-Yves David -
r23901:13268fde default
parent child Browse files
Show More
@@ -1,512 +1,512 b''
1 1 # transaction.py - simple journaling scheme for mercurial
2 2 #
3 3 # This transaction scheme is intended to gracefully handle program
4 4 # errors and interruptions. More serious failures like system crashes
5 5 # can be recovered with an fsck-like tool. As the whole repository is
6 6 # effectively log-structured, this should amount to simply truncating
7 7 # anything that isn't referenced in the changelog.
8 8 #
9 9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
10 10 #
11 11 # This software may be used and distributed according to the terms of the
12 12 # GNU General Public License version 2 or any later version.
13 13
14 14 from i18n import _
15 15 import errno
16 16 import error, util
17 17
18 18 version = 2
19 19
20 20 def active(func):
21 21 def _active(self, *args, **kwds):
22 22 if self.count == 0:
23 23 raise error.Abort(_(
24 24 'cannot use transaction when it is already committed/aborted'))
25 25 return func(self, *args, **kwds)
26 26 return _active
27 27
28 28 def _playback(journal, report, opener, vfsmap, entries, backupentries,
29 29 unlink=True):
30 30 for f, o, _ignore in entries:
31 31 if o or not unlink:
32 32 try:
33 33 fp = opener(f, 'a')
34 34 fp.truncate(o)
35 35 fp.close()
36 36 except IOError:
37 37 report(_("failed to truncate %s\n") % f)
38 38 raise
39 39 else:
40 40 try:
41 41 opener.unlink(f)
42 42 except (IOError, OSError), inst:
43 43 if inst.errno != errno.ENOENT:
44 44 raise
45 45
46 46 backupfiles = []
47 47 for l, f, b, c in backupentries:
48 48 if l not in vfsmap and c:
49 49 report("couldn't handle %s: unknown cache location %s\n"
50 50 % (b, l))
51 51 vfs = vfsmap[l]
52 52 try:
53 53 if f and b:
54 54 filepath = vfs.join(f)
55 55 backuppath = vfs.join(b)
56 56 try:
57 57 util.copyfile(backuppath, filepath)
58 58 backupfiles.append(b)
59 59 except IOError:
60 60 report(_("failed to recover %s\n") % f)
61 61 else:
62 62 target = f or b
63 63 try:
64 64 vfs.unlink(target)
65 65 except (IOError, OSError), inst:
66 66 if inst.errno != errno.ENOENT:
67 67 raise
68 68 except (IOError, OSError, util.Abort), inst:
69 69 if not c:
70 70 raise
71 71
72 72 opener.unlink(journal)
73 73 backuppath = "%s.backupfiles" % journal
74 74 if opener.exists(backuppath):
75 75 opener.unlink(backuppath)
76 76 try:
77 77 for f in backupfiles:
78 78 if opener.exists(f):
79 79 opener.unlink(f)
80 80 except (IOError, OSError, util.Abort), inst:
81 81 # only pure backup file remains, it is sage to ignore any error
82 82 pass
83 83
84 84 class transaction(object):
85 def __init__(self, report, opener, vfsmap, journal, after=None,
85 def __init__(self, report, opener, vfsmap, journalname, after=None,
86 86 createmode=None):
87 87 """Begin a new transaction
88 88
89 89 Begins a new transaction that allows rolling back writes in the event of
90 90 an exception.
91 91
92 92 * `after`: called after the transaction has been committed
93 93 * `createmode`: the mode of the journal file that will be created
94 94 """
95 95 self.count = 1
96 96 self.usages = 1
97 97 self.report = report
98 98 # a vfs to the store content
99 99 self.opener = opener
100 100 # a map to access file in various {location -> vfs}
101 101 vfsmap = vfsmap.copy()
102 102 vfsmap[''] = opener # set default value
103 103 self._vfsmap = vfsmap
104 104 self.after = after
105 105 self.entries = []
106 106 self.map = {}
107 self.journal = journal
107 self.journal = journalname
108 108 self._queue = []
109 109 # a dict of arguments to be passed to hooks
110 110 self.hookargs = {}
111 111 self.file = opener.open(self.journal, "w")
112 112
113 113 # a list of ('location', 'path', 'backuppath', cache) entries.
114 114 # - if 'backuppath' is empty, no file existed at backup time
115 115 # - if 'path' is empty, this is a temporary transaction file
116 116 # - if 'location' is not empty, the path is outside main opener reach.
117 117 # use 'location' value as a key in a vfsmap to find the right 'vfs'
118 118 # (cache is currently unused)
119 119 self._backupentries = []
120 120 self._backupmap = {}
121 self._backupjournal = "%s.backupfiles" % journal
121 self._backupjournal = "%s.backupfiles" % self.journal
122 122 self._backupsfile = opener.open(self._backupjournal, 'w')
123 123 self._backupsfile.write('%d\n' % version)
124 124
125 125 if createmode is not None:
126 126 opener.chmod(self.journal, createmode & 0666)
127 127 opener.chmod(self._backupjournal, createmode & 0666)
128 128
129 129 # hold file generations to be performed on commit
130 130 self._filegenerators = {}
131 131 # hold callback to write pending data for hooks
132 132 self._pendingcallback = {}
133 133 # True is any pending data have been written ever
134 134 self._anypending = False
135 135 # holds callback to call when writing the transaction
136 136 self._finalizecallback = {}
137 137 # hold callback for post transaction close
138 138 self._postclosecallback = {}
139 139 # holds callbacks to call during abort
140 140 self._abortcallback = {}
141 141
142 142 def __del__(self):
143 143 if self.journal:
144 144 self._abort()
145 145
146 146 @active
147 147 def startgroup(self):
148 148 """delay registration of file entry
149 149
150 150 This is used by strip to delay vision of strip offset. The transaction
151 151 sees either none or all of the strip actions to be done."""
152 152 self._queue.append([])
153 153
154 154 @active
155 155 def endgroup(self):
156 156 """apply delayed registration of file entry.
157 157
158 158 This is used by strip to delay vision of strip offset. The transaction
159 159 sees either none or all of the strip actions to be done."""
160 160 q = self._queue.pop()
161 161 for f, o, data in q:
162 162 self._addentry(f, o, data)
163 163
164 164 @active
165 165 def add(self, file, offset, data=None):
166 166 """record the state of an append-only file before update"""
167 167 if file in self.map or file in self._backupmap:
168 168 return
169 169 if self._queue:
170 170 self._queue[-1].append((file, offset, data))
171 171 return
172 172
173 173 self._addentry(file, offset, data)
174 174
175 175 def _addentry(self, file, offset, data):
176 176 """add a append-only entry to memory and on-disk state"""
177 177 if file in self.map or file in self._backupmap:
178 178 return
179 179 self.entries.append((file, offset, data))
180 180 self.map[file] = len(self.entries) - 1
181 181 # add enough data to the journal to do the truncate
182 182 self.file.write("%s\0%d\n" % (file, offset))
183 183 self.file.flush()
184 184
185 185 @active
186 186 def addbackup(self, file, hardlink=True, location=''):
187 187 """Adds a backup of the file to the transaction
188 188
189 189 Calling addbackup() creates a hardlink backup of the specified file
190 190 that is used to recover the file in the event of the transaction
191 191 aborting.
192 192
193 193 * `file`: the file path, relative to .hg/store
194 194 * `hardlink`: use a hardlink to quickly create the backup
195 195 """
196 196 if self._queue:
197 197 msg = 'cannot use transaction.addbackup inside "group"'
198 198 raise RuntimeError(msg)
199 199
200 200 if file in self.map or file in self._backupmap:
201 201 return
202 202 vfs = self._vfsmap[location]
203 203 dirname, filename = vfs.split(file)
204 204 backupfilename = "%s.backup.%s" % (self.journal, filename)
205 205 backupfile = vfs.reljoin(dirname, backupfilename)
206 206 if vfs.exists(file):
207 207 filepath = vfs.join(file)
208 208 backuppath = vfs.join(backupfile)
209 209 util.copyfile(filepath, backuppath, hardlink=hardlink)
210 210 else:
211 211 backupfile = ''
212 212
213 213 self._addbackupentry((location, file, backupfile, False))
214 214
215 215 def _addbackupentry(self, entry):
216 216 """register a new backup entry and write it to disk"""
217 217 self._backupentries.append(entry)
218 218 self._backupmap[file] = len(self._backupentries) - 1
219 219 self._backupsfile.write("%s\0%s\0%s\0%d\n" % entry)
220 220 self._backupsfile.flush()
221 221
222 222 @active
223 223 def registertmp(self, tmpfile, location=''):
224 224 """register a temporary transaction file
225 225
226 226 Such files will be deleted when the transaction exits (on both
227 227 failure and success).
228 228 """
229 229 self._addbackupentry((location, '', tmpfile, False))
230 230
231 231 @active
232 232 def addfilegenerator(self, genid, filenames, genfunc, order=0,
233 233 location=''):
234 234 """add a function to generates some files at transaction commit
235 235
236 236 The `genfunc` argument is a function capable of generating proper
237 237 content of each entry in the `filename` tuple.
238 238
239 239 At transaction close time, `genfunc` will be called with one file
240 240 object argument per entries in `filenames`.
241 241
242 242 The transaction itself is responsible for the backup, creation and
243 243 final write of such file.
244 244
245 245 The `genid` argument is used to ensure the same set of file is only
246 246 generated once. Call to `addfilegenerator` for a `genid` already
247 247 present will overwrite the old entry.
248 248
249 249 The `order` argument may be used to control the order in which multiple
250 250 generator will be executed.
251 251
252 252 The `location` arguments may be used to indicate the files are located
253 253 outside of the the standard directory for transaction. It should match
254 254 one of the key of the `transaction.vfsmap` dictionary.
255 255 """
256 256 # For now, we are unable to do proper backup and restore of custom vfs
257 257 # but for bookmarks that are handled outside this mechanism.
258 258 self._filegenerators[genid] = (order, filenames, genfunc, location)
259 259
260 260 def _generatefiles(self, suffix=''):
261 261 # write files registered for generation
262 262 any = False
263 263 for entry in sorted(self._filegenerators.values()):
264 264 any = True
265 265 order, filenames, genfunc, location = entry
266 266 vfs = self._vfsmap[location]
267 267 files = []
268 268 try:
269 269 for name in filenames:
270 270 name += suffix
271 271 if suffix:
272 272 self.registertmp(name, location=location)
273 273 else:
274 274 self.addbackup(name, location=location)
275 275 files.append(vfs(name, 'w', atomictemp=True))
276 276 genfunc(*files)
277 277 finally:
278 278 for f in files:
279 279 f.close()
280 280 return any
281 281
282 282 @active
283 283 def find(self, file):
284 284 if file in self.map:
285 285 return self.entries[self.map[file]]
286 286 if file in self._backupmap:
287 287 return self._backupentries[self._backupmap[file]]
288 288 return None
289 289
290 290 @active
291 291 def replace(self, file, offset, data=None):
292 292 '''
293 293 replace can only replace already committed entries
294 294 that are not pending in the queue
295 295 '''
296 296
297 297 if file not in self.map:
298 298 raise KeyError(file)
299 299 index = self.map[file]
300 300 self.entries[index] = (file, offset, data)
301 301 self.file.write("%s\0%d\n" % (file, offset))
302 302 self.file.flush()
303 303
304 304 @active
305 305 def nest(self):
306 306 self.count += 1
307 307 self.usages += 1
308 308 return self
309 309
310 310 def release(self):
311 311 if self.count > 0:
312 312 self.usages -= 1
313 313 # if the transaction scopes are left without being closed, fail
314 314 if self.count > 0 and self.usages == 0:
315 315 self._abort()
316 316
317 317 def running(self):
318 318 return self.count > 0
319 319
320 320 def addpending(self, category, callback):
321 321 """add a callback to be called when the transaction is pending
322 322
323 323 The transaction will be given as callback's first argument.
324 324
325 325 Category is a unique identifier to allow overwriting an old callback
326 326 with a newer callback.
327 327 """
328 328 self._pendingcallback[category] = callback
329 329
330 330 @active
331 331 def writepending(self):
332 332 '''write pending file to temporary version
333 333
334 334 This is used to allow hooks to view a transaction before commit'''
335 335 categories = sorted(self._pendingcallback)
336 336 for cat in categories:
337 337 # remove callback since the data will have been flushed
338 338 any = self._pendingcallback.pop(cat)(self)
339 339 self._anypending = self._anypending or any
340 340 self._anypending |= self._generatefiles(suffix='.pending')
341 341 return self._anypending
342 342
343 343 @active
344 344 def addfinalize(self, category, callback):
345 345 """add a callback to be called when the transaction is closed
346 346
347 347 The transaction will be given as callback's first argument.
348 348
349 349 Category is a unique identifier to allow overwriting old callbacks with
350 350 newer callbacks.
351 351 """
352 352 self._finalizecallback[category] = callback
353 353
354 354 @active
355 355 def addpostclose(self, category, callback):
356 356 """add a callback to be called after the transaction is closed
357 357
358 358 The transaction will be given as callback's first argument.
359 359
360 360 Category is a unique identifier to allow overwriting an old callback
361 361 with a newer callback.
362 362 """
363 363 self._postclosecallback[category] = callback
364 364
365 365 @active
366 366 def addabort(self, category, callback):
367 367 """add a callback to be called when the transaction is aborted.
368 368
369 369 The transaction will be given as the first argument to the callback.
370 370
371 371 Category is a unique identifier to allow overwriting an old callback
372 372 with a newer callback.
373 373 """
374 374 self._abortcallback[category] = callback
375 375
376 376 @active
377 377 def close(self):
378 378 '''commit the transaction'''
379 379 if self.count == 1:
380 380 self._generatefiles()
381 381 categories = sorted(self._finalizecallback)
382 382 for cat in categories:
383 383 self._finalizecallback[cat](self)
384 384
385 385 self.count -= 1
386 386 if self.count != 0:
387 387 return
388 388 self.file.close()
389 389 self._backupsfile.close()
390 390 # cleanup temporary files
391 391 for l, f, b, c in self._backupentries:
392 392 if l not in self._vfsmap and c:
393 393 self.report("couldn't remote %s: unknown cache location %s\n"
394 394 % (b, l))
395 395 continue
396 396 vfs = self._vfsmap[l]
397 397 if not f and b and vfs.exists(b):
398 398 try:
399 399 vfs.unlink(b)
400 400 except (IOError, OSError, util.Abort), inst:
401 401 if not c:
402 402 raise
403 403 # Abort may be raise by read only opener
404 404 self.report("couldn't remote %s: %s\n"
405 405 % (vfs.join(b), inst))
406 406 self.entries = []
407 407 if self.after:
408 408 self.after()
409 409 if self.opener.isfile(self.journal):
410 410 self.opener.unlink(self.journal)
411 411 if self.opener.isfile(self._backupjournal):
412 412 self.opener.unlink(self._backupjournal)
413 413 for l, _f, b, c in self._backupentries:
414 414 if l not in self._vfsmap and c:
415 415 self.report("couldn't remote %s: unknown cache location"
416 416 "%s\n" % (b, l))
417 417 continue
418 418 vfs = self._vfsmap[l]
419 419 if b and vfs.exists(b):
420 420 try:
421 421 vfs.unlink(b)
422 422 except (IOError, OSError, util.Abort), inst:
423 423 if not c:
424 424 raise
425 425 # Abort may be raise by read only opener
426 426 self.report("couldn't remote %s: %s\n"
427 427 % (vfs.join(b), inst))
428 428 self._backupentries = []
429 429 self.journal = None
430 430 # run post close action
431 431 categories = sorted(self._postclosecallback)
432 432 for cat in categories:
433 433 self._postclosecallback[cat](self)
434 434
435 435 @active
436 436 def abort(self):
437 437 '''abort the transaction (generally called on error, or when the
438 438 transaction is not explicitly committed before going out of
439 439 scope)'''
440 440 self._abort()
441 441
442 442 def _abort(self):
443 443 self.count = 0
444 444 self.usages = 0
445 445 self.file.close()
446 446 self._backupsfile.close()
447 447
448 448 try:
449 449 if not self.entries and not self._backupentries:
450 450 if self.journal:
451 451 self.opener.unlink(self.journal)
452 452 if self._backupjournal:
453 453 self.opener.unlink(self._backupjournal)
454 454 return
455 455
456 456 self.report(_("transaction abort!\n"))
457 457
458 458 try:
459 459 for cat in sorted(self._abortcallback):
460 460 self._abortcallback[cat](self)
461 461 _playback(self.journal, self.report, self.opener, self._vfsmap,
462 462 self.entries, self._backupentries, False)
463 463 self.report(_("rollback completed\n"))
464 464 except Exception:
465 465 self.report(_("rollback failed - please run hg recover\n"))
466 466 finally:
467 467 self.journal = None
468 468
469 469
470 470 def rollback(opener, vfsmap, file, report):
471 471 """Rolls back the transaction contained in the given file
472 472
473 473 Reads the entries in the specified file, and the corresponding
474 474 '*.backupfiles' file, to recover from an incomplete transaction.
475 475
476 476 * `file`: a file containing a list of entries, specifying where
477 477 to truncate each file. The file should contain a list of
478 478 file\0offset pairs, delimited by newlines. The corresponding
479 479 '*.backupfiles' file should contain a list of file\0backupfile
480 480 pairs, delimited by \0.
481 481 """
482 482 entries = []
483 483 backupentries = []
484 484
485 485 fp = opener.open(file)
486 486 lines = fp.readlines()
487 487 fp.close()
488 488 for l in lines:
489 489 try:
490 490 f, o = l.split('\0')
491 491 entries.append((f, int(o), None))
492 492 except ValueError:
493 493 report(_("couldn't read journal entry %r!\n") % l)
494 494
495 495 backupjournal = "%s.backupfiles" % file
496 496 if opener.exists(backupjournal):
497 497 fp = opener.open(backupjournal)
498 498 lines = fp.readlines()
499 499 if lines:
500 500 ver = lines[0][:-1]
501 501 if ver == str(version):
502 502 for line in lines[1:]:
503 503 if line:
504 504 # Shave off the trailing newline
505 505 line = line[:-1]
506 506 l, f, b, c = line.split('\0')
507 507 backupentries.append((l, f, b, bool(c)))
508 508 else:
509 509 report(_("journal was created by a different version of "
510 510 "Mercurial"))
511 511
512 512 _playback(file, report, opener, vfsmap, entries, backupentries)
General Comments 0
You need to be logged in to leave comments. Login now