Show More
@@ -0,0 +1,10 b'' | |||||
|
1 | from IPython.utils import decorators | |||
|
2 | ||||
|
3 | def test_flag_calls(): | |||
|
4 | @decorators.flag_calls | |||
|
5 | def f(): | |||
|
6 | pass | |||
|
7 | ||||
|
8 | assert not f.called | |||
|
9 | f() | |||
|
10 | assert f.called No newline at end of file |
@@ -0,0 +1,61 b'' | |||||
|
1 | from __future__ import print_function | |||
|
2 | ||||
|
3 | import os | |||
|
4 | from unittest import TestCase | |||
|
5 | ||||
|
6 | from IPython.testing.decorators import skip | |||
|
7 | from IPython.utils.tempdir import TemporaryDirectory | |||
|
8 | from IPython.utils.pickleshare import PickleShareDB | |||
|
9 | ||||
|
10 | ||||
|
11 | class PickleShareDBTestCase(TestCase): | |||
|
12 | def setUp(self): | |||
|
13 | self.tempdir = TemporaryDirectory() | |||
|
14 | ||||
|
15 | def tearDown(self): | |||
|
16 | self.tempdir.cleanup() | |||
|
17 | ||||
|
18 | def test_picklesharedb(self): | |||
|
19 | db = PickleShareDB(self.tempdir.name) | |||
|
20 | db.clear() | |||
|
21 | print("Should be empty:",db.items()) | |||
|
22 | db['hello'] = 15 | |||
|
23 | db['aku ankka'] = [1,2,313] | |||
|
24 | db['paths/nest/ok/keyname'] = [1,(5,46)] | |||
|
25 | db.hset('hash', 'aku', 12) | |||
|
26 | db.hset('hash', 'ankka', 313) | |||
|
27 | self.assertEqual(db.hget('hash','aku'), 12) | |||
|
28 | self.assertEqual(db.hget('hash','ankka'), 313) | |||
|
29 | print("all hashed",db.hdict('hash')) | |||
|
30 | print(db.keys()) | |||
|
31 | print(db.keys('paths/nest/ok/k*')) | |||
|
32 | print(dict(db)) # snapsot of whole db | |||
|
33 | db.uncache() # frees memory, causes re-reads later | |||
|
34 | ||||
|
35 | # shorthand for accessing deeply nested files | |||
|
36 | lnk = db.getlink('myobjects/test') | |||
|
37 | lnk.foo = 2 | |||
|
38 | lnk.bar = lnk.foo + 5 | |||
|
39 | self.assertEqual(lnk.bar, 7) | |||
|
40 | ||||
|
41 | @skip("Too slow for regular running.") | |||
|
42 | def test_stress(self): | |||
|
43 | db = PickleShareDB('~/fsdbtest') | |||
|
44 | import time,sys | |||
|
45 | for i in range(1000): | |||
|
46 | for j in range(1000): | |||
|
47 | if i % 15 == 0 and i < 200: | |||
|
48 | if str(j) in db: | |||
|
49 | del db[str(j)] | |||
|
50 | continue | |||
|
51 | ||||
|
52 | if j%33 == 0: | |||
|
53 | time.sleep(0.02) | |||
|
54 | ||||
|
55 | db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())] | |||
|
56 | db.hset('hash',j, db.hget('hash',j,15) + 1 ) | |||
|
57 | ||||
|
58 | print(i, end=' ') | |||
|
59 | sys.stdout.flush() | |||
|
60 | if i % 10 == 0: | |||
|
61 | db.uncache() No newline at end of file |
@@ -280,51 +280,6 b' class PickleShareLink:' | |||||
280 | self.__dict__['keydir'], |
|
280 | self.__dict__['keydir'], | |
281 | ";".join([Path(k).basename() for k in keys])) |
|
281 | ";".join([Path(k).basename() for k in keys])) | |
282 |
|
282 | |||
283 |
|
||||
284 | def test(): |
|
|||
285 | db = PickleShareDB('~/testpickleshare') |
|
|||
286 | db.clear() |
|
|||
287 | print("Should be empty:",db.items()) |
|
|||
288 | db['hello'] = 15 |
|
|||
289 | db['aku ankka'] = [1,2,313] |
|
|||
290 | db['paths/nest/ok/keyname'] = [1,(5,46)] |
|
|||
291 | db.hset('hash', 'aku', 12) |
|
|||
292 | db.hset('hash', 'ankka', 313) |
|
|||
293 | print("12 =",db.hget('hash','aku')) |
|
|||
294 | print("313 =",db.hget('hash','ankka')) |
|
|||
295 | print("all hashed",db.hdict('hash')) |
|
|||
296 | print(db.keys()) |
|
|||
297 | print(db.keys('paths/nest/ok/k*')) |
|
|||
298 | print(dict(db)) # snapsot of whole db |
|
|||
299 | db.uncache() # frees memory, causes re-reads later |
|
|||
300 |
|
||||
301 | # shorthand for accessing deeply nested files |
|
|||
302 | lnk = db.getlink('myobjects/test') |
|
|||
303 | lnk.foo = 2 |
|
|||
304 | lnk.bar = lnk.foo + 5 |
|
|||
305 | print(lnk.bar) # 7 |
|
|||
306 |
|
||||
307 | def stress(): |
|
|||
308 | db = PickleShareDB('~/fsdbtest') |
|
|||
309 | import time,sys |
|
|||
310 | for i in range(1000): |
|
|||
311 | for j in range(1000): |
|
|||
312 | if i % 15 == 0 and i < 200: |
|
|||
313 | if str(j) in db: |
|
|||
314 | del db[str(j)] |
|
|||
315 | continue |
|
|||
316 |
|
||||
317 | if j%33 == 0: |
|
|||
318 | time.sleep(0.02) |
|
|||
319 |
|
||||
320 | db[str(j)] = db.get(str(j), []) + [(i,j,"proc %d" % os.getpid())] |
|
|||
321 | db.hset('hash',j, db.hget('hash',j,15) + 1 ) |
|
|||
322 |
|
||||
323 | print(i, end=' ') |
|
|||
324 | sys.stdout.flush() |
|
|||
325 | if i % 10 == 0: |
|
|||
326 | db.uncache() |
|
|||
327 |
|
||||
328 | def main(): |
|
283 | def main(): | |
329 | import textwrap |
|
284 | import textwrap | |
330 | usage = textwrap.dedent("""\ |
|
285 | usage = textwrap.dedent("""\ |
@@ -20,4 +20,20 b' def test_read_file():' | |||||
20 |
|
20 | |||
21 | read_strip_enc_cookie = openpy.read_py_file(nonascii_path, skip_encoding_cookie=True) |
|
21 | read_strip_enc_cookie = openpy.read_py_file(nonascii_path, skip_encoding_cookie=True) | |
22 | assert u'coding: iso-8859-5' not in read_strip_enc_cookie |
|
22 | assert u'coding: iso-8859-5' not in read_strip_enc_cookie | |
23 |
|
23 | |||
|
24 | def test_source_to_unicode(): | |||
|
25 | with io.open(nonascii_path, 'rb') as f: | |||
|
26 | source_bytes = f.read() | |||
|
27 | nt.assert_equal(openpy.source_to_unicode(source_bytes, skip_encoding_cookie=False), | |||
|
28 | source_bytes.decode('iso-8859-5')) | |||
|
29 | ||||
|
30 | source_no_cookie = openpy.source_to_unicode(source_bytes, skip_encoding_cookie=True) | |||
|
31 | nt.assert_not_in(u'coding: iso-8859-5', source_no_cookie) | |||
|
32 | ||||
|
33 | def test_list_readline(): | |||
|
34 | l = ['a', 'b'] | |||
|
35 | readline = openpy._list_readline(l) | |||
|
36 | nt.assert_equal(readline(), 'a') | |||
|
37 | nt.assert_equal(readline(), 'b') | |||
|
38 | with nt.assert_raises(StopIteration): | |||
|
39 | readline() No newline at end of file |
@@ -175,3 +175,16 b' def test_strip_email2():' | |||||
175 | src = '> > > list()' |
|
175 | src = '> > > list()' | |
176 | cln = 'list()' |
|
176 | cln = 'list()' | |
177 | nt.assert_equal(text.strip_email_quotes(src), cln) |
|
177 | nt.assert_equal(text.strip_email_quotes(src), cln) | |
|
178 | ||||
|
179 | def test_LSString(): | |||
|
180 | lss = text.LSString("abc\ndef") | |||
|
181 | nt.assert_equal(lss.l, ['abc', 'def']) | |||
|
182 | nt.assert_equal(lss.s, 'abc def') | |||
|
183 | ||||
|
184 | def test_SList(): | |||
|
185 | sl = text.SList(['a 11', 'b 1', 'a 2']) | |||
|
186 | nt.assert_equal(sl.n, 'a 11\nb 1\na 2') | |||
|
187 | nt.assert_equal(sl.s, 'a 11 b 1 a 2') | |||
|
188 | nt.assert_equal(sl.grep(lambda x: x.startswith('a')), text.SList(['a 11', 'a 2'])) | |||
|
189 | nt.assert_equal(sl.fields(0), text.SList(['a', 'b', 'a'])) | |||
|
190 | nt.assert_equal(sl.sort(field=1, nums=True), text.SList(['b 1', 'a 2', 'a 11'])) No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now