##// END OF EJS Templates
automatically cull oldest 25% of signatures...
Min RK -
Show More
@@ -1,411 +1,424 b''
1 1 """Utilities for signing notebooks"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import base64
7 7 from contextlib import contextmanager
8 8 from datetime import datetime
9 9 import hashlib
10 10 from hmac import HMAC
11 11 import io
12 12 import os
13 13
14 14 try:
15 15 import sqlite3
16 16 except ImportError:
17 17 try:
18 18 from pysqlite2 import dbapi2 as sqlite3
19 19 except ImportError:
20 20 sqlite3 = None
21 21
22 22 from IPython.utils.io import atomic_writing
23 23 from IPython.utils.py3compat import unicode_type, cast_bytes
24 24 from IPython.utils.traitlets import Instance, Bytes, Enum, Any, Unicode, Bool, Integer
25 25 from IPython.config import LoggingConfigurable, MultipleInstanceError
26 26 from IPython.core.application import BaseIPythonApplication, base_flags
27 27
28 28 from . import read, write, NO_CONVERT
29 29
30 30 try:
31 31 # Python 3
32 32 algorithms = hashlib.algorithms_guaranteed
33 33 except AttributeError:
34 34 algorithms = hashlib.algorithms
35 35
36 36
37 37 def yield_everything(obj):
38 38 """Yield every item in a container as bytes
39 39
40 40 Allows any JSONable object to be passed to an HMAC digester
41 41 without having to serialize the whole thing.
42 42 """
43 43 if isinstance(obj, dict):
44 44 for key in sorted(obj):
45 45 value = obj[key]
46 46 yield cast_bytes(key)
47 47 for b in yield_everything(value):
48 48 yield b
49 49 elif isinstance(obj, (list, tuple)):
50 50 for element in obj:
51 51 for b in yield_everything(element):
52 52 yield b
53 53 elif isinstance(obj, unicode_type):
54 54 yield obj.encode('utf8')
55 55 else:
56 56 yield unicode_type(obj).encode('utf8')
57 57
58 58 def yield_code_cells(nb):
59 59 """Iterator that yields all cells in a notebook
60 60
61 61 nbformat version independent
62 62 """
63 63 if nb.nbformat >= 4:
64 64 for cell in nb['cells']:
65 65 if cell['cell_type'] == 'code':
66 66 yield cell
67 67 elif nb.nbformat == 3:
68 68 for ws in nb['worksheets']:
69 69 for cell in ws['cells']:
70 70 if cell['cell_type'] == 'code':
71 71 yield cell
72 72
73 73 @contextmanager
74 74 def signature_removed(nb):
75 75 """Context manager for operating on a notebook with its signature removed
76 76
77 77 Used for excluding the previous signature when computing a notebook's signature.
78 78 """
79 79 save_signature = nb['metadata'].pop('signature', None)
80 80 try:
81 81 yield
82 82 finally:
83 83 if save_signature is not None:
84 84 nb['metadata']['signature'] = save_signature
85 85
86 86
87 87 class NotebookNotary(LoggingConfigurable):
88 88 """A class for computing and verifying notebook signatures."""
89 89
90 90 profile_dir = Instance("IPython.core.profiledir.ProfileDir")
91 91 def _profile_dir_default(self):
92 92 from IPython.core.application import BaseIPythonApplication
93 93 app = None
94 94 try:
95 95 if BaseIPythonApplication.initialized():
96 96 app = BaseIPythonApplication.instance()
97 97 except MultipleInstanceError:
98 98 pass
99 99 if app is None:
100 100 # create an app, without the global instance
101 101 app = BaseIPythonApplication()
102 102 app.initialize(argv=[])
103 103 return app.profile_dir
104 104
105 db_file = Unicode(config=True)
105 db_file = Unicode(config=True,
106 help="""The sqlite file in which to store notebook signatures.
107 By default, this will be in your IPython profile.
108 You can set it to ':memory:' to disable sqlite writing to the filesystem.
109 """)
106 110 def _db_file_default(self):
107 111 if self.profile_dir is None:
108 112 return ':memory:'
109 113 return os.path.join(self.profile_dir.security_dir, u'nbsignatures.db')
110 114
111 115 # 64k entries ~ 12MB
112 db_size_limit = Integer(65535, config=True)
116 cache_size = Integer(65535, config=True,
117 help="""The number of notebook signatures to cache.
118 When the number of signatures exceeds this value,
119 the oldest 25% of signatures will be culled.
120 """
121 )
113 122 db = Any()
114 123 def _db_default(self):
115 124 if sqlite3 is None:
116 125 self.log.warn("Missing SQLite3, all notebooks will be untrusted!")
117 126 return
118 127 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
119 128 db = sqlite3.connect(self.db_file, **kwargs)
120 129 self.init_db(db)
121 130 return db
122 131
123 132 def init_db(self, db):
124 133 db.execute("""
125 134 CREATE TABLE IF NOT EXISTS nbsignatures
126 135 (
127 136 id integer PRIMARY KEY AUTOINCREMENT,
128 137 algorithm text,
129 138 signature text,
130 139 path text,
131 140 last_seen timestamp
132 141 )""")
133 142 db.execute("""
134 143 CREATE INDEX IF NOT EXISTS algosig ON nbsignatures(algorithm, signature)
135 144 """)
136 145 db.commit()
137 146
138 147 algorithm = Enum(algorithms, default_value='sha256', config=True,
139 148 help="""The hashing algorithm used to sign notebooks."""
140 149 )
141 150 def _algorithm_changed(self, name, old, new):
142 151 self.digestmod = getattr(hashlib, self.algorithm)
143 152
144 153 digestmod = Any()
145 154 def _digestmod_default(self):
146 155 return getattr(hashlib, self.algorithm)
147 156
148 157 secret_file = Unicode(config=True,
149 158 help="""The file where the secret key is stored."""
150 159 )
151 160 def _secret_file_default(self):
152 161 if self.profile_dir is None:
153 162 return ''
154 163 return os.path.join(self.profile_dir.security_dir, 'notebook_secret')
155 164
156 165 secret = Bytes(config=True,
157 166 help="""The secret key with which notebooks are signed."""
158 167 )
159 168 def _secret_default(self):
160 169 # note : this assumes an Application is running
161 170 if os.path.exists(self.secret_file):
162 171 with io.open(self.secret_file, 'rb') as f:
163 172 return f.read()
164 173 else:
165 174 secret = base64.encodestring(os.urandom(1024))
166 175 self._write_secret_file(secret)
167 176 return secret
168 177
169 178 def _write_secret_file(self, secret):
170 179 """write my secret to my secret_file"""
171 180 self.log.info("Writing notebook-signing key to %s", self.secret_file)
172 181 with io.open(self.secret_file, 'wb') as f:
173 182 f.write(secret)
174 183 try:
175 184 os.chmod(self.secret_file, 0o600)
176 185 except OSError:
177 186 self.log.warn(
178 187 "Could not set permissions on %s",
179 188 self.secret_file
180 189 )
181 190 return secret
182 191
183 192 def compute_signature(self, nb):
184 193 """Compute a notebook's signature
185 194
186 195 by hashing the entire contents of the notebook via HMAC digest.
187 196 """
188 197 hmac = HMAC(self.secret, digestmod=self.digestmod)
189 198 # don't include the previous hash in the content to hash
190 199 with signature_removed(nb):
191 200 # sign the whole thing
192 201 for b in yield_everything(nb):
193 202 hmac.update(b)
194 203
195 204 return hmac.hexdigest()
196 205
197 206 def check_signature(self, nb):
198 207 """Check a notebook's stored signature
199 208
200 209 If a signature is stored in the notebook's metadata,
201 210 a new signature is computed and compared with the stored value.
202 211
203 212 Returns True if the signature is found and matches, False otherwise.
204 213
205 214 The following conditions must all be met for a notebook to be trusted:
206 215 - a signature is stored in the form 'scheme:hexdigest'
207 216 - the stored scheme matches the requested scheme
208 217 - the requested scheme is available from hashlib
209 218 - the computed hash from notebook_signature matches the stored hash
210 219 """
211 220 if nb.nbformat < 3:
212 221 return False
213 222 if self.db is None:
214 223 return False
215 224 signature = self.compute_signature(nb)
216 225 r = self.db.execute("""SELECT id FROM nbsignatures WHERE
217 226 algorithm = ? AND
218 227 signature = ?;
219 228 """, (self.algorithm, signature)).fetchone()
220 229 if r is None:
221 230 return False
222 231 self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
223 232 algorithm = ? AND
224 233 signature = ?;
225 234 """,
226 235 (datetime.utcnow(), self.algorithm, signature),
227 236 )
228 237 self.db.commit()
229 238 return True
230 239
231 240 def sign(self, nb):
232 241 """Sign a notebook, indicating that its output is trusted on this machine
233 242
234 243 Stores hash algorithm and hmac digest in a local database of trusted notebooks.
235 244 """
236 245 if nb.nbformat < 3:
237 246 return
238 247 signature = self.compute_signature(nb)
239 248 self.store_signature(signature, nb)
240 249
241 250 def store_signature(self, signature, nb):
242 251 if self.db is None:
243 252 return
244 253 self.db.execute("""INSERT OR IGNORE INTO nbsignatures
245 254 (algorithm, signature, last_seen) VALUES (?, ?, ?)""",
246 255 (self.algorithm, signature, datetime.utcnow())
247 256 )
248 257 self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
249 258 algorithm = ? AND
250 259 signature = ?;
251 260 """,
252 261 (datetime.utcnow(), self.algorithm, signature),
253 262 )
254 263 self.db.commit()
264 n, = self.db.execute("SELECT Count(*) FROM nbsignatures").fetchone()
265 if n > self.cache_size:
266 self.cull_db()
255 267
256 268 def unsign(self, nb):
257 269 """Ensure that a notebook is untrusted
258 270
259 271 by removing its signature from the trusted database, if present.
260 272 """
261 273 signature = self.compute_signature(nb)
262 274 self.db.execute("""DELETE FROM nbsignatures WHERE
263 275 algorithm = ? AND
264 276 signature = ?;
265 277 """,
266 278 (self.algorithm, signature)
267 279 )
268 280 self.db.commit()
269 281
270 282 def cull_db(self):
283 """Cull oldest 25% of the trusted signatures when the size limit is reached"""
271 284 self.db.execute("""DELETE FROM nbsignatures WHERE id IN (
272 285 SELECT id FROM nbsignatures ORDER BY last_seen DESC LIMIT -1 OFFSET ?
273 286 );
274 """, (self.db_size_limit,))
287 """, (max(int(0.75 * self.cache_size), 1),))
275 288
276 289 def mark_cells(self, nb, trusted):
277 290 """Mark cells as trusted if the notebook's signature can be verified
278 291
279 292 Sets ``cell.metadata.trusted = True | False`` on all code cells,
280 293 depending on whether the stored signature can be verified.
281 294
282 295 This function is the inverse of check_cells
283 296 """
284 297 if nb.nbformat < 3:
285 298 return
286 299
287 300 for cell in yield_code_cells(nb):
288 301 cell['metadata']['trusted'] = trusted
289 302
290 303 def _check_cell(self, cell, nbformat_version):
291 304 """Do we trust an individual cell?
292 305
293 306 Return True if:
294 307
295 308 - cell is explicitly trusted
296 309 - cell has no potentially unsafe rich output
297 310
298 311 If a cell has no output, or only simple print statements,
299 312 it will always be trusted.
300 313 """
301 314 # explicitly trusted
302 315 if cell['metadata'].pop("trusted", False):
303 316 return True
304 317
305 318 # explicitly safe output
306 319 if nbformat_version >= 4:
307 320 unsafe_output_types = ['execute_result', 'display_data']
308 321 safe_keys = {"output_type", "execution_count", "metadata"}
309 322 else: # v3
310 323 unsafe_output_types = ['pyout', 'display_data']
311 324 safe_keys = {"output_type", "prompt_number", "metadata"}
312 325
313 326 for output in cell['outputs']:
314 327 output_type = output['output_type']
315 328 if output_type in unsafe_output_types:
316 329 # if there are any data keys not in the safe whitelist
317 330 output_keys = set(output)
318 331 if output_keys.difference(safe_keys):
319 332 return False
320 333
321 334 return True
322 335
323 336 def check_cells(self, nb):
324 337 """Return whether all code cells are trusted
325 338
326 339 If there are no code cells, return True.
327 340
328 341 This function is the inverse of mark_cells.
329 342 """
330 343 if nb.nbformat < 3:
331 344 return False
332 345 trusted = True
333 346 for cell in yield_code_cells(nb):
334 347 # only distrust a cell if it actually has some output to distrust
335 348 if not self._check_cell(cell, nb.nbformat):
336 349 trusted = False
337 350
338 351 return trusted
339 352
340 353
341 354 trust_flags = {
342 355 'reset' : (
343 356 {'TrustNotebookApp' : { 'reset' : True}},
344 357 """Generate a new key for notebook signature.
345 358 All previously signed notebooks will become untrusted.
346 359 """
347 360 ),
348 361 }
349 362 trust_flags.update(base_flags)
350 363 trust_flags.pop('init')
351 364
352 365
353 366 class TrustNotebookApp(BaseIPythonApplication):
354 367
355 368 description="""Sign one or more IPython notebooks with your key,
356 369 to trust their dynamic (HTML, Javascript) output.
357 370
358 371 Trusting a notebook only applies to the current IPython profile.
359 372 To trust a notebook for use with a profile other than default,
360 373 add `--profile [profile name]`.
361 374
362 375 Otherwise, you will have to re-execute the notebook to see output.
363 376 """
364 377
365 378 examples = """
366 379 ipython trust mynotebook.ipynb and_this_one.ipynb
367 380 ipython trust --profile myprofile mynotebook.ipynb
368 381 """
369 382
370 383 flags = trust_flags
371 384
372 385 reset = Bool(False, config=True,
373 386 help="""If True, generate a new key for notebook signature.
374 387 After reset, all previously signed notebooks will become untrusted.
375 388 """
376 389 )
377 390
378 391 notary = Instance(NotebookNotary)
379 392 def _notary_default(self):
380 393 return NotebookNotary(parent=self, profile_dir=self.profile_dir)
381 394
382 395 def sign_notebook(self, notebook_path):
383 396 if not os.path.exists(notebook_path):
384 397 self.log.error("Notebook missing: %s" % notebook_path)
385 398 self.exit(1)
386 399 with io.open(notebook_path, encoding='utf8') as f:
387 400 nb = read(f, NO_CONVERT)
388 401 if self.notary.check_signature(nb):
389 402 print("Notebook already signed: %s" % notebook_path)
390 403 else:
391 404 print("Signing notebook: %s" % notebook_path)
392 405 self.notary.sign(nb)
393 406 with atomic_writing(notebook_path) as f:
394 407 write(nb, f, NO_CONVERT)
395 408
396 409 def generate_new_key(self):
397 410 """Generate a new notebook signature key"""
398 411 print("Generating new notebook key: %s" % self.notary.secret_file)
399 412 self.notary._write_secret_file(os.urandom(1024))
400 413
401 414 def start(self):
402 415 if self.reset:
403 416 self.generate_new_key()
404 417 return
405 418 if not self.extra_args:
406 419 self.log.critical("Specify at least one notebook to sign.")
407 420 self.exit(1)
408 421
409 422 for notebook_path in self.extra_args:
410 423 self.sign_notebook(notebook_path)
411 424
@@ -1,200 +1,191 b''
1 1 """Test Notebook signing"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import copy
7 7 import time
8 8
9 9 from .base import TestsBase
10 10
11 11 from IPython.nbformat import read, sign
12 12 from IPython.core.getipython import get_ipython
13 13
14 14
15 15 class TestNotary(TestsBase):
16 16
17 17 def setUp(self):
18 18 self.notary = sign.NotebookNotary(
19 19 secret=b'secret',
20 20 profile_dir=get_ipython().profile_dir,
21 db_url=':memory:'
21 db_file=':memory:'
22 22 )
23 23 with self.fopen(u'test3.ipynb', u'r') as f:
24 24 self.nb = read(f, as_version=4)
25 25 with self.fopen(u'test3.ipynb', u'r') as f:
26 26 self.nb3 = read(f, as_version=3)
27 27
28 28 def test_algorithms(self):
29 29 last_sig = ''
30 30 for algo in sign.algorithms:
31 31 self.notary.algorithm = algo
32 32 sig = self.notary.compute_signature(self.nb)
33 33 self.assertNotEqual(last_sig, sig)
34 34 last_sig = sig
35 35
36 36 def test_sign_same(self):
37 37 """Multiple signatures of the same notebook are the same"""
38 38 sig1 = self.notary.compute_signature(self.nb)
39 39 sig2 = self.notary.compute_signature(self.nb)
40 40 self.assertEqual(sig1, sig2)
41 41
42 42 def test_change_secret(self):
43 43 """Changing the secret changes the signature"""
44 44 sig1 = self.notary.compute_signature(self.nb)
45 45 self.notary.secret = b'different'
46 46 sig2 = self.notary.compute_signature(self.nb)
47 47 self.assertNotEqual(sig1, sig2)
48 48
49 49 def test_sign(self):
50 50 self.assertFalse(self.notary.check_signature(self.nb))
51 51 self.notary.sign(self.nb)
52 52 self.assertTrue(self.notary.check_signature(self.nb))
53 53
54 54 def test_unsign(self):
55 55 self.notary.sign(self.nb)
56 56 self.assertTrue(self.notary.check_signature(self.nb))
57 57 self.notary.unsign(self.nb)
58 58 self.assertFalse(self.notary.check_signature(self.nb))
59 59 self.notary.unsign(self.nb)
60 60 self.assertFalse(self.notary.check_signature(self.nb))
61 61
62 62 def test_cull_db(self):
63 63 # this test has various sleeps of 2ms
64 64 # to ensure low resolution timestamps compare as expected
65 65 dt = 2e-3
66 66 nbs = [
67 copy.deepcopy(self.nb) for i in range(5)
67 copy.deepcopy(self.nb) for i in range(10)
68 68 ]
69 for i, nb in enumerate(nbs):
69 for row in self.notary.db.execute("SELECT * FROM nbsignatures"):
70 print(row)
71 self.notary.cache_size = 8
72 for i, nb in enumerate(nbs[:8]):
70 73 nb.metadata.dirty = i
71 74 self.notary.sign(nb)
72 75
73 for i, nb in enumerate(nbs):
76 for i, nb in enumerate(nbs[:8]):
74 77 time.sleep(dt)
75 78 self.assertTrue(self.notary.check_signature(nb), 'nb %i is trusted' % i)
76 79
77 self.notary.db_size_limit = 2
78 self.notary.cull_db()
79
80 # expect all but last two signatures to be culled
81 self.assertEqual(
82 [self.notary.check_signature(nb) for nb in nbs],
83 [False] * (len(nbs) - 2) + [True] * 2
84 )
85
86 # sign them all again
87 for nb in nbs:
88 time.sleep(dt)
89 self.notary.sign(nb)
90
91 # checking front two marks them as newest for next cull instead of oldest
92 time.sleep(dt)
93 self.notary.check_signature(nbs[0])
94 self.notary.check_signature(nbs[1])
95 self.notary.cull_db()
96
97 self.assertEqual(
98 [self.notary.check_signature(nb) for nb in nbs],
99 [True] * 2 + [False] * (len(nbs) - 2)
100 )
101
80 # signing the 9th triggers culling of first 3
81 # (75% of 8 = 6, 9 - 6 = 3 culled)
82 self.notary.sign(nbs[8])
83 self.assertFalse(self.notary.check_signature(nbs[0]))
84 self.assertFalse(self.notary.check_signature(nbs[1]))
85 self.assertFalse(self.notary.check_signature(nbs[2]))
86 self.assertTrue(self.notary.check_signature(nbs[3]))
87 # checking nb3 should keep it from being culled:
88 self.notary.sign(nbs[0])
89 self.notary.sign(nbs[1])
90 self.notary.sign(nbs[2])
91 self.assertTrue(self.notary.check_signature(nbs[3]))
92 self.assertFalse(self.notary.check_signature(nbs[4]))
102 93
103 94 def test_check_signature(self):
104 95 nb = self.nb
105 96 md = nb.metadata
106 97 notary = self.notary
107 98 check_signature = notary.check_signature
108 99 # no signature:
109 100 md.pop('signature', None)
110 101 self.assertFalse(check_signature(nb))
111 102 # hash only, no algo
112 103 md.signature = notary.compute_signature(nb)
113 104 self.assertFalse(check_signature(nb))
114 105 # proper signature, algo mismatch
115 106 notary.algorithm = 'sha224'
116 107 notary.sign(nb)
117 108 notary.algorithm = 'sha256'
118 109 self.assertFalse(check_signature(nb))
119 110 # check correctly signed notebook
120 111 notary.sign(nb)
121 112 self.assertTrue(check_signature(nb))
122 113
123 114 def test_mark_cells_untrusted(self):
124 115 cells = self.nb.cells
125 116 self.notary.mark_cells(self.nb, False)
126 117 for cell in cells:
127 118 self.assertNotIn('trusted', cell)
128 119 if cell.cell_type == 'code':
129 120 self.assertIn('trusted', cell.metadata)
130 121 self.assertFalse(cell.metadata.trusted)
131 122 else:
132 123 self.assertNotIn('trusted', cell.metadata)
133 124
134 125 def test_mark_cells_trusted(self):
135 126 cells = self.nb.cells
136 127 self.notary.mark_cells(self.nb, True)
137 128 for cell in cells:
138 129 self.assertNotIn('trusted', cell)
139 130 if cell.cell_type == 'code':
140 131 self.assertIn('trusted', cell.metadata)
141 132 self.assertTrue(cell.metadata.trusted)
142 133 else:
143 134 self.assertNotIn('trusted', cell.metadata)
144 135
145 136 def test_check_cells(self):
146 137 nb = self.nb
147 138 self.notary.mark_cells(nb, True)
148 139 self.assertTrue(self.notary.check_cells(nb))
149 140 for cell in nb.cells:
150 141 self.assertNotIn('trusted', cell)
151 142 self.notary.mark_cells(nb, False)
152 143 self.assertFalse(self.notary.check_cells(nb))
153 144 for cell in nb.cells:
154 145 self.assertNotIn('trusted', cell)
155 146
156 147 def test_trust_no_output(self):
157 148 nb = self.nb
158 149 self.notary.mark_cells(nb, False)
159 150 for cell in nb.cells:
160 151 if cell.cell_type == 'code':
161 152 cell.outputs = []
162 153 self.assertTrue(self.notary.check_cells(nb))
163 154
164 155 def test_mark_cells_untrusted_v3(self):
165 156 nb = self.nb3
166 157 cells = nb.worksheets[0].cells
167 158 self.notary.mark_cells(nb, False)
168 159 for cell in cells:
169 160 self.assertNotIn('trusted', cell)
170 161 if cell.cell_type == 'code':
171 162 self.assertIn('trusted', cell.metadata)
172 163 self.assertFalse(cell.metadata.trusted)
173 164 else:
174 165 self.assertNotIn('trusted', cell.metadata)
175 166
176 167 def test_mark_cells_trusted_v3(self):
177 168 nb = self.nb3
178 169 cells = nb.worksheets[0].cells
179 170 self.notary.mark_cells(nb, True)
180 171 for cell in cells:
181 172 self.assertNotIn('trusted', cell)
182 173 if cell.cell_type == 'code':
183 174 self.assertIn('trusted', cell.metadata)
184 175 self.assertTrue(cell.metadata.trusted)
185 176 else:
186 177 self.assertNotIn('trusted', cell.metadata)
187 178
188 179 def test_check_cells_v3(self):
189 180 nb = self.nb3
190 181 cells = nb.worksheets[0].cells
191 182 self.notary.mark_cells(nb, True)
192 183 self.assertTrue(self.notary.check_cells(nb))
193 184 for cell in cells:
194 185 self.assertNotIn('trusted', cell)
195 186 self.notary.mark_cells(nb, False)
196 187 self.assertFalse(self.notary.check_cells(nb))
197 188 for cell in cells:
198 189 self.assertNotIn('trusted', cell)
199 190
200 191
General Comments 0
You need to be logged in to leave comments. Login now