##// END OF EJS Templates
Add tests for things in utils
Thomas Kluyver -
Show More
@@ -0,0 +1,10 b''
1 from IPython.utils import decorators
2
3 def test_flag_calls():
4 @decorators.flag_calls
5 def f():
6 pass
7
8 assert not f.called
9 f()
10 assert f.called No newline at end of file
@@ -0,0 +1,59 b''
1 import os
2 from unittest import TestCase
3
4 from IPython.testing.decorators import skip
5 from IPython.utils.tempdir import TemporaryDirectory
6 from IPython.utils.pickleshare import PickleShareDB
7
8
9 class PickleShareDBTestCase(TestCase):
10 def setUp(self):
11 self.tempdir = TemporaryDirectory()
12
13 def tearDown(self):
14 self.tempdir.cleanup()
15
16 def test_picklesharedb(self):
17 db = PickleShareDB(self.tempdir.name)
18 db.clear()
19 print("Should be empty:",db.items())
20 db['hello'] = 15
21 db['aku ankka'] = [1,2,313]
22 db['paths/nest/ok/keyname'] = [1,(5,46)]
23 db.hset('hash', 'aku', 12)
24 db.hset('hash', 'ankka', 313)
25 self.assertEqual(db.hget('hash','aku'), 12)
26 self.assertEqual(db.hget('hash','ankka'), 313)
27 print("all hashed",db.hdict('hash'))
28 print(db.keys())
29 print(db.keys('paths/nest/ok/k*'))
30 print(dict(db)) # snapsot of whole db
31 db.uncache() # frees memory, causes re-reads later
32
33 # shorthand for accessing deeply nested files
34 lnk = db.getlink('myobjects/test')
35 lnk.foo = 2
36 lnk.bar = lnk.foo + 5
37 self.assertEqual(lnk.bar, 7)
38
39 @skip("Too slow for regular running.")
40 def test_stress(self):
41 db = PickleShareDB('~/fsdbtest')
42 import time,sys
43 for i in range(1000):
44 for j in range(1000):
45 if i % 15 == 0 and i < 200:
46 if str(j) in db:
47 del db[str(j)]
48 continue
49
50 if j%33 == 0:
51 time.sleep(0.02)
52
53 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
54 db.hset('hash',j, db.hget('hash',j,15) + 1 )
55
56 print(i, end=' ')
57 sys.stdout.flush()
58 if i % 10 == 0:
59 db.uncache() No newline at end of file
@@ -1,370 +1,325 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2
2
3 """ PickleShare - a small 'shelve' like datastore with concurrency support
3 """ PickleShare - a small 'shelve' like datastore with concurrency support
4
4
5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
5 Like shelve, a PickleShareDB object acts like a normal dictionary. Unlike
6 shelve, many processes can access the database simultaneously. Changing a
6 shelve, many processes can access the database simultaneously. Changing a
7 value in database is immediately visible to other processes accessing the
7 value in database is immediately visible to other processes accessing the
8 same database.
8 same database.
9
9
10 Concurrency is possible because the values are stored in separate files. Hence
10 Concurrency is possible because the values are stored in separate files. Hence
11 the "database" is a directory where *all* files are governed by PickleShare.
11 the "database" is a directory where *all* files are governed by PickleShare.
12
12
13 Example usage::
13 Example usage::
14
14
15 from pickleshare import *
15 from pickleshare import *
16 db = PickleShareDB('~/testpickleshare')
16 db = PickleShareDB('~/testpickleshare')
17 db.clear()
17 db.clear()
18 print "Should be empty:",db.items()
18 print "Should be empty:",db.items()
19 db['hello'] = 15
19 db['hello'] = 15
20 db['aku ankka'] = [1,2,313]
20 db['aku ankka'] = [1,2,313]
21 db['paths/are/ok/key'] = [1,(5,46)]
21 db['paths/are/ok/key'] = [1,(5,46)]
22 print db.keys()
22 print db.keys()
23 del db['aku ankka']
23 del db['aku ankka']
24
24
25 This module is certainly not ZODB, but can be used for low-load
25 This module is certainly not ZODB, but can be used for low-load
26 (non-mission-critical) situations where tiny code size trumps the
26 (non-mission-critical) situations where tiny code size trumps the
27 advanced features of a "real" object database.
27 advanced features of a "real" object database.
28
28
29 Installation guide: easy_install pickleshare
29 Installation guide: easy_install pickleshare
30
30
31 Author: Ville Vainio <vivainio@gmail.com>
31 Author: Ville Vainio <vivainio@gmail.com>
32 License: MIT open source license.
32 License: MIT open source license.
33
33
34 """
34 """
35 from __future__ import print_function
35 from __future__ import print_function
36
36
37 from IPython.external.path import path as Path
37 from IPython.external.path import path as Path
38 import os,stat,time
38 import os,stat,time
39 import collections
39 import collections
40 try:
40 try:
41 import cPickle as pickle
41 import cPickle as pickle
42 except ImportError:
42 except ImportError:
43 import pickle
43 import pickle
44 import glob
44 import glob
45
45
46 def gethashfile(key):
46 def gethashfile(key):
47 return ("%02x" % abs(hash(key) % 256))[-2:]
47 return ("%02x" % abs(hash(key) % 256))[-2:]
48
48
49 _sentinel = object()
49 _sentinel = object()
50
50
51 class PickleShareDB(collections.MutableMapping):
51 class PickleShareDB(collections.MutableMapping):
52 """ The main 'connection' object for PickleShare database """
52 """ The main 'connection' object for PickleShare database """
53 def __init__(self,root):
53 def __init__(self,root):
54 """ Return a db object that will manage the specied directory"""
54 """ Return a db object that will manage the specied directory"""
55 self.root = Path(root).expanduser().abspath()
55 self.root = Path(root).expanduser().abspath()
56 if not self.root.isdir():
56 if not self.root.isdir():
57 self.root.makedirs()
57 self.root.makedirs()
58 # cache has { 'key' : (obj, orig_mod_time) }
58 # cache has { 'key' : (obj, orig_mod_time) }
59 self.cache = {}
59 self.cache = {}
60
60
61
61
62 def __getitem__(self,key):
62 def __getitem__(self,key):
63 """ db['key'] reading """
63 """ db['key'] reading """
64 fil = self.root / key
64 fil = self.root / key
65 try:
65 try:
66 mtime = (fil.stat()[stat.ST_MTIME])
66 mtime = (fil.stat()[stat.ST_MTIME])
67 except OSError:
67 except OSError:
68 raise KeyError(key)
68 raise KeyError(key)
69
69
70 if fil in self.cache and mtime == self.cache[fil][1]:
70 if fil in self.cache and mtime == self.cache[fil][1]:
71 return self.cache[fil][0]
71 return self.cache[fil][0]
72 try:
72 try:
73 # The cached item has expired, need to read
73 # The cached item has expired, need to read
74 with fil.open("rb") as f:
74 with fil.open("rb") as f:
75 obj = pickle.loads(f.read())
75 obj = pickle.loads(f.read())
76 except:
76 except:
77 raise KeyError(key)
77 raise KeyError(key)
78
78
79 self.cache[fil] = (obj,mtime)
79 self.cache[fil] = (obj,mtime)
80 return obj
80 return obj
81
81
82 def __setitem__(self,key,value):
82 def __setitem__(self,key,value):
83 """ db['key'] = 5 """
83 """ db['key'] = 5 """
84 fil = self.root / key
84 fil = self.root / key
85 parent = fil.parent
85 parent = fil.parent
86 if parent and not parent.isdir():
86 if parent and not parent.isdir():
87 parent.makedirs()
87 parent.makedirs()
88 # We specify protocol 2, so that we can mostly go between Python 2
88 # We specify protocol 2, so that we can mostly go between Python 2
89 # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
89 # and Python 3. We can upgrade to protocol 3 when Python 2 is obsolete.
90 with fil.open('wb') as f:
90 with fil.open('wb') as f:
91 pickled = pickle.dump(value, f, protocol=2)
91 pickled = pickle.dump(value, f, protocol=2)
92 try:
92 try:
93 self.cache[fil] = (value,fil.mtime)
93 self.cache[fil] = (value,fil.mtime)
94 except OSError as e:
94 except OSError as e:
95 if e.errno != 2:
95 if e.errno != 2:
96 raise
96 raise
97
97
98 def hset(self, hashroot, key, value):
98 def hset(self, hashroot, key, value):
99 """ hashed set """
99 """ hashed set """
100 hroot = self.root / hashroot
100 hroot = self.root / hashroot
101 if not hroot.isdir():
101 if not hroot.isdir():
102 hroot.makedirs()
102 hroot.makedirs()
103 hfile = hroot / gethashfile(key)
103 hfile = hroot / gethashfile(key)
104 d = self.get(hfile, {})
104 d = self.get(hfile, {})
105 d.update( {key : value})
105 d.update( {key : value})
106 self[hfile] = d
106 self[hfile] = d
107
107
108
108
109
109
110 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
110 def hget(self, hashroot, key, default = _sentinel, fast_only = True):
111 """ hashed get """
111 """ hashed get """
112 hroot = self.root / hashroot
112 hroot = self.root / hashroot
113 hfile = hroot / gethashfile(key)
113 hfile = hroot / gethashfile(key)
114
114
115 d = self.get(hfile, _sentinel )
115 d = self.get(hfile, _sentinel )
116 #print "got dict",d,"from",hfile
116 #print "got dict",d,"from",hfile
117 if d is _sentinel:
117 if d is _sentinel:
118 if fast_only:
118 if fast_only:
119 if default is _sentinel:
119 if default is _sentinel:
120 raise KeyError(key)
120 raise KeyError(key)
121
121
122 return default
122 return default
123
123
124 # slow mode ok, works even after hcompress()
124 # slow mode ok, works even after hcompress()
125 d = self.hdict(hashroot)
125 d = self.hdict(hashroot)
126
126
127 return d.get(key, default)
127 return d.get(key, default)
128
128
129 def hdict(self, hashroot):
129 def hdict(self, hashroot):
130 """ Get all data contained in hashed category 'hashroot' as dict """
130 """ Get all data contained in hashed category 'hashroot' as dict """
131 hfiles = self.keys(hashroot + "/*")
131 hfiles = self.keys(hashroot + "/*")
132 hfiles.sort()
132 hfiles.sort()
133 last = len(hfiles) and hfiles[-1] or ''
133 last = len(hfiles) and hfiles[-1] or ''
134 if last.endswith('xx'):
134 if last.endswith('xx'):
135 # print "using xx"
135 # print "using xx"
136 hfiles = [last] + hfiles[:-1]
136 hfiles = [last] + hfiles[:-1]
137
137
138 all = {}
138 all = {}
139
139
140 for f in hfiles:
140 for f in hfiles:
141 # print "using",f
141 # print "using",f
142 try:
142 try:
143 all.update(self[f])
143 all.update(self[f])
144 except KeyError:
144 except KeyError:
145 print("Corrupt",f,"deleted - hset is not threadsafe!")
145 print("Corrupt",f,"deleted - hset is not threadsafe!")
146 del self[f]
146 del self[f]
147
147
148 self.uncache(f)
148 self.uncache(f)
149
149
150 return all
150 return all
151
151
152 def hcompress(self, hashroot):
152 def hcompress(self, hashroot):
153 """ Compress category 'hashroot', so hset is fast again
153 """ Compress category 'hashroot', so hset is fast again
154
154
155 hget will fail if fast_only is True for compressed items (that were
155 hget will fail if fast_only is True for compressed items (that were
156 hset before hcompress).
156 hset before hcompress).
157
157
158 """
158 """
159 hfiles = self.keys(hashroot + "/*")
159 hfiles = self.keys(hashroot + "/*")
160 all = {}
160 all = {}
161 for f in hfiles:
161 for f in hfiles:
162 # print "using",f
162 # print "using",f
163 all.update(self[f])
163 all.update(self[f])
164 self.uncache(f)
164 self.uncache(f)
165
165
166 self[hashroot + '/xx'] = all
166 self[hashroot + '/xx'] = all
167 for f in hfiles:
167 for f in hfiles:
168 p = self.root / f
168 p = self.root / f
169 if p.basename() == 'xx':
169 if p.basename() == 'xx':
170 continue
170 continue
171 p.remove()
171 p.remove()
172
172
173
173
174
174
175 def __delitem__(self,key):
175 def __delitem__(self,key):
176 """ del db["key"] """
176 """ del db["key"] """
177 fil = self.root / key
177 fil = self.root / key
178 self.cache.pop(fil,None)
178 self.cache.pop(fil,None)
179 try:
179 try:
180 fil.remove()
180 fil.remove()
181 except OSError:
181 except OSError:
182 # notfound and permission denied are ok - we
182 # notfound and permission denied are ok - we
183 # lost, the other process wins the conflict
183 # lost, the other process wins the conflict
184 pass
184 pass
185
185
186 def _normalized(self, p):
186 def _normalized(self, p):
187 """ Make a key suitable for user's eyes """
187 """ Make a key suitable for user's eyes """
188 return str(self.root.relpathto(p)).replace('\\','/')
188 return str(self.root.relpathto(p)).replace('\\','/')
189
189
190 def keys(self, globpat = None):
190 def keys(self, globpat = None):
191 """ All keys in DB, or all keys matching a glob"""
191 """ All keys in DB, or all keys matching a glob"""
192
192
193 if globpat is None:
193 if globpat is None:
194 files = self.root.walkfiles()
194 files = self.root.walkfiles()
195 else:
195 else:
196 files = [Path(p) for p in glob.glob(self.root/globpat)]
196 files = [Path(p) for p in glob.glob(self.root/globpat)]
197 return [self._normalized(p) for p in files if p.isfile()]
197 return [self._normalized(p) for p in files if p.isfile()]
198
198
199 def __iter__(self):
199 def __iter__(self):
200 return iter(self.keys())
200 return iter(self.keys())
201
201
202 def __len__(self):
202 def __len__(self):
203 return len(self.keys())
203 return len(self.keys())
204
204
205 def uncache(self,*items):
205 def uncache(self,*items):
206 """ Removes all, or specified items from cache
206 """ Removes all, or specified items from cache
207
207
208 Use this after reading a large amount of large objects
208 Use this after reading a large amount of large objects
209 to free up memory, when you won't be needing the objects
209 to free up memory, when you won't be needing the objects
210 for a while.
210 for a while.
211
211
212 """
212 """
213 if not items:
213 if not items:
214 self.cache = {}
214 self.cache = {}
215 for it in items:
215 for it in items:
216 self.cache.pop(it,None)
216 self.cache.pop(it,None)
217
217
218 def waitget(self,key, maxwaittime = 60 ):
218 def waitget(self,key, maxwaittime = 60 ):
219 """ Wait (poll) for a key to get a value
219 """ Wait (poll) for a key to get a value
220
220
221 Will wait for `maxwaittime` seconds before raising a KeyError.
221 Will wait for `maxwaittime` seconds before raising a KeyError.
222 The call exits normally if the `key` field in db gets a value
222 The call exits normally if the `key` field in db gets a value
223 within the timeout period.
223 within the timeout period.
224
224
225 Use this for synchronizing different processes or for ensuring
225 Use this for synchronizing different processes or for ensuring
226 that an unfortunately timed "db['key'] = newvalue" operation
226 that an unfortunately timed "db['key'] = newvalue" operation
227 in another process (which causes all 'get' operation to cause a
227 in another process (which causes all 'get' operation to cause a
228 KeyError for the duration of pickling) won't screw up your program
228 KeyError for the duration of pickling) won't screw up your program
229 logic.
229 logic.
230 """
230 """
231
231
232 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
232 wtimes = [0.2] * 3 + [0.5] * 2 + [1]
233 tries = 0
233 tries = 0
234 waited = 0
234 waited = 0
235 while 1:
235 while 1:
236 try:
236 try:
237 val = self[key]
237 val = self[key]
238 return val
238 return val
239 except KeyError:
239 except KeyError:
240 pass
240 pass
241
241
242 if waited > maxwaittime:
242 if waited > maxwaittime:
243 raise KeyError(key)
243 raise KeyError(key)
244
244
245 time.sleep(wtimes[tries])
245 time.sleep(wtimes[tries])
246 waited+=wtimes[tries]
246 waited+=wtimes[tries]
247 if tries < len(wtimes) -1:
247 if tries < len(wtimes) -1:
248 tries+=1
248 tries+=1
249
249
250 def getlink(self,folder):
250 def getlink(self,folder):
251 """ Get a convenient link for accessing items """
251 """ Get a convenient link for accessing items """
252 return PickleShareLink(self, folder)
252 return PickleShareLink(self, folder)
253
253
254 def __repr__(self):
254 def __repr__(self):
255 return "PickleShareDB('%s')" % self.root
255 return "PickleShareDB('%s')" % self.root
256
256
257
257
258
258
259 class PickleShareLink:
259 class PickleShareLink:
260 """ A shortdand for accessing nested PickleShare data conveniently.
260 """ A shortdand for accessing nested PickleShare data conveniently.
261
261
262 Created through PickleShareDB.getlink(), example::
262 Created through PickleShareDB.getlink(), example::
263
263
264 lnk = db.getlink('myobjects/test')
264 lnk = db.getlink('myobjects/test')
265 lnk.foo = 2
265 lnk.foo = 2
266 lnk.bar = lnk.foo + 5
266 lnk.bar = lnk.foo + 5
267
267
268 """
268 """
269 def __init__(self, db, keydir ):
269 def __init__(self, db, keydir ):
270 self.__dict__.update(locals())
270 self.__dict__.update(locals())
271
271
272 def __getattr__(self,key):
272 def __getattr__(self,key):
273 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
273 return self.__dict__['db'][self.__dict__['keydir']+'/' + key]
274 def __setattr__(self,key,val):
274 def __setattr__(self,key,val):
275 self.db[self.keydir+'/' + key] = val
275 self.db[self.keydir+'/' + key] = val
276 def __repr__(self):
276 def __repr__(self):
277 db = self.__dict__['db']
277 db = self.__dict__['db']
278 keys = db.keys( self.__dict__['keydir'] +"/*")
278 keys = db.keys( self.__dict__['keydir'] +"/*")
279 return "<PickleShareLink '%s': %s>" % (
279 return "<PickleShareLink '%s': %s>" % (
280 self.__dict__['keydir'],
280 self.__dict__['keydir'],
281 ";".join([Path(k).basename() for k in keys]))
281 ";".join([Path(k).basename() for k in keys]))
282
282
283
284 def test():
285 db = PickleShareDB('~/testpickleshare')
286 db.clear()
287 print("Should be empty:",db.items())
288 db['hello'] = 15
289 db['aku ankka'] = [1,2,313]
290 db['paths/nest/ok/keyname'] = [1,(5,46)]
291 db.hset('hash', 'aku', 12)
292 db.hset('hash', 'ankka', 313)
293 print("12 =",db.hget('hash','aku'))
294 print("313 =",db.hget('hash','ankka'))
295 print("all hashed",db.hdict('hash'))
296 print(db.keys())
297 print(db.keys('paths/nest/ok/k*'))
298 print(dict(db)) # snapsot of whole db
299 db.uncache() # frees memory, causes re-reads later
300
301 # shorthand for accessing deeply nested files
302 lnk = db.getlink('myobjects/test')
303 lnk.foo = 2
304 lnk.bar = lnk.foo + 5
305 print(lnk.bar) # 7
306
307 def stress():
308 db = PickleShareDB('~/fsdbtest')
309 import time,sys
310 for i in range(1000):
311 for j in range(1000):
312 if i % 15 == 0 and i < 200:
313 if str(j) in db:
314 del db[str(j)]
315 continue
316
317 if j%33 == 0:
318 time.sleep(0.02)
319
320 db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())]
321 db.hset('hash',j, db.hget('hash',j,15) + 1 )
322
323 print(i, end=' ')
324 sys.stdout.flush()
325 if i % 10 == 0:
326 db.uncache()
327
328 def main():
283 def main():
329 import textwrap
284 import textwrap
330 usage = textwrap.dedent("""\
285 usage = textwrap.dedent("""\
331 pickleshare - manage PickleShare databases
286 pickleshare - manage PickleShare databases
332
287
333 Usage:
288 Usage:
334
289
335 pickleshare dump /path/to/db > dump.txt
290 pickleshare dump /path/to/db > dump.txt
336 pickleshare load /path/to/db < dump.txt
291 pickleshare load /path/to/db < dump.txt
337 pickleshare test /path/to/db
292 pickleshare test /path/to/db
338 """)
293 """)
339 DB = PickleShareDB
294 DB = PickleShareDB
340 import sys
295 import sys
341 if len(sys.argv) < 2:
296 if len(sys.argv) < 2:
342 print(usage)
297 print(usage)
343 return
298 return
344
299
345 cmd = sys.argv[1]
300 cmd = sys.argv[1]
346 args = sys.argv[2:]
301 args = sys.argv[2:]
347 if cmd == 'dump':
302 if cmd == 'dump':
348 if not args: args= ['.']
303 if not args: args= ['.']
349 db = DB(args[0])
304 db = DB(args[0])
350 import pprint
305 import pprint
351 pprint.pprint(db.items())
306 pprint.pprint(db.items())
352 elif cmd == 'load':
307 elif cmd == 'load':
353 cont = sys.stdin.read()
308 cont = sys.stdin.read()
354 db = DB(args[0])
309 db = DB(args[0])
355 data = eval(cont)
310 data = eval(cont)
356 db.clear()
311 db.clear()
357 for k,v in db.items():
312 for k,v in db.items():
358 db[k] = v
313 db[k] = v
359 elif cmd == 'testwait':
314 elif cmd == 'testwait':
360 db = DB(args[0])
315 db = DB(args[0])
361 db.clear()
316 db.clear()
362 print(db.waitget('250'))
317 print(db.waitget('250'))
363 elif cmd == 'test':
318 elif cmd == 'test':
364 test()
319 test()
365 stress()
320 stress()
366
321
367 if __name__== "__main__":
322 if __name__== "__main__":
368 main()
323 main()
369
324
370
325
@@ -1,23 +1,39 b''
1 import io
1 import io
2 import os.path
2 import os.path
3 import nose.tools as nt
3 import nose.tools as nt
4
4
5 from IPython.utils import openpy
5 from IPython.utils import openpy
6
6
7 mydir = os.path.dirname(__file__)
7 mydir = os.path.dirname(__file__)
8 nonascii_path = os.path.join(mydir, '../../core/tests/nonascii.py')
8 nonascii_path = os.path.join(mydir, '../../core/tests/nonascii.py')
9
9
10 def test_detect_encoding():
10 def test_detect_encoding():
11 f = open(nonascii_path, 'rb')
11 f = open(nonascii_path, 'rb')
12 enc, lines = openpy.detect_encoding(f.readline)
12 enc, lines = openpy.detect_encoding(f.readline)
13 nt.assert_equal(enc, 'iso-8859-5')
13 nt.assert_equal(enc, 'iso-8859-5')
14
14
15 def test_read_file():
15 def test_read_file():
16 read_specified_enc = io.open(nonascii_path, encoding='iso-8859-5').read()
16 read_specified_enc = io.open(nonascii_path, encoding='iso-8859-5').read()
17 read_detected_enc = openpy.read_py_file(nonascii_path, skip_encoding_cookie=False)
17 read_detected_enc = openpy.read_py_file(nonascii_path, skip_encoding_cookie=False)
18 nt.assert_equal(read_detected_enc, read_specified_enc)
18 nt.assert_equal(read_detected_enc, read_specified_enc)
19 assert u'coding: iso-8859-5' in read_detected_enc
19 assert u'coding: iso-8859-5' in read_detected_enc
20
20
21 read_strip_enc_cookie = openpy.read_py_file(nonascii_path, skip_encoding_cookie=True)
21 read_strip_enc_cookie = openpy.read_py_file(nonascii_path, skip_encoding_cookie=True)
22 assert u'coding: iso-8859-5' not in read_strip_enc_cookie
22 assert u'coding: iso-8859-5' not in read_strip_enc_cookie
23
23
24 def test_source_to_unicode():
25 with io.open(nonascii_path, 'rb') as f:
26 source_bytes = f.read()
27 nt.assert_equal(openpy.source_to_unicode(source_bytes, skip_encoding_cookie=False),
28 source_bytes.decode('iso-8859-5'))
29
30 source_no_cookie = openpy.source_to_unicode(source_bytes, skip_encoding_cookie=True)
31 nt.assert_not_in(u'coding: iso-8859-5', source_no_cookie)
32
33 def test_list_readline():
34 l = ['a', 'b']
35 readline = openpy._list_readline(l)
36 nt.assert_equal(readline(), 'a')
37 nt.assert_equal(readline(), 'b')
38 with nt.assert_raises(StopIteration):
39 readline() No newline at end of file
@@ -1,177 +1,190 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """Tests for IPython.utils.text"""
2 """Tests for IPython.utils.text"""
3 from __future__ import print_function
3 from __future__ import print_function
4
4
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2011 The IPython Development Team
6 # Copyright (C) 2011 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
9 # the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 import os
16 import os
17 import math
17 import math
18 import random
18 import random
19 import sys
19 import sys
20
20
21 import nose.tools as nt
21 import nose.tools as nt
22
22
23 from IPython.utils import text
23 from IPython.utils import text
24
24
25 #-----------------------------------------------------------------------------
25 #-----------------------------------------------------------------------------
26 # Globals
26 # Globals
27 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
28
28
29 def test_columnize():
29 def test_columnize():
30 """Basic columnize tests."""
30 """Basic columnize tests."""
31 size = 5
31 size = 5
32 items = [l*size for l in 'abc']
32 items = [l*size for l in 'abc']
33 out = text.columnize(items, displaywidth=80)
33 out = text.columnize(items, displaywidth=80)
34 nt.assert_equal(out, 'aaaaa bbbbb ccccc\n')
34 nt.assert_equal(out, 'aaaaa bbbbb ccccc\n')
35 out = text.columnize(items, displaywidth=12)
35 out = text.columnize(items, displaywidth=12)
36 nt.assert_equal(out, 'aaaaa ccccc\nbbbbb\n')
36 nt.assert_equal(out, 'aaaaa ccccc\nbbbbb\n')
37 out = text.columnize(items, displaywidth=10)
37 out = text.columnize(items, displaywidth=10)
38 nt.assert_equal(out, 'aaaaa\nbbbbb\nccccc\n')
38 nt.assert_equal(out, 'aaaaa\nbbbbb\nccccc\n')
39
39
40 def test_columnize_random():
40 def test_columnize_random():
41 """Test with random input to hopfully catch edge case """
41 """Test with random input to hopfully catch edge case """
42 for nitems in [random.randint(2,70) for i in range(2,20)]:
42 for nitems in [random.randint(2,70) for i in range(2,20)]:
43 displaywidth = random.randint(20,200)
43 displaywidth = random.randint(20,200)
44 rand_len = [random.randint(2,displaywidth) for i in range(nitems)]
44 rand_len = [random.randint(2,displaywidth) for i in range(nitems)]
45 items = ['x'*l for l in rand_len]
45 items = ['x'*l for l in rand_len]
46 out = text.columnize(items, displaywidth=displaywidth)
46 out = text.columnize(items, displaywidth=displaywidth)
47 longer_line = max([len(x) for x in out.split('\n')])
47 longer_line = max([len(x) for x in out.split('\n')])
48 longer_element = max(rand_len)
48 longer_element = max(rand_len)
49 if longer_line > displaywidth:
49 if longer_line > displaywidth:
50 print("Columnize displayed something lager than displaywidth : %s " % longer_line)
50 print("Columnize displayed something lager than displaywidth : %s " % longer_line)
51 print("longer element : %s " % longer_element)
51 print("longer element : %s " % longer_element)
52 print("displaywidth : %s " % displaywidth)
52 print("displaywidth : %s " % displaywidth)
53 print("number of element : %s " % nitems)
53 print("number of element : %s " % nitems)
54 print("size of each element :\n %s" % rand_len)
54 print("size of each element :\n %s" % rand_len)
55 assert False
55 assert False
56
56
57 def test_columnize_medium():
57 def test_columnize_medium():
58 """Test with inputs than shouldn't be wider tahn 80 """
58 """Test with inputs than shouldn't be wider tahn 80 """
59 size = 40
59 size = 40
60 items = [l*size for l in 'abc']
60 items = [l*size for l in 'abc']
61 out = text.columnize(items, displaywidth=80)
61 out = text.columnize(items, displaywidth=80)
62 nt.assert_equal(out, '\n'.join(items+['']))
62 nt.assert_equal(out, '\n'.join(items+['']))
63
63
64 def test_columnize_long():
64 def test_columnize_long():
65 """Test columnize with inputs longer than the display window"""
65 """Test columnize with inputs longer than the display window"""
66 size = 11
66 size = 11
67 items = [l*size for l in 'abc']
67 items = [l*size for l in 'abc']
68 out = text.columnize(items, displaywidth=size-1)
68 out = text.columnize(items, displaywidth=size-1)
69 nt.assert_equal(out, '\n'.join(items+['']))
69 nt.assert_equal(out, '\n'.join(items+['']))
70
70
71 def eval_formatter_check(f):
71 def eval_formatter_check(f):
72 ns = dict(n=12, pi=math.pi, stuff='hello there', os=os, u=u"cafΓ©", b="cafΓ©")
72 ns = dict(n=12, pi=math.pi, stuff='hello there', os=os, u=u"cafΓ©", b="cafΓ©")
73 s = f.format("{n} {n//4} {stuff.split()[0]}", **ns)
73 s = f.format("{n} {n//4} {stuff.split()[0]}", **ns)
74 nt.assert_equal(s, "12 3 hello")
74 nt.assert_equal(s, "12 3 hello")
75 s = f.format(' '.join(['{n//%i}'%i for i in range(1,8)]), **ns)
75 s = f.format(' '.join(['{n//%i}'%i for i in range(1,8)]), **ns)
76 nt.assert_equal(s, "12 6 4 3 2 2 1")
76 nt.assert_equal(s, "12 6 4 3 2 2 1")
77 s = f.format('{[n//i for i in range(1,8)]}', **ns)
77 s = f.format('{[n//i for i in range(1,8)]}', **ns)
78 nt.assert_equal(s, "[12, 6, 4, 3, 2, 2, 1]")
78 nt.assert_equal(s, "[12, 6, 4, 3, 2, 2, 1]")
79 s = f.format("{stuff!s}", **ns)
79 s = f.format("{stuff!s}", **ns)
80 nt.assert_equal(s, ns['stuff'])
80 nt.assert_equal(s, ns['stuff'])
81 s = f.format("{stuff!r}", **ns)
81 s = f.format("{stuff!r}", **ns)
82 nt.assert_equal(s, repr(ns['stuff']))
82 nt.assert_equal(s, repr(ns['stuff']))
83
83
84 # Check with unicode:
84 # Check with unicode:
85 s = f.format("{u}", **ns)
85 s = f.format("{u}", **ns)
86 nt.assert_equal(s, ns['u'])
86 nt.assert_equal(s, ns['u'])
87 # This decodes in a platform dependent manner, but it shouldn't error out
87 # This decodes in a platform dependent manner, but it shouldn't error out
88 s = f.format("{b}", **ns)
88 s = f.format("{b}", **ns)
89
89
90 nt.assert_raises(NameError, f.format, '{dne}', **ns)
90 nt.assert_raises(NameError, f.format, '{dne}', **ns)
91
91
92 def eval_formatter_slicing_check(f):
92 def eval_formatter_slicing_check(f):
93 ns = dict(n=12, pi=math.pi, stuff='hello there', os=os)
93 ns = dict(n=12, pi=math.pi, stuff='hello there', os=os)
94 s = f.format(" {stuff.split()[:]} ", **ns)
94 s = f.format(" {stuff.split()[:]} ", **ns)
95 nt.assert_equal(s, " ['hello', 'there'] ")
95 nt.assert_equal(s, " ['hello', 'there'] ")
96 s = f.format(" {stuff.split()[::-1]} ", **ns)
96 s = f.format(" {stuff.split()[::-1]} ", **ns)
97 nt.assert_equal(s, " ['there', 'hello'] ")
97 nt.assert_equal(s, " ['there', 'hello'] ")
98 s = f.format("{stuff[::2]}", **ns)
98 s = f.format("{stuff[::2]}", **ns)
99 nt.assert_equal(s, ns['stuff'][::2])
99 nt.assert_equal(s, ns['stuff'][::2])
100
100
101 nt.assert_raises(SyntaxError, f.format, "{n:x}", **ns)
101 nt.assert_raises(SyntaxError, f.format, "{n:x}", **ns)
102
102
103 def eval_formatter_no_slicing_check(f):
103 def eval_formatter_no_slicing_check(f):
104 ns = dict(n=12, pi=math.pi, stuff='hello there', os=os)
104 ns = dict(n=12, pi=math.pi, stuff='hello there', os=os)
105
105
106 s = f.format('{n:x} {pi**2:+f}', **ns)
106 s = f.format('{n:x} {pi**2:+f}', **ns)
107 nt.assert_equal(s, "c +9.869604")
107 nt.assert_equal(s, "c +9.869604")
108
108
109 s = f.format('{stuff[slice(1,4)]}', **ns)
109 s = f.format('{stuff[slice(1,4)]}', **ns)
110 nt.assert_equal(s, 'ell')
110 nt.assert_equal(s, 'ell')
111
111
112 if sys.version_info >= (3, 4):
112 if sys.version_info >= (3, 4):
113 # String formatting has changed in Python 3.4, so this now works.
113 # String formatting has changed in Python 3.4, so this now works.
114 s = f.format("{a[:]}", a=[1, 2])
114 s = f.format("{a[:]}", a=[1, 2])
115 nt.assert_equal(s, "[1, 2]")
115 nt.assert_equal(s, "[1, 2]")
116 else:
116 else:
117 nt.assert_raises(SyntaxError, f.format, "{a[:]}")
117 nt.assert_raises(SyntaxError, f.format, "{a[:]}")
118
118
119 def test_eval_formatter():
119 def test_eval_formatter():
120 f = text.EvalFormatter()
120 f = text.EvalFormatter()
121 eval_formatter_check(f)
121 eval_formatter_check(f)
122 eval_formatter_no_slicing_check(f)
122 eval_formatter_no_slicing_check(f)
123
123
124 def test_full_eval_formatter():
124 def test_full_eval_formatter():
125 f = text.FullEvalFormatter()
125 f = text.FullEvalFormatter()
126 eval_formatter_check(f)
126 eval_formatter_check(f)
127 eval_formatter_slicing_check(f)
127 eval_formatter_slicing_check(f)
128
128
129 def test_dollar_formatter():
129 def test_dollar_formatter():
130 f = text.DollarFormatter()
130 f = text.DollarFormatter()
131 eval_formatter_check(f)
131 eval_formatter_check(f)
132 eval_formatter_slicing_check(f)
132 eval_formatter_slicing_check(f)
133
133
134 ns = dict(n=12, pi=math.pi, stuff='hello there', os=os)
134 ns = dict(n=12, pi=math.pi, stuff='hello there', os=os)
135 s = f.format("$n", **ns)
135 s = f.format("$n", **ns)
136 nt.assert_equal(s, "12")
136 nt.assert_equal(s, "12")
137 s = f.format("$n.real", **ns)
137 s = f.format("$n.real", **ns)
138 nt.assert_equal(s, "12")
138 nt.assert_equal(s, "12")
139 s = f.format("$n/{stuff[:5]}", **ns)
139 s = f.format("$n/{stuff[:5]}", **ns)
140 nt.assert_equal(s, "12/hello")
140 nt.assert_equal(s, "12/hello")
141 s = f.format("$n $$HOME", **ns)
141 s = f.format("$n $$HOME", **ns)
142 nt.assert_equal(s, "12 $HOME")
142 nt.assert_equal(s, "12 $HOME")
143 s = f.format("${foo}", foo="HOME")
143 s = f.format("${foo}", foo="HOME")
144 nt.assert_equal(s, "$HOME")
144 nt.assert_equal(s, "$HOME")
145
145
146
146
147 def test_long_substr():
147 def test_long_substr():
148 data = ['hi']
148 data = ['hi']
149 nt.assert_equal(text.long_substr(data), 'hi')
149 nt.assert_equal(text.long_substr(data), 'hi')
150
150
151
151
152 def test_long_substr2():
152 def test_long_substr2():
153 data = ['abc', 'abd', 'abf', 'ab']
153 data = ['abc', 'abd', 'abf', 'ab']
154 nt.assert_equal(text.long_substr(data), 'ab')
154 nt.assert_equal(text.long_substr(data), 'ab')
155
155
156 def test_long_substr_empty():
156 def test_long_substr_empty():
157 data = []
157 data = []
158 nt.assert_equal(text.long_substr(data), '')
158 nt.assert_equal(text.long_substr(data), '')
159
159
160 def test_strip_email():
160 def test_strip_email():
161 src = """\
161 src = """\
162 >> >>> def f(x):
162 >> >>> def f(x):
163 >> ... return x+1
163 >> ... return x+1
164 >> ...
164 >> ...
165 >> >>> zz = f(2.5)"""
165 >> >>> zz = f(2.5)"""
166 cln = """\
166 cln = """\
167 >>> def f(x):
167 >>> def f(x):
168 ... return x+1
168 ... return x+1
169 ...
169 ...
170 >>> zz = f(2.5)"""
170 >>> zz = f(2.5)"""
171 nt.assert_equal(text.strip_email_quotes(src), cln)
171 nt.assert_equal(text.strip_email_quotes(src), cln)
172
172
173
173
174 def test_strip_email2():
174 def test_strip_email2():
175 src = '> > > list()'
175 src = '> > > list()'
176 cln = 'list()'
176 cln = 'list()'
177 nt.assert_equal(text.strip_email_quotes(src), cln)
177 nt.assert_equal(text.strip_email_quotes(src), cln)
178
179 def test_LSString():
180 lss = text.LSString("abc\ndef")
181 nt.assert_equal(lss.l, ['abc', 'def'])
182 nt.assert_equal(lss.s, 'abc def')
183
184 def test_SList():
185 sl = text.SList(['a 11', 'b 1', 'a 2'])
186 nt.assert_equal(sl.n, 'a 11\nb 1\na 2')
187 nt.assert_equal(sl.s, 'a 11 b 1 a 2')
188 nt.assert_equal(sl.grep(lambda x: x.startswith('a')), text.SList(['a 11', 'a 2']))
189 nt.assert_equal(sl.fields(0), text.SList(['a', 'b', 'a']))
190 nt.assert_equal(sl.sort(field=1, nums=True), text.SList(['b 1', 'a 2', 'a 11'])) No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now