##// END OF EJS Templates
remotefilelog: tell runbgcommand to not block on child process startup...
Augie Fackler -
r42697:df1419c5 default
parent child Browse files
Show More
@@ -1,777 +1,778 b''
1 1 from __future__ import absolute_import
2 2
3 3 import os
4 4 import time
5 5
6 6 from mercurial.i18n import _
7 7 from mercurial.node import (
8 8 nullid,
9 9 short,
10 10 )
11 11 from mercurial import (
12 12 encoding,
13 13 error,
14 14 mdiff,
15 15 policy,
16 16 pycompat,
17 17 scmutil,
18 18 util,
19 19 vfs,
20 20 )
21 21 from mercurial.utils import procutil
22 22 from . import (
23 23 constants,
24 24 contentstore,
25 25 datapack,
26 26 extutil,
27 27 historypack,
28 28 metadatastore,
29 29 shallowutil,
30 30 )
31 31
32 32 osutil = policy.importmod(r'osutil')
33 33
34 34 class RepackAlreadyRunning(error.Abort):
35 35 pass
36 36
37 37 def backgroundrepack(repo, incremental=True, packsonly=False):
38 38 cmd = [procutil.hgexecutable(), '-R', repo.origroot, 'repack']
39 39 msg = _("(running background repack)\n")
40 40 if incremental:
41 41 cmd.append('--incremental')
42 42 msg = _("(running background incremental repack)\n")
43 43 if packsonly:
44 44 cmd.append('--packsonly')
45 45 repo.ui.warn(msg)
46 procutil.runbgcommand(cmd, encoding.environ)
46 # We know this command will find a binary, so don't block on it starting.
47 procutil.runbgcommand(cmd, encoding.environ, ensurestart=False)
47 48
48 49 def fullrepack(repo, options=None):
49 50 """If ``packsonly`` is True, stores creating only loose objects are skipped.
50 51 """
51 52 if util.safehasattr(repo, 'shareddatastores'):
52 53 datasource = contentstore.unioncontentstore(
53 54 *repo.shareddatastores)
54 55 historysource = metadatastore.unionmetadatastore(
55 56 *repo.sharedhistorystores,
56 57 allowincomplete=True)
57 58
58 59 packpath = shallowutil.getcachepackpath(
59 60 repo,
60 61 constants.FILEPACK_CATEGORY)
61 62 _runrepack(repo, datasource, historysource, packpath,
62 63 constants.FILEPACK_CATEGORY, options=options)
63 64
64 65 if util.safehasattr(repo.manifestlog, 'datastore'):
65 66 localdata, shareddata = _getmanifeststores(repo)
66 67 lpackpath, ldstores, lhstores = localdata
67 68 spackpath, sdstores, shstores = shareddata
68 69
69 70 # Repack the shared manifest store
70 71 datasource = contentstore.unioncontentstore(*sdstores)
71 72 historysource = metadatastore.unionmetadatastore(
72 73 *shstores,
73 74 allowincomplete=True)
74 75 _runrepack(repo, datasource, historysource, spackpath,
75 76 constants.TREEPACK_CATEGORY, options=options)
76 77
77 78 # Repack the local manifest store
78 79 datasource = contentstore.unioncontentstore(
79 80 *ldstores,
80 81 allowincomplete=True)
81 82 historysource = metadatastore.unionmetadatastore(
82 83 *lhstores,
83 84 allowincomplete=True)
84 85 _runrepack(repo, datasource, historysource, lpackpath,
85 86 constants.TREEPACK_CATEGORY, options=options)
86 87
87 88 def incrementalrepack(repo, options=None):
88 89 """This repacks the repo by looking at the distribution of pack files in the
89 90 repo and performing the most minimal repack to keep the repo in good shape.
90 91 """
91 92 if util.safehasattr(repo, 'shareddatastores'):
92 93 packpath = shallowutil.getcachepackpath(
93 94 repo,
94 95 constants.FILEPACK_CATEGORY)
95 96 _incrementalrepack(repo,
96 97 repo.shareddatastores,
97 98 repo.sharedhistorystores,
98 99 packpath,
99 100 constants.FILEPACK_CATEGORY,
100 101 options=options)
101 102
102 103 if util.safehasattr(repo.manifestlog, 'datastore'):
103 104 localdata, shareddata = _getmanifeststores(repo)
104 105 lpackpath, ldstores, lhstores = localdata
105 106 spackpath, sdstores, shstores = shareddata
106 107
107 108 # Repack the shared manifest store
108 109 _incrementalrepack(repo,
109 110 sdstores,
110 111 shstores,
111 112 spackpath,
112 113 constants.TREEPACK_CATEGORY,
113 114 options=options)
114 115
115 116 # Repack the local manifest store
116 117 _incrementalrepack(repo,
117 118 ldstores,
118 119 lhstores,
119 120 lpackpath,
120 121 constants.TREEPACK_CATEGORY,
121 122 allowincompletedata=True,
122 123 options=options)
123 124
124 125 def _getmanifeststores(repo):
125 126 shareddatastores = repo.manifestlog.shareddatastores
126 127 localdatastores = repo.manifestlog.localdatastores
127 128 sharedhistorystores = repo.manifestlog.sharedhistorystores
128 129 localhistorystores = repo.manifestlog.localhistorystores
129 130
130 131 sharedpackpath = shallowutil.getcachepackpath(repo,
131 132 constants.TREEPACK_CATEGORY)
132 133 localpackpath = shallowutil.getlocalpackpath(repo.svfs.vfs.base,
133 134 constants.TREEPACK_CATEGORY)
134 135
135 136 return ((localpackpath, localdatastores, localhistorystores),
136 137 (sharedpackpath, shareddatastores, sharedhistorystores))
137 138
138 139 def _topacks(packpath, files, constructor):
139 140 paths = list(os.path.join(packpath, p) for p in files)
140 141 packs = list(constructor(p) for p in paths)
141 142 return packs
142 143
143 144 def _deletebigpacks(repo, folder, files):
144 145 """Deletes packfiles that are bigger than ``packs.maxpacksize``.
145 146
146 147 Returns ``files` with the removed files omitted."""
147 148 maxsize = repo.ui.configbytes("packs", "maxpacksize")
148 149 if maxsize <= 0:
149 150 return files
150 151
151 152 # This only considers datapacks today, but we could broaden it to include
152 153 # historypacks.
153 154 VALIDEXTS = [".datapack", ".dataidx"]
154 155
155 156 # Either an oversize index or datapack will trigger cleanup of the whole
156 157 # pack:
157 158 oversized = {os.path.splitext(path)[0] for path, ftype, stat in files
158 159 if (stat.st_size > maxsize and (os.path.splitext(path)[1]
159 160 in VALIDEXTS))}
160 161
161 162 for rootfname in oversized:
162 163 rootpath = os.path.join(folder, rootfname)
163 164 for ext in VALIDEXTS:
164 165 path = rootpath + ext
165 166 repo.ui.debug('removing oversize packfile %s (%s)\n' %
166 167 (path, util.bytecount(os.stat(path).st_size)))
167 168 os.unlink(path)
168 169 return [row for row in files if os.path.basename(row[0]) not in oversized]
169 170
170 171 def _incrementalrepack(repo, datastore, historystore, packpath, category,
171 172 allowincompletedata=False, options=None):
172 173 shallowutil.mkstickygroupdir(repo.ui, packpath)
173 174
174 175 files = osutil.listdir(packpath, stat=True)
175 176 files = _deletebigpacks(repo, packpath, files)
176 177 datapacks = _topacks(packpath,
177 178 _computeincrementaldatapack(repo.ui, files),
178 179 datapack.datapack)
179 180 datapacks.extend(s for s in datastore
180 181 if not isinstance(s, datapack.datapackstore))
181 182
182 183 historypacks = _topacks(packpath,
183 184 _computeincrementalhistorypack(repo.ui, files),
184 185 historypack.historypack)
185 186 historypacks.extend(s for s in historystore
186 187 if not isinstance(s, historypack.historypackstore))
187 188
188 189 # ``allhistory{files,packs}`` contains all known history packs, even ones we
189 190 # don't plan to repack. They are used during the datapack repack to ensure
190 191 # good ordering of nodes.
191 192 allhistoryfiles = _allpackfileswithsuffix(files, historypack.PACKSUFFIX,
192 193 historypack.INDEXSUFFIX)
193 194 allhistorypacks = _topacks(packpath,
194 195 (f for f, mode, stat in allhistoryfiles),
195 196 historypack.historypack)
196 197 allhistorypacks.extend(s for s in historystore
197 198 if not isinstance(s, historypack.historypackstore))
198 199 _runrepack(repo,
199 200 contentstore.unioncontentstore(
200 201 *datapacks,
201 202 allowincomplete=allowincompletedata),
202 203 metadatastore.unionmetadatastore(
203 204 *historypacks,
204 205 allowincomplete=True),
205 206 packpath, category,
206 207 fullhistory=metadatastore.unionmetadatastore(
207 208 *allhistorypacks,
208 209 allowincomplete=True),
209 210 options=options)
210 211
211 212 def _computeincrementaldatapack(ui, files):
212 213 opts = {
213 214 'gencountlimit' : ui.configint(
214 215 'remotefilelog', 'data.gencountlimit'),
215 216 'generations' : ui.configlist(
216 217 'remotefilelog', 'data.generations'),
217 218 'maxrepackpacks' : ui.configint(
218 219 'remotefilelog', 'data.maxrepackpacks'),
219 220 'repackmaxpacksize' : ui.configbytes(
220 221 'remotefilelog', 'data.repackmaxpacksize'),
221 222 'repacksizelimit' : ui.configbytes(
222 223 'remotefilelog', 'data.repacksizelimit'),
223 224 }
224 225
225 226 packfiles = _allpackfileswithsuffix(
226 227 files, datapack.PACKSUFFIX, datapack.INDEXSUFFIX)
227 228 return _computeincrementalpack(packfiles, opts)
228 229
229 230 def _computeincrementalhistorypack(ui, files):
230 231 opts = {
231 232 'gencountlimit' : ui.configint(
232 233 'remotefilelog', 'history.gencountlimit'),
233 234 'generations' : ui.configlist(
234 235 'remotefilelog', 'history.generations', ['100MB']),
235 236 'maxrepackpacks' : ui.configint(
236 237 'remotefilelog', 'history.maxrepackpacks'),
237 238 'repackmaxpacksize' : ui.configbytes(
238 239 'remotefilelog', 'history.repackmaxpacksize', '400MB'),
239 240 'repacksizelimit' : ui.configbytes(
240 241 'remotefilelog', 'history.repacksizelimit'),
241 242 }
242 243
243 244 packfiles = _allpackfileswithsuffix(
244 245 files, historypack.PACKSUFFIX, historypack.INDEXSUFFIX)
245 246 return _computeincrementalpack(packfiles, opts)
246 247
247 248 def _allpackfileswithsuffix(files, packsuffix, indexsuffix):
248 249 result = []
249 250 fileset = set(fn for fn, mode, stat in files)
250 251 for filename, mode, stat in files:
251 252 if not filename.endswith(packsuffix):
252 253 continue
253 254
254 255 prefix = filename[:-len(packsuffix)]
255 256
256 257 # Don't process a pack if it doesn't have an index.
257 258 if (prefix + indexsuffix) not in fileset:
258 259 continue
259 260 result.append((prefix, mode, stat))
260 261
261 262 return result
262 263
263 264 def _computeincrementalpack(files, opts):
264 265 """Given a set of pack files along with the configuration options, this
265 266 function computes the list of files that should be packed as part of an
266 267 incremental repack.
267 268
268 269 It tries to strike a balance between keeping incremental repacks cheap (i.e.
269 270 packing small things when possible, and rolling the packs up to the big ones
270 271 over time).
271 272 """
272 273
273 274 limits = list(sorted((util.sizetoint(s) for s in opts['generations']),
274 275 reverse=True))
275 276 limits.append(0)
276 277
277 278 # Group the packs by generation (i.e. by size)
278 279 generations = []
279 280 for i in pycompat.xrange(len(limits)):
280 281 generations.append([])
281 282
282 283 sizes = {}
283 284 for prefix, mode, stat in files:
284 285 size = stat.st_size
285 286 if size > opts['repackmaxpacksize']:
286 287 continue
287 288
288 289 sizes[prefix] = size
289 290 for i, limit in enumerate(limits):
290 291 if size > limit:
291 292 generations[i].append(prefix)
292 293 break
293 294
294 295 # Steps for picking what packs to repack:
295 296 # 1. Pick the largest generation with > gencountlimit pack files.
296 297 # 2. Take the smallest three packs.
297 298 # 3. While total-size-of-packs < repacksizelimit: add another pack
298 299
299 300 # Find the largest generation with more than gencountlimit packs
300 301 genpacks = []
301 302 for i, limit in enumerate(limits):
302 303 if len(generations[i]) > opts['gencountlimit']:
303 304 # Sort to be smallest last, for easy popping later
304 305 genpacks.extend(sorted(generations[i], reverse=True,
305 306 key=lambda x: sizes[x]))
306 307 break
307 308
308 309 # Take as many packs from the generation as we can
309 310 chosenpacks = genpacks[-3:]
310 311 genpacks = genpacks[:-3]
311 312 repacksize = sum(sizes[n] for n in chosenpacks)
312 313 while (repacksize < opts['repacksizelimit'] and genpacks and
313 314 len(chosenpacks) < opts['maxrepackpacks']):
314 315 chosenpacks.append(genpacks.pop())
315 316 repacksize += sizes[chosenpacks[-1]]
316 317
317 318 return chosenpacks
318 319
319 320 def _runrepack(repo, data, history, packpath, category, fullhistory=None,
320 321 options=None):
321 322 shallowutil.mkstickygroupdir(repo.ui, packpath)
322 323
323 324 def isold(repo, filename, node):
324 325 """Check if the file node is older than a limit.
325 326 Unless a limit is specified in the config the default limit is taken.
326 327 """
327 328 filectx = repo.filectx(filename, fileid=node)
328 329 filetime = repo[filectx.linkrev()].date()
329 330
330 331 ttl = repo.ui.configint('remotefilelog', 'nodettl')
331 332
332 333 limit = time.time() - ttl
333 334 return filetime[0] < limit
334 335
335 336 garbagecollect = repo.ui.configbool('remotefilelog', 'gcrepack')
336 337 if not fullhistory:
337 338 fullhistory = history
338 339 packer = repacker(repo, data, history, fullhistory, category,
339 340 gc=garbagecollect, isold=isold, options=options)
340 341
341 342 with datapack.mutabledatapack(repo.ui, packpath) as dpack:
342 343 with historypack.mutablehistorypack(repo.ui, packpath) as hpack:
343 344 try:
344 345 packer.run(dpack, hpack)
345 346 except error.LockHeld:
346 347 raise RepackAlreadyRunning(_("skipping repack - another repack "
347 348 "is already running"))
348 349
349 350 def keepset(repo, keyfn, lastkeepkeys=None):
350 351 """Computes a keepset which is not garbage collected.
351 352 'keyfn' is a function that maps filename, node to a unique key.
352 353 'lastkeepkeys' is an optional argument and if provided the keepset
353 354 function updates lastkeepkeys with more keys and returns the result.
354 355 """
355 356 if not lastkeepkeys:
356 357 keepkeys = set()
357 358 else:
358 359 keepkeys = lastkeepkeys
359 360
360 361 # We want to keep:
361 362 # 1. Working copy parent
362 363 # 2. Draft commits
363 364 # 3. Parents of draft commits
364 365 # 4. Pullprefetch and bgprefetchrevs revsets if specified
365 366 revs = ['.', 'draft()', 'parents(draft())']
366 367 prefetchrevs = repo.ui.config('remotefilelog', 'pullprefetch', None)
367 368 if prefetchrevs:
368 369 revs.append('(%s)' % prefetchrevs)
369 370 prefetchrevs = repo.ui.config('remotefilelog', 'bgprefetchrevs', None)
370 371 if prefetchrevs:
371 372 revs.append('(%s)' % prefetchrevs)
372 373 revs = '+'.join(revs)
373 374
374 375 revs = ['sort((%s), "topo")' % revs]
375 376 keep = scmutil.revrange(repo, revs)
376 377
377 378 processed = set()
378 379 lastmanifest = None
379 380
380 381 # process the commits in toposorted order starting from the oldest
381 382 for r in reversed(keep._list):
382 383 if repo[r].p1().rev() in processed:
383 384 # if the direct parent has already been processed
384 385 # then we only need to process the delta
385 386 m = repo[r].manifestctx().readdelta()
386 387 else:
387 388 # otherwise take the manifest and diff it
388 389 # with the previous manifest if one exists
389 390 if lastmanifest:
390 391 m = repo[r].manifest().diff(lastmanifest)
391 392 else:
392 393 m = repo[r].manifest()
393 394 lastmanifest = repo[r].manifest()
394 395 processed.add(r)
395 396
396 397 # populate keepkeys with keys from the current manifest
397 398 if type(m) is dict:
398 399 # m is a result of diff of two manifests and is a dictionary that
399 400 # maps filename to ((newnode, newflag), (oldnode, oldflag)) tuple
400 401 for filename, diff in m.iteritems():
401 402 if diff[0][0] is not None:
402 403 keepkeys.add(keyfn(filename, diff[0][0]))
403 404 else:
404 405 # m is a manifest object
405 406 for filename, filenode in m.iteritems():
406 407 keepkeys.add(keyfn(filename, filenode))
407 408
408 409 return keepkeys
409 410
410 411 class repacker(object):
411 412 """Class for orchestrating the repack of data and history information into a
412 413 new format.
413 414 """
414 415 def __init__(self, repo, data, history, fullhistory, category, gc=False,
415 416 isold=None, options=None):
416 417 self.repo = repo
417 418 self.data = data
418 419 self.history = history
419 420 self.fullhistory = fullhistory
420 421 self.unit = constants.getunits(category)
421 422 self.garbagecollect = gc
422 423 self.options = options
423 424 if self.garbagecollect:
424 425 if not isold:
425 426 raise ValueError("Function 'isold' is not properly specified")
426 427 # use (filename, node) tuple as a keepset key
427 428 self.keepkeys = keepset(repo, lambda f, n : (f, n))
428 429 self.isold = isold
429 430
430 431 def run(self, targetdata, targethistory):
431 432 ledger = repackledger()
432 433
433 434 with extutil.flock(repacklockvfs(self.repo).join("repacklock"),
434 435 _('repacking %s') % self.repo.origroot, timeout=0):
435 436 self.repo.hook('prerepack')
436 437
437 438 # Populate ledger from source
438 439 self.data.markledger(ledger, options=self.options)
439 440 self.history.markledger(ledger, options=self.options)
440 441
441 442 # Run repack
442 443 self.repackdata(ledger, targetdata)
443 444 self.repackhistory(ledger, targethistory)
444 445
445 446 # Call cleanup on each source
446 447 for source in ledger.sources:
447 448 source.cleanup(ledger)
448 449
449 450 def _chainorphans(self, ui, filename, nodes, orphans, deltabases):
450 451 """Reorderes ``orphans`` into a single chain inside ``nodes`` and
451 452 ``deltabases``.
452 453
453 454 We often have orphan entries (nodes without a base that aren't
454 455 referenced by other nodes -- i.e., part of a chain) due to gaps in
455 456 history. Rather than store them as individual fulltexts, we prefer to
456 457 insert them as one chain sorted by size.
457 458 """
458 459 if not orphans:
459 460 return nodes
460 461
461 462 def getsize(node, default=0):
462 463 meta = self.data.getmeta(filename, node)
463 464 if constants.METAKEYSIZE in meta:
464 465 return meta[constants.METAKEYSIZE]
465 466 else:
466 467 return default
467 468
468 469 # Sort orphans by size; biggest first is preferred, since it's more
469 470 # likely to be the newest version assuming files grow over time.
470 471 # (Sort by node first to ensure the sort is stable.)
471 472 orphans = sorted(orphans)
472 473 orphans = list(sorted(orphans, key=getsize, reverse=True))
473 474 if ui.debugflag:
474 475 ui.debug("%s: orphan chain: %s\n" % (filename,
475 476 ", ".join([short(s) for s in orphans])))
476 477
477 478 # Create one contiguous chain and reassign deltabases.
478 479 for i, node in enumerate(orphans):
479 480 if i == 0:
480 481 deltabases[node] = (nullid, 0)
481 482 else:
482 483 parent = orphans[i - 1]
483 484 deltabases[node] = (parent, deltabases[parent][1] + 1)
484 485 nodes = [n for n in nodes if n not in orphans]
485 486 nodes += orphans
486 487 return nodes
487 488
488 489 def repackdata(self, ledger, target):
489 490 ui = self.repo.ui
490 491 maxchainlen = ui.configint('packs', 'maxchainlen', 1000)
491 492
492 493 byfile = {}
493 494 for entry in ledger.entries.itervalues():
494 495 if entry.datasource:
495 496 byfile.setdefault(entry.filename, {})[entry.node] = entry
496 497
497 498 count = 0
498 499 repackprogress = ui.makeprogress(_("repacking data"), unit=self.unit,
499 500 total=len(byfile))
500 501 for filename, entries in sorted(byfile.iteritems()):
501 502 repackprogress.update(count)
502 503
503 504 ancestors = {}
504 505 nodes = list(node for node in entries)
505 506 nohistory = []
506 507 buildprogress = ui.makeprogress(_("building history"), unit='nodes',
507 508 total=len(nodes))
508 509 for i, node in enumerate(nodes):
509 510 if node in ancestors:
510 511 continue
511 512 buildprogress.update(i)
512 513 try:
513 514 ancestors.update(self.fullhistory.getancestors(filename,
514 515 node, known=ancestors))
515 516 except KeyError:
516 517 # Since we're packing data entries, we may not have the
517 518 # corresponding history entries for them. It's not a big
518 519 # deal, but the entries won't be delta'd perfectly.
519 520 nohistory.append(node)
520 521 buildprogress.complete()
521 522
522 523 # Order the nodes children first, so we can produce reverse deltas
523 524 orderednodes = list(reversed(self._toposort(ancestors)))
524 525 if len(nohistory) > 0:
525 526 ui.debug('repackdata: %d nodes without history\n' %
526 527 len(nohistory))
527 528 orderednodes.extend(sorted(nohistory))
528 529
529 530 # Filter orderednodes to just the nodes we want to serialize (it
530 531 # currently also has the edge nodes' ancestors).
531 532 orderednodes = list(filter(lambda node: node in nodes,
532 533 orderednodes))
533 534
534 535 # Garbage collect old nodes:
535 536 if self.garbagecollect:
536 537 neworderednodes = []
537 538 for node in orderednodes:
538 539 # If the node is old and is not in the keepset, we skip it,
539 540 # and mark as garbage collected
540 541 if ((filename, node) not in self.keepkeys and
541 542 self.isold(self.repo, filename, node)):
542 543 entries[node].gced = True
543 544 continue
544 545 neworderednodes.append(node)
545 546 orderednodes = neworderednodes
546 547
547 548 # Compute delta bases for nodes:
548 549 deltabases = {}
549 550 nobase = set()
550 551 referenced = set()
551 552 nodes = set(nodes)
552 553 processprogress = ui.makeprogress(_("processing nodes"),
553 554 unit='nodes',
554 555 total=len(orderednodes))
555 556 for i, node in enumerate(orderednodes):
556 557 processprogress.update(i)
557 558 # Find delta base
558 559 # TODO: allow delta'ing against most recent descendant instead
559 560 # of immediate child
560 561 deltatuple = deltabases.get(node, None)
561 562 if deltatuple is None:
562 563 deltabase, chainlen = nullid, 0
563 564 deltabases[node] = (nullid, 0)
564 565 nobase.add(node)
565 566 else:
566 567 deltabase, chainlen = deltatuple
567 568 referenced.add(deltabase)
568 569
569 570 # Use available ancestor information to inform our delta choices
570 571 ancestorinfo = ancestors.get(node)
571 572 if ancestorinfo:
572 573 p1, p2, linknode, copyfrom = ancestorinfo
573 574
574 575 # The presence of copyfrom means we're at a point where the
575 576 # file was copied from elsewhere. So don't attempt to do any
576 577 # deltas with the other file.
577 578 if copyfrom:
578 579 p1 = nullid
579 580
580 581 if chainlen < maxchainlen:
581 582 # Record this child as the delta base for its parents.
582 583 # This may be non optimal, since the parents may have
583 584 # many children, and this will only choose the last one.
584 585 # TODO: record all children and try all deltas to find
585 586 # best
586 587 if p1 != nullid:
587 588 deltabases[p1] = (node, chainlen + 1)
588 589 if p2 != nullid:
589 590 deltabases[p2] = (node, chainlen + 1)
590 591
591 592 # experimental config: repack.chainorphansbysize
592 593 if ui.configbool('repack', 'chainorphansbysize'):
593 594 orphans = nobase - referenced
594 595 orderednodes = self._chainorphans(ui, filename, orderednodes,
595 596 orphans, deltabases)
596 597
597 598 # Compute deltas and write to the pack
598 599 for i, node in enumerate(orderednodes):
599 600 deltabase, chainlen = deltabases[node]
600 601 # Compute delta
601 602 # TODO: Optimize the deltachain fetching. Since we're
602 603 # iterating over the different version of the file, we may
603 604 # be fetching the same deltachain over and over again.
604 605 if deltabase != nullid:
605 606 deltaentry = self.data.getdelta(filename, node)
606 607 delta, deltabasename, origdeltabase, meta = deltaentry
607 608 size = meta.get(constants.METAKEYSIZE)
608 609 if (deltabasename != filename or origdeltabase != deltabase
609 610 or size is None):
610 611 deltabasetext = self.data.get(filename, deltabase)
611 612 original = self.data.get(filename, node)
612 613 size = len(original)
613 614 delta = mdiff.textdiff(deltabasetext, original)
614 615 else:
615 616 delta = self.data.get(filename, node)
616 617 size = len(delta)
617 618 meta = self.data.getmeta(filename, node)
618 619
619 620 # TODO: don't use the delta if it's larger than the fulltext
620 621 if constants.METAKEYSIZE not in meta:
621 622 meta[constants.METAKEYSIZE] = size
622 623 target.add(filename, node, deltabase, delta, meta)
623 624
624 625 entries[node].datarepacked = True
625 626
626 627 processprogress.complete()
627 628 count += 1
628 629
629 630 repackprogress.complete()
630 631 target.close(ledger=ledger)
631 632
632 633 def repackhistory(self, ledger, target):
633 634 ui = self.repo.ui
634 635
635 636 byfile = {}
636 637 for entry in ledger.entries.itervalues():
637 638 if entry.historysource:
638 639 byfile.setdefault(entry.filename, {})[entry.node] = entry
639 640
640 641 progress = ui.makeprogress(_("repacking history"), unit=self.unit,
641 642 total=len(byfile))
642 643 for filename, entries in sorted(byfile.iteritems()):
643 644 ancestors = {}
644 645 nodes = list(node for node in entries)
645 646
646 647 for node in nodes:
647 648 if node in ancestors:
648 649 continue
649 650 ancestors.update(self.history.getancestors(filename, node,
650 651 known=ancestors))
651 652
652 653 # Order the nodes children first
653 654 orderednodes = reversed(self._toposort(ancestors))
654 655
655 656 # Write to the pack
656 657 dontprocess = set()
657 658 for node in orderednodes:
658 659 p1, p2, linknode, copyfrom = ancestors[node]
659 660
660 661 # If the node is marked dontprocess, but it's also in the
661 662 # explicit entries set, that means the node exists both in this
662 663 # file and in another file that was copied to this file.
663 664 # Usually this happens if the file was copied to another file,
664 665 # then the copy was deleted, then reintroduced without copy
665 666 # metadata. The original add and the new add have the same hash
666 667 # since the content is identical and the parents are null.
667 668 if node in dontprocess and node not in entries:
668 669 # If copyfrom == filename, it means the copy history
669 670 # went to come other file, then came back to this one, so we
670 671 # should continue processing it.
671 672 if p1 != nullid and copyfrom != filename:
672 673 dontprocess.add(p1)
673 674 if p2 != nullid:
674 675 dontprocess.add(p2)
675 676 continue
676 677
677 678 if copyfrom:
678 679 dontprocess.add(p1)
679 680
680 681 target.add(filename, node, p1, p2, linknode, copyfrom)
681 682
682 683 if node in entries:
683 684 entries[node].historyrepacked = True
684 685
685 686 progress.increment()
686 687
687 688 progress.complete()
688 689 target.close(ledger=ledger)
689 690
690 691 def _toposort(self, ancestors):
691 692 def parentfunc(node):
692 693 p1, p2, linknode, copyfrom = ancestors[node]
693 694 parents = []
694 695 if p1 != nullid:
695 696 parents.append(p1)
696 697 if p2 != nullid:
697 698 parents.append(p2)
698 699 return parents
699 700
700 701 sortednodes = shallowutil.sortnodes(ancestors.keys(), parentfunc)
701 702 return sortednodes
702 703
703 704 class repackledger(object):
704 705 """Storage for all the bookkeeping that happens during a repack. It contains
705 706 the list of revisions being repacked, what happened to each revision, and
706 707 which source store contained which revision originally (for later cleanup).
707 708 """
708 709 def __init__(self):
709 710 self.entries = {}
710 711 self.sources = {}
711 712 self.created = set()
712 713
713 714 def markdataentry(self, source, filename, node):
714 715 """Mark the given filename+node revision as having a data rev in the
715 716 given source.
716 717 """
717 718 entry = self._getorcreateentry(filename, node)
718 719 entry.datasource = True
719 720 entries = self.sources.get(source)
720 721 if not entries:
721 722 entries = set()
722 723 self.sources[source] = entries
723 724 entries.add(entry)
724 725
725 726 def markhistoryentry(self, source, filename, node):
726 727 """Mark the given filename+node revision as having a history rev in the
727 728 given source.
728 729 """
729 730 entry = self._getorcreateentry(filename, node)
730 731 entry.historysource = True
731 732 entries = self.sources.get(source)
732 733 if not entries:
733 734 entries = set()
734 735 self.sources[source] = entries
735 736 entries.add(entry)
736 737
737 738 def _getorcreateentry(self, filename, node):
738 739 key = (filename, node)
739 740 value = self.entries.get(key)
740 741 if not value:
741 742 value = repackentry(filename, node)
742 743 self.entries[key] = value
743 744
744 745 return value
745 746
746 747 def addcreated(self, value):
747 748 self.created.add(value)
748 749
749 750 class repackentry(object):
750 751 """Simple class representing a single revision entry in the repackledger.
751 752 """
752 753 __slots__ = (r'filename', r'node', r'datasource', r'historysource',
753 754 r'datarepacked', r'historyrepacked', r'gced')
754 755 def __init__(self, filename, node):
755 756 self.filename = filename
756 757 self.node = node
757 758 # If the revision has a data entry in the source
758 759 self.datasource = False
759 760 # If the revision has a history entry in the source
760 761 self.historysource = False
761 762 # If the revision's data entry was repacked into the repack target
762 763 self.datarepacked = False
763 764 # If the revision's history entry was repacked into the repack target
764 765 self.historyrepacked = False
765 766 # If garbage collected
766 767 self.gced = False
767 768
768 769 def repacklockvfs(repo):
769 770 if util.safehasattr(repo, 'name'):
770 771 # Lock in the shared cache so repacks across multiple copies of the same
771 772 # repo are coordinated.
772 773 sharedcachepath = shallowutil.getcachepackpath(
773 774 repo,
774 775 constants.FILEPACK_CATEGORY)
775 776 return vfs.vfs(sharedcachepath)
776 777 else:
777 778 return repo.svfs
@@ -1,298 +1,300 b''
1 1 # shallowrepo.py - shallow repository that uses remote filelogs
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 from __future__ import absolute_import
8 8
9 9 import os
10 10
11 11 from mercurial.i18n import _
12 12 from mercurial.node import hex, nullid, nullrev
13 13 from mercurial import (
14 14 encoding,
15 15 error,
16 16 localrepo,
17 17 match,
18 18 scmutil,
19 19 sparse,
20 20 util,
21 21 )
22 22 from mercurial.utils import procutil
23 23 from . import (
24 24 connectionpool,
25 25 constants,
26 26 contentstore,
27 27 datapack,
28 28 fileserverclient,
29 29 historypack,
30 30 metadatastore,
31 31 remotefilectx,
32 32 remotefilelog,
33 33 shallowutil,
34 34 )
35 35
36 36 # These make*stores functions are global so that other extensions can replace
37 37 # them.
38 38 def makelocalstores(repo):
39 39 """In-repo stores, like .hg/store/data; can not be discarded."""
40 40 localpath = os.path.join(repo.svfs.vfs.base, 'data')
41 41 if not os.path.exists(localpath):
42 42 os.makedirs(localpath)
43 43
44 44 # Instantiate local data stores
45 45 localcontent = contentstore.remotefilelogcontentstore(
46 46 repo, localpath, repo.name, shared=False)
47 47 localmetadata = metadatastore.remotefilelogmetadatastore(
48 48 repo, localpath, repo.name, shared=False)
49 49 return localcontent, localmetadata
50 50
51 51 def makecachestores(repo):
52 52 """Typically machine-wide, cache of remote data; can be discarded."""
53 53 # Instantiate shared cache stores
54 54 cachepath = shallowutil.getcachepath(repo.ui)
55 55 cachecontent = contentstore.remotefilelogcontentstore(
56 56 repo, cachepath, repo.name, shared=True)
57 57 cachemetadata = metadatastore.remotefilelogmetadatastore(
58 58 repo, cachepath, repo.name, shared=True)
59 59
60 60 repo.sharedstore = cachecontent
61 61 repo.shareddatastores.append(cachecontent)
62 62 repo.sharedhistorystores.append(cachemetadata)
63 63
64 64 return cachecontent, cachemetadata
65 65
66 66 def makeremotestores(repo, cachecontent, cachemetadata):
67 67 """These stores fetch data from a remote server."""
68 68 # Instantiate remote stores
69 69 repo.fileservice = fileserverclient.fileserverclient(repo)
70 70 remotecontent = contentstore.remotecontentstore(
71 71 repo.ui, repo.fileservice, cachecontent)
72 72 remotemetadata = metadatastore.remotemetadatastore(
73 73 repo.ui, repo.fileservice, cachemetadata)
74 74 return remotecontent, remotemetadata
75 75
76 76 def makepackstores(repo):
77 77 """Packs are more efficient (to read from) cache stores."""
78 78 # Instantiate pack stores
79 79 packpath = shallowutil.getcachepackpath(repo,
80 80 constants.FILEPACK_CATEGORY)
81 81 packcontentstore = datapack.datapackstore(repo.ui, packpath)
82 82 packmetadatastore = historypack.historypackstore(repo.ui, packpath)
83 83
84 84 repo.shareddatastores.append(packcontentstore)
85 85 repo.sharedhistorystores.append(packmetadatastore)
86 86 shallowutil.reportpackmetrics(repo.ui, 'filestore', packcontentstore,
87 87 packmetadatastore)
88 88 return packcontentstore, packmetadatastore
89 89
90 90 def makeunionstores(repo):
91 91 """Union stores iterate the other stores and return the first result."""
92 92 repo.shareddatastores = []
93 93 repo.sharedhistorystores = []
94 94
95 95 packcontentstore, packmetadatastore = makepackstores(repo)
96 96 cachecontent, cachemetadata = makecachestores(repo)
97 97 localcontent, localmetadata = makelocalstores(repo)
98 98 remotecontent, remotemetadata = makeremotestores(repo, cachecontent,
99 99 cachemetadata)
100 100
101 101 # Instantiate union stores
102 102 repo.contentstore = contentstore.unioncontentstore(
103 103 packcontentstore, cachecontent,
104 104 localcontent, remotecontent, writestore=localcontent)
105 105 repo.metadatastore = metadatastore.unionmetadatastore(
106 106 packmetadatastore, cachemetadata, localmetadata, remotemetadata,
107 107 writestore=localmetadata)
108 108
109 109 fileservicedatawrite = cachecontent
110 110 fileservicehistorywrite = cachemetadata
111 111 repo.fileservice.setstore(repo.contentstore, repo.metadatastore,
112 112 fileservicedatawrite, fileservicehistorywrite)
113 113 shallowutil.reportpackmetrics(repo.ui, 'filestore',
114 114 packcontentstore, packmetadatastore)
115 115
116 116 def wraprepo(repo):
117 117 class shallowrepository(repo.__class__):
118 118 @util.propertycache
119 119 def name(self):
120 120 return self.ui.config('remotefilelog', 'reponame')
121 121
122 122 @util.propertycache
123 123 def fallbackpath(self):
124 124 path = repo.ui.config("remotefilelog", "fallbackpath",
125 125 repo.ui.config('paths', 'default'))
126 126 if not path:
127 127 raise error.Abort("no remotefilelog server "
128 128 "configured - is your .hg/hgrc trusted?")
129 129
130 130 return path
131 131
132 132 def maybesparsematch(self, *revs, **kwargs):
133 133 '''
134 134 A wrapper that allows the remotefilelog to invoke sparsematch() if
135 135 this is a sparse repository, or returns None if this is not a
136 136 sparse repository.
137 137 '''
138 138 if revs:
139 139 ret = sparse.matcher(repo, revs=revs)
140 140 else:
141 141 ret = sparse.matcher(repo)
142 142
143 143 if ret.always():
144 144 return None
145 145 return ret
146 146
147 147 def file(self, f):
148 148 if f[0] == '/':
149 149 f = f[1:]
150 150
151 151 if self.shallowmatch(f):
152 152 return remotefilelog.remotefilelog(self.svfs, f, self)
153 153 else:
154 154 return super(shallowrepository, self).file(f)
155 155
156 156 def filectx(self, path, *args, **kwargs):
157 157 if self.shallowmatch(path):
158 158 return remotefilectx.remotefilectx(self, path, *args, **kwargs)
159 159 else:
160 160 return super(shallowrepository, self).filectx(path, *args,
161 161 **kwargs)
162 162
163 163 @localrepo.unfilteredmethod
164 164 def commitctx(self, ctx, error=False):
165 165 """Add a new revision to current repository.
166 166 Revision information is passed via the context argument.
167 167 """
168 168
169 169 # some contexts already have manifest nodes, they don't need any
170 170 # prefetching (for example if we're just editing a commit message
171 171 # we can reuse manifest
172 172 if not ctx.manifestnode():
173 173 # prefetch files that will likely be compared
174 174 m1 = ctx.p1().manifest()
175 175 files = []
176 176 for f in ctx.modified() + ctx.added():
177 177 fparent1 = m1.get(f, nullid)
178 178 if fparent1 != nullid:
179 179 files.append((f, hex(fparent1)))
180 180 self.fileservice.prefetch(files)
181 181 return super(shallowrepository, self).commitctx(ctx,
182 182 error=error)
183 183
184 184 def backgroundprefetch(self, revs, base=None, repack=False, pats=None,
185 185 opts=None):
186 186 """Runs prefetch in background with optional repack
187 187 """
188 188 cmd = [procutil.hgexecutable(), '-R', repo.origroot, 'prefetch']
189 189 if repack:
190 190 cmd.append('--repack')
191 191 if revs:
192 192 cmd += ['-r', revs]
193 procutil.runbgcommand(cmd, encoding.environ)
193 # We know this command will find a binary, so don't block
194 # on it starting.
195 procutil.runbgcommand(cmd, encoding.environ, ensurestart=False)
194 196
195 197 def prefetch(self, revs, base=None, pats=None, opts=None):
196 198 """Prefetches all the necessary file revisions for the given revs
197 199 Optionally runs repack in background
198 200 """
199 201 with repo._lock(repo.svfs, 'prefetchlock', True, None, None,
200 202 _('prefetching in %s') % repo.origroot):
201 203 self._prefetch(revs, base, pats, opts)
202 204
203 205 def _prefetch(self, revs, base=None, pats=None, opts=None):
204 206 fallbackpath = self.fallbackpath
205 207 if fallbackpath:
206 208 # If we know a rev is on the server, we should fetch the server
207 209 # version of those files, since our local file versions might
208 210 # become obsolete if the local commits are stripped.
209 211 localrevs = repo.revs('outgoing(%s)', fallbackpath)
210 212 if base is not None and base != nullrev:
211 213 serverbase = list(repo.revs('first(reverse(::%s) - %ld)',
212 214 base, localrevs))
213 215 if serverbase:
214 216 base = serverbase[0]
215 217 else:
216 218 localrevs = repo
217 219
218 220 mfl = repo.manifestlog
219 221 mfrevlog = mfl.getstorage('')
220 222 if base is not None:
221 223 mfdict = mfl[repo[base].manifestnode()].read()
222 224 skip = set(mfdict.iteritems())
223 225 else:
224 226 skip = set()
225 227
226 228 # Copy the skip set to start large and avoid constant resizing,
227 229 # and since it's likely to be very similar to the prefetch set.
228 230 files = skip.copy()
229 231 serverfiles = skip.copy()
230 232 visited = set()
231 233 visited.add(nullrev)
232 234 revcount = len(revs)
233 235 progress = self.ui.makeprogress(_('prefetching'), total=revcount)
234 236 progress.update(0)
235 237 for rev in sorted(revs):
236 238 ctx = repo[rev]
237 239 if pats:
238 240 m = scmutil.match(ctx, pats, opts)
239 241 sparsematch = repo.maybesparsematch(rev)
240 242
241 243 mfnode = ctx.manifestnode()
242 244 mfrev = mfrevlog.rev(mfnode)
243 245
244 246 # Decompressing manifests is expensive.
245 247 # When possible, only read the deltas.
246 248 p1, p2 = mfrevlog.parentrevs(mfrev)
247 249 if p1 in visited and p2 in visited:
248 250 mfdict = mfl[mfnode].readfast()
249 251 else:
250 252 mfdict = mfl[mfnode].read()
251 253
252 254 diff = mfdict.iteritems()
253 255 if pats:
254 256 diff = (pf for pf in diff if m(pf[0]))
255 257 if sparsematch:
256 258 diff = (pf for pf in diff if sparsematch(pf[0]))
257 259 if rev not in localrevs:
258 260 serverfiles.update(diff)
259 261 else:
260 262 files.update(diff)
261 263
262 264 visited.add(mfrev)
263 265 progress.increment()
264 266
265 267 files.difference_update(skip)
266 268 serverfiles.difference_update(skip)
267 269 progress.complete()
268 270
269 271 # Fetch files known to be on the server
270 272 if serverfiles:
271 273 results = [(path, hex(fnode)) for (path, fnode) in serverfiles]
272 274 repo.fileservice.prefetch(results, force=True)
273 275
274 276 # Fetch files that may or may not be on the server
275 277 if files:
276 278 results = [(path, hex(fnode)) for (path, fnode) in files]
277 279 repo.fileservice.prefetch(results)
278 280
279 281 def close(self):
280 282 super(shallowrepository, self).close()
281 283 self.connectionpool.close()
282 284
283 285 repo.__class__ = shallowrepository
284 286
285 287 repo.shallowmatch = match.always()
286 288
287 289 makeunionstores(repo)
288 290
289 291 repo.includepattern = repo.ui.configlist("remotefilelog", "includepattern",
290 292 None)
291 293 repo.excludepattern = repo.ui.configlist("remotefilelog", "excludepattern",
292 294 None)
293 295 if not util.safehasattr(repo, 'connectionpool'):
294 296 repo.connectionpool = connectionpool.connectionpool(repo)
295 297
296 298 if repo.includepattern or repo.excludepattern:
297 299 repo.shallowmatch = match.match(repo.root, '', None,
298 300 repo.includepattern, repo.excludepattern)
General Comments 0
You need to be logged in to leave comments. Login now