From c2ddffd0c56f1edfa108eabd79c309ee981acfb5 2014-12-17 22:27:37 From: Min RK Date: 2014-12-17 22:27:37 Subject: [PATCH] automatically cull oldest 25% of signatures when cache size is reached --- diff --git a/IPython/nbformat/sign.py b/IPython/nbformat/sign.py index 39154f4..e9e7adc 100644 --- a/IPython/nbformat/sign.py +++ b/IPython/nbformat/sign.py @@ -102,14 +102,23 @@ class NotebookNotary(LoggingConfigurable): app.initialize(argv=[]) return app.profile_dir - db_file = Unicode(config=True) + db_file = Unicode(config=True, + help="""The sqlite file in which to store notebook signatures. + By default, this will be in your IPython profile. + You can set it to ':memory:' to disable sqlite writing to the filesystem. + """) def _db_file_default(self): if self.profile_dir is None: return ':memory:' return os.path.join(self.profile_dir.security_dir, u'nbsignatures.db') # 64k entries ~ 12MB - db_size_limit = Integer(65535, config=True) + cache_size = Integer(65535, config=True, + help="""The number of notebook signatures to cache. + When the number of signatures exceeds this value, + the oldest 25% of signatures will be culled. + """ + ) db = Any() def _db_default(self): if sqlite3 is None: @@ -252,6 +261,9 @@ class NotebookNotary(LoggingConfigurable): (datetime.utcnow(), self.algorithm, signature), ) self.db.commit() + n, = self.db.execute("SELECT Count(*) FROM nbsignatures").fetchone() + if n > self.cache_size: + self.cull_db() def unsign(self, nb): """Ensure that a notebook is untrusted @@ -268,10 +280,11 @@ class NotebookNotary(LoggingConfigurable): self.db.commit() def cull_db(self): + """Cull oldest 25% of the trusted signatures when the size limit is reached""" self.db.execute("""DELETE FROM nbsignatures WHERE id IN ( SELECT id FROM nbsignatures ORDER BY last_seen DESC LIMIT -1 OFFSET ? ); - """, (self.db_size_limit,)) + """, (max(int(0.75 * self.cache_size), 1),)) def mark_cells(self, nb, trusted): """Mark cells as trusted if the notebook's signature can be verified diff --git a/IPython/nbformat/tests/test_sign.py b/IPython/nbformat/tests/test_sign.py index 005d1a0..21b4f99 100644 --- a/IPython/nbformat/tests/test_sign.py +++ b/IPython/nbformat/tests/test_sign.py @@ -18,7 +18,7 @@ class TestNotary(TestsBase): self.notary = sign.NotebookNotary( secret=b'secret', profile_dir=get_ipython().profile_dir, - db_url=':memory:' + db_file=':memory:' ) with self.fopen(u'test3.ipynb', u'r') as f: self.nb = read(f, as_version=4) @@ -64,41 +64,32 @@ class TestNotary(TestsBase): # to ensure low resolution timestamps compare as expected dt = 2e-3 nbs = [ - copy.deepcopy(self.nb) for i in range(5) + copy.deepcopy(self.nb) for i in range(10) ] - for i, nb in enumerate(nbs): + for row in self.notary.db.execute("SELECT * FROM nbsignatures"): + print(row) + self.notary.cache_size = 8 + for i, nb in enumerate(nbs[:8]): nb.metadata.dirty = i self.notary.sign(nb) - for i, nb in enumerate(nbs): + for i, nb in enumerate(nbs[:8]): time.sleep(dt) self.assertTrue(self.notary.check_signature(nb), 'nb %i is trusted' % i) - self.notary.db_size_limit = 2 - self.notary.cull_db() - - # expect all but last two signatures to be culled - self.assertEqual( - [self.notary.check_signature(nb) for nb in nbs], - [False] * (len(nbs) - 2) + [True] * 2 - ) - - # sign them all again - for nb in nbs: - time.sleep(dt) - self.notary.sign(nb) - - # checking front two marks them as newest for next cull instead of oldest - time.sleep(dt) - self.notary.check_signature(nbs[0]) - self.notary.check_signature(nbs[1]) - self.notary.cull_db() - - self.assertEqual( - [self.notary.check_signature(nb) for nb in nbs], - [True] * 2 + [False] * (len(nbs) - 2) - ) - + # signing the 9th triggers culling of first 3 + # (75% of 8 = 6, 9 - 6 = 3 culled) + self.notary.sign(nbs[8]) + self.assertFalse(self.notary.check_signature(nbs[0])) + self.assertFalse(self.notary.check_signature(nbs[1])) + self.assertFalse(self.notary.check_signature(nbs[2])) + self.assertTrue(self.notary.check_signature(nbs[3])) + # checking nb3 should keep it from being culled: + self.notary.sign(nbs[0]) + self.notary.sign(nbs[1]) + self.notary.sign(nbs[2]) + self.assertTrue(self.notary.check_signature(nbs[3])) + self.assertFalse(self.notary.check_signature(nbs[4])) def test_check_signature(self): nb = self.nb