##// END OF EJS Templates
remotefilelog: fix opening validatecachelog in text mode
Inada Naoki -
r44560:25097b4d default
parent child Browse files
Show More
@@ -1,461 +1,461 b''
1 1 from __future__ import absolute_import
2 2
3 3 import errno
4 4 import os
5 5 import shutil
6 6 import stat
7 7 import time
8 8
9 9 from mercurial.i18n import _
10 10 from mercurial.node import bin, hex
11 11 from mercurial.pycompat import open
12 12 from mercurial import (
13 13 error,
14 14 pycompat,
15 15 util,
16 16 )
17 17 from mercurial.utils import hashutil
18 18 from . import (
19 19 constants,
20 20 shallowutil,
21 21 )
22 22
23 23
24 24 class basestore(object):
25 25 def __init__(self, repo, path, reponame, shared=False):
26 26 """Creates a remotefilelog store object for the given repo name.
27 27
28 28 `path` - The file path where this store keeps its data
29 29 `reponame` - The name of the repo. This is used to partition data from
30 30 many repos.
31 31 `shared` - True if this store is a shared cache of data from the central
32 32 server, for many repos on this machine. False means this store is for
33 33 the local data for one repo.
34 34 """
35 35 self.repo = repo
36 36 self.ui = repo.ui
37 37 self._path = path
38 38 self._reponame = reponame
39 39 self._shared = shared
40 40 self._uid = os.getuid() if not pycompat.iswindows else None
41 41
42 42 self._validatecachelog = self.ui.config(
43 43 b"remotefilelog", b"validatecachelog"
44 44 )
45 45 self._validatecache = self.ui.config(
46 46 b"remotefilelog", b"validatecache", b'on'
47 47 )
48 48 if self._validatecache not in (b'on', b'strict', b'off'):
49 49 self._validatecache = b'on'
50 50 if self._validatecache == b'off':
51 51 self._validatecache = False
52 52
53 53 if shared:
54 54 shallowutil.mkstickygroupdir(self.ui, path)
55 55
56 56 def getmissing(self, keys):
57 57 missing = []
58 58 for name, node in keys:
59 59 filepath = self._getfilepath(name, node)
60 60 exists = os.path.exists(filepath)
61 61 if (
62 62 exists
63 63 and self._validatecache == b'strict'
64 64 and not self._validatekey(filepath, b'contains')
65 65 ):
66 66 exists = False
67 67 if not exists:
68 68 missing.append((name, node))
69 69
70 70 return missing
71 71
72 72 # BELOW THIS ARE IMPLEMENTATIONS OF REPACK SOURCE
73 73
74 74 def markledger(self, ledger, options=None):
75 75 if options and options.get(constants.OPTION_PACKSONLY):
76 76 return
77 77 if self._shared:
78 78 for filename, nodes in self._getfiles():
79 79 for node in nodes:
80 80 ledger.markdataentry(self, filename, node)
81 81 ledger.markhistoryentry(self, filename, node)
82 82
83 83 def cleanup(self, ledger):
84 84 ui = self.ui
85 85 entries = ledger.sources.get(self, [])
86 86 count = 0
87 87 progress = ui.makeprogress(
88 88 _(b"cleaning up"), unit=b"files", total=len(entries)
89 89 )
90 90 for entry in entries:
91 91 if entry.gced or (entry.datarepacked and entry.historyrepacked):
92 92 progress.update(count)
93 93 path = self._getfilepath(entry.filename, entry.node)
94 94 util.tryunlink(path)
95 95 count += 1
96 96 progress.complete()
97 97
98 98 # Clean up the repo cache directory.
99 99 self._cleanupdirectory(self._getrepocachepath())
100 100
101 101 # BELOW THIS ARE NON-STANDARD APIS
102 102
103 103 def _cleanupdirectory(self, rootdir):
104 104 """Removes the empty directories and unnecessary files within the root
105 105 directory recursively. Note that this method does not remove the root
106 106 directory itself. """
107 107
108 108 oldfiles = set()
109 109 otherfiles = set()
110 110 # osutil.listdir returns stat information which saves some rmdir/listdir
111 111 # syscalls.
112 112 for name, mode in util.osutil.listdir(rootdir):
113 113 if stat.S_ISDIR(mode):
114 114 dirpath = os.path.join(rootdir, name)
115 115 self._cleanupdirectory(dirpath)
116 116
117 117 # Now that the directory specified by dirpath is potentially
118 118 # empty, try and remove it.
119 119 try:
120 120 os.rmdir(dirpath)
121 121 except OSError:
122 122 pass
123 123
124 124 elif stat.S_ISREG(mode):
125 125 if name.endswith(b'_old'):
126 126 oldfiles.add(name[:-4])
127 127 else:
128 128 otherfiles.add(name)
129 129
130 130 # Remove the files which end with suffix '_old' and have no
131 131 # corresponding file without the suffix '_old'. See addremotefilelognode
132 132 # method for the generation/purpose of files with '_old' suffix.
133 133 for filename in oldfiles - otherfiles:
134 134 filepath = os.path.join(rootdir, filename + b'_old')
135 135 util.tryunlink(filepath)
136 136
137 137 def _getfiles(self):
138 138 """Return a list of (filename, [node,...]) for all the revisions that
139 139 exist in the store.
140 140
141 141 This is useful for obtaining a list of all the contents of the store
142 142 when performing a repack to another store, since the store API requires
143 143 name+node keys and not namehash+node keys.
144 144 """
145 145 existing = {}
146 146 for filenamehash, node in self._listkeys():
147 147 existing.setdefault(filenamehash, []).append(node)
148 148
149 149 filenamemap = self._resolvefilenames(existing.keys())
150 150
151 151 for filename, sha in pycompat.iteritems(filenamemap):
152 152 yield (filename, existing[sha])
153 153
154 154 def _resolvefilenames(self, hashes):
155 155 """Given a list of filename hashes that are present in the
156 156 remotefilelog store, return a mapping from filename->hash.
157 157
158 158 This is useful when converting remotefilelog blobs into other storage
159 159 formats.
160 160 """
161 161 if not hashes:
162 162 return {}
163 163
164 164 filenames = {}
165 165 missingfilename = set(hashes)
166 166
167 167 # Start with a full manifest, since it'll cover the majority of files
168 168 for filename in self.repo[b'tip'].manifest():
169 169 sha = hashutil.sha1(filename).digest()
170 170 if sha in missingfilename:
171 171 filenames[filename] = sha
172 172 missingfilename.discard(sha)
173 173
174 174 # Scan the changelog until we've found every file name
175 175 cl = self.repo.unfiltered().changelog
176 176 for rev in pycompat.xrange(len(cl) - 1, -1, -1):
177 177 if not missingfilename:
178 178 break
179 179 files = cl.readfiles(cl.node(rev))
180 180 for filename in files:
181 181 sha = hashutil.sha1(filename).digest()
182 182 if sha in missingfilename:
183 183 filenames[filename] = sha
184 184 missingfilename.discard(sha)
185 185
186 186 return filenames
187 187
188 188 def _getrepocachepath(self):
189 189 return (
190 190 os.path.join(self._path, self._reponame)
191 191 if self._shared
192 192 else self._path
193 193 )
194 194
195 195 def _listkeys(self):
196 196 """List all the remotefilelog keys that exist in the store.
197 197
198 198 Returns a iterator of (filename hash, filecontent hash) tuples.
199 199 """
200 200
201 201 for root, dirs, files in os.walk(self._getrepocachepath()):
202 202 for filename in files:
203 203 if len(filename) != 40:
204 204 continue
205 205 node = filename
206 206 if self._shared:
207 207 # .../1a/85ffda..be21
208 208 filenamehash = root[-41:-39] + root[-38:]
209 209 else:
210 210 filenamehash = root[-40:]
211 211 yield (bin(filenamehash), bin(node))
212 212
213 213 def _getfilepath(self, name, node):
214 214 node = hex(node)
215 215 if self._shared:
216 216 key = shallowutil.getcachekey(self._reponame, name, node)
217 217 else:
218 218 key = shallowutil.getlocalkey(name, node)
219 219
220 220 return os.path.join(self._path, key)
221 221
222 222 def _getdata(self, name, node):
223 223 filepath = self._getfilepath(name, node)
224 224 try:
225 225 data = shallowutil.readfile(filepath)
226 226 if self._validatecache and not self._validatedata(data, filepath):
227 227 if self._validatecachelog:
228 with open(self._validatecachelog, b'a+') as f:
228 with open(self._validatecachelog, b'ab+') as f:
229 229 f.write(b"corrupt %s during read\n" % filepath)
230 230 os.rename(filepath, filepath + b".corrupt")
231 231 raise KeyError(b"corrupt local cache file %s" % filepath)
232 232 except IOError:
233 233 raise KeyError(
234 234 b"no file found at %s for %s:%s" % (filepath, name, hex(node))
235 235 )
236 236
237 237 return data
238 238
239 239 def addremotefilelognode(self, name, node, data):
240 240 filepath = self._getfilepath(name, node)
241 241
242 242 oldumask = os.umask(0o002)
243 243 try:
244 244 # if this node already exists, save the old version for
245 245 # recovery/debugging purposes.
246 246 if os.path.exists(filepath):
247 247 newfilename = filepath + b'_old'
248 248 # newfilename can be read-only and shutil.copy will fail.
249 249 # Delete newfilename to avoid it
250 250 if os.path.exists(newfilename):
251 251 shallowutil.unlinkfile(newfilename)
252 252 shutil.copy(filepath, newfilename)
253 253
254 254 shallowutil.mkstickygroupdir(self.ui, os.path.dirname(filepath))
255 255 shallowutil.writefile(filepath, data, readonly=True)
256 256
257 257 if self._validatecache:
258 258 if not self._validatekey(filepath, b'write'):
259 259 raise error.Abort(
260 260 _(b"local cache write was corrupted %s") % filepath
261 261 )
262 262 finally:
263 263 os.umask(oldumask)
264 264
265 265 def markrepo(self, path):
266 266 """Call this to add the given repo path to the store's list of
267 267 repositories that are using it. This is useful later when doing garbage
268 268 collection, since it allows us to insecpt the repos to see what nodes
269 269 they want to be kept alive in the store.
270 270 """
271 271 repospath = os.path.join(self._path, b"repos")
272 272 with open(repospath, b'ab') as reposfile:
273 273 reposfile.write(os.path.dirname(path) + b"\n")
274 274
275 275 repospathstat = os.stat(repospath)
276 276 if repospathstat.st_uid == self._uid:
277 277 os.chmod(repospath, 0o0664)
278 278
279 279 def _validatekey(self, path, action):
280 280 with open(path, b'rb') as f:
281 281 data = f.read()
282 282
283 283 if self._validatedata(data, path):
284 284 return True
285 285
286 286 if self._validatecachelog:
287 287 with open(self._validatecachelog, b'ab+') as f:
288 288 f.write(b"corrupt %s during %s\n" % (path, action))
289 289
290 290 os.rename(path, path + b".corrupt")
291 291 return False
292 292
293 293 def _validatedata(self, data, path):
294 294 try:
295 295 if len(data) > 0:
296 296 # see remotefilelogserver.createfileblob for the format
297 297 offset, size, flags = shallowutil.parsesizeflags(data)
298 298 if len(data) <= size:
299 299 # it is truncated
300 300 return False
301 301
302 302 # extract the node from the metadata
303 303 offset += size
304 304 datanode = data[offset : offset + 20]
305 305
306 306 # and compare against the path
307 307 if os.path.basename(path) == hex(datanode):
308 308 # Content matches the intended path
309 309 return True
310 310 return False
311 311 except (ValueError, RuntimeError):
312 312 pass
313 313
314 314 return False
315 315
316 316 def gc(self, keepkeys):
317 317 ui = self.ui
318 318 cachepath = self._path
319 319
320 320 # prune cache
321 321 queue = pycompat.queue.PriorityQueue()
322 322 originalsize = 0
323 323 size = 0
324 324 count = 0
325 325 removed = 0
326 326
327 327 # keep files newer than a day even if they aren't needed
328 328 limit = time.time() - (60 * 60 * 24)
329 329
330 330 progress = ui.makeprogress(
331 331 _(b"removing unnecessary files"), unit=b"files"
332 332 )
333 333 progress.update(0)
334 334 for root, dirs, files in os.walk(cachepath):
335 335 for file in files:
336 336 if file == b'repos':
337 337 continue
338 338
339 339 # Don't delete pack files
340 340 if b'/packs/' in root:
341 341 continue
342 342
343 343 progress.update(count)
344 344 path = os.path.join(root, file)
345 345 key = os.path.relpath(path, cachepath)
346 346 count += 1
347 347 try:
348 348 pathstat = os.stat(path)
349 349 except OSError as e:
350 350 # errno.ENOENT = no such file or directory
351 351 if e.errno != errno.ENOENT:
352 352 raise
353 353 msg = _(
354 354 b"warning: file %s was removed by another process\n"
355 355 )
356 356 ui.warn(msg % path)
357 357 continue
358 358
359 359 originalsize += pathstat.st_size
360 360
361 361 if key in keepkeys or pathstat.st_atime > limit:
362 362 queue.put((pathstat.st_atime, path, pathstat))
363 363 size += pathstat.st_size
364 364 else:
365 365 try:
366 366 shallowutil.unlinkfile(path)
367 367 except OSError as e:
368 368 # errno.ENOENT = no such file or directory
369 369 if e.errno != errno.ENOENT:
370 370 raise
371 371 msg = _(
372 372 b"warning: file %s was removed by another "
373 373 b"process\n"
374 374 )
375 375 ui.warn(msg % path)
376 376 continue
377 377 removed += 1
378 378 progress.complete()
379 379
380 380 # remove oldest files until under limit
381 381 limit = ui.configbytes(b"remotefilelog", b"cachelimit")
382 382 if size > limit:
383 383 excess = size - limit
384 384 progress = ui.makeprogress(
385 385 _(b"enforcing cache limit"), unit=b"bytes", total=excess
386 386 )
387 387 removedexcess = 0
388 388 while queue and size > limit and size > 0:
389 389 progress.update(removedexcess)
390 390 atime, oldpath, oldpathstat = queue.get()
391 391 try:
392 392 shallowutil.unlinkfile(oldpath)
393 393 except OSError as e:
394 394 # errno.ENOENT = no such file or directory
395 395 if e.errno != errno.ENOENT:
396 396 raise
397 397 msg = _(
398 398 b"warning: file %s was removed by another process\n"
399 399 )
400 400 ui.warn(msg % oldpath)
401 401 size -= oldpathstat.st_size
402 402 removed += 1
403 403 removedexcess += oldpathstat.st_size
404 404 progress.complete()
405 405
406 406 ui.status(
407 407 _(b"finished: removed %d of %d files (%0.2f GB to %0.2f GB)\n")
408 408 % (
409 409 removed,
410 410 count,
411 411 float(originalsize) / 1024.0 / 1024.0 / 1024.0,
412 412 float(size) / 1024.0 / 1024.0 / 1024.0,
413 413 )
414 414 )
415 415
416 416
417 417 class baseunionstore(object):
418 418 def __init__(self, *args, **kwargs):
419 419 # If one of the functions that iterates all of the stores is about to
420 420 # throw a KeyError, try this many times with a full refresh between
421 421 # attempts. A repack operation may have moved data from one store to
422 422 # another while we were running.
423 423 self.numattempts = kwargs.get('numretries', 0) + 1
424 424 # If not-None, call this function on every retry and if the attempts are
425 425 # exhausted.
426 426 self.retrylog = kwargs.get('retrylog', None)
427 427
428 428 def markforrefresh(self):
429 429 for store in self.stores:
430 430 if util.safehasattr(store, b'markforrefresh'):
431 431 store.markforrefresh()
432 432
433 433 @staticmethod
434 434 def retriable(fn):
435 435 def noop(*args):
436 436 pass
437 437
438 438 def wrapped(self, *args, **kwargs):
439 439 retrylog = self.retrylog or noop
440 440 funcname = fn.__name__
441 441 i = 0
442 442 while i < self.numattempts:
443 443 if i > 0:
444 444 retrylog(
445 445 b're-attempting (n=%d) %s\n'
446 446 % (i, pycompat.sysbytes(funcname))
447 447 )
448 448 self.markforrefresh()
449 449 i += 1
450 450 try:
451 451 return fn(self, *args, **kwargs)
452 452 except KeyError:
453 453 if i == self.numattempts:
454 454 # retries exhausted
455 455 retrylog(
456 456 b'retries exhausted in %s, raising KeyError\n'
457 457 % pycompat.sysbytes(funcname)
458 458 )
459 459 raise
460 460
461 461 return wrapped
General Comments 0
You need to be logged in to leave comments. Login now