# HG changeset patch # User Valentin Gatien-Baron # Date 2019-06-27 09:39:35 # Node ID d29db0a0c4eb333fdd928172b308b8cbffc4edfc # Parent 64a873ca71358e036222220bc92987b3b43a8cd6 update: fix spurious unclean status bug shown by previous commit The crux of the problem is: - the dirstate is corrupted (the sizes/dates are assigned to the wrong files) - because when worker.worker is used with a return value (batchget in merge.py here), the return value when worker.worker effectively parallelizes is permuted - this is because worker.worker's partition of input and combination of output values are not inverses of one another: it split [1,2,3,4,5,6] into [[1,3,5],[2,4,6]], but combines that into [1,3,5,2,4,6]. Given that worker.worker doesn't call its function argument on contiguous chunks on the input arguments, sticking with lists means we'd need to know the relation between the inputs of worker.worker function argument (for instance, requiring that every input element is mapped to exactly one output element). It seems better to instead switch return values to dicts, which can combined reliably with a straighforward restriction. Differential Revision: https://phab.mercurial-scm.org/D6581 diff --git a/mercurial/merge.py b/mercurial/merge.py --- a/mercurial/merge.py +++ b/mercurial/merge.py @@ -1472,10 +1472,10 @@ def batchget(repo, mctx, wctx, wantfiled Yields arbitrarily many (False, tuple) for progress updates, followed by exactly one (True, filedata). When wantfiledata is false, filedata is an - empty list. When wantfiledata is true, filedata[i] is a triple (mode, size, - mtime) of the file written for action[i]. + empty dict. When wantfiledata is true, filedata[f] is a triple (mode, size, + mtime) of the file f written for each action. """ - filedata = [] + filedata = {} verbose = repo.ui.verbose fctx = mctx.filectx ui = repo.ui @@ -1509,7 +1509,7 @@ def batchget(repo, mctx, wctx, wantfiled s = wfctx.lstat() mode = s.st_mode mtime = s[stat.ST_MTIME] - filedata.append((mode, size, mtime)) # for dirstate.normal + filedata[f] = ((mode, size, mtime)) # for dirstate.normal if i == 100: yield False, (i, f) i = 0 @@ -1670,7 +1670,7 @@ def applyupdates(repo, actions, wctx, mc actions[ACTION_GET], threadsafe=threadsafe, hasretval=True) - getfiledata = [] + getfiledata = {} for final, res in prog: if final: getfiledata = res @@ -1803,7 +1803,8 @@ def applyupdates(repo, actions, wctx, mc actions[k].extend(acts) if k == ACTION_GET and wantfiledata: # no filedata until mergestate is updated to provide it - getfiledata.extend([None] * len(acts)) + for a in acts: + getfiledata[a[0]] = None # Remove these files from actions[ACTION_MERGE] as well. This is # important because in recordupdates, files in actions[ACTION_MERGE] # are processed after files in other actions, and the merge driver @@ -1873,11 +1874,11 @@ def recordupdates(repo, actions, branchm pass # get - for i, (f, args, msg) in enumerate(actions.get(ACTION_GET, [])): + for f, args, msg in actions.get(ACTION_GET, []): if branchmerge: repo.dirstate.otherparent(f) else: - parentfiledata = getfiledata[i] if getfiledata else None + parentfiledata = getfiledata[f] if getfiledata else None repo.dirstate.normal(f, parentfiledata=parentfiledata) # merge diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -100,8 +100,9 @@ def worker(ui, costperarg, func, statica workers hasretval - when True, func and the current function return an progress - iterator then a list (encoded as an iterator that yield many (False, ..) - then a (True, list)). The resulting list is in the natural order. + iterator then a dict (encoded as an iterator that yield many (False, ..) + then a (True, dict)). The dicts are joined in some arbitrary order, so + overlapping keys are a bad idea. threadsafe - whether work items are thread safe and can be executed using a thread-based worker. Should be disabled for CPU heavy tasks that don't @@ -162,8 +163,8 @@ def _posixworker(ui, func, staticargs, a ui.flush() parentpid = os.getpid() pipes = [] - retvals = [] - for i, pargs in enumerate(partition(args, workers)): + retval = {} + for pargs in partition(args, workers): # Every worker gets its own pipe to send results on, so we don't have to # implement atomic writes larger than PIPE_BUF. Each forked process has # its own pipe's descriptors in the local variables, and the parent @@ -171,7 +172,6 @@ def _posixworker(ui, func, staticargs, a # care what order they're in). rfd, wfd = os.pipe() pipes.append((rfd, wfd)) - retvals.append(None) # make sure we use os._exit in all worker code paths. otherwise the # worker may do some clean-ups which could cause surprises like # deadlock. see sshpeer.cleanup for example. @@ -192,7 +192,7 @@ def _posixworker(ui, func, staticargs, a os.close(w) os.close(rfd) for result in func(*(staticargs + (pargs,))): - os.write(wfd, util.pickle.dumps((i, result))) + os.write(wfd, util.pickle.dumps(result)) return 0 ret = scmutil.callcatch(ui, workerfunc) @@ -226,9 +226,9 @@ def _posixworker(ui, func, staticargs, a while openpipes > 0: for key, events in selector.select(): try: - i, res = util.pickle.load(key.fileobj) + res = util.pickle.load(key.fileobj) if hasretval and res[0]: - retvals[i] = res[1] + retval.update(res[1]) else: yield res except EOFError: @@ -249,7 +249,7 @@ def _posixworker(ui, func, staticargs, a os.kill(os.getpid(), -status) sys.exit(status) if hasretval: - yield True, sum(retvals, []) + yield True, retval def _posixexitstatus(code): '''convert a posix exit status into the same form returned by @@ -281,9 +281,9 @@ def _windowsworker(ui, func, staticargs, try: while not self._taskqueue.empty(): try: - i, args = self._taskqueue.get_nowait() + args = self._taskqueue.get_nowait() for res in self._func(*self._staticargs + (args,)): - self._resultqueue.put((i, res)) + self._resultqueue.put(res) # threading doesn't provide a native way to # interrupt execution. handle it manually at every # iteration. @@ -318,11 +318,10 @@ def _windowsworker(ui, func, staticargs, workers = _numworkers(ui) resultqueue = pycompat.queue.Queue() taskqueue = pycompat.queue.Queue() - retvals = [] + retval = {} # partition work to more pieces than workers to minimize the chance # of uneven distribution of large tasks between the workers - for pargs in enumerate(partition(args, workers * 20)): - retvals.append(None) + for pargs in partition(args, workers * 20): taskqueue.put(pargs) for _i in range(workers): t = Worker(taskqueue, resultqueue, func, staticargs) @@ -331,9 +330,9 @@ def _windowsworker(ui, func, staticargs, try: while len(threads) > 0: while not resultqueue.empty(): - (i, res) = resultqueue.get() + res = resultqueue.get() if hasretval and res[0]: - retvals[i] = res[1] + retval.update(res[1]) else: yield res threads[0].join(0.05) @@ -346,13 +345,13 @@ def _windowsworker(ui, func, staticargs, trykillworkers() raise while not resultqueue.empty(): - (i, res) = resultqueue.get() + res = resultqueue.get() if hasretval and res[0]: - retvals[i] = res[1] + retval.update(res[1]) else: yield res if hasretval: - yield True, sum(retvals, []) + yield True, retval if pycompat.iswindows: _platformworker = _windowsworker diff --git a/tests/test-simple-update.t b/tests/test-simple-update.t --- a/tests/test-simple-update.t +++ b/tests/test-simple-update.t @@ -110,24 +110,6 @@ update with worker processes getting 100 100 files updated, 0 files merged, 0 files removed, 0 files unresolved $ hg status - M 100 - M 11 - M 2 - M 21 - M 3 - M 4 - M 41 - M 5 - M 51 - M 54 - M 6 - M 61 - M 7 - M 71 - M 8 - M 81 - M 9 - M 91 $ cd ..