##// END OF EJS Templates
don't store signatures in notebooks...
Min RK -
Show More
@@ -1,18 +1,27 b''
1 """Functions for signing notebooks"""
1 """Utilities for signing notebooks"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import base64
7 7 from contextlib import contextmanager
8 from datetime import datetime
8 9 import hashlib
9 10 from hmac import HMAC
10 11 import io
11 12 import os
12 13
14 try:
15 import sqlite3
16 except ImportError:
17 try:
18 from pysqlite2 import dbapi2 as sqlite3
19 except ImportError:
20 sqlite3 = None
21
13 22 from IPython.utils.io import atomic_writing
14 from IPython.utils.py3compat import string_types, unicode_type, cast_bytes
15 from IPython.utils.traitlets import Instance, Bytes, Enum, Any, Unicode, Bool
23 from IPython.utils.py3compat import unicode_type, cast_bytes
24 from IPython.utils.traitlets import Instance, Bytes, Enum, Any, Unicode, Bool, Integer
16 25 from IPython.config import LoggingConfigurable, MultipleInstanceError
17 26 from IPython.core.application import BaseIPythonApplication, base_flags
18 27
@@ -93,6 +102,39 b' class NotebookNotary(LoggingConfigurable):'
93 102 app.initialize(argv=[])
94 103 return app.profile_dir
95 104
105 db_file = Unicode(config=True)
106 def _db_file_default(self):
107 if self.profile_dir is None:
108 return ':memory:'
109 return os.path.join(self.profile_dir.security_dir, u'nbsignatures.db')
110
111 # 64k entries ~ 12MB
112 db_size_limit = Integer(65535, config=True)
113 db = Any()
114 def _db_default(self):
115 if sqlite3 is None:
116 self.log.warn("Missing SQLite3, all notebooks will be untrusted!")
117 return
118 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
119 db = sqlite3.connect(self.db_file, **kwargs)
120 self.init_db(db)
121 return db
122
123 def init_db(self, db):
124 db.execute("""
125 CREATE TABLE IF NOT EXISTS nbsignatures
126 (
127 id integer PRIMARY KEY AUTOINCREMENT,
128 algorithm text,
129 signature text,
130 path text,
131 last_seen timestamp
132 )""")
133 db.execute("""
134 CREATE INDEX IF NOT EXISTS algosig ON nbsignatures(algorithm, signature)
135 """)
136 db.commit()
137
96 138 algorithm = Enum(algorithms, default_value='sha256', config=True,
97 139 help="""The hashing algorithm used to sign notebooks."""
98 140 )
@@ -168,28 +210,68 b' class NotebookNotary(LoggingConfigurable):'
168 210 """
169 211 if nb.nbformat < 3:
170 212 return False
171 stored_signature = nb['metadata'].get('signature', None)
172 if not stored_signature \
173 or not isinstance(stored_signature, string_types) \
174 or ':' not in stored_signature:
213 if self.db is None:
175 214 return False
176 stored_algo, sig = stored_signature.split(':', 1)
177 if self.algorithm != stored_algo:
215 signature = self.compute_signature(nb)
216 r = self.db.execute("""SELECT id FROM nbsignatures WHERE
217 algorithm = ? AND
218 signature = ?;
219 """, (self.algorithm, signature)).fetchone()
220 if r is None:
178 221 return False
179 my_signature = self.compute_signature(nb)
180 return my_signature == sig
222 self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
223 algorithm = ? AND
224 signature = ?;
225 """,
226 (datetime.utcnow(), self.algorithm, signature),
227 )
228 self.db.commit()
229 return True
181 230
182 231 def sign(self, nb):
183 """Sign a notebook, indicating that its output is trusted
184
185 stores 'algo:hmac-hexdigest' in notebook.metadata.signature
232 """Sign a notebook, indicating that its output is trusted on this machine
186 233
187 e.g. 'sha256:deadbeef123...'
234 Stores hash algorithm and hmac digest in a local database of trusted notebooks.
188 235 """
189 236 if nb.nbformat < 3:
190 237 return
191 238 signature = self.compute_signature(nb)
192 nb['metadata']['signature'] = "%s:%s" % (self.algorithm, signature)
239 self.store_signature(signature, nb)
240
241 def store_signature(self, signature, nb):
242 if self.db is None:
243 return
244 self.db.execute("""INSERT OR IGNORE INTO nbsignatures
245 (algorithm, signature, last_seen) VALUES (?, ?, ?)""",
246 (self.algorithm, signature, datetime.utcnow())
247 )
248 self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
249 algorithm = ? AND
250 signature = ?;
251 """,
252 (datetime.utcnow(), self.algorithm, signature),
253 )
254 self.db.commit()
255
256 def unsign(self, nb):
257 """Ensure that a notebook is untrusted
258
259 by removing its signature from the trusted database, if present.
260 """
261 signature = self.compute_signature(nb)
262 self.db.execute("""DELETE FROM nbsignatures WHERE
263 algorithm = ? AND
264 signature = ?;
265 """,
266 (self.algorithm, signature)
267 )
268 self.db.commit()
269
270 def cull_db(self):
271 self.db.execute("""DELETE FROM nbsignatures WHERE id IN (
272 SELECT id FROM nbsignatures ORDER BY last_seen DESC LIMIT -1 OFFSET ?
273 );
274 """, (self.db_size_limit,))
193 275
194 276 def mark_cells(self, nb, trusted):
195 277 """Mark cells as trusted if the notebook's signature can be verified
@@ -222,11 +304,9 b' class NotebookNotary(LoggingConfigurable):'
222 304
223 305 # explicitly safe output
224 306 if nbformat_version >= 4:
225 safe = {'text/plain', 'image/png', 'image/jpeg'}
226 307 unsafe_output_types = ['execute_result', 'display_data']
227 308 safe_keys = {"output_type", "execution_count", "metadata"}
228 309 else: # v3
229 safe = {'text', 'png', 'jpeg'}
230 310 unsafe_output_types = ['pyout', 'display_data']
231 311 safe_keys = {"output_type", "prompt_number", "metadata"}
232 312
@@ -3,6 +3,9 b''
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 import copy
7 import time
8
6 9 from .base import TestsBase
7 10
8 11 from IPython.nbformat import read, sign
@@ -14,7 +17,8 b' class TestNotary(TestsBase):'
14 17 def setUp(self):
15 18 self.notary = sign.NotebookNotary(
16 19 secret=b'secret',
17 profile_dir=get_ipython().profile_dir
20 profile_dir=get_ipython().profile_dir,
21 db_url=':memory:'
18 22 )
19 23 with self.fopen(u'test3.ipynb', u'r') as f:
20 24 self.nb = read(f, as_version=4)
@@ -25,10 +29,7 b' class TestNotary(TestsBase):'
25 29 last_sig = ''
26 30 for algo in sign.algorithms:
27 31 self.notary.algorithm = algo
28 self.notary.sign(self.nb)
29 sig = self.nb.metadata.signature
30 print(sig)
31 self.assertEqual(sig[:len(self.notary.algorithm)+1], '%s:' % self.notary.algorithm)
32 sig = self.notary.compute_signature(self.nb)
32 33 self.assertNotEqual(last_sig, sig)
33 34 last_sig = sig
34 35
@@ -46,9 +47,58 b' class TestNotary(TestsBase):'
46 47 self.assertNotEqual(sig1, sig2)
47 48
48 49 def test_sign(self):
50 self.assertFalse(self.notary.check_signature(self.nb))
51 self.notary.sign(self.nb)
52 self.assertTrue(self.notary.check_signature(self.nb))
53
54 def test_unsign(self):
49 55 self.notary.sign(self.nb)
50 sig = self.nb.metadata.signature
51 self.assertEqual(sig[:len(self.notary.algorithm)+1], '%s:' % self.notary.algorithm)
56 self.assertTrue(self.notary.check_signature(self.nb))
57 self.notary.unsign(self.nb)
58 self.assertFalse(self.notary.check_signature(self.nb))
59 self.notary.unsign(self.nb)
60 self.assertFalse(self.notary.check_signature(self.nb))
61
62 def test_cull_db(self):
63 # this test has various sleeps of 2ms
64 # to ensure low resolution timestamps compare as expected
65 dt = 2e-3
66 nbs = [
67 copy.deepcopy(self.nb) for i in range(5)
68 ]
69 for i, nb in enumerate(nbs):
70 nb.metadata.dirty = i
71 self.notary.sign(nb)
72
73 for i, nb in enumerate(nbs):
74 time.sleep(dt)
75 self.assertTrue(self.notary.check_signature(nb), 'nb %i is trusted' % i)
76
77 self.notary.db_size_limit = 2
78 self.notary.cull_db()
79
80 # expect all but last two signatures to be culled
81 self.assertEqual(
82 [self.notary.check_signature(nb) for nb in nbs],
83 [False] * (len(nbs) - 2) + [True] * 2
84 )
85
86 # sign them all again
87 for nb in nbs:
88 time.sleep(dt)
89 self.notary.sign(nb)
90
91 # checking front two marks them as newest for next cull instead of oldest
92 time.sleep(dt)
93 self.notary.check_signature(nbs[0])
94 self.notary.check_signature(nbs[1])
95 self.notary.cull_db()
96
97 self.assertEqual(
98 [self.notary.check_signature(nb) for nb in nbs],
99 [True] * 2 + [False] * (len(nbs) - 2)
100 )
101
52 102
53 103 def test_check_signature(self):
54 104 nb = self.nb
@@ -56,6 +56,7 b' def upgrade(nb, from_version=3, from_minor=0):'
56 56 cells.append(upgrade_cell(cell))
57 57 # upgrade metadata
58 58 nb.metadata.pop('name', '')
59 nb.metadata.pop('signature', '')
59 60 # Validate the converted notebook before returning it
60 61 _warn_if_invalid(nb, nbformat)
61 62 return nb
@@ -55,10 +55,6 b''
55 55 }
56 56 }
57 57 },
58 "signature": {
59 "description": "Hash of the notebook.",
60 "type": "string"
61 },
62 58 "orig_nbformat": {
63 59 "description": "Original notebook format (major number) before converting the notebook between versions. This should never be written to a file.",
64 60 "type": "integer",
@@ -64,6 +64,7 b' def strip_transient(nb):'
64 64 """
65 65 nb.metadata.pop('orig_nbformat', None)
66 66 nb.metadata.pop('orig_nbformat_minor', None)
67 nb.metadata.pop('signature', None)
67 68 for cell in nb.cells:
68 69 cell.metadata.pop('trusted', None)
69 70 return nb
General Comments 0
You need to be logged in to leave comments. Login now