##// END OF EJS Templates
clear signature db on reset
Min RK -
Show More
@@ -1,424 +1,427 b''
1 """Utilities for signing notebooks"""
1 """Utilities for signing notebooks"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import base64
6 import base64
7 from contextlib import contextmanager
7 from contextlib import contextmanager
8 from datetime import datetime
8 from datetime import datetime
9 import hashlib
9 import hashlib
10 from hmac import HMAC
10 from hmac import HMAC
11 import io
11 import io
12 import os
12 import os
13
13
14 try:
14 try:
15 import sqlite3
15 import sqlite3
16 except ImportError:
16 except ImportError:
17 try:
17 try:
18 from pysqlite2 import dbapi2 as sqlite3
18 from pysqlite2 import dbapi2 as sqlite3
19 except ImportError:
19 except ImportError:
20 sqlite3 = None
20 sqlite3 = None
21
21
22 from IPython.utils.io import atomic_writing
22 from IPython.utils.io import atomic_writing
23 from IPython.utils.py3compat import unicode_type, cast_bytes
23 from IPython.utils.py3compat import unicode_type, cast_bytes
24 from IPython.utils.traitlets import Instance, Bytes, Enum, Any, Unicode, Bool, Integer
24 from IPython.utils.traitlets import Instance, Bytes, Enum, Any, Unicode, Bool, Integer
25 from IPython.config import LoggingConfigurable, MultipleInstanceError
25 from IPython.config import LoggingConfigurable, MultipleInstanceError
26 from IPython.core.application import BaseIPythonApplication, base_flags
26 from IPython.core.application import BaseIPythonApplication, base_flags
27
27
28 from . import read, write, NO_CONVERT
28 from . import read, write, NO_CONVERT
29
29
30 try:
30 try:
31 # Python 3
31 # Python 3
32 algorithms = hashlib.algorithms_guaranteed
32 algorithms = hashlib.algorithms_guaranteed
33 except AttributeError:
33 except AttributeError:
34 algorithms = hashlib.algorithms
34 algorithms = hashlib.algorithms
35
35
36
36
37 def yield_everything(obj):
37 def yield_everything(obj):
38 """Yield every item in a container as bytes
38 """Yield every item in a container as bytes
39
39
40 Allows any JSONable object to be passed to an HMAC digester
40 Allows any JSONable object to be passed to an HMAC digester
41 without having to serialize the whole thing.
41 without having to serialize the whole thing.
42 """
42 """
43 if isinstance(obj, dict):
43 if isinstance(obj, dict):
44 for key in sorted(obj):
44 for key in sorted(obj):
45 value = obj[key]
45 value = obj[key]
46 yield cast_bytes(key)
46 yield cast_bytes(key)
47 for b in yield_everything(value):
47 for b in yield_everything(value):
48 yield b
48 yield b
49 elif isinstance(obj, (list, tuple)):
49 elif isinstance(obj, (list, tuple)):
50 for element in obj:
50 for element in obj:
51 for b in yield_everything(element):
51 for b in yield_everything(element):
52 yield b
52 yield b
53 elif isinstance(obj, unicode_type):
53 elif isinstance(obj, unicode_type):
54 yield obj.encode('utf8')
54 yield obj.encode('utf8')
55 else:
55 else:
56 yield unicode_type(obj).encode('utf8')
56 yield unicode_type(obj).encode('utf8')
57
57
58 def yield_code_cells(nb):
58 def yield_code_cells(nb):
59 """Iterator that yields all cells in a notebook
59 """Iterator that yields all cells in a notebook
60
60
61 nbformat version independent
61 nbformat version independent
62 """
62 """
63 if nb.nbformat >= 4:
63 if nb.nbformat >= 4:
64 for cell in nb['cells']:
64 for cell in nb['cells']:
65 if cell['cell_type'] == 'code':
65 if cell['cell_type'] == 'code':
66 yield cell
66 yield cell
67 elif nb.nbformat == 3:
67 elif nb.nbformat == 3:
68 for ws in nb['worksheets']:
68 for ws in nb['worksheets']:
69 for cell in ws['cells']:
69 for cell in ws['cells']:
70 if cell['cell_type'] == 'code':
70 if cell['cell_type'] == 'code':
71 yield cell
71 yield cell
72
72
73 @contextmanager
73 @contextmanager
74 def signature_removed(nb):
74 def signature_removed(nb):
75 """Context manager for operating on a notebook with its signature removed
75 """Context manager for operating on a notebook with its signature removed
76
76
77 Used for excluding the previous signature when computing a notebook's signature.
77 Used for excluding the previous signature when computing a notebook's signature.
78 """
78 """
79 save_signature = nb['metadata'].pop('signature', None)
79 save_signature = nb['metadata'].pop('signature', None)
80 try:
80 try:
81 yield
81 yield
82 finally:
82 finally:
83 if save_signature is not None:
83 if save_signature is not None:
84 nb['metadata']['signature'] = save_signature
84 nb['metadata']['signature'] = save_signature
85
85
86
86
87 class NotebookNotary(LoggingConfigurable):
87 class NotebookNotary(LoggingConfigurable):
88 """A class for computing and verifying notebook signatures."""
88 """A class for computing and verifying notebook signatures."""
89
89
90 profile_dir = Instance("IPython.core.profiledir.ProfileDir")
90 profile_dir = Instance("IPython.core.profiledir.ProfileDir")
91 def _profile_dir_default(self):
91 def _profile_dir_default(self):
92 from IPython.core.application import BaseIPythonApplication
92 from IPython.core.application import BaseIPythonApplication
93 app = None
93 app = None
94 try:
94 try:
95 if BaseIPythonApplication.initialized():
95 if BaseIPythonApplication.initialized():
96 app = BaseIPythonApplication.instance()
96 app = BaseIPythonApplication.instance()
97 except MultipleInstanceError:
97 except MultipleInstanceError:
98 pass
98 pass
99 if app is None:
99 if app is None:
100 # create an app, without the global instance
100 # create an app, without the global instance
101 app = BaseIPythonApplication()
101 app = BaseIPythonApplication()
102 app.initialize(argv=[])
102 app.initialize(argv=[])
103 return app.profile_dir
103 return app.profile_dir
104
104
105 db_file = Unicode(config=True,
105 db_file = Unicode(config=True,
106 help="""The sqlite file in which to store notebook signatures.
106 help="""The sqlite file in which to store notebook signatures.
107 By default, this will be in your IPython profile.
107 By default, this will be in your IPython profile.
108 You can set it to ':memory:' to disable sqlite writing to the filesystem.
108 You can set it to ':memory:' to disable sqlite writing to the filesystem.
109 """)
109 """)
110 def _db_file_default(self):
110 def _db_file_default(self):
111 if self.profile_dir is None:
111 if self.profile_dir is None:
112 return ':memory:'
112 return ':memory:'
113 return os.path.join(self.profile_dir.security_dir, u'nbsignatures.db')
113 return os.path.join(self.profile_dir.security_dir, u'nbsignatures.db')
114
114
115 # 64k entries ~ 12MB
115 # 64k entries ~ 12MB
116 cache_size = Integer(65535, config=True,
116 cache_size = Integer(65535, config=True,
117 help="""The number of notebook signatures to cache.
117 help="""The number of notebook signatures to cache.
118 When the number of signatures exceeds this value,
118 When the number of signatures exceeds this value,
119 the oldest 25% of signatures will be culled.
119 the oldest 25% of signatures will be culled.
120 """
120 """
121 )
121 )
122 db = Any()
122 db = Any()
123 def _db_default(self):
123 def _db_default(self):
124 if sqlite3 is None:
124 if sqlite3 is None:
125 self.log.warn("Missing SQLite3, all notebooks will be untrusted!")
125 self.log.warn("Missing SQLite3, all notebooks will be untrusted!")
126 return
126 return
127 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
127 kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
128 db = sqlite3.connect(self.db_file, **kwargs)
128 db = sqlite3.connect(self.db_file, **kwargs)
129 self.init_db(db)
129 self.init_db(db)
130 return db
130 return db
131
131
132 def init_db(self, db):
132 def init_db(self, db):
133 db.execute("""
133 db.execute("""
134 CREATE TABLE IF NOT EXISTS nbsignatures
134 CREATE TABLE IF NOT EXISTS nbsignatures
135 (
135 (
136 id integer PRIMARY KEY AUTOINCREMENT,
136 id integer PRIMARY KEY AUTOINCREMENT,
137 algorithm text,
137 algorithm text,
138 signature text,
138 signature text,
139 path text,
139 path text,
140 last_seen timestamp
140 last_seen timestamp
141 )""")
141 )""")
142 db.execute("""
142 db.execute("""
143 CREATE INDEX IF NOT EXISTS algosig ON nbsignatures(algorithm, signature)
143 CREATE INDEX IF NOT EXISTS algosig ON nbsignatures(algorithm, signature)
144 """)
144 """)
145 db.commit()
145 db.commit()
146
146
147 algorithm = Enum(algorithms, default_value='sha256', config=True,
147 algorithm = Enum(algorithms, default_value='sha256', config=True,
148 help="""The hashing algorithm used to sign notebooks."""
148 help="""The hashing algorithm used to sign notebooks."""
149 )
149 )
150 def _algorithm_changed(self, name, old, new):
150 def _algorithm_changed(self, name, old, new):
151 self.digestmod = getattr(hashlib, self.algorithm)
151 self.digestmod = getattr(hashlib, self.algorithm)
152
152
153 digestmod = Any()
153 digestmod = Any()
154 def _digestmod_default(self):
154 def _digestmod_default(self):
155 return getattr(hashlib, self.algorithm)
155 return getattr(hashlib, self.algorithm)
156
156
157 secret_file = Unicode(config=True,
157 secret_file = Unicode(config=True,
158 help="""The file where the secret key is stored."""
158 help="""The file where the secret key is stored."""
159 )
159 )
160 def _secret_file_default(self):
160 def _secret_file_default(self):
161 if self.profile_dir is None:
161 if self.profile_dir is None:
162 return ''
162 return ''
163 return os.path.join(self.profile_dir.security_dir, 'notebook_secret')
163 return os.path.join(self.profile_dir.security_dir, 'notebook_secret')
164
164
165 secret = Bytes(config=True,
165 secret = Bytes(config=True,
166 help="""The secret key with which notebooks are signed."""
166 help="""The secret key with which notebooks are signed."""
167 )
167 )
168 def _secret_default(self):
168 def _secret_default(self):
169 # note : this assumes an Application is running
169 # note : this assumes an Application is running
170 if os.path.exists(self.secret_file):
170 if os.path.exists(self.secret_file):
171 with io.open(self.secret_file, 'rb') as f:
171 with io.open(self.secret_file, 'rb') as f:
172 return f.read()
172 return f.read()
173 else:
173 else:
174 secret = base64.encodestring(os.urandom(1024))
174 secret = base64.encodestring(os.urandom(1024))
175 self._write_secret_file(secret)
175 self._write_secret_file(secret)
176 return secret
176 return secret
177
177
178 def _write_secret_file(self, secret):
178 def _write_secret_file(self, secret):
179 """write my secret to my secret_file"""
179 """write my secret to my secret_file"""
180 self.log.info("Writing notebook-signing key to %s", self.secret_file)
180 self.log.info("Writing notebook-signing key to %s", self.secret_file)
181 with io.open(self.secret_file, 'wb') as f:
181 with io.open(self.secret_file, 'wb') as f:
182 f.write(secret)
182 f.write(secret)
183 try:
183 try:
184 os.chmod(self.secret_file, 0o600)
184 os.chmod(self.secret_file, 0o600)
185 except OSError:
185 except OSError:
186 self.log.warn(
186 self.log.warn(
187 "Could not set permissions on %s",
187 "Could not set permissions on %s",
188 self.secret_file
188 self.secret_file
189 )
189 )
190 return secret
190 return secret
191
191
192 def compute_signature(self, nb):
192 def compute_signature(self, nb):
193 """Compute a notebook's signature
193 """Compute a notebook's signature
194
194
195 by hashing the entire contents of the notebook via HMAC digest.
195 by hashing the entire contents of the notebook via HMAC digest.
196 """
196 """
197 hmac = HMAC(self.secret, digestmod=self.digestmod)
197 hmac = HMAC(self.secret, digestmod=self.digestmod)
198 # don't include the previous hash in the content to hash
198 # don't include the previous hash in the content to hash
199 with signature_removed(nb):
199 with signature_removed(nb):
200 # sign the whole thing
200 # sign the whole thing
201 for b in yield_everything(nb):
201 for b in yield_everything(nb):
202 hmac.update(b)
202 hmac.update(b)
203
203
204 return hmac.hexdigest()
204 return hmac.hexdigest()
205
205
206 def check_signature(self, nb):
206 def check_signature(self, nb):
207 """Check a notebook's stored signature
207 """Check a notebook's stored signature
208
208
209 If a signature is stored in the notebook's metadata,
209 If a signature is stored in the notebook's metadata,
210 a new signature is computed and compared with the stored value.
210 a new signature is computed and compared with the stored value.
211
211
212 Returns True if the signature is found and matches, False otherwise.
212 Returns True if the signature is found and matches, False otherwise.
213
213
214 The following conditions must all be met for a notebook to be trusted:
214 The following conditions must all be met for a notebook to be trusted:
215 - a signature is stored in the form 'scheme:hexdigest'
215 - a signature is stored in the form 'scheme:hexdigest'
216 - the stored scheme matches the requested scheme
216 - the stored scheme matches the requested scheme
217 - the requested scheme is available from hashlib
217 - the requested scheme is available from hashlib
218 - the computed hash from notebook_signature matches the stored hash
218 - the computed hash from notebook_signature matches the stored hash
219 """
219 """
220 if nb.nbformat < 3:
220 if nb.nbformat < 3:
221 return False
221 return False
222 if self.db is None:
222 if self.db is None:
223 return False
223 return False
224 signature = self.compute_signature(nb)
224 signature = self.compute_signature(nb)
225 r = self.db.execute("""SELECT id FROM nbsignatures WHERE
225 r = self.db.execute("""SELECT id FROM nbsignatures WHERE
226 algorithm = ? AND
226 algorithm = ? AND
227 signature = ?;
227 signature = ?;
228 """, (self.algorithm, signature)).fetchone()
228 """, (self.algorithm, signature)).fetchone()
229 if r is None:
229 if r is None:
230 return False
230 return False
231 self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
231 self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
232 algorithm = ? AND
232 algorithm = ? AND
233 signature = ?;
233 signature = ?;
234 """,
234 """,
235 (datetime.utcnow(), self.algorithm, signature),
235 (datetime.utcnow(), self.algorithm, signature),
236 )
236 )
237 self.db.commit()
237 self.db.commit()
238 return True
238 return True
239
239
240 def sign(self, nb):
240 def sign(self, nb):
241 """Sign a notebook, indicating that its output is trusted on this machine
241 """Sign a notebook, indicating that its output is trusted on this machine
242
242
243 Stores hash algorithm and hmac digest in a local database of trusted notebooks.
243 Stores hash algorithm and hmac digest in a local database of trusted notebooks.
244 """
244 """
245 if nb.nbformat < 3:
245 if nb.nbformat < 3:
246 return
246 return
247 signature = self.compute_signature(nb)
247 signature = self.compute_signature(nb)
248 self.store_signature(signature, nb)
248 self.store_signature(signature, nb)
249
249
250 def store_signature(self, signature, nb):
250 def store_signature(self, signature, nb):
251 if self.db is None:
251 if self.db is None:
252 return
252 return
253 self.db.execute("""INSERT OR IGNORE INTO nbsignatures
253 self.db.execute("""INSERT OR IGNORE INTO nbsignatures
254 (algorithm, signature, last_seen) VALUES (?, ?, ?)""",
254 (algorithm, signature, last_seen) VALUES (?, ?, ?)""",
255 (self.algorithm, signature, datetime.utcnow())
255 (self.algorithm, signature, datetime.utcnow())
256 )
256 )
257 self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
257 self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
258 algorithm = ? AND
258 algorithm = ? AND
259 signature = ?;
259 signature = ?;
260 """,
260 """,
261 (datetime.utcnow(), self.algorithm, signature),
261 (datetime.utcnow(), self.algorithm, signature),
262 )
262 )
263 self.db.commit()
263 self.db.commit()
264 n, = self.db.execute("SELECT Count(*) FROM nbsignatures").fetchone()
264 n, = self.db.execute("SELECT Count(*) FROM nbsignatures").fetchone()
265 if n > self.cache_size:
265 if n > self.cache_size:
266 self.cull_db()
266 self.cull_db()
267
267
268 def unsign(self, nb):
268 def unsign(self, nb):
269 """Ensure that a notebook is untrusted
269 """Ensure that a notebook is untrusted
270
270
271 by removing its signature from the trusted database, if present.
271 by removing its signature from the trusted database, if present.
272 """
272 """
273 signature = self.compute_signature(nb)
273 signature = self.compute_signature(nb)
274 self.db.execute("""DELETE FROM nbsignatures WHERE
274 self.db.execute("""DELETE FROM nbsignatures WHERE
275 algorithm = ? AND
275 algorithm = ? AND
276 signature = ?;
276 signature = ?;
277 """,
277 """,
278 (self.algorithm, signature)
278 (self.algorithm, signature)
279 )
279 )
280 self.db.commit()
280 self.db.commit()
281
281
282 def cull_db(self):
282 def cull_db(self):
283 """Cull oldest 25% of the trusted signatures when the size limit is reached"""
283 """Cull oldest 25% of the trusted signatures when the size limit is reached"""
284 self.db.execute("""DELETE FROM nbsignatures WHERE id IN (
284 self.db.execute("""DELETE FROM nbsignatures WHERE id IN (
285 SELECT id FROM nbsignatures ORDER BY last_seen DESC LIMIT -1 OFFSET ?
285 SELECT id FROM nbsignatures ORDER BY last_seen DESC LIMIT -1 OFFSET ?
286 );
286 );
287 """, (max(int(0.75 * self.cache_size), 1),))
287 """, (max(int(0.75 * self.cache_size), 1),))
288
288
289 def mark_cells(self, nb, trusted):
289 def mark_cells(self, nb, trusted):
290 """Mark cells as trusted if the notebook's signature can be verified
290 """Mark cells as trusted if the notebook's signature can be verified
291
291
292 Sets ``cell.metadata.trusted = True | False`` on all code cells,
292 Sets ``cell.metadata.trusted = True | False`` on all code cells,
293 depending on whether the stored signature can be verified.
293 depending on whether the stored signature can be verified.
294
294
295 This function is the inverse of check_cells
295 This function is the inverse of check_cells
296 """
296 """
297 if nb.nbformat < 3:
297 if nb.nbformat < 3:
298 return
298 return
299
299
300 for cell in yield_code_cells(nb):
300 for cell in yield_code_cells(nb):
301 cell['metadata']['trusted'] = trusted
301 cell['metadata']['trusted'] = trusted
302
302
303 def _check_cell(self, cell, nbformat_version):
303 def _check_cell(self, cell, nbformat_version):
304 """Do we trust an individual cell?
304 """Do we trust an individual cell?
305
305
306 Return True if:
306 Return True if:
307
307
308 - cell is explicitly trusted
308 - cell is explicitly trusted
309 - cell has no potentially unsafe rich output
309 - cell has no potentially unsafe rich output
310
310
311 If a cell has no output, or only simple print statements,
311 If a cell has no output, or only simple print statements,
312 it will always be trusted.
312 it will always be trusted.
313 """
313 """
314 # explicitly trusted
314 # explicitly trusted
315 if cell['metadata'].pop("trusted", False):
315 if cell['metadata'].pop("trusted", False):
316 return True
316 return True
317
317
318 # explicitly safe output
318 # explicitly safe output
319 if nbformat_version >= 4:
319 if nbformat_version >= 4:
320 unsafe_output_types = ['execute_result', 'display_data']
320 unsafe_output_types = ['execute_result', 'display_data']
321 safe_keys = {"output_type", "execution_count", "metadata"}
321 safe_keys = {"output_type", "execution_count", "metadata"}
322 else: # v3
322 else: # v3
323 unsafe_output_types = ['pyout', 'display_data']
323 unsafe_output_types = ['pyout', 'display_data']
324 safe_keys = {"output_type", "prompt_number", "metadata"}
324 safe_keys = {"output_type", "prompt_number", "metadata"}
325
325
326 for output in cell['outputs']:
326 for output in cell['outputs']:
327 output_type = output['output_type']
327 output_type = output['output_type']
328 if output_type in unsafe_output_types:
328 if output_type in unsafe_output_types:
329 # if there are any data keys not in the safe whitelist
329 # if there are any data keys not in the safe whitelist
330 output_keys = set(output)
330 output_keys = set(output)
331 if output_keys.difference(safe_keys):
331 if output_keys.difference(safe_keys):
332 return False
332 return False
333
333
334 return True
334 return True
335
335
336 def check_cells(self, nb):
336 def check_cells(self, nb):
337 """Return whether all code cells are trusted
337 """Return whether all code cells are trusted
338
338
339 If there are no code cells, return True.
339 If there are no code cells, return True.
340
340
341 This function is the inverse of mark_cells.
341 This function is the inverse of mark_cells.
342 """
342 """
343 if nb.nbformat < 3:
343 if nb.nbformat < 3:
344 return False
344 return False
345 trusted = True
345 trusted = True
346 for cell in yield_code_cells(nb):
346 for cell in yield_code_cells(nb):
347 # only distrust a cell if it actually has some output to distrust
347 # only distrust a cell if it actually has some output to distrust
348 if not self._check_cell(cell, nb.nbformat):
348 if not self._check_cell(cell, nb.nbformat):
349 trusted = False
349 trusted = False
350
350
351 return trusted
351 return trusted
352
352
353
353
354 trust_flags = {
354 trust_flags = {
355 'reset' : (
355 'reset' : (
356 {'TrustNotebookApp' : { 'reset' : True}},
356 {'TrustNotebookApp' : { 'reset' : True}},
357 """Generate a new key for notebook signature.
357 """Delete the trusted notebook cache.
358 All previously signed notebooks will become untrusted.
358 All previously signed notebooks will become untrusted.
359 """
359 """
360 ),
360 ),
361 }
361 }
362 trust_flags.update(base_flags)
362 trust_flags.update(base_flags)
363 trust_flags.pop('init')
363 trust_flags.pop('init')
364
364
365
365
366 class TrustNotebookApp(BaseIPythonApplication):
366 class TrustNotebookApp(BaseIPythonApplication):
367
367
368 description="""Sign one or more IPython notebooks with your key,
368 description="""Sign one or more IPython notebooks with your key,
369 to trust their dynamic (HTML, Javascript) output.
369 to trust their dynamic (HTML, Javascript) output.
370
370
371 Trusting a notebook only applies to the current IPython profile.
371 Trusting a notebook only applies to the current IPython profile.
372 To trust a notebook for use with a profile other than default,
372 To trust a notebook for use with a profile other than default,
373 add `--profile [profile name]`.
373 add `--profile [profile name]`.
374
374
375 Otherwise, you will have to re-execute the notebook to see output.
375 Otherwise, you will have to re-execute the notebook to see output.
376 """
376 """
377
377
378 examples = """
378 examples = """
379 ipython trust mynotebook.ipynb and_this_one.ipynb
379 ipython trust mynotebook.ipynb and_this_one.ipynb
380 ipython trust --profile myprofile mynotebook.ipynb
380 ipython trust --profile myprofile mynotebook.ipynb
381 """
381 """
382
382
383 flags = trust_flags
383 flags = trust_flags
384
384
385 reset = Bool(False, config=True,
385 reset = Bool(False, config=True,
386 help="""If True, generate a new key for notebook signature.
386 help="""If True, delete the trusted signature cache.
387 After reset, all previously signed notebooks will become untrusted.
387 After reset, all previously signed notebooks will become untrusted.
388 """
388 """
389 )
389 )
390
390
391 notary = Instance(NotebookNotary)
391 notary = Instance(NotebookNotary)
392 def _notary_default(self):
392 def _notary_default(self):
393 return NotebookNotary(parent=self, profile_dir=self.profile_dir)
393 return NotebookNotary(parent=self, profile_dir=self.profile_dir)
394
394
395 def sign_notebook(self, notebook_path):
395 def sign_notebook(self, notebook_path):
396 if not os.path.exists(notebook_path):
396 if not os.path.exists(notebook_path):
397 self.log.error("Notebook missing: %s" % notebook_path)
397 self.log.error("Notebook missing: %s" % notebook_path)
398 self.exit(1)
398 self.exit(1)
399 with io.open(notebook_path, encoding='utf8') as f:
399 with io.open(notebook_path, encoding='utf8') as f:
400 nb = read(f, NO_CONVERT)
400 nb = read(f, NO_CONVERT)
401 if self.notary.check_signature(nb):
401 if self.notary.check_signature(nb):
402 print("Notebook already signed: %s" % notebook_path)
402 print("Notebook already signed: %s" % notebook_path)
403 else:
403 else:
404 print("Signing notebook: %s" % notebook_path)
404 print("Signing notebook: %s" % notebook_path)
405 self.notary.sign(nb)
405 self.notary.sign(nb)
406 with atomic_writing(notebook_path) as f:
406 with atomic_writing(notebook_path) as f:
407 write(nb, f, NO_CONVERT)
407 write(nb, f, NO_CONVERT)
408
408
409 def generate_new_key(self):
409 def generate_new_key(self):
410 """Generate a new notebook signature key"""
410 """Generate a new notebook signature key"""
411 print("Generating new notebook key: %s" % self.notary.secret_file)
411 print("Generating new notebook key: %s" % self.notary.secret_file)
412 self.notary._write_secret_file(os.urandom(1024))
412 self.notary._write_secret_file(os.urandom(1024))
413
413
414 def start(self):
414 def start(self):
415 if self.reset:
415 if self.reset:
416 if os.path.exists(self.notary.db_file):
417 print("Removing trusted signature cache: %s" % self.notary.db_file)
418 os.remove(self.notary.db_file)
416 self.generate_new_key()
419 self.generate_new_key()
417 return
420 return
418 if not self.extra_args:
421 if not self.extra_args:
419 self.log.critical("Specify at least one notebook to sign.")
422 self.log.critical("Specify at least one notebook to sign.")
420 self.exit(1)
423 self.exit(1)
421
424
422 for notebook_path in self.extra_args:
425 for notebook_path in self.extra_args:
423 self.sign_notebook(notebook_path)
426 self.sign_notebook(notebook_path)
424
427
@@ -1,191 +1,191 b''
1 """Test Notebook signing"""
1 """Test Notebook signing"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import copy
6 import copy
7 import time
7 import time
8
8
9 from .base import TestsBase
9 from .base import TestsBase
10
10
11 from IPython.nbformat import read, sign
11 from IPython.nbformat import read, sign
12 from IPython.core.getipython import get_ipython
12 from IPython.core.getipython import get_ipython
13
13
14
14
15 class TestNotary(TestsBase):
15 class TestNotary(TestsBase):
16
16
17 def setUp(self):
17 def setUp(self):
18 self.notary = sign.NotebookNotary(
18 self.notary = sign.NotebookNotary(
19 db_file=':memory:',
19 secret=b'secret',
20 secret=b'secret',
20 profile_dir=get_ipython().profile_dir,
21 profile_dir=get_ipython().profile_dir,
21 db_file=':memory:'
22 )
22 )
23 with self.fopen(u'test3.ipynb', u'r') as f:
23 with self.fopen(u'test3.ipynb', u'r') as f:
24 self.nb = read(f, as_version=4)
24 self.nb = read(f, as_version=4)
25 with self.fopen(u'test3.ipynb', u'r') as f:
25 with self.fopen(u'test3.ipynb', u'r') as f:
26 self.nb3 = read(f, as_version=3)
26 self.nb3 = read(f, as_version=3)
27
27
28 def test_algorithms(self):
28 def test_algorithms(self):
29 last_sig = ''
29 last_sig = ''
30 for algo in sign.algorithms:
30 for algo in sign.algorithms:
31 self.notary.algorithm = algo
31 self.notary.algorithm = algo
32 sig = self.notary.compute_signature(self.nb)
32 sig = self.notary.compute_signature(self.nb)
33 self.assertNotEqual(last_sig, sig)
33 self.assertNotEqual(last_sig, sig)
34 last_sig = sig
34 last_sig = sig
35
35
36 def test_sign_same(self):
36 def test_sign_same(self):
37 """Multiple signatures of the same notebook are the same"""
37 """Multiple signatures of the same notebook are the same"""
38 sig1 = self.notary.compute_signature(self.nb)
38 sig1 = self.notary.compute_signature(self.nb)
39 sig2 = self.notary.compute_signature(self.nb)
39 sig2 = self.notary.compute_signature(self.nb)
40 self.assertEqual(sig1, sig2)
40 self.assertEqual(sig1, sig2)
41
41
42 def test_change_secret(self):
42 def test_change_secret(self):
43 """Changing the secret changes the signature"""
43 """Changing the secret changes the signature"""
44 sig1 = self.notary.compute_signature(self.nb)
44 sig1 = self.notary.compute_signature(self.nb)
45 self.notary.secret = b'different'
45 self.notary.secret = b'different'
46 sig2 = self.notary.compute_signature(self.nb)
46 sig2 = self.notary.compute_signature(self.nb)
47 self.assertNotEqual(sig1, sig2)
47 self.assertNotEqual(sig1, sig2)
48
48
49 def test_sign(self):
49 def test_sign(self):
50 self.assertFalse(self.notary.check_signature(self.nb))
50 self.assertFalse(self.notary.check_signature(self.nb))
51 self.notary.sign(self.nb)
51 self.notary.sign(self.nb)
52 self.assertTrue(self.notary.check_signature(self.nb))
52 self.assertTrue(self.notary.check_signature(self.nb))
53
53
54 def test_unsign(self):
54 def test_unsign(self):
55 self.notary.sign(self.nb)
55 self.notary.sign(self.nb)
56 self.assertTrue(self.notary.check_signature(self.nb))
56 self.assertTrue(self.notary.check_signature(self.nb))
57 self.notary.unsign(self.nb)
57 self.notary.unsign(self.nb)
58 self.assertFalse(self.notary.check_signature(self.nb))
58 self.assertFalse(self.notary.check_signature(self.nb))
59 self.notary.unsign(self.nb)
59 self.notary.unsign(self.nb)
60 self.assertFalse(self.notary.check_signature(self.nb))
60 self.assertFalse(self.notary.check_signature(self.nb))
61
61
62 def test_cull_db(self):
62 def test_cull_db(self):
63 # this test has various sleeps of 2ms
63 # this test has various sleeps of 2ms
64 # to ensure low resolution timestamps compare as expected
64 # to ensure low resolution timestamps compare as expected
65 dt = 2e-3
65 dt = 2e-3
66 nbs = [
66 nbs = [
67 copy.deepcopy(self.nb) for i in range(10)
67 copy.deepcopy(self.nb) for i in range(10)
68 ]
68 ]
69 for row in self.notary.db.execute("SELECT * FROM nbsignatures"):
69 for row in self.notary.db.execute("SELECT * FROM nbsignatures"):
70 print(row)
70 print(row)
71 self.notary.cache_size = 8
71 self.notary.cache_size = 8
72 for i, nb in enumerate(nbs[:8]):
72 for i, nb in enumerate(nbs[:8]):
73 nb.metadata.dirty = i
73 nb.metadata.dirty = i
74 self.notary.sign(nb)
74 self.notary.sign(nb)
75
75
76 for i, nb in enumerate(nbs[:8]):
76 for i, nb in enumerate(nbs[:8]):
77 time.sleep(dt)
77 time.sleep(dt)
78 self.assertTrue(self.notary.check_signature(nb), 'nb %i is trusted' % i)
78 self.assertTrue(self.notary.check_signature(nb), 'nb %i is trusted' % i)
79
79
80 # signing the 9th triggers culling of first 3
80 # signing the 9th triggers culling of first 3
81 # (75% of 8 = 6, 9 - 6 = 3 culled)
81 # (75% of 8 = 6, 9 - 6 = 3 culled)
82 self.notary.sign(nbs[8])
82 self.notary.sign(nbs[8])
83 self.assertFalse(self.notary.check_signature(nbs[0]))
83 self.assertFalse(self.notary.check_signature(nbs[0]))
84 self.assertFalse(self.notary.check_signature(nbs[1]))
84 self.assertFalse(self.notary.check_signature(nbs[1]))
85 self.assertFalse(self.notary.check_signature(nbs[2]))
85 self.assertFalse(self.notary.check_signature(nbs[2]))
86 self.assertTrue(self.notary.check_signature(nbs[3]))
86 self.assertTrue(self.notary.check_signature(nbs[3]))
87 # checking nb3 should keep it from being culled:
87 # checking nb3 should keep it from being culled:
88 self.notary.sign(nbs[0])
88 self.notary.sign(nbs[0])
89 self.notary.sign(nbs[1])
89 self.notary.sign(nbs[1])
90 self.notary.sign(nbs[2])
90 self.notary.sign(nbs[2])
91 self.assertTrue(self.notary.check_signature(nbs[3]))
91 self.assertTrue(self.notary.check_signature(nbs[3]))
92 self.assertFalse(self.notary.check_signature(nbs[4]))
92 self.assertFalse(self.notary.check_signature(nbs[4]))
93
93
94 def test_check_signature(self):
94 def test_check_signature(self):
95 nb = self.nb
95 nb = self.nb
96 md = nb.metadata
96 md = nb.metadata
97 notary = self.notary
97 notary = self.notary
98 check_signature = notary.check_signature
98 check_signature = notary.check_signature
99 # no signature:
99 # no signature:
100 md.pop('signature', None)
100 md.pop('signature', None)
101 self.assertFalse(check_signature(nb))
101 self.assertFalse(check_signature(nb))
102 # hash only, no algo
102 # hash only, no algo
103 md.signature = notary.compute_signature(nb)
103 md.signature = notary.compute_signature(nb)
104 self.assertFalse(check_signature(nb))
104 self.assertFalse(check_signature(nb))
105 # proper signature, algo mismatch
105 # proper signature, algo mismatch
106 notary.algorithm = 'sha224'
106 notary.algorithm = 'sha224'
107 notary.sign(nb)
107 notary.sign(nb)
108 notary.algorithm = 'sha256'
108 notary.algorithm = 'sha256'
109 self.assertFalse(check_signature(nb))
109 self.assertFalse(check_signature(nb))
110 # check correctly signed notebook
110 # check correctly signed notebook
111 notary.sign(nb)
111 notary.sign(nb)
112 self.assertTrue(check_signature(nb))
112 self.assertTrue(check_signature(nb))
113
113
114 def test_mark_cells_untrusted(self):
114 def test_mark_cells_untrusted(self):
115 cells = self.nb.cells
115 cells = self.nb.cells
116 self.notary.mark_cells(self.nb, False)
116 self.notary.mark_cells(self.nb, False)
117 for cell in cells:
117 for cell in cells:
118 self.assertNotIn('trusted', cell)
118 self.assertNotIn('trusted', cell)
119 if cell.cell_type == 'code':
119 if cell.cell_type == 'code':
120 self.assertIn('trusted', cell.metadata)
120 self.assertIn('trusted', cell.metadata)
121 self.assertFalse(cell.metadata.trusted)
121 self.assertFalse(cell.metadata.trusted)
122 else:
122 else:
123 self.assertNotIn('trusted', cell.metadata)
123 self.assertNotIn('trusted', cell.metadata)
124
124
125 def test_mark_cells_trusted(self):
125 def test_mark_cells_trusted(self):
126 cells = self.nb.cells
126 cells = self.nb.cells
127 self.notary.mark_cells(self.nb, True)
127 self.notary.mark_cells(self.nb, True)
128 for cell in cells:
128 for cell in cells:
129 self.assertNotIn('trusted', cell)
129 self.assertNotIn('trusted', cell)
130 if cell.cell_type == 'code':
130 if cell.cell_type == 'code':
131 self.assertIn('trusted', cell.metadata)
131 self.assertIn('trusted', cell.metadata)
132 self.assertTrue(cell.metadata.trusted)
132 self.assertTrue(cell.metadata.trusted)
133 else:
133 else:
134 self.assertNotIn('trusted', cell.metadata)
134 self.assertNotIn('trusted', cell.metadata)
135
135
136 def test_check_cells(self):
136 def test_check_cells(self):
137 nb = self.nb
137 nb = self.nb
138 self.notary.mark_cells(nb, True)
138 self.notary.mark_cells(nb, True)
139 self.assertTrue(self.notary.check_cells(nb))
139 self.assertTrue(self.notary.check_cells(nb))
140 for cell in nb.cells:
140 for cell in nb.cells:
141 self.assertNotIn('trusted', cell)
141 self.assertNotIn('trusted', cell)
142 self.notary.mark_cells(nb, False)
142 self.notary.mark_cells(nb, False)
143 self.assertFalse(self.notary.check_cells(nb))
143 self.assertFalse(self.notary.check_cells(nb))
144 for cell in nb.cells:
144 for cell in nb.cells:
145 self.assertNotIn('trusted', cell)
145 self.assertNotIn('trusted', cell)
146
146
147 def test_trust_no_output(self):
147 def test_trust_no_output(self):
148 nb = self.nb
148 nb = self.nb
149 self.notary.mark_cells(nb, False)
149 self.notary.mark_cells(nb, False)
150 for cell in nb.cells:
150 for cell in nb.cells:
151 if cell.cell_type == 'code':
151 if cell.cell_type == 'code':
152 cell.outputs = []
152 cell.outputs = []
153 self.assertTrue(self.notary.check_cells(nb))
153 self.assertTrue(self.notary.check_cells(nb))
154
154
155 def test_mark_cells_untrusted_v3(self):
155 def test_mark_cells_untrusted_v3(self):
156 nb = self.nb3
156 nb = self.nb3
157 cells = nb.worksheets[0].cells
157 cells = nb.worksheets[0].cells
158 self.notary.mark_cells(nb, False)
158 self.notary.mark_cells(nb, False)
159 for cell in cells:
159 for cell in cells:
160 self.assertNotIn('trusted', cell)
160 self.assertNotIn('trusted', cell)
161 if cell.cell_type == 'code':
161 if cell.cell_type == 'code':
162 self.assertIn('trusted', cell.metadata)
162 self.assertIn('trusted', cell.metadata)
163 self.assertFalse(cell.metadata.trusted)
163 self.assertFalse(cell.metadata.trusted)
164 else:
164 else:
165 self.assertNotIn('trusted', cell.metadata)
165 self.assertNotIn('trusted', cell.metadata)
166
166
167 def test_mark_cells_trusted_v3(self):
167 def test_mark_cells_trusted_v3(self):
168 nb = self.nb3
168 nb = self.nb3
169 cells = nb.worksheets[0].cells
169 cells = nb.worksheets[0].cells
170 self.notary.mark_cells(nb, True)
170 self.notary.mark_cells(nb, True)
171 for cell in cells:
171 for cell in cells:
172 self.assertNotIn('trusted', cell)
172 self.assertNotIn('trusted', cell)
173 if cell.cell_type == 'code':
173 if cell.cell_type == 'code':
174 self.assertIn('trusted', cell.metadata)
174 self.assertIn('trusted', cell.metadata)
175 self.assertTrue(cell.metadata.trusted)
175 self.assertTrue(cell.metadata.trusted)
176 else:
176 else:
177 self.assertNotIn('trusted', cell.metadata)
177 self.assertNotIn('trusted', cell.metadata)
178
178
179 def test_check_cells_v3(self):
179 def test_check_cells_v3(self):
180 nb = self.nb3
180 nb = self.nb3
181 cells = nb.worksheets[0].cells
181 cells = nb.worksheets[0].cells
182 self.notary.mark_cells(nb, True)
182 self.notary.mark_cells(nb, True)
183 self.assertTrue(self.notary.check_cells(nb))
183 self.assertTrue(self.notary.check_cells(nb))
184 for cell in cells:
184 for cell in cells:
185 self.assertNotIn('trusted', cell)
185 self.assertNotIn('trusted', cell)
186 self.notary.mark_cells(nb, False)
186 self.notary.mark_cells(nb, False)
187 self.assertFalse(self.notary.check_cells(nb))
187 self.assertFalse(self.notary.check_cells(nb))
188 for cell in cells:
188 for cell in cells:
189 self.assertNotIn('trusted', cell)
189 self.assertNotIn('trusted', cell)
190
190
191
191
General Comments 0
You need to be logged in to leave comments. Login now