##// END OF EJS Templates
branching: merge stable into default...
branching: merge stable into default This show that the recent changes on default fixed the issue with transaction overwriting content in `test-transaction-wc-rollback-race.t`

File last commit:

r50935:d41960df default
r51069:596a6b9b merge default
Show More
test-context.py
253 lines | 6.4 KiB | text/x-python | PythonLexer
import os
import stat
import sys
from mercurial.node import hex
from mercurial import (
context,
diffutil,
encoding,
hg,
scmutil,
ui as uimod,
)
print_ = print
def print(*args, **kwargs):
"""print() wrapper that flushes stdout buffers to avoid py3 buffer issues
We could also just write directly to sys.stdout.buffer the way the
ui object will, but this was easier for porting the test.
"""
print_(*args, **kwargs)
sys.stdout.flush()
def printb(data, end=b'\n'):
out = getattr(sys.stdout, 'buffer', sys.stdout)
out.write(data + end)
out.flush()
ui = uimod.ui.load()
repo = hg.repository(ui, b'test1', create=1)
os.chdir('test1')
# create 'foo' with fixed time stamp
f = open('foo', 'wb')
f.write(b'foo\n')
f.close()
os.utime('foo', (1000, 1000))
# add+commit 'foo'
with repo.wlock(), repo.lock(), repo.transaction(b'test-context'):
with repo.dirstate.changing_files(repo):
repo[None].add([b'foo'])
repo.commit(text=b'commit1', date=b"0 0")
d = repo[None][b'foo'].date()
if os.name == 'nt':
d = d[:2]
print("workingfilectx.date = (%d, %d)" % d)
# test memctx with non-ASCII commit message
def filectxfn(repo, memctx, path):
return context.memfilectx(repo, memctx, b"foo", b"")
ctx = context.memctx(
repo,
[b'tip', None],
encoding.tolocal(b"Gr\xc3\xbcezi!"),
[b"foo"],
filectxfn,
)
ctx.commit()
for enc in "ASCII", "Latin-1", "UTF-8":
encoding.encoding = enc
printb(b"%-8s: %s" % (enc.encode('ascii'), repo[b"tip"].description()))
# test performing a status
def getfilectx(repo, memctx, f):
fctx = memctx.p1()[f]
data, flags = fctx.data(), fctx.flags()
if f == b'foo':
data += b'bar\n'
return context.memfilectx(
repo, memctx, f, data, b'l' in flags, b'x' in flags
)
ctxa = repo[0]
ctxb = context.memctx(
repo,
[ctxa.node(), None],
b"test diff",
[b"foo"],
getfilectx,
ctxa.user(),
ctxa.date(),
)
print(ctxb.status(ctxa))
# test performing a diff on a memctx
diffopts = diffutil.diffallopts(repo.ui, {b'git': True})
for d in ctxb.diff(ctxa, opts=diffopts):
printb(d, end=b'')
# test safeness and correctness of "ctx.status()"
print('= checking context.status():')
# ancestor "wcctx ~ 2"
actx2 = repo[b'.']
repo.wwrite(b'bar-m', b'bar-m\n', b'')
repo.wwrite(b'bar-r', b'bar-r\n', b'')
with repo.wlock(), repo.lock(), repo.transaction(b'test-context'):
with repo.dirstate.changing_files(repo):
repo[None].add([b'bar-m', b'bar-r'])
repo.commit(text=b'add bar-m, bar-r', date=b"0 0")
# ancestor "wcctx ~ 1"
actx1 = repo[b'.']
repo.wwrite(b'bar-m', b'bar-m bar-m\n', b'')
repo.wwrite(b'bar-a', b'bar-a\n', b'')
with repo.wlock(), repo.lock(), repo.transaction(b'test-context'):
with repo.dirstate.changing_files(repo):
repo[None].add([b'bar-a'])
repo[None].forget([b'bar-r'])
# status at this point:
# M bar-m
# A bar-a
# R bar-r
# C foo
print('== checking workingctx.status:')
wctx = repo[None]
print('wctx._status=%s' % (str(wctx._status)))
print('=== with "pattern match":')
print(
actx1.status(other=wctx, match=scmutil.matchfiles(repo, [b'bar-m', b'foo']))
)
print('wctx._status=%s' % (str(wctx._status)))
print(
actx2.status(other=wctx, match=scmutil.matchfiles(repo, [b'bar-m', b'foo']))
)
print('wctx._status=%s' % (str(wctx._status)))
print('=== with "always match" and "listclean=True":')
print(actx1.status(other=wctx, listclean=True))
print('wctx._status=%s' % (str(wctx._status)))
print(actx2.status(other=wctx, listclean=True))
print('wctx._status=%s' % (str(wctx._status)))
print("== checking workingcommitctx.status:")
wcctx = context.workingcommitctx(
repo,
scmutil.status([b'bar-m'], [b'bar-a'], [], [], [], [], []),
text=b'',
date=b'0 0',
)
print('wcctx._status=%s' % (str(wcctx._status)))
print('=== with "always match":')
print(actx1.status(other=wcctx))
print('wcctx._status=%s' % (str(wcctx._status)))
print(actx2.status(other=wcctx))
print('wcctx._status=%s' % (str(wcctx._status)))
print('=== with "always match" and "listclean=True":')
print(actx1.status(other=wcctx, listclean=True))
print('wcctx._status=%s' % (str(wcctx._status)))
print(actx2.status(other=wcctx, listclean=True))
print('wcctx._status=%s' % (str(wcctx._status)))
print('=== with "pattern match":')
print(
actx1.status(
other=wcctx, match=scmutil.matchfiles(repo, [b'bar-m', b'foo'])
)
)
print('wcctx._status=%s' % (str(wcctx._status)))
print(
actx2.status(
other=wcctx, match=scmutil.matchfiles(repo, [b'bar-m', b'foo'])
)
)
print('wcctx._status=%s' % (str(wcctx._status)))
print('=== with "pattern match" and "listclean=True":')
print(
actx1.status(
other=wcctx,
match=scmutil.matchfiles(repo, [b'bar-r', b'foo']),
listclean=True,
)
)
print('wcctx._status=%s' % (str(wcctx._status)))
print(
actx2.status(
other=wcctx,
match=scmutil.matchfiles(repo, [b'bar-r', b'foo']),
listclean=True,
)
)
print('wcctx._status=%s' % (str(wcctx._status)))
os.chdir('..')
# test manifestlog being changed
print('== commit with manifestlog invalidated')
repo = hg.repository(ui, b'test2', create=1)
os.chdir('test2')
# make some commits
for i in [b'1', b'2', b'3']:
with open(i, 'wb') as f:
f.write(i)
status = scmutil.status([], [i], [], [], [], [], [])
ctx = context.workingcommitctx(
repo, status, text=i, user=b'test@test.com', date=(0, 0)
)
ctx.p1().manifest() # side effect: cache manifestctx
n = repo.commitctx(ctx)
printb(b'commit %s: %s' % (i, hex(n)))
# touch 00manifest.i mtime so storecache could expire.
# repo.__dict__['manifestlog'] is deleted by transaction releasefn.
st = repo.svfs.stat(b'00manifest.i')
repo.svfs.utime(
b'00manifest.i', (st[stat.ST_MTIME] + 1, st[stat.ST_MTIME] + 1)
)
# read the file just committed
try:
if repo[n][i].data() != i:
print('data mismatch')
except Exception as ex:
print('cannot read data: %r' % ex)
with repo.wlock(), repo.lock(), repo.transaction(b'test'):
with open(b'4', 'wb') as f:
f.write(b'4')
with repo.dirstate.changing_files(repo):
repo.dirstate.set_tracked(b'4')
repo.commit(b'4')
revsbefore = len(repo.changelog)
repo.invalidate(clearfilecache=True)
revsafter = len(repo.changelog)
if revsbefore != revsafter:
print('changeset lost by repo.invalidate()')