Show More
@@ -0,0 +1,91 b'' | |||
|
1 | from __future__ import absolute_import, print_function | |
|
2 | ||
|
3 | import io | |
|
4 | import unittest | |
|
5 | ||
|
6 | from mercurial import ( | |
|
7 | util, | |
|
8 | ) | |
|
9 | ||
|
10 | class CappedReaderTests(unittest.TestCase): | |
|
11 | def testreadfull(self): | |
|
12 | source = io.BytesIO(b'x' * 100) | |
|
13 | ||
|
14 | reader = util.cappedreader(source, 10) | |
|
15 | res = reader.read(10) | |
|
16 | self.assertEqual(res, b'x' * 10) | |
|
17 | self.assertEqual(source.tell(), 10) | |
|
18 | source.seek(0) | |
|
19 | ||
|
20 | reader = util.cappedreader(source, 15) | |
|
21 | res = reader.read(16) | |
|
22 | self.assertEqual(res, b'x' * 15) | |
|
23 | self.assertEqual(source.tell(), 15) | |
|
24 | source.seek(0) | |
|
25 | ||
|
26 | reader = util.cappedreader(source, 100) | |
|
27 | res = reader.read(100) | |
|
28 | self.assertEqual(res, b'x' * 100) | |
|
29 | self.assertEqual(source.tell(), 100) | |
|
30 | source.seek(0) | |
|
31 | ||
|
32 | reader = util.cappedreader(source, 50) | |
|
33 | res = reader.read() | |
|
34 | self.assertEqual(res, b'x' * 50) | |
|
35 | self.assertEqual(source.tell(), 50) | |
|
36 | source.seek(0) | |
|
37 | ||
|
38 | def testreadnegative(self): | |
|
39 | source = io.BytesIO(b'x' * 100) | |
|
40 | ||
|
41 | reader = util.cappedreader(source, 20) | |
|
42 | res = reader.read(-1) | |
|
43 | self.assertEqual(res, b'x' * 20) | |
|
44 | self.assertEqual(source.tell(), 20) | |
|
45 | source.seek(0) | |
|
46 | ||
|
47 | reader = util.cappedreader(source, 100) | |
|
48 | res = reader.read(-1) | |
|
49 | self.assertEqual(res, b'x' * 100) | |
|
50 | self.assertEqual(source.tell(), 100) | |
|
51 | source.seek(0) | |
|
52 | ||
|
53 | def testreadmultiple(self): | |
|
54 | source = io.BytesIO(b'x' * 100) | |
|
55 | ||
|
56 | reader = util.cappedreader(source, 10) | |
|
57 | for i in range(10): | |
|
58 | res = reader.read(1) | |
|
59 | self.assertEqual(res, b'x') | |
|
60 | self.assertEqual(source.tell(), i + 1) | |
|
61 | ||
|
62 | self.assertEqual(source.tell(), 10) | |
|
63 | res = reader.read(1) | |
|
64 | self.assertEqual(res, b'') | |
|
65 | self.assertEqual(source.tell(), 10) | |
|
66 | source.seek(0) | |
|
67 | ||
|
68 | reader = util.cappedreader(source, 45) | |
|
69 | for i in range(4): | |
|
70 | res = reader.read(10) | |
|
71 | self.assertEqual(res, b'x' * 10) | |
|
72 | self.assertEqual(source.tell(), (i + 1) * 10) | |
|
73 | ||
|
74 | res = reader.read(10) | |
|
75 | self.assertEqual(res, b'x' * 5) | |
|
76 | self.assertEqual(source.tell(), 45) | |
|
77 | ||
|
78 | def readlimitpasteof(self): | |
|
79 | source = io.BytesIO(b'x' * 100) | |
|
80 | ||
|
81 | reader = util.cappedreader(source, 1024) | |
|
82 | res = reader.read(1000) | |
|
83 | self.assertEqual(res, b'x' * 100) | |
|
84 | self.assertEqual(source.tell(), 100) | |
|
85 | res = reader.read(1000) | |
|
86 | self.assertEqual(res, b'') | |
|
87 | self.assertEqual(source.tell(), 100) | |
|
88 | ||
|
89 | if __name__ == '__main__': | |
|
90 | import silenttestrunner | |
|
91 | silenttestrunner.main(__name__) |
@@ -1980,6 +1980,35 b' def filechunkiter(f, size=131072, limit=' | |||
|
1980 | 1980 | limit -= len(s) |
|
1981 | 1981 | yield s |
|
1982 | 1982 | |
|
1983 | class cappedreader(object): | |
|
1984 | """A file object proxy that allows reading up to N bytes. | |
|
1985 | ||
|
1986 | Given a source file object, instances of this type allow reading up to | |
|
1987 | N bytes from that source file object. Attempts to read past the allowed | |
|
1988 | limit are treated as EOF. | |
|
1989 | ||
|
1990 | It is assumed that I/O is not performed on the original file object | |
|
1991 | in addition to I/O that is performed by this instance. If there is, | |
|
1992 | state tracking will get out of sync and unexpected results will ensue. | |
|
1993 | """ | |
|
1994 | def __init__(self, fh, limit): | |
|
1995 | """Allow reading up to <limit> bytes from <fh>.""" | |
|
1996 | self._fh = fh | |
|
1997 | self._left = limit | |
|
1998 | ||
|
1999 | def read(self, n=-1): | |
|
2000 | if not self._left: | |
|
2001 | return b'' | |
|
2002 | ||
|
2003 | if n < 0: | |
|
2004 | n = self._left | |
|
2005 | ||
|
2006 | data = self._fh.read(min(n, self._left)) | |
|
2007 | self._left -= len(data) | |
|
2008 | assert self._left >= 0 | |
|
2009 | ||
|
2010 | return data | |
|
2011 | ||
|
1983 | 2012 | def makedate(timestamp=None): |
|
1984 | 2013 | '''Return a unix timestamp (or the current time) as a (unixtime, |
|
1985 | 2014 | offset) tuple based off the local timezone.''' |
General Comments 0
You need to be logged in to leave comments.
Login now