Show More
@@ -102,14 +102,23 b' class NotebookNotary(LoggingConfigurable):' | |||
|
102 | 102 | app.initialize(argv=[]) |
|
103 | 103 | return app.profile_dir |
|
104 | 104 | |
|
105 |
db_file = Unicode(config=True |
|
|
105 | db_file = Unicode(config=True, | |
|
106 | help="""The sqlite file in which to store notebook signatures. | |
|
107 | By default, this will be in your IPython profile. | |
|
108 | You can set it to ':memory:' to disable sqlite writing to the filesystem. | |
|
109 | """) | |
|
106 | 110 | def _db_file_default(self): |
|
107 | 111 | if self.profile_dir is None: |
|
108 | 112 | return ':memory:' |
|
109 | 113 | return os.path.join(self.profile_dir.security_dir, u'nbsignatures.db') |
|
110 | 114 | |
|
111 | 115 | # 64k entries ~ 12MB |
|
112 |
|
|
|
116 | cache_size = Integer(65535, config=True, | |
|
117 | help="""The number of notebook signatures to cache. | |
|
118 | When the number of signatures exceeds this value, | |
|
119 | the oldest 25% of signatures will be culled. | |
|
120 | """ | |
|
121 | ) | |
|
113 | 122 | db = Any() |
|
114 | 123 | def _db_default(self): |
|
115 | 124 | if sqlite3 is None: |
@@ -252,6 +261,9 b' class NotebookNotary(LoggingConfigurable):' | |||
|
252 | 261 | (datetime.utcnow(), self.algorithm, signature), |
|
253 | 262 | ) |
|
254 | 263 | self.db.commit() |
|
264 | n, = self.db.execute("SELECT Count(*) FROM nbsignatures").fetchone() | |
|
265 | if n > self.cache_size: | |
|
266 | self.cull_db() | |
|
255 | 267 | |
|
256 | 268 | def unsign(self, nb): |
|
257 | 269 | """Ensure that a notebook is untrusted |
@@ -268,10 +280,11 b' class NotebookNotary(LoggingConfigurable):' | |||
|
268 | 280 | self.db.commit() |
|
269 | 281 | |
|
270 | 282 | def cull_db(self): |
|
283 | """Cull oldest 25% of the trusted signatures when the size limit is reached""" | |
|
271 | 284 | self.db.execute("""DELETE FROM nbsignatures WHERE id IN ( |
|
272 | 285 | SELECT id FROM nbsignatures ORDER BY last_seen DESC LIMIT -1 OFFSET ? |
|
273 | 286 | ); |
|
274 |
""", (self. |
|
|
287 | """, (max(int(0.75 * self.cache_size), 1),)) | |
|
275 | 288 | |
|
276 | 289 | def mark_cells(self, nb, trusted): |
|
277 | 290 | """Mark cells as trusted if the notebook's signature can be verified |
@@ -18,7 +18,7 b' class TestNotary(TestsBase):' | |||
|
18 | 18 | self.notary = sign.NotebookNotary( |
|
19 | 19 | secret=b'secret', |
|
20 | 20 | profile_dir=get_ipython().profile_dir, |
|
21 |
db_ |
|
|
21 | db_file=':memory:' | |
|
22 | 22 | ) |
|
23 | 23 | with self.fopen(u'test3.ipynb', u'r') as f: |
|
24 | 24 | self.nb = read(f, as_version=4) |
@@ -64,41 +64,32 b' class TestNotary(TestsBase):' | |||
|
64 | 64 | # to ensure low resolution timestamps compare as expected |
|
65 | 65 | dt = 2e-3 |
|
66 | 66 | nbs = [ |
|
67 |
copy.deepcopy(self.nb) for i in range( |
|
|
67 | copy.deepcopy(self.nb) for i in range(10) | |
|
68 | 68 | ] |
|
69 | for i, nb in enumerate(nbs): | |
|
69 | for row in self.notary.db.execute("SELECT * FROM nbsignatures"): | |
|
70 | print(row) | |
|
71 | self.notary.cache_size = 8 | |
|
72 | for i, nb in enumerate(nbs[:8]): | |
|
70 | 73 | nb.metadata.dirty = i |
|
71 | 74 | self.notary.sign(nb) |
|
72 | 75 | |
|
73 | for i, nb in enumerate(nbs): | |
|
76 | for i, nb in enumerate(nbs[:8]): | |
|
74 | 77 | time.sleep(dt) |
|
75 | 78 | self.assertTrue(self.notary.check_signature(nb), 'nb %i is trusted' % i) |
|
76 | 79 | |
|
77 | self.notary.db_size_limit = 2 | |
|
78 | self.notary.cull_db() | |
|
79 | ||
|
80 | # expect all but last two signatures to be culled | |
|
81 | self.assertEqual( | |
|
82 |
|
|
|
83 | [False] * (len(nbs) - 2) + [True] * 2 | |
|
84 | ) | |
|
85 | ||
|
86 | # sign them all again | |
|
87 | for nb in nbs: | |
|
88 | time.sleep(dt) | |
|
89 |
|
|
|
90 | ||
|
91 | # checking front two marks them as newest for next cull instead of oldest | |
|
92 | time.sleep(dt) | |
|
93 | self.notary.check_signature(nbs[0]) | |
|
94 | self.notary.check_signature(nbs[1]) | |
|
95 | self.notary.cull_db() | |
|
96 | ||
|
97 | self.assertEqual( | |
|
98 | [self.notary.check_signature(nb) for nb in nbs], | |
|
99 | [True] * 2 + [False] * (len(nbs) - 2) | |
|
100 | ) | |
|
101 | ||
|
80 | # signing the 9th triggers culling of first 3 | |
|
81 | # (75% of 8 = 6, 9 - 6 = 3 culled) | |
|
82 | self.notary.sign(nbs[8]) | |
|
83 | self.assertFalse(self.notary.check_signature(nbs[0])) | |
|
84 | self.assertFalse(self.notary.check_signature(nbs[1])) | |
|
85 | self.assertFalse(self.notary.check_signature(nbs[2])) | |
|
86 | self.assertTrue(self.notary.check_signature(nbs[3])) | |
|
87 | # checking nb3 should keep it from being culled: | |
|
88 | self.notary.sign(nbs[0]) | |
|
89 | self.notary.sign(nbs[1]) | |
|
90 | self.notary.sign(nbs[2]) | |
|
91 | self.assertTrue(self.notary.check_signature(nbs[3])) | |
|
92 | self.assertFalse(self.notary.check_signature(nbs[4])) | |
|
102 | 93 | |
|
103 | 94 | def test_check_signature(self): |
|
104 | 95 | nb = self.nb |
General Comments 0
You need to be logged in to leave comments.
Login now