##// END OF EJS Templates
util: add a file object proxy that can read at most N bytes...
Gregory Szorc -
r36382:01e29e88 default
parent child Browse files
Show More
@@ -0,0 +1,91 b''
1 from __future__ import absolute_import, print_function
2
3 import io
4 import unittest
5
6 from mercurial import (
7 util,
8 )
9
10 class CappedReaderTests(unittest.TestCase):
11 def testreadfull(self):
12 source = io.BytesIO(b'x' * 100)
13
14 reader = util.cappedreader(source, 10)
15 res = reader.read(10)
16 self.assertEqual(res, b'x' * 10)
17 self.assertEqual(source.tell(), 10)
18 source.seek(0)
19
20 reader = util.cappedreader(source, 15)
21 res = reader.read(16)
22 self.assertEqual(res, b'x' * 15)
23 self.assertEqual(source.tell(), 15)
24 source.seek(0)
25
26 reader = util.cappedreader(source, 100)
27 res = reader.read(100)
28 self.assertEqual(res, b'x' * 100)
29 self.assertEqual(source.tell(), 100)
30 source.seek(0)
31
32 reader = util.cappedreader(source, 50)
33 res = reader.read()
34 self.assertEqual(res, b'x' * 50)
35 self.assertEqual(source.tell(), 50)
36 source.seek(0)
37
38 def testreadnegative(self):
39 source = io.BytesIO(b'x' * 100)
40
41 reader = util.cappedreader(source, 20)
42 res = reader.read(-1)
43 self.assertEqual(res, b'x' * 20)
44 self.assertEqual(source.tell(), 20)
45 source.seek(0)
46
47 reader = util.cappedreader(source, 100)
48 res = reader.read(-1)
49 self.assertEqual(res, b'x' * 100)
50 self.assertEqual(source.tell(), 100)
51 source.seek(0)
52
53 def testreadmultiple(self):
54 source = io.BytesIO(b'x' * 100)
55
56 reader = util.cappedreader(source, 10)
57 for i in range(10):
58 res = reader.read(1)
59 self.assertEqual(res, b'x')
60 self.assertEqual(source.tell(), i + 1)
61
62 self.assertEqual(source.tell(), 10)
63 res = reader.read(1)
64 self.assertEqual(res, b'')
65 self.assertEqual(source.tell(), 10)
66 source.seek(0)
67
68 reader = util.cappedreader(source, 45)
69 for i in range(4):
70 res = reader.read(10)
71 self.assertEqual(res, b'x' * 10)
72 self.assertEqual(source.tell(), (i + 1) * 10)
73
74 res = reader.read(10)
75 self.assertEqual(res, b'x' * 5)
76 self.assertEqual(source.tell(), 45)
77
78 def readlimitpasteof(self):
79 source = io.BytesIO(b'x' * 100)
80
81 reader = util.cappedreader(source, 1024)
82 res = reader.read(1000)
83 self.assertEqual(res, b'x' * 100)
84 self.assertEqual(source.tell(), 100)
85 res = reader.read(1000)
86 self.assertEqual(res, b'')
87 self.assertEqual(source.tell(), 100)
88
89 if __name__ == '__main__':
90 import silenttestrunner
91 silenttestrunner.main(__name__)
@@ -1980,6 +1980,35 b' def filechunkiter(f, size=131072, limit='
1980 limit -= len(s)
1980 limit -= len(s)
1981 yield s
1981 yield s
1982
1982
1983 class cappedreader(object):
1984 """A file object proxy that allows reading up to N bytes.
1985
1986 Given a source file object, instances of this type allow reading up to
1987 N bytes from that source file object. Attempts to read past the allowed
1988 limit are treated as EOF.
1989
1990 It is assumed that I/O is not performed on the original file object
1991 in addition to I/O that is performed by this instance. If there is,
1992 state tracking will get out of sync and unexpected results will ensue.
1993 """
1994 def __init__(self, fh, limit):
1995 """Allow reading up to <limit> bytes from <fh>."""
1996 self._fh = fh
1997 self._left = limit
1998
1999 def read(self, n=-1):
2000 if not self._left:
2001 return b''
2002
2003 if n < 0:
2004 n = self._left
2005
2006 data = self._fh.read(min(n, self._left))
2007 self._left -= len(data)
2008 assert self._left >= 0
2009
2010 return data
2011
1983 def makedate(timestamp=None):
2012 def makedate(timestamp=None):
1984 '''Return a unix timestamp (or the current time) as a (unixtime,
2013 '''Return a unix timestamp (or the current time) as a (unixtime,
1985 offset) tuple based off the local timezone.'''
2014 offset) tuple based off the local timezone.'''
General Comments 0
You need to be logged in to leave comments. Login now