diff --git a/IPython/nbformat/sign.py b/IPython/nbformat/sign.py index fb7b426..39154f4 100644 --- a/IPython/nbformat/sign.py +++ b/IPython/nbformat/sign.py @@ -1,18 +1,27 @@ -"""Functions for signing notebooks""" +"""Utilities for signing notebooks""" # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. import base64 from contextlib import contextmanager +from datetime import datetime import hashlib from hmac import HMAC import io import os +try: + import sqlite3 +except ImportError: + try: + from pysqlite2 import dbapi2 as sqlite3 + except ImportError: + sqlite3 = None + from IPython.utils.io import atomic_writing -from IPython.utils.py3compat import string_types, unicode_type, cast_bytes -from IPython.utils.traitlets import Instance, Bytes, Enum, Any, Unicode, Bool +from IPython.utils.py3compat import unicode_type, cast_bytes +from IPython.utils.traitlets import Instance, Bytes, Enum, Any, Unicode, Bool, Integer from IPython.config import LoggingConfigurable, MultipleInstanceError from IPython.core.application import BaseIPythonApplication, base_flags @@ -93,6 +102,39 @@ class NotebookNotary(LoggingConfigurable): app.initialize(argv=[]) return app.profile_dir + db_file = Unicode(config=True) + def _db_file_default(self): + if self.profile_dir is None: + return ':memory:' + return os.path.join(self.profile_dir.security_dir, u'nbsignatures.db') + + # 64k entries ~ 12MB + db_size_limit = Integer(65535, config=True) + db = Any() + def _db_default(self): + if sqlite3 is None: + self.log.warn("Missing SQLite3, all notebooks will be untrusted!") + return + kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES) + db = sqlite3.connect(self.db_file, **kwargs) + self.init_db(db) + return db + + def init_db(self, db): + db.execute(""" + CREATE TABLE IF NOT EXISTS nbsignatures + ( + id integer PRIMARY KEY AUTOINCREMENT, + algorithm text, + signature text, + path text, + last_seen timestamp + )""") + db.execute(""" + CREATE INDEX IF NOT EXISTS algosig ON nbsignatures(algorithm, signature) + """) + db.commit() + algorithm = Enum(algorithms, default_value='sha256', config=True, help="""The hashing algorithm used to sign notebooks.""" ) @@ -168,28 +210,68 @@ class NotebookNotary(LoggingConfigurable): """ if nb.nbformat < 3: return False - stored_signature = nb['metadata'].get('signature', None) - if not stored_signature \ - or not isinstance(stored_signature, string_types) \ - or ':' not in stored_signature: + if self.db is None: return False - stored_algo, sig = stored_signature.split(':', 1) - if self.algorithm != stored_algo: + signature = self.compute_signature(nb) + r = self.db.execute("""SELECT id FROM nbsignatures WHERE + algorithm = ? AND + signature = ?; + """, (self.algorithm, signature)).fetchone() + if r is None: return False - my_signature = self.compute_signature(nb) - return my_signature == sig + self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE + algorithm = ? AND + signature = ?; + """, + (datetime.utcnow(), self.algorithm, signature), + ) + self.db.commit() + return True def sign(self, nb): - """Sign a notebook, indicating that its output is trusted - - stores 'algo:hmac-hexdigest' in notebook.metadata.signature + """Sign a notebook, indicating that its output is trusted on this machine - e.g. 'sha256:deadbeef123...' + Stores hash algorithm and hmac digest in a local database of trusted notebooks. """ if nb.nbformat < 3: return signature = self.compute_signature(nb) - nb['metadata']['signature'] = "%s:%s" % (self.algorithm, signature) + self.store_signature(signature, nb) + + def store_signature(self, signature, nb): + if self.db is None: + return + self.db.execute("""INSERT OR IGNORE INTO nbsignatures + (algorithm, signature, last_seen) VALUES (?, ?, ?)""", + (self.algorithm, signature, datetime.utcnow()) + ) + self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE + algorithm = ? AND + signature = ?; + """, + (datetime.utcnow(), self.algorithm, signature), + ) + self.db.commit() + + def unsign(self, nb): + """Ensure that a notebook is untrusted + + by removing its signature from the trusted database, if present. + """ + signature = self.compute_signature(nb) + self.db.execute("""DELETE FROM nbsignatures WHERE + algorithm = ? AND + signature = ?; + """, + (self.algorithm, signature) + ) + self.db.commit() + + def cull_db(self): + self.db.execute("""DELETE FROM nbsignatures WHERE id IN ( + SELECT id FROM nbsignatures ORDER BY last_seen DESC LIMIT -1 OFFSET ? + ); + """, (self.db_size_limit,)) def mark_cells(self, nb, trusted): """Mark cells as trusted if the notebook's signature can be verified @@ -222,11 +304,9 @@ class NotebookNotary(LoggingConfigurable): # explicitly safe output if nbformat_version >= 4: - safe = {'text/plain', 'image/png', 'image/jpeg'} unsafe_output_types = ['execute_result', 'display_data'] safe_keys = {"output_type", "execution_count", "metadata"} else: # v3 - safe = {'text', 'png', 'jpeg'} unsafe_output_types = ['pyout', 'display_data'] safe_keys = {"output_type", "prompt_number", "metadata"} diff --git a/IPython/nbformat/tests/test_sign.py b/IPython/nbformat/tests/test_sign.py index 2516343..005d1a0 100644 --- a/IPython/nbformat/tests/test_sign.py +++ b/IPython/nbformat/tests/test_sign.py @@ -3,6 +3,9 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. +import copy +import time + from .base import TestsBase from IPython.nbformat import read, sign @@ -14,7 +17,8 @@ class TestNotary(TestsBase): def setUp(self): self.notary = sign.NotebookNotary( secret=b'secret', - profile_dir=get_ipython().profile_dir + profile_dir=get_ipython().profile_dir, + db_url=':memory:' ) with self.fopen(u'test3.ipynb', u'r') as f: self.nb = read(f, as_version=4) @@ -25,10 +29,7 @@ class TestNotary(TestsBase): last_sig = '' for algo in sign.algorithms: self.notary.algorithm = algo - self.notary.sign(self.nb) - sig = self.nb.metadata.signature - print(sig) - self.assertEqual(sig[:len(self.notary.algorithm)+1], '%s:' % self.notary.algorithm) + sig = self.notary.compute_signature(self.nb) self.assertNotEqual(last_sig, sig) last_sig = sig @@ -46,9 +47,58 @@ class TestNotary(TestsBase): self.assertNotEqual(sig1, sig2) def test_sign(self): + self.assertFalse(self.notary.check_signature(self.nb)) + self.notary.sign(self.nb) + self.assertTrue(self.notary.check_signature(self.nb)) + + def test_unsign(self): self.notary.sign(self.nb) - sig = self.nb.metadata.signature - self.assertEqual(sig[:len(self.notary.algorithm)+1], '%s:' % self.notary.algorithm) + self.assertTrue(self.notary.check_signature(self.nb)) + self.notary.unsign(self.nb) + self.assertFalse(self.notary.check_signature(self.nb)) + self.notary.unsign(self.nb) + self.assertFalse(self.notary.check_signature(self.nb)) + + def test_cull_db(self): + # this test has various sleeps of 2ms + # to ensure low resolution timestamps compare as expected + dt = 2e-3 + nbs = [ + copy.deepcopy(self.nb) for i in range(5) + ] + for i, nb in enumerate(nbs): + nb.metadata.dirty = i + self.notary.sign(nb) + + for i, nb in enumerate(nbs): + time.sleep(dt) + self.assertTrue(self.notary.check_signature(nb), 'nb %i is trusted' % i) + + self.notary.db_size_limit = 2 + self.notary.cull_db() + + # expect all but last two signatures to be culled + self.assertEqual( + [self.notary.check_signature(nb) for nb in nbs], + [False] * (len(nbs) - 2) + [True] * 2 + ) + + # sign them all again + for nb in nbs: + time.sleep(dt) + self.notary.sign(nb) + + # checking front two marks them as newest for next cull instead of oldest + time.sleep(dt) + self.notary.check_signature(nbs[0]) + self.notary.check_signature(nbs[1]) + self.notary.cull_db() + + self.assertEqual( + [self.notary.check_signature(nb) for nb in nbs], + [True] * 2 + [False] * (len(nbs) - 2) + ) + def test_check_signature(self): nb = self.nb diff --git a/IPython/nbformat/v4/convert.py b/IPython/nbformat/v4/convert.py index c559bdf..1e9e65e 100644 --- a/IPython/nbformat/v4/convert.py +++ b/IPython/nbformat/v4/convert.py @@ -56,6 +56,7 @@ def upgrade(nb, from_version=3, from_minor=0): cells.append(upgrade_cell(cell)) # upgrade metadata nb.metadata.pop('name', '') + nb.metadata.pop('signature', '') # Validate the converted notebook before returning it _warn_if_invalid(nb, nbformat) return nb diff --git a/IPython/nbformat/v4/nbformat.v4.schema.json b/IPython/nbformat/v4/nbformat.v4.schema.json index 365a08a..e90b0f3 100644 --- a/IPython/nbformat/v4/nbformat.v4.schema.json +++ b/IPython/nbformat/v4/nbformat.v4.schema.json @@ -55,10 +55,6 @@ } } }, - "signature": { - "description": "Hash of the notebook.", - "type": "string" - }, "orig_nbformat": { "description": "Original notebook format (major number) before converting the notebook between versions. This should never be written to a file.", "type": "integer", diff --git a/IPython/nbformat/v4/rwbase.py b/IPython/nbformat/v4/rwbase.py index 68b81e0..9a1ca50 100644 --- a/IPython/nbformat/v4/rwbase.py +++ b/IPython/nbformat/v4/rwbase.py @@ -64,6 +64,7 @@ def strip_transient(nb): """ nb.metadata.pop('orig_nbformat', None) nb.metadata.pop('orig_nbformat_minor', None) + nb.metadata.pop('signature', None) for cell in nb.cells: cell.metadata.pop('trusted', None) return nb