##// END OF EJS Templates
fix: use a worker pool to parallelize running tools...
Danny Hooper -
r38554:5ffe2041 default
parent child Browse files
Show More
@@ -70,6 +70,7 from mercurial import (
70 registrar,
70 registrar,
71 scmutil,
71 scmutil,
72 util,
72 util,
73 worker,
73 )
74 )
74
75
75 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
76 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
@@ -138,19 +139,40 def fix(ui, repo, *pats, **opts):
138 basectxs = getbasectxs(repo, opts, revstofix)
139 basectxs = getbasectxs(repo, opts, revstofix)
139 workqueue, numitems = getworkqueue(ui, repo, pats, opts, revstofix,
140 workqueue, numitems = getworkqueue(ui, repo, pats, opts, revstofix,
140 basectxs)
141 basectxs)
141 filedata = collections.defaultdict(dict)
142 replacements = {}
143 fixers = getfixers(ui)
142 fixers = getfixers(ui)
144 # Some day this loop can become a worker pool, but for now it's easier
143
145 # to fix everything serially in topological order.
144 # There are no data dependencies between the workers fixing each file
146 for rev, path in sorted(workqueue):
145 # revision, so we can use all available parallelism.
146 def getfixes(items):
147 for rev, path in items:
147 ctx = repo[rev]
148 ctx = repo[rev]
148 olddata = ctx[path].data()
149 olddata = ctx[path].data()
149 newdata = fixfile(ui, opts, fixers, ctx, path, basectxs[rev])
150 newdata = fixfile(ui, opts, fixers, ctx, path, basectxs[rev])
150 if newdata != olddata:
151 # Don't waste memory/time passing unchanged content back, but
152 # produce one result per item either way.
153 yield (rev, path, newdata if newdata != olddata else None)
154 results = worker.worker(ui, 1.0, getfixes, tuple(), workqueue)
155
156 # We have to hold on to the data for each successor revision in memory
157 # until all its parents are committed. We ensure this by committing and
158 # freeing memory for the revisions in some topological order. This
159 # leaves a little bit of memory efficiency on the table, but also makes
160 # the tests deterministic. It might also be considered a feature since
161 # it makes the results more easily reproducible.
162 filedata = collections.defaultdict(dict)
163 replacements = {}
164 commitorder = sorted(revstofix, reverse=True)
165 for rev, path, newdata in results:
166 if newdata is not None:
151 filedata[rev][path] = newdata
167 filedata[rev][path] = newdata
152 numitems[rev] -= 1
168 numitems[rev] -= 1
153 if not numitems[rev]:
169 # Apply the fixes for this and any other revisions that are ready
170 # and sitting at the front of the queue. Using a loop here prevents
171 # the queue from being blocked by the first revision to be ready out
172 # of order.
173 while commitorder and not numitems[commitorder[-1]]:
174 rev = commitorder.pop()
175 ctx = repo[rev]
154 if rev == wdirrev:
176 if rev == wdirrev:
155 writeworkingdir(repo, ctx, filedata[rev], replacements)
177 writeworkingdir(repo, ctx, filedata[rev], replacements)
156 else:
178 else:
@@ -168,11 +190,19 def getworkqueue(ui, repo, pats, opts, r
168 topological order. Each work item represents a file in the working copy or
190 topological order. Each work item represents a file in the working copy or
169 in some revision that should be fixed and written back to the working copy
191 in some revision that should be fixed and written back to the working copy
170 or into a replacement revision.
192 or into a replacement revision.
193
194 Work items for the same revision are grouped together, so that a worker
195 pool starting with the first N items in parallel is likely to finish the
196 first revision's work before other revisions. This can allow us to write
197 the result to disk and reduce memory footprint. At time of writing, the
198 partition strategy in worker.py seems favorable to this. We also sort the
199 items by ascending revision number to match the order in which we commit
200 the fixes later.
171 """
201 """
172 workqueue = []
202 workqueue = []
173 numitems = collections.defaultdict(int)
203 numitems = collections.defaultdict(int)
174 maxfilesize = ui.configbytes('fix', 'maxfilesize')
204 maxfilesize = ui.configbytes('fix', 'maxfilesize')
175 for rev in revstofix:
205 for rev in sorted(revstofix):
176 fixctx = repo[rev]
206 fixctx = repo[rev]
177 match = scmutil.match(fixctx, pats, opts)
207 match = scmutil.match(fixctx, pats, opts)
178 for path in pathstofix(ui, repo, pats, opts, match, basectxs[rev],
208 for path in pathstofix(ui, repo, pats, opts, match, basectxs[rev],
General Comments 0
You need to be logged in to leave comments. Login now