##// END OF EJS Templates
clear signature db on reset
Min RK -
Show More
@@ -1,424 +1,427 b''
1 1 """Utilities for signing notebooks"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import base64
7 7 from contextlib import contextmanager
8 8 from datetime import datetime
9 9 import hashlib
10 10 from hmac import HMAC
11 11 import io
12 12 import os
13 13
14 14 try:
15 15 import sqlite3
16 16 except ImportError:
17 17 try:
18 18 from pysqlite2 import dbapi2 as sqlite3
19 19 except ImportError:
20 20 sqlite3 = None
21 21
22 22 from IPython.utils.io import atomic_writing
23 23 from IPython.utils.py3compat import unicode_type, cast_bytes
24 24 from IPython.utils.traitlets import Instance, Bytes, Enum, Any, Unicode, Bool, Integer
25 25 from IPython.config import LoggingConfigurable, MultipleInstanceError
26 26 from IPython.core.application import BaseIPythonApplication, base_flags
27 27
28 28 from . import read, write, NO_CONVERT
29 29
30 30 try:
31 31 # Python 3
32 32 algorithms = hashlib.algorithms_guaranteed
33 33 except AttributeError:
34 34 algorithms = hashlib.algorithms
35 35
36 36
37 37 def yield_everything(obj):
38 38 """Yield every item in a container as bytes
39 39
40 40 Allows any JSONable object to be passed to an HMAC digester
41 41 without having to serialize the whole thing.
42 42 """
43 43 if isinstance(obj, dict):
44 44 for key in sorted(obj):
45 45 value = obj[key]
46 46 yield cast_bytes(key)
47 47 for b in yield_everything(value):
48 48 yield b
49 49 elif isinstance(obj, (list, tuple)):
50 50 for element in obj:
51 51 for b in yield_everything(element):
52 52 yield b
53 53 elif isinstance(obj, unicode_type):
54 54 yield obj.encode('utf8')
55 55 else:
56 56 yield unicode_type(obj).encode('utf8')
57 57
58 58 def yield_code_cells(nb):
59 59 """Iterator that yields all cells in a notebook
60 60
61 61 nbformat version independent
62 62 """
63 63 if nb.nbformat >= 4:
64 64 for cell in nb['cells']:
65 65 if cell['cell_type'] == 'code':
66 66 yield cell
67 67 elif nb.nbformat == 3:
68 68 for ws in nb['worksheets']:
69 69 for cell in ws['cells']:
70 70 if cell['cell_type'] == 'code':
71 71 yield cell
72 72
73 73 @contextmanager
74 74 def signature_removed(nb):
75 75 """Context manager for operating on a notebook with its signature removed
76 76
77 77 Used for excluding the previous signature when computing a notebook's signature.
78 78 """
79 79 save_signature = nb['metadata'].pop('signature', None)
80 80 try:
81 81 yield
82 82 finally:
83 83 if save_signature is not None:
84 84 nb['metadata']['signature'] = save_signature
85 85
86 86
87 87 class NotebookNotary(LoggingConfigurable):
88 88 """A class for computing and verifying notebook signatures."""
89 89
90 90 profile_dir = Instance("IPython.core.profiledir.ProfileDir")
91 91 def _profile_dir_default(self):
92 92 from IPython.core.application import BaseIPythonApplication
93 93 app = None
94 94 try:
95 95 if BaseIPythonApplication.initialized():
96 96 app = BaseIPythonApplication.instance()
97 97 except MultipleInstanceError:
98 98 pass
99 99 if app is None:
100 100 # create an app, without the global instance
101 101 app = BaseIPythonApplication()
102 102 app.initialize(argv=[])
103 103 return app.profile_dir
104 104
105 105 db_file = Unicode(config=True,
106 106 help="""The sqlite file in which to store notebook signatures.
107 107 By default, this will be in your IPython profile.
108 108 You can set it to ':memory:' to disable sqlite writing to the filesystem.
109 109 """)
110 110 def _db_file_default(self):
111 111 if self.profile_dir is None:
112 112 return ':memory:'
113 113 return os.path.join(self.profile_dir.security_dir, u'nbsignatures.db')
114 114
115 115 # 64k entries ~ 12MB
116 116 cache_size = Integer(65535, config=True,
117 117 help="""The number of notebook signatures to cache.
118 118 When the number of signatures exceeds this value,
119 119 the oldest 25% of signatures will be culled.
120 120 """
121 121 )
122 122 db = Any()
123 123 def _db_default(self):
124 124 if sqlite3 is None:
125 125 self.log.warn("Missing SQLite3, all notebooks will be untrusted!")
126 126 return
127 127 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
128 128 db = sqlite3.connect(self.db_file, **kwargs)
129 129 self.init_db(db)
130 130 return db
131 131
132 132 def init_db(self, db):
133 133 db.execute("""
134 134 CREATE TABLE IF NOT EXISTS nbsignatures
135 135 (
136 136 id integer PRIMARY KEY AUTOINCREMENT,
137 137 algorithm text,
138 138 signature text,
139 139 path text,
140 140 last_seen timestamp
141 141 )""")
142 142 db.execute("""
143 143 CREATE INDEX IF NOT EXISTS algosig ON nbsignatures(algorithm, signature)
144 144 """)
145 145 db.commit()
146 146
147 147 algorithm = Enum(algorithms, default_value='sha256', config=True,
148 148 help="""The hashing algorithm used to sign notebooks."""
149 149 )
150 150 def _algorithm_changed(self, name, old, new):
151 151 self.digestmod = getattr(hashlib, self.algorithm)
152 152
153 153 digestmod = Any()
154 154 def _digestmod_default(self):
155 155 return getattr(hashlib, self.algorithm)
156 156
157 157 secret_file = Unicode(config=True,
158 158 help="""The file where the secret key is stored."""
159 159 )
160 160 def _secret_file_default(self):
161 161 if self.profile_dir is None:
162 162 return ''
163 163 return os.path.join(self.profile_dir.security_dir, 'notebook_secret')
164 164
165 165 secret = Bytes(config=True,
166 166 help="""The secret key with which notebooks are signed."""
167 167 )
168 168 def _secret_default(self):
169 169 # note : this assumes an Application is running
170 170 if os.path.exists(self.secret_file):
171 171 with io.open(self.secret_file, 'rb') as f:
172 172 return f.read()
173 173 else:
174 174 secret = base64.encodestring(os.urandom(1024))
175 175 self._write_secret_file(secret)
176 176 return secret
177 177
178 178 def _write_secret_file(self, secret):
179 179 """write my secret to my secret_file"""
180 180 self.log.info("Writing notebook-signing key to %s", self.secret_file)
181 181 with io.open(self.secret_file, 'wb') as f:
182 182 f.write(secret)
183 183 try:
184 184 os.chmod(self.secret_file, 0o600)
185 185 except OSError:
186 186 self.log.warn(
187 187 "Could not set permissions on %s",
188 188 self.secret_file
189 189 )
190 190 return secret
191 191
192 192 def compute_signature(self, nb):
193 193 """Compute a notebook's signature
194 194
195 195 by hashing the entire contents of the notebook via HMAC digest.
196 196 """
197 197 hmac = HMAC(self.secret, digestmod=self.digestmod)
198 198 # don't include the previous hash in the content to hash
199 199 with signature_removed(nb):
200 200 # sign the whole thing
201 201 for b in yield_everything(nb):
202 202 hmac.update(b)
203 203
204 204 return hmac.hexdigest()
205 205
206 206 def check_signature(self, nb):
207 207 """Check a notebook's stored signature
208 208
209 209 If a signature is stored in the notebook's metadata,
210 210 a new signature is computed and compared with the stored value.
211 211
212 212 Returns True if the signature is found and matches, False otherwise.
213 213
214 214 The following conditions must all be met for a notebook to be trusted:
215 215 - a signature is stored in the form 'scheme:hexdigest'
216 216 - the stored scheme matches the requested scheme
217 217 - the requested scheme is available from hashlib
218 218 - the computed hash from notebook_signature matches the stored hash
219 219 """
220 220 if nb.nbformat < 3:
221 221 return False
222 222 if self.db is None:
223 223 return False
224 224 signature = self.compute_signature(nb)
225 225 r = self.db.execute("""SELECT id FROM nbsignatures WHERE
226 226 algorithm = ? AND
227 227 signature = ?;
228 228 """, (self.algorithm, signature)).fetchone()
229 229 if r is None:
230 230 return False
231 231 self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
232 232 algorithm = ? AND
233 233 signature = ?;
234 234 """,
235 235 (datetime.utcnow(), self.algorithm, signature),
236 236 )
237 237 self.db.commit()
238 238 return True
239 239
240 240 def sign(self, nb):
241 241 """Sign a notebook, indicating that its output is trusted on this machine
242 242
243 243 Stores hash algorithm and hmac digest in a local database of trusted notebooks.
244 244 """
245 245 if nb.nbformat < 3:
246 246 return
247 247 signature = self.compute_signature(nb)
248 248 self.store_signature(signature, nb)
249 249
250 250 def store_signature(self, signature, nb):
251 251 if self.db is None:
252 252 return
253 253 self.db.execute("""INSERT OR IGNORE INTO nbsignatures
254 254 (algorithm, signature, last_seen) VALUES (?, ?, ?)""",
255 255 (self.algorithm, signature, datetime.utcnow())
256 256 )
257 257 self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
258 258 algorithm = ? AND
259 259 signature = ?;
260 260 """,
261 261 (datetime.utcnow(), self.algorithm, signature),
262 262 )
263 263 self.db.commit()
264 264 n, = self.db.execute("SELECT Count(*) FROM nbsignatures").fetchone()
265 265 if n > self.cache_size:
266 266 self.cull_db()
267 267
268 268 def unsign(self, nb):
269 269 """Ensure that a notebook is untrusted
270 270
271 271 by removing its signature from the trusted database, if present.
272 272 """
273 273 signature = self.compute_signature(nb)
274 274 self.db.execute("""DELETE FROM nbsignatures WHERE
275 275 algorithm = ? AND
276 276 signature = ?;
277 277 """,
278 278 (self.algorithm, signature)
279 279 )
280 280 self.db.commit()
281 281
282 282 def cull_db(self):
283 283 """Cull oldest 25% of the trusted signatures when the size limit is reached"""
284 284 self.db.execute("""DELETE FROM nbsignatures WHERE id IN (
285 285 SELECT id FROM nbsignatures ORDER BY last_seen DESC LIMIT -1 OFFSET ?
286 286 );
287 287 """, (max(int(0.75 * self.cache_size), 1),))
288 288
289 289 def mark_cells(self, nb, trusted):
290 290 """Mark cells as trusted if the notebook's signature can be verified
291 291
292 292 Sets ``cell.metadata.trusted = True | False`` on all code cells,
293 293 depending on whether the stored signature can be verified.
294 294
295 295 This function is the inverse of check_cells
296 296 """
297 297 if nb.nbformat < 3:
298 298 return
299 299
300 300 for cell in yield_code_cells(nb):
301 301 cell['metadata']['trusted'] = trusted
302 302
303 303 def _check_cell(self, cell, nbformat_version):
304 304 """Do we trust an individual cell?
305 305
306 306 Return True if:
307 307
308 308 - cell is explicitly trusted
309 309 - cell has no potentially unsafe rich output
310 310
311 311 If a cell has no output, or only simple print statements,
312 312 it will always be trusted.
313 313 """
314 314 # explicitly trusted
315 315 if cell['metadata'].pop("trusted", False):
316 316 return True
317 317
318 318 # explicitly safe output
319 319 if nbformat_version >= 4:
320 320 unsafe_output_types = ['execute_result', 'display_data']
321 321 safe_keys = {"output_type", "execution_count", "metadata"}
322 322 else: # v3
323 323 unsafe_output_types = ['pyout', 'display_data']
324 324 safe_keys = {"output_type", "prompt_number", "metadata"}
325 325
326 326 for output in cell['outputs']:
327 327 output_type = output['output_type']
328 328 if output_type in unsafe_output_types:
329 329 # if there are any data keys not in the safe whitelist
330 330 output_keys = set(output)
331 331 if output_keys.difference(safe_keys):
332 332 return False
333 333
334 334 return True
335 335
336 336 def check_cells(self, nb):
337 337 """Return whether all code cells are trusted
338 338
339 339 If there are no code cells, return True.
340 340
341 341 This function is the inverse of mark_cells.
342 342 """
343 343 if nb.nbformat < 3:
344 344 return False
345 345 trusted = True
346 346 for cell in yield_code_cells(nb):
347 347 # only distrust a cell if it actually has some output to distrust
348 348 if not self._check_cell(cell, nb.nbformat):
349 349 trusted = False
350 350
351 351 return trusted
352 352
353 353
354 354 trust_flags = {
355 355 'reset' : (
356 356 {'TrustNotebookApp' : { 'reset' : True}},
357 """Generate a new key for notebook signature.
357 """Delete the trusted notebook cache.
358 358 All previously signed notebooks will become untrusted.
359 359 """
360 360 ),
361 361 }
362 362 trust_flags.update(base_flags)
363 363 trust_flags.pop('init')
364 364
365 365
366 366 class TrustNotebookApp(BaseIPythonApplication):
367 367
368 368 description="""Sign one or more IPython notebooks with your key,
369 369 to trust their dynamic (HTML, Javascript) output.
370 370
371 371 Trusting a notebook only applies to the current IPython profile.
372 372 To trust a notebook for use with a profile other than default,
373 373 add `--profile [profile name]`.
374 374
375 375 Otherwise, you will have to re-execute the notebook to see output.
376 376 """
377 377
378 378 examples = """
379 379 ipython trust mynotebook.ipynb and_this_one.ipynb
380 380 ipython trust --profile myprofile mynotebook.ipynb
381 381 """
382 382
383 383 flags = trust_flags
384 384
385 385 reset = Bool(False, config=True,
386 help="""If True, generate a new key for notebook signature.
386 help="""If True, delete the trusted signature cache.
387 387 After reset, all previously signed notebooks will become untrusted.
388 388 """
389 389 )
390 390
391 391 notary = Instance(NotebookNotary)
392 392 def _notary_default(self):
393 393 return NotebookNotary(parent=self, profile_dir=self.profile_dir)
394 394
395 395 def sign_notebook(self, notebook_path):
396 396 if not os.path.exists(notebook_path):
397 397 self.log.error("Notebook missing: %s" % notebook_path)
398 398 self.exit(1)
399 399 with io.open(notebook_path, encoding='utf8') as f:
400 400 nb = read(f, NO_CONVERT)
401 401 if self.notary.check_signature(nb):
402 402 print("Notebook already signed: %s" % notebook_path)
403 403 else:
404 404 print("Signing notebook: %s" % notebook_path)
405 405 self.notary.sign(nb)
406 406 with atomic_writing(notebook_path) as f:
407 407 write(nb, f, NO_CONVERT)
408 408
409 409 def generate_new_key(self):
410 410 """Generate a new notebook signature key"""
411 411 print("Generating new notebook key: %s" % self.notary.secret_file)
412 412 self.notary._write_secret_file(os.urandom(1024))
413 413
414 414 def start(self):
415 415 if self.reset:
416 if os.path.exists(self.notary.db_file):
417 print("Removing trusted signature cache: %s" % self.notary.db_file)
418 os.remove(self.notary.db_file)
416 419 self.generate_new_key()
417 420 return
418 421 if not self.extra_args:
419 422 self.log.critical("Specify at least one notebook to sign.")
420 423 self.exit(1)
421 424
422 425 for notebook_path in self.extra_args:
423 426 self.sign_notebook(notebook_path)
424 427
@@ -1,191 +1,191 b''
1 1 """Test Notebook signing"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import copy
7 7 import time
8 8
9 9 from .base import TestsBase
10 10
11 11 from IPython.nbformat import read, sign
12 12 from IPython.core.getipython import get_ipython
13 13
14 14
15 15 class TestNotary(TestsBase):
16 16
17 17 def setUp(self):
18 18 self.notary = sign.NotebookNotary(
19 db_file=':memory:',
19 20 secret=b'secret',
20 21 profile_dir=get_ipython().profile_dir,
21 db_file=':memory:'
22 22 )
23 23 with self.fopen(u'test3.ipynb', u'r') as f:
24 24 self.nb = read(f, as_version=4)
25 25 with self.fopen(u'test3.ipynb', u'r') as f:
26 26 self.nb3 = read(f, as_version=3)
27 27
28 28 def test_algorithms(self):
29 29 last_sig = ''
30 30 for algo in sign.algorithms:
31 31 self.notary.algorithm = algo
32 32 sig = self.notary.compute_signature(self.nb)
33 33 self.assertNotEqual(last_sig, sig)
34 34 last_sig = sig
35 35
36 36 def test_sign_same(self):
37 37 """Multiple signatures of the same notebook are the same"""
38 38 sig1 = self.notary.compute_signature(self.nb)
39 39 sig2 = self.notary.compute_signature(self.nb)
40 40 self.assertEqual(sig1, sig2)
41 41
42 42 def test_change_secret(self):
43 43 """Changing the secret changes the signature"""
44 44 sig1 = self.notary.compute_signature(self.nb)
45 45 self.notary.secret = b'different'
46 46 sig2 = self.notary.compute_signature(self.nb)
47 47 self.assertNotEqual(sig1, sig2)
48 48
49 49 def test_sign(self):
50 50 self.assertFalse(self.notary.check_signature(self.nb))
51 51 self.notary.sign(self.nb)
52 52 self.assertTrue(self.notary.check_signature(self.nb))
53 53
54 54 def test_unsign(self):
55 55 self.notary.sign(self.nb)
56 56 self.assertTrue(self.notary.check_signature(self.nb))
57 57 self.notary.unsign(self.nb)
58 58 self.assertFalse(self.notary.check_signature(self.nb))
59 59 self.notary.unsign(self.nb)
60 60 self.assertFalse(self.notary.check_signature(self.nb))
61 61
62 62 def test_cull_db(self):
63 63 # this test has various sleeps of 2ms
64 64 # to ensure low resolution timestamps compare as expected
65 65 dt = 2e-3
66 66 nbs = [
67 67 copy.deepcopy(self.nb) for i in range(10)
68 68 ]
69 69 for row in self.notary.db.execute("SELECT * FROM nbsignatures"):
70 70 print(row)
71 71 self.notary.cache_size = 8
72 72 for i, nb in enumerate(nbs[:8]):
73 73 nb.metadata.dirty = i
74 74 self.notary.sign(nb)
75 75
76 76 for i, nb in enumerate(nbs[:8]):
77 77 time.sleep(dt)
78 78 self.assertTrue(self.notary.check_signature(nb), 'nb %i is trusted' % i)
79 79
80 80 # signing the 9th triggers culling of first 3
81 81 # (75% of 8 = 6, 9 - 6 = 3 culled)
82 82 self.notary.sign(nbs[8])
83 83 self.assertFalse(self.notary.check_signature(nbs[0]))
84 84 self.assertFalse(self.notary.check_signature(nbs[1]))
85 85 self.assertFalse(self.notary.check_signature(nbs[2]))
86 86 self.assertTrue(self.notary.check_signature(nbs[3]))
87 87 # checking nb3 should keep it from being culled:
88 88 self.notary.sign(nbs[0])
89 89 self.notary.sign(nbs[1])
90 90 self.notary.sign(nbs[2])
91 91 self.assertTrue(self.notary.check_signature(nbs[3]))
92 92 self.assertFalse(self.notary.check_signature(nbs[4]))
93 93
94 94 def test_check_signature(self):
95 95 nb = self.nb
96 96 md = nb.metadata
97 97 notary = self.notary
98 98 check_signature = notary.check_signature
99 99 # no signature:
100 100 md.pop('signature', None)
101 101 self.assertFalse(check_signature(nb))
102 102 # hash only, no algo
103 103 md.signature = notary.compute_signature(nb)
104 104 self.assertFalse(check_signature(nb))
105 105 # proper signature, algo mismatch
106 106 notary.algorithm = 'sha224'
107 107 notary.sign(nb)
108 108 notary.algorithm = 'sha256'
109 109 self.assertFalse(check_signature(nb))
110 110 # check correctly signed notebook
111 111 notary.sign(nb)
112 112 self.assertTrue(check_signature(nb))
113 113
114 114 def test_mark_cells_untrusted(self):
115 115 cells = self.nb.cells
116 116 self.notary.mark_cells(self.nb, False)
117 117 for cell in cells:
118 118 self.assertNotIn('trusted', cell)
119 119 if cell.cell_type == 'code':
120 120 self.assertIn('trusted', cell.metadata)
121 121 self.assertFalse(cell.metadata.trusted)
122 122 else:
123 123 self.assertNotIn('trusted', cell.metadata)
124 124
125 125 def test_mark_cells_trusted(self):
126 126 cells = self.nb.cells
127 127 self.notary.mark_cells(self.nb, True)
128 128 for cell in cells:
129 129 self.assertNotIn('trusted', cell)
130 130 if cell.cell_type == 'code':
131 131 self.assertIn('trusted', cell.metadata)
132 132 self.assertTrue(cell.metadata.trusted)
133 133 else:
134 134 self.assertNotIn('trusted', cell.metadata)
135 135
136 136 def test_check_cells(self):
137 137 nb = self.nb
138 138 self.notary.mark_cells(nb, True)
139 139 self.assertTrue(self.notary.check_cells(nb))
140 140 for cell in nb.cells:
141 141 self.assertNotIn('trusted', cell)
142 142 self.notary.mark_cells(nb, False)
143 143 self.assertFalse(self.notary.check_cells(nb))
144 144 for cell in nb.cells:
145 145 self.assertNotIn('trusted', cell)
146 146
147 147 def test_trust_no_output(self):
148 148 nb = self.nb
149 149 self.notary.mark_cells(nb, False)
150 150 for cell in nb.cells:
151 151 if cell.cell_type == 'code':
152 152 cell.outputs = []
153 153 self.assertTrue(self.notary.check_cells(nb))
154 154
155 155 def test_mark_cells_untrusted_v3(self):
156 156 nb = self.nb3
157 157 cells = nb.worksheets[0].cells
158 158 self.notary.mark_cells(nb, False)
159 159 for cell in cells:
160 160 self.assertNotIn('trusted', cell)
161 161 if cell.cell_type == 'code':
162 162 self.assertIn('trusted', cell.metadata)
163 163 self.assertFalse(cell.metadata.trusted)
164 164 else:
165 165 self.assertNotIn('trusted', cell.metadata)
166 166
167 167 def test_mark_cells_trusted_v3(self):
168 168 nb = self.nb3
169 169 cells = nb.worksheets[0].cells
170 170 self.notary.mark_cells(nb, True)
171 171 for cell in cells:
172 172 self.assertNotIn('trusted', cell)
173 173 if cell.cell_type == 'code':
174 174 self.assertIn('trusted', cell.metadata)
175 175 self.assertTrue(cell.metadata.trusted)
176 176 else:
177 177 self.assertNotIn('trusted', cell.metadata)
178 178
179 179 def test_check_cells_v3(self):
180 180 nb = self.nb3
181 181 cells = nb.worksheets[0].cells
182 182 self.notary.mark_cells(nb, True)
183 183 self.assertTrue(self.notary.check_cells(nb))
184 184 for cell in cells:
185 185 self.assertNotIn('trusted', cell)
186 186 self.notary.mark_cells(nb, False)
187 187 self.assertFalse(self.notary.check_cells(nb))
188 188 for cell in cells:
189 189 self.assertNotIn('trusted', cell)
190 190
191 191
General Comments 0
You need to be logged in to leave comments. Login now