##// END OF EJS Templates
remotefilelog: import Queue on Python 2, and queue on Python 3...
Augie Fackler -
r41302:92cf293f default
parent child Browse files
Show More
@@ -1,426 +1,425 b''
1 1 from __future__ import absolute_import
2 2
3 3 import errno
4 4 import hashlib
5 5 import os
6 6 import shutil
7 7 import stat
8 8 import time
9 9
10 10 from mercurial.i18n import _
11 11 from mercurial.node import bin, hex
12 12 from mercurial import (
13 13 error,
14 14 pycompat,
15 15 util,
16 16 )
17 17 from . import (
18 18 constants,
19 19 shallowutil,
20 20 )
21 21
22 22 class basestore(object):
23 23 def __init__(self, repo, path, reponame, shared=False):
24 24 """Creates a remotefilelog store object for the given repo name.
25 25
26 26 `path` - The file path where this store keeps its data
27 27 `reponame` - The name of the repo. This is used to partition data from
28 28 many repos.
29 29 `shared` - True if this store is a shared cache of data from the central
30 30 server, for many repos on this machine. False means this store is for
31 31 the local data for one repo.
32 32 """
33 33 self.repo = repo
34 34 self.ui = repo.ui
35 35 self._path = path
36 36 self._reponame = reponame
37 37 self._shared = shared
38 38 self._uid = os.getuid() if not pycompat.iswindows else None
39 39
40 40 self._validatecachelog = self.ui.config("remotefilelog",
41 41 "validatecachelog")
42 42 self._validatecache = self.ui.config("remotefilelog", "validatecache",
43 43 'on')
44 44 if self._validatecache not in ('on', 'strict', 'off'):
45 45 self._validatecache = 'on'
46 46 if self._validatecache == 'off':
47 47 self._validatecache = False
48 48
49 49 if shared:
50 50 shallowutil.mkstickygroupdir(self.ui, path)
51 51
52 52 def getmissing(self, keys):
53 53 missing = []
54 54 for name, node in keys:
55 55 filepath = self._getfilepath(name, node)
56 56 exists = os.path.exists(filepath)
57 57 if (exists and self._validatecache == 'strict' and
58 58 not self._validatekey(filepath, 'contains')):
59 59 exists = False
60 60 if not exists:
61 61 missing.append((name, node))
62 62
63 63 return missing
64 64
65 65 # BELOW THIS ARE IMPLEMENTATIONS OF REPACK SOURCE
66 66
67 67 def markledger(self, ledger, options=None):
68 68 if options and options.get(constants.OPTION_PACKSONLY):
69 69 return
70 70 if self._shared:
71 71 for filename, nodes in self._getfiles():
72 72 for node in nodes:
73 73 ledger.markdataentry(self, filename, node)
74 74 ledger.markhistoryentry(self, filename, node)
75 75
76 76 def cleanup(self, ledger):
77 77 ui = self.ui
78 78 entries = ledger.sources.get(self, [])
79 79 count = 0
80 80 progress = ui.makeprogress(_("cleaning up"), unit="files",
81 81 total=len(entries))
82 82 for entry in entries:
83 83 if entry.gced or (entry.datarepacked and entry.historyrepacked):
84 84 progress.update(count)
85 85 path = self._getfilepath(entry.filename, entry.node)
86 86 util.tryunlink(path)
87 87 count += 1
88 88 progress.complete()
89 89
90 90 # Clean up the repo cache directory.
91 91 self._cleanupdirectory(self._getrepocachepath())
92 92
93 93 # BELOW THIS ARE NON-STANDARD APIS
94 94
95 95 def _cleanupdirectory(self, rootdir):
96 96 """Removes the empty directories and unnecessary files within the root
97 97 directory recursively. Note that this method does not remove the root
98 98 directory itself. """
99 99
100 100 oldfiles = set()
101 101 otherfiles = set()
102 102 # osutil.listdir returns stat information which saves some rmdir/listdir
103 103 # syscalls.
104 104 for name, mode in util.osutil.listdir(rootdir):
105 105 if stat.S_ISDIR(mode):
106 106 dirpath = os.path.join(rootdir, name)
107 107 self._cleanupdirectory(dirpath)
108 108
109 109 # Now that the directory specified by dirpath is potentially
110 110 # empty, try and remove it.
111 111 try:
112 112 os.rmdir(dirpath)
113 113 except OSError:
114 114 pass
115 115
116 116 elif stat.S_ISREG(mode):
117 117 if name.endswith('_old'):
118 118 oldfiles.add(name[:-4])
119 119 else:
120 120 otherfiles.add(name)
121 121
122 122 # Remove the files which end with suffix '_old' and have no
123 123 # corresponding file without the suffix '_old'. See addremotefilelognode
124 124 # method for the generation/purpose of files with '_old' suffix.
125 125 for filename in oldfiles - otherfiles:
126 126 filepath = os.path.join(rootdir, filename + '_old')
127 127 util.tryunlink(filepath)
128 128
129 129 def _getfiles(self):
130 130 """Return a list of (filename, [node,...]) for all the revisions that
131 131 exist in the store.
132 132
133 133 This is useful for obtaining a list of all the contents of the store
134 134 when performing a repack to another store, since the store API requires
135 135 name+node keys and not namehash+node keys.
136 136 """
137 137 existing = {}
138 138 for filenamehash, node in self._listkeys():
139 139 existing.setdefault(filenamehash, []).append(node)
140 140
141 141 filenamemap = self._resolvefilenames(existing.keys())
142 142
143 143 for filename, sha in filenamemap.iteritems():
144 144 yield (filename, existing[sha])
145 145
146 146 def _resolvefilenames(self, hashes):
147 147 """Given a list of filename hashes that are present in the
148 148 remotefilelog store, return a mapping from filename->hash.
149 149
150 150 This is useful when converting remotefilelog blobs into other storage
151 151 formats.
152 152 """
153 153 if not hashes:
154 154 return {}
155 155
156 156 filenames = {}
157 157 missingfilename = set(hashes)
158 158
159 159 # Start with a full manifest, since it'll cover the majority of files
160 160 for filename in self.repo['tip'].manifest():
161 161 sha = hashlib.sha1(filename).digest()
162 162 if sha in missingfilename:
163 163 filenames[filename] = sha
164 164 missingfilename.discard(sha)
165 165
166 166 # Scan the changelog until we've found every file name
167 167 cl = self.repo.unfiltered().changelog
168 168 for rev in pycompat.xrange(len(cl) - 1, -1, -1):
169 169 if not missingfilename:
170 170 break
171 171 files = cl.readfiles(cl.node(rev))
172 172 for filename in files:
173 173 sha = hashlib.sha1(filename).digest()
174 174 if sha in missingfilename:
175 175 filenames[filename] = sha
176 176 missingfilename.discard(sha)
177 177
178 178 return filenames
179 179
180 180 def _getrepocachepath(self):
181 181 return os.path.join(
182 182 self._path, self._reponame) if self._shared else self._path
183 183
184 184 def _listkeys(self):
185 185 """List all the remotefilelog keys that exist in the store.
186 186
187 187 Returns a iterator of (filename hash, filecontent hash) tuples.
188 188 """
189 189
190 190 for root, dirs, files in os.walk(self._getrepocachepath()):
191 191 for filename in files:
192 192 if len(filename) != 40:
193 193 continue
194 194 node = filename
195 195 if self._shared:
196 196 # .../1a/85ffda..be21
197 197 filenamehash = root[-41:-39] + root[-38:]
198 198 else:
199 199 filenamehash = root[-40:]
200 200 yield (bin(filenamehash), bin(node))
201 201
202 202 def _getfilepath(self, name, node):
203 203 node = hex(node)
204 204 if self._shared:
205 205 key = shallowutil.getcachekey(self._reponame, name, node)
206 206 else:
207 207 key = shallowutil.getlocalkey(name, node)
208 208
209 209 return os.path.join(self._path, key)
210 210
211 211 def _getdata(self, name, node):
212 212 filepath = self._getfilepath(name, node)
213 213 try:
214 214 data = shallowutil.readfile(filepath)
215 215 if self._validatecache and not self._validatedata(data, filepath):
216 216 if self._validatecachelog:
217 217 with open(self._validatecachelog, 'a+') as f:
218 218 f.write("corrupt %s during read\n" % filepath)
219 219 os.rename(filepath, filepath + ".corrupt")
220 220 raise KeyError("corrupt local cache file %s" % filepath)
221 221 except IOError:
222 222 raise KeyError("no file found at %s for %s:%s" % (filepath, name,
223 223 hex(node)))
224 224
225 225 return data
226 226
227 227 def addremotefilelognode(self, name, node, data):
228 228 filepath = self._getfilepath(name, node)
229 229
230 230 oldumask = os.umask(0o002)
231 231 try:
232 232 # if this node already exists, save the old version for
233 233 # recovery/debugging purposes.
234 234 if os.path.exists(filepath):
235 235 newfilename = filepath + '_old'
236 236 # newfilename can be read-only and shutil.copy will fail.
237 237 # Delete newfilename to avoid it
238 238 if os.path.exists(newfilename):
239 239 shallowutil.unlinkfile(newfilename)
240 240 shutil.copy(filepath, newfilename)
241 241
242 242 shallowutil.mkstickygroupdir(self.ui, os.path.dirname(filepath))
243 243 shallowutil.writefile(filepath, data, readonly=True)
244 244
245 245 if self._validatecache:
246 246 if not self._validatekey(filepath, 'write'):
247 247 raise error.Abort(_("local cache write was corrupted %s") %
248 248 filepath)
249 249 finally:
250 250 os.umask(oldumask)
251 251
252 252 def markrepo(self, path):
253 253 """Call this to add the given repo path to the store's list of
254 254 repositories that are using it. This is useful later when doing garbage
255 255 collection, since it allows us to insecpt the repos to see what nodes
256 256 they want to be kept alive in the store.
257 257 """
258 258 repospath = os.path.join(self._path, "repos")
259 259 with open(repospath, 'ab') as reposfile:
260 260 reposfile.write(os.path.dirname(path) + "\n")
261 261
262 262 repospathstat = os.stat(repospath)
263 263 if repospathstat.st_uid == self._uid:
264 264 os.chmod(repospath, 0o0664)
265 265
266 266 def _validatekey(self, path, action):
267 267 with open(path, 'rb') as f:
268 268 data = f.read()
269 269
270 270 if self._validatedata(data, path):
271 271 return True
272 272
273 273 if self._validatecachelog:
274 274 with open(self._validatecachelog, 'ab+') as f:
275 275 f.write("corrupt %s during %s\n" % (path, action))
276 276
277 277 os.rename(path, path + ".corrupt")
278 278 return False
279 279
280 280 def _validatedata(self, data, path):
281 281 try:
282 282 if len(data) > 0:
283 283 # see remotefilelogserver.createfileblob for the format
284 284 offset, size, flags = shallowutil.parsesizeflags(data)
285 285 if len(data) <= size:
286 286 # it is truncated
287 287 return False
288 288
289 289 # extract the node from the metadata
290 290 offset += size
291 291 datanode = data[offset:offset + 20]
292 292
293 293 # and compare against the path
294 294 if os.path.basename(path) == hex(datanode):
295 295 # Content matches the intended path
296 296 return True
297 297 return False
298 298 except (ValueError, RuntimeError):
299 299 pass
300 300
301 301 return False
302 302
303 303 def gc(self, keepkeys):
304 304 ui = self.ui
305 305 cachepath = self._path
306 306
307 307 # prune cache
308 import Queue
309 queue = Queue.PriorityQueue()
308 queue = pycompat.queue.PriorityQueue()
310 309 originalsize = 0
311 310 size = 0
312 311 count = 0
313 312 removed = 0
314 313
315 314 # keep files newer than a day even if they aren't needed
316 315 limit = time.time() - (60 * 60 * 24)
317 316
318 317 progress = ui.makeprogress(_("removing unnecessary files"),
319 318 unit="files")
320 319 progress.update(0)
321 320 for root, dirs, files in os.walk(cachepath):
322 321 for file in files:
323 322 if file == 'repos':
324 323 continue
325 324
326 325 # Don't delete pack files
327 326 if '/packs/' in root:
328 327 continue
329 328
330 329 progress.update(count)
331 330 path = os.path.join(root, file)
332 331 key = os.path.relpath(path, cachepath)
333 332 count += 1
334 333 try:
335 334 pathstat = os.stat(path)
336 335 except OSError as e:
337 336 # errno.ENOENT = no such file or directory
338 337 if e.errno != errno.ENOENT:
339 338 raise
340 339 msg = _("warning: file %s was removed by another process\n")
341 340 ui.warn(msg % path)
342 341 continue
343 342
344 343 originalsize += pathstat.st_size
345 344
346 345 if key in keepkeys or pathstat.st_atime > limit:
347 346 queue.put((pathstat.st_atime, path, pathstat))
348 347 size += pathstat.st_size
349 348 else:
350 349 try:
351 350 shallowutil.unlinkfile(path)
352 351 except OSError as e:
353 352 # errno.ENOENT = no such file or directory
354 353 if e.errno != errno.ENOENT:
355 354 raise
356 355 msg = _("warning: file %s was removed by another "
357 356 "process\n")
358 357 ui.warn(msg % path)
359 358 continue
360 359 removed += 1
361 360 progress.complete()
362 361
363 362 # remove oldest files until under limit
364 363 limit = ui.configbytes("remotefilelog", "cachelimit")
365 364 if size > limit:
366 365 excess = size - limit
367 366 progress = ui.makeprogress(_("enforcing cache limit"), unit="bytes",
368 367 total=excess)
369 368 removedexcess = 0
370 369 while queue and size > limit and size > 0:
371 370 progress.update(removedexcess)
372 371 atime, oldpath, oldpathstat = queue.get()
373 372 try:
374 373 shallowutil.unlinkfile(oldpath)
375 374 except OSError as e:
376 375 # errno.ENOENT = no such file or directory
377 376 if e.errno != errno.ENOENT:
378 377 raise
379 378 msg = _("warning: file %s was removed by another process\n")
380 379 ui.warn(msg % oldpath)
381 380 size -= oldpathstat.st_size
382 381 removed += 1
383 382 removedexcess += oldpathstat.st_size
384 383 progress.complete()
385 384
386 385 ui.status(_("finished: removed %d of %d files (%0.2f GB to %0.2f GB)\n")
387 386 % (removed, count,
388 387 float(originalsize) / 1024.0 / 1024.0 / 1024.0,
389 388 float(size) / 1024.0 / 1024.0 / 1024.0))
390 389
391 390 class baseunionstore(object):
392 391 def __init__(self, *args, **kwargs):
393 392 # If one of the functions that iterates all of the stores is about to
394 393 # throw a KeyError, try this many times with a full refresh between
395 394 # attempts. A repack operation may have moved data from one store to
396 395 # another while we were running.
397 396 self.numattempts = kwargs.get(r'numretries', 0) + 1
398 397 # If not-None, call this function on every retry and if the attempts are
399 398 # exhausted.
400 399 self.retrylog = kwargs.get(r'retrylog', None)
401 400
402 401 def markforrefresh(self):
403 402 for store in self.stores:
404 403 if util.safehasattr(store, 'markforrefresh'):
405 404 store.markforrefresh()
406 405
407 406 @staticmethod
408 407 def retriable(fn):
409 408 def noop(*args):
410 409 pass
411 410 def wrapped(self, *args, **kwargs):
412 411 retrylog = self.retrylog or noop
413 412 funcname = fn.__name__
414 413 for i in pycompat.xrange(self.numattempts):
415 414 if i > 0:
416 415 retrylog('re-attempting (n=%d) %s\n' % (i, funcname))
417 416 self.markforrefresh()
418 417 try:
419 418 return fn(self, *args, **kwargs)
420 419 except KeyError:
421 420 pass
422 421 # retries exhausted
423 422 retrylog('retries exhausted in %s, raising KeyError\n' %
424 423 pycompat.sysbytes(funcname))
425 424 raise
426 425 return wrapped
General Comments 0
You need to be logged in to leave comments. Login now