##// END OF EJS Templates
py3: make second argument of fdopen() a str...
Pulkit Goyal -
r40647:5e3b3f88 default
parent child Browse files
Show More
@@ -1,539 +1,539 b''
1 1 from __future__ import absolute_import
2 2
3 3 import collections
4 4 import errno
5 5 import hashlib
6 6 import mmap
7 7 import os
8 8 import struct
9 9 import time
10 10
11 11 from mercurial.i18n import _
12 12 from mercurial import (
13 13 policy,
14 14 pycompat,
15 15 util,
16 16 vfs as vfsmod,
17 17 )
18 18 from . import shallowutil
19 19
20 20 osutil = policy.importmod(r'osutil')
21 21
22 22 # The pack version supported by this implementation. This will need to be
23 23 # rev'd whenever the byte format changes. Ex: changing the fanout prefix,
24 24 # changing any of the int sizes, changing the delta algorithm, etc.
25 25 PACKVERSIONSIZE = 1
26 26 INDEXVERSIONSIZE = 2
27 27
28 28 FANOUTSTART = INDEXVERSIONSIZE
29 29
30 30 # Constant that indicates a fanout table entry hasn't been filled in. (This does
31 31 # not get serialized)
32 32 EMPTYFANOUT = -1
33 33
34 34 # The fanout prefix is the number of bytes that can be addressed by the fanout
35 35 # table. Example: a fanout prefix of 1 means we use the first byte of a hash to
36 36 # look in the fanout table (which will be 2^8 entries long).
37 37 SMALLFANOUTPREFIX = 1
38 38 LARGEFANOUTPREFIX = 2
39 39
40 40 # The number of entries in the index at which point we switch to a large fanout.
41 41 # It is chosen to balance the linear scan through a sparse fanout, with the
42 42 # size of the bisect in actual index.
43 43 # 2^16 / 8 was chosen because it trades off (1 step fanout scan + 5 step
44 44 # bisect) with (8 step fanout scan + 1 step bisect)
45 45 # 5 step bisect = log(2^16 / 8 / 255) # fanout
46 46 # 10 step fanout scan = 2^16 / (2^16 / 8) # fanout space divided by entries
47 47 SMALLFANOUTCUTOFF = 2**16 / 8
48 48
49 49 # The amount of time to wait between checking for new packs. This prevents an
50 50 # exception when data is moved to a new pack after the process has already
51 51 # loaded the pack list.
52 52 REFRESHRATE = 0.1
53 53
54 54 if pycompat.isposix:
55 55 # With glibc 2.7+ the 'e' flag uses O_CLOEXEC when opening.
56 56 # The 'e' flag will be ignored on older versions of glibc.
57 57 PACKOPENMODE = 'rbe'
58 58 else:
59 59 PACKOPENMODE = 'rb'
60 60
61 61 class _cachebackedpacks(object):
62 62 def __init__(self, packs, cachesize):
63 63 self._packs = set(packs)
64 64 self._lrucache = util.lrucachedict(cachesize)
65 65 self._lastpack = None
66 66
67 67 # Avoid cold start of the cache by populating the most recent packs
68 68 # in the cache.
69 69 for i in reversed(range(min(cachesize, len(packs)))):
70 70 self._movetofront(packs[i])
71 71
72 72 def _movetofront(self, pack):
73 73 # This effectively makes pack the first entry in the cache.
74 74 self._lrucache[pack] = True
75 75
76 76 def _registerlastpackusage(self):
77 77 if self._lastpack is not None:
78 78 self._movetofront(self._lastpack)
79 79 self._lastpack = None
80 80
81 81 def add(self, pack):
82 82 self._registerlastpackusage()
83 83
84 84 # This method will mostly be called when packs are not in cache.
85 85 # Therefore, adding pack to the cache.
86 86 self._movetofront(pack)
87 87 self._packs.add(pack)
88 88
89 89 def __iter__(self):
90 90 self._registerlastpackusage()
91 91
92 92 # Cache iteration is based on LRU.
93 93 for pack in self._lrucache:
94 94 self._lastpack = pack
95 95 yield pack
96 96
97 97 cachedpacks = set(pack for pack in self._lrucache)
98 98 # Yield for paths not in the cache.
99 99 for pack in self._packs - cachedpacks:
100 100 self._lastpack = pack
101 101 yield pack
102 102
103 103 # Data not found in any pack.
104 104 self._lastpack = None
105 105
106 106 class basepackstore(object):
107 107 # Default cache size limit for the pack files.
108 108 DEFAULTCACHESIZE = 100
109 109
110 110 def __init__(self, ui, path):
111 111 self.ui = ui
112 112 self.path = path
113 113
114 114 # lastrefesh is 0 so we'll immediately check for new packs on the first
115 115 # failure.
116 116 self.lastrefresh = 0
117 117
118 118 packs = []
119 119 for filepath, __, __ in self._getavailablepackfilessorted():
120 120 try:
121 121 pack = self.getpack(filepath)
122 122 except Exception as ex:
123 123 # An exception may be thrown if the pack file is corrupted
124 124 # somehow. Log a warning but keep going in this case, just
125 125 # skipping this pack file.
126 126 #
127 127 # If this is an ENOENT error then don't even bother logging.
128 128 # Someone could have removed the file since we retrieved the
129 129 # list of paths.
130 130 if getattr(ex, 'errno', None) != errno.ENOENT:
131 131 ui.warn(_('unable to load pack %s: %s\n') % (filepath, ex))
132 132 continue
133 133 packs.append(pack)
134 134
135 135 self.packs = _cachebackedpacks(packs, self.DEFAULTCACHESIZE)
136 136
137 137 def _getavailablepackfiles(self):
138 138 """For each pack file (a index/data file combo), yields:
139 139 (full path without extension, mtime, size)
140 140
141 141 mtime will be the mtime of the index/data file (whichever is newer)
142 142 size is the combined size of index/data file
143 143 """
144 144 indexsuffixlen = len(self.INDEXSUFFIX)
145 145 packsuffixlen = len(self.PACKSUFFIX)
146 146
147 147 ids = set()
148 148 sizes = collections.defaultdict(lambda: 0)
149 149 mtimes = collections.defaultdict(lambda: [])
150 150 try:
151 151 for filename, type, stat in osutil.listdir(self.path, stat=True):
152 152 id = None
153 153 if filename[-indexsuffixlen:] == self.INDEXSUFFIX:
154 154 id = filename[:-indexsuffixlen]
155 155 elif filename[-packsuffixlen:] == self.PACKSUFFIX:
156 156 id = filename[:-packsuffixlen]
157 157
158 158 # Since we expect to have two files corresponding to each ID
159 159 # (the index file and the pack file), we can yield once we see
160 160 # it twice.
161 161 if id:
162 162 sizes[id] += stat.st_size # Sum both files' sizes together
163 163 mtimes[id].append(stat.st_mtime)
164 164 if id in ids:
165 165 yield (os.path.join(self.path, id), max(mtimes[id]),
166 166 sizes[id])
167 167 else:
168 168 ids.add(id)
169 169 except OSError as ex:
170 170 if ex.errno != errno.ENOENT:
171 171 raise
172 172
173 173 def _getavailablepackfilessorted(self):
174 174 """Like `_getavailablepackfiles`, but also sorts the files by mtime,
175 175 yielding newest files first.
176 176
177 177 This is desirable, since it is more likely newer packfiles have more
178 178 desirable data.
179 179 """
180 180 files = []
181 181 for path, mtime, size in self._getavailablepackfiles():
182 182 files.append((mtime, size, path))
183 183 files = sorted(files, reverse=True)
184 184 for mtime, size, path in files:
185 185 yield path, mtime, size
186 186
187 187 def gettotalsizeandcount(self):
188 188 """Returns the total disk size (in bytes) of all the pack files in
189 189 this store, and the count of pack files.
190 190
191 191 (This might be smaller than the total size of the ``self.path``
192 192 directory, since this only considers fuly-writen pack files, and not
193 193 temporary files or other detritus on the directory.)
194 194 """
195 195 totalsize = 0
196 196 count = 0
197 197 for __, __, size in self._getavailablepackfiles():
198 198 totalsize += size
199 199 count += 1
200 200 return totalsize, count
201 201
202 202 def getmetrics(self):
203 203 """Returns metrics on the state of this store."""
204 204 size, count = self.gettotalsizeandcount()
205 205 return {
206 206 'numpacks': count,
207 207 'totalpacksize': size,
208 208 }
209 209
210 210 def getpack(self, path):
211 211 raise NotImplementedError()
212 212
213 213 def getmissing(self, keys):
214 214 missing = keys
215 215 for pack in self.packs:
216 216 missing = pack.getmissing(missing)
217 217
218 218 # Ensures better performance of the cache by keeping the most
219 219 # recently accessed pack at the beginning in subsequent iterations.
220 220 if not missing:
221 221 return missing
222 222
223 223 if missing:
224 224 for pack in self.refresh():
225 225 missing = pack.getmissing(missing)
226 226
227 227 return missing
228 228
229 229 def markledger(self, ledger, options=None):
230 230 for pack in self.packs:
231 231 pack.markledger(ledger)
232 232
233 233 def markforrefresh(self):
234 234 """Tells the store that there may be new pack files, so the next time it
235 235 has a lookup miss it should check for new files."""
236 236 self.lastrefresh = 0
237 237
238 238 def refresh(self):
239 239 """Checks for any new packs on disk, adds them to the main pack list,
240 240 and returns a list of just the new packs."""
241 241 now = time.time()
242 242
243 243 # If we experience a lot of misses (like in the case of getmissing() on
244 244 # new objects), let's only actually check disk for new stuff every once
245 245 # in a while. Generally this code path should only ever matter when a
246 246 # repack is going on in the background, and that should be pretty rare
247 247 # to have that happen twice in quick succession.
248 248 newpacks = []
249 249 if now > self.lastrefresh + REFRESHRATE:
250 250 self.lastrefresh = now
251 251 previous = set(p.path for p in self.packs)
252 252 for filepath, __, __ in self._getavailablepackfilessorted():
253 253 if filepath not in previous:
254 254 newpack = self.getpack(filepath)
255 255 newpacks.append(newpack)
256 256 self.packs.add(newpack)
257 257
258 258 return newpacks
259 259
260 260 class versionmixin(object):
261 261 # Mix-in for classes with multiple supported versions
262 262 VERSION = None
263 263 SUPPORTED_VERSIONS = [2]
264 264
265 265 def _checkversion(self, version):
266 266 if version in self.SUPPORTED_VERSIONS:
267 267 if self.VERSION is None:
268 268 # only affect this instance
269 269 self.VERSION = version
270 270 elif self.VERSION != version:
271 271 raise RuntimeError('inconsistent version: %s' % version)
272 272 else:
273 273 raise RuntimeError('unsupported version: %s' % version)
274 274
275 275 class basepack(versionmixin):
276 276 # The maximum amount we should read via mmap before remmaping so the old
277 277 # pages can be released (100MB)
278 278 MAXPAGEDIN = 100 * 1024**2
279 279
280 280 SUPPORTED_VERSIONS = [2]
281 281
282 282 def __init__(self, path):
283 283 self.path = path
284 284 self.packpath = path + self.PACKSUFFIX
285 285 self.indexpath = path + self.INDEXSUFFIX
286 286
287 287 self.indexsize = os.stat(self.indexpath).st_size
288 288 self.datasize = os.stat(self.packpath).st_size
289 289
290 290 self._index = None
291 291 self._data = None
292 292 self.freememory() # initialize the mmap
293 293
294 294 version = struct.unpack('!B', self._data[:PACKVERSIONSIZE])[0]
295 295 self._checkversion(version)
296 296
297 297 version, config = struct.unpack('!BB', self._index[:INDEXVERSIONSIZE])
298 298 self._checkversion(version)
299 299
300 300 if 0b10000000 & config:
301 301 self.params = indexparams(LARGEFANOUTPREFIX, version)
302 302 else:
303 303 self.params = indexparams(SMALLFANOUTPREFIX, version)
304 304
305 305 @util.propertycache
306 306 def _fanouttable(self):
307 307 params = self.params
308 308 rawfanout = self._index[FANOUTSTART:FANOUTSTART + params.fanoutsize]
309 309 fanouttable = []
310 310 for i in pycompat.xrange(0, params.fanoutcount):
311 311 loc = i * 4
312 312 fanoutentry = struct.unpack('!I', rawfanout[loc:loc + 4])[0]
313 313 fanouttable.append(fanoutentry)
314 314 return fanouttable
315 315
316 316 @util.propertycache
317 317 def _indexend(self):
318 318 nodecount = struct.unpack_from('!Q', self._index,
319 319 self.params.indexstart - 8)[0]
320 320 return self.params.indexstart + nodecount * self.INDEXENTRYLENGTH
321 321
322 322 def freememory(self):
323 323 """Unmap and remap the memory to free it up after known expensive
324 324 operations. Return True if self._data and self._index were reloaded.
325 325 """
326 326 if self._index:
327 327 if self._pagedin < self.MAXPAGEDIN:
328 328 return False
329 329
330 330 self._index.close()
331 331 self._data.close()
332 332
333 333 # TODO: use an opener/vfs to access these paths
334 334 with open(self.indexpath, PACKOPENMODE) as indexfp:
335 335 # memory-map the file, size 0 means whole file
336 336 self._index = mmap.mmap(indexfp.fileno(), 0,
337 337 access=mmap.ACCESS_READ)
338 338 with open(self.packpath, PACKOPENMODE) as datafp:
339 339 self._data = mmap.mmap(datafp.fileno(), 0, access=mmap.ACCESS_READ)
340 340
341 341 self._pagedin = 0
342 342 return True
343 343
344 344 def getmissing(self, keys):
345 345 raise NotImplementedError()
346 346
347 347 def markledger(self, ledger, options=None):
348 348 raise NotImplementedError()
349 349
350 350 def cleanup(self, ledger):
351 351 raise NotImplementedError()
352 352
353 353 def __iter__(self):
354 354 raise NotImplementedError()
355 355
356 356 def iterentries(self):
357 357 raise NotImplementedError()
358 358
359 359 class mutablebasepack(versionmixin):
360 360
361 361 def __init__(self, ui, packdir, version=2):
362 362 self._checkversion(version)
363 363 # TODO(augie): make this configurable
364 364 self._compressor = 'GZ'
365 365 opener = vfsmod.vfs(packdir)
366 366 opener.createmode = 0o444
367 367 self.opener = opener
368 368
369 369 self.entries = {}
370 370
371 371 shallowutil.mkstickygroupdir(ui, packdir)
372 372 self.packfp, self.packpath = opener.mkstemp(
373 373 suffix=self.PACKSUFFIX + '-tmp')
374 374 self.idxfp, self.idxpath = opener.mkstemp(
375 375 suffix=self.INDEXSUFFIX + '-tmp')
376 self.packfp = os.fdopen(self.packfp, 'w+')
377 self.idxfp = os.fdopen(self.idxfp, 'w+')
376 self.packfp = os.fdopen(self.packfp, r'w+')
377 self.idxfp = os.fdopen(self.idxfp, r'w+')
378 378 self.sha = hashlib.sha1()
379 379 self._closed = False
380 380
381 381 # The opener provides no way of doing permission fixup on files created
382 382 # via mkstemp, so we must fix it ourselves. We can probably fix this
383 383 # upstream in vfs.mkstemp so we don't need to use the private method.
384 384 opener._fixfilemode(opener.join(self.packpath))
385 385 opener._fixfilemode(opener.join(self.idxpath))
386 386
387 387 # Write header
388 388 # TODO: make it extensible (ex: allow specifying compression algorithm,
389 389 # a flexible key/value header, delta algorithm, fanout size, etc)
390 390 versionbuf = struct.pack('!B', self.VERSION) # unsigned 1 byte int
391 391 self.writeraw(versionbuf)
392 392
393 393 def __enter__(self):
394 394 return self
395 395
396 396 def __exit__(self, exc_type, exc_value, traceback):
397 397 if exc_type is None:
398 398 self.close()
399 399 else:
400 400 self.abort()
401 401
402 402 def abort(self):
403 403 # Unclean exit
404 404 self._cleantemppacks()
405 405
406 406 def writeraw(self, data):
407 407 self.packfp.write(data)
408 408 self.sha.update(data)
409 409
410 410 def close(self, ledger=None):
411 411 if self._closed:
412 412 return
413 413
414 414 try:
415 415 sha = self.sha.hexdigest()
416 416 self.packfp.close()
417 417 self.writeindex()
418 418
419 419 if len(self.entries) == 0:
420 420 # Empty pack
421 421 self._cleantemppacks()
422 422 self._closed = True
423 423 return None
424 424
425 425 self.opener.rename(self.packpath, sha + self.PACKSUFFIX)
426 426 try:
427 427 self.opener.rename(self.idxpath, sha + self.INDEXSUFFIX)
428 428 except Exception as ex:
429 429 try:
430 430 self.opener.unlink(sha + self.PACKSUFFIX)
431 431 except Exception:
432 432 pass
433 433 # Throw exception 'ex' explicitly since a normal 'raise' would
434 434 # potentially throw an exception from the unlink cleanup.
435 435 raise ex
436 436 except Exception:
437 437 # Clean up temp packs in all exception cases
438 438 self._cleantemppacks()
439 439 raise
440 440
441 441 self._closed = True
442 442 result = self.opener.join(sha)
443 443 if ledger:
444 444 ledger.addcreated(result)
445 445 return result
446 446
447 447 def _cleantemppacks(self):
448 448 try:
449 449 self.opener.unlink(self.packpath)
450 450 except Exception:
451 451 pass
452 452 try:
453 453 self.opener.unlink(self.idxpath)
454 454 except Exception:
455 455 pass
456 456
457 457 def writeindex(self):
458 458 rawindex = ''
459 459
460 460 largefanout = len(self.entries) > SMALLFANOUTCUTOFF
461 461 if largefanout:
462 462 params = indexparams(LARGEFANOUTPREFIX, self.VERSION)
463 463 else:
464 464 params = indexparams(SMALLFANOUTPREFIX, self.VERSION)
465 465
466 466 fanouttable = [EMPTYFANOUT] * params.fanoutcount
467 467
468 468 # Precompute the location of each entry
469 469 locations = {}
470 470 count = 0
471 471 for node in sorted(self.entries.iterkeys()):
472 472 location = count * self.INDEXENTRYLENGTH
473 473 locations[node] = location
474 474 count += 1
475 475
476 476 # Must use [0] on the unpack result since it's always a tuple.
477 477 fanoutkey = struct.unpack(params.fanoutstruct,
478 478 node[:params.fanoutprefix])[0]
479 479 if fanouttable[fanoutkey] == EMPTYFANOUT:
480 480 fanouttable[fanoutkey] = location
481 481
482 482 rawfanouttable = ''
483 483 last = 0
484 484 for offset in fanouttable:
485 485 offset = offset if offset != EMPTYFANOUT else last
486 486 last = offset
487 487 rawfanouttable += struct.pack('!I', offset)
488 488
489 489 rawentrieslength = struct.pack('!Q', len(self.entries))
490 490
491 491 # The index offset is the it's location in the file. So after the 2 byte
492 492 # header and the fanouttable.
493 493 rawindex = self.createindex(locations, 2 + len(rawfanouttable))
494 494
495 495 self._writeheader(params)
496 496 self.idxfp.write(rawfanouttable)
497 497 self.idxfp.write(rawentrieslength)
498 498 self.idxfp.write(rawindex)
499 499 self.idxfp.close()
500 500
501 501 def createindex(self, nodelocations):
502 502 raise NotImplementedError()
503 503
504 504 def _writeheader(self, indexparams):
505 505 # Index header
506 506 # <version: 1 byte>
507 507 # <large fanout: 1 bit> # 1 means 2^16, 0 means 2^8
508 508 # <unused: 7 bit> # future use (compression, delta format, etc)
509 509 config = 0
510 510 if indexparams.fanoutprefix == LARGEFANOUTPREFIX:
511 511 config = 0b10000000
512 512 self.idxfp.write(struct.pack('!BB', self.VERSION, config))
513 513
514 514 class indexparams(object):
515 515 __slots__ = (r'fanoutprefix', r'fanoutstruct', r'fanoutcount',
516 516 r'fanoutsize', r'indexstart')
517 517
518 518 def __init__(self, prefixsize, version):
519 519 self.fanoutprefix = prefixsize
520 520
521 521 # The struct pack format for fanout table location (i.e. the format that
522 522 # converts the node prefix into an integer location in the fanout
523 523 # table).
524 524 if prefixsize == SMALLFANOUTPREFIX:
525 525 self.fanoutstruct = '!B'
526 526 elif prefixsize == LARGEFANOUTPREFIX:
527 527 self.fanoutstruct = '!H'
528 528 else:
529 529 raise ValueError("invalid fanout prefix size: %s" % prefixsize)
530 530
531 531 # The number of fanout table entries
532 532 self.fanoutcount = 2**(prefixsize * 8)
533 533
534 534 # The total bytes used by the fanout table
535 535 self.fanoutsize = self.fanoutcount * 4
536 536
537 537 self.indexstart = FANOUTSTART + self.fanoutsize
538 538 # Skip the index length
539 539 self.indexstart += 8
General Comments 0
You need to be logged in to leave comments. Login now