##// END OF EJS Templates
kernelselector lint
kernelselector lint

File last commit:

r19629:6075fc10
r19887:1de72ac5
Show More
sign.py
427 lines | 14.2 KiB | text/x-python | PythonLexer
"""Utilities for signing notebooks"""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import base64
from contextlib import contextmanager
from datetime import datetime
import hashlib
from hmac import HMAC
import io
import os
try:
import sqlite3
except ImportError:
try:
from pysqlite2 import dbapi2 as sqlite3
except ImportError:
sqlite3 = None
from IPython.utils.io import atomic_writing
from IPython.utils.py3compat import unicode_type, cast_bytes
from IPython.utils.traitlets import Instance, Bytes, Enum, Any, Unicode, Bool, Integer
from IPython.config import LoggingConfigurable, MultipleInstanceError
from IPython.core.application import BaseIPythonApplication, base_flags
from . import read, write, NO_CONVERT
try:
# Python 3
algorithms = hashlib.algorithms_guaranteed
except AttributeError:
algorithms = hashlib.algorithms
def yield_everything(obj):
"""Yield every item in a container as bytes
Allows any JSONable object to be passed to an HMAC digester
without having to serialize the whole thing.
"""
if isinstance(obj, dict):
for key in sorted(obj):
value = obj[key]
yield cast_bytes(key)
for b in yield_everything(value):
yield b
elif isinstance(obj, (list, tuple)):
for element in obj:
for b in yield_everything(element):
yield b
elif isinstance(obj, unicode_type):
yield obj.encode('utf8')
else:
yield unicode_type(obj).encode('utf8')
def yield_code_cells(nb):
"""Iterator that yields all cells in a notebook
nbformat version independent
"""
if nb.nbformat >= 4:
for cell in nb['cells']:
if cell['cell_type'] == 'code':
yield cell
elif nb.nbformat == 3:
for ws in nb['worksheets']:
for cell in ws['cells']:
if cell['cell_type'] == 'code':
yield cell
@contextmanager
def signature_removed(nb):
"""Context manager for operating on a notebook with its signature removed
Used for excluding the previous signature when computing a notebook's signature.
"""
save_signature = nb['metadata'].pop('signature', None)
try:
yield
finally:
if save_signature is not None:
nb['metadata']['signature'] = save_signature
class NotebookNotary(LoggingConfigurable):
"""A class for computing and verifying notebook signatures."""
profile_dir = Instance("IPython.core.profiledir.ProfileDir")
def _profile_dir_default(self):
from IPython.core.application import BaseIPythonApplication
app = None
try:
if BaseIPythonApplication.initialized():
app = BaseIPythonApplication.instance()
except MultipleInstanceError:
pass
if app is None:
# create an app, without the global instance
app = BaseIPythonApplication()
app.initialize(argv=[])
return app.profile_dir
db_file = Unicode(config=True,
help="""The sqlite file in which to store notebook signatures.
By default, this will be in your IPython profile.
You can set it to ':memory:' to disable sqlite writing to the filesystem.
""")
def _db_file_default(self):
if self.profile_dir is None:
return ':memory:'
return os.path.join(self.profile_dir.security_dir, u'nbsignatures.db')
# 64k entries ~ 12MB
cache_size = Integer(65535, config=True,
help="""The number of notebook signatures to cache.
When the number of signatures exceeds this value,
the oldest 25% of signatures will be culled.
"""
)
db = Any()
def _db_default(self):
if sqlite3 is None:
self.log.warn("Missing SQLite3, all notebooks will be untrusted!")
return
kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
db = sqlite3.connect(self.db_file, **kwargs)
self.init_db(db)
return db
def init_db(self, db):
db.execute("""
CREATE TABLE IF NOT EXISTS nbsignatures
(
id integer PRIMARY KEY AUTOINCREMENT,
algorithm text,
signature text,
path text,
last_seen timestamp
)""")
db.execute("""
CREATE INDEX IF NOT EXISTS algosig ON nbsignatures(algorithm, signature)
""")
db.commit()
algorithm = Enum(algorithms, default_value='sha256', config=True,
help="""The hashing algorithm used to sign notebooks."""
)
def _algorithm_changed(self, name, old, new):
self.digestmod = getattr(hashlib, self.algorithm)
digestmod = Any()
def _digestmod_default(self):
return getattr(hashlib, self.algorithm)
secret_file = Unicode(config=True,
help="""The file where the secret key is stored."""
)
def _secret_file_default(self):
if self.profile_dir is None:
return ''
return os.path.join(self.profile_dir.security_dir, 'notebook_secret')
secret = Bytes(config=True,
help="""The secret key with which notebooks are signed."""
)
def _secret_default(self):
# note : this assumes an Application is running
if os.path.exists(self.secret_file):
with io.open(self.secret_file, 'rb') as f:
return f.read()
else:
secret = base64.encodestring(os.urandom(1024))
self._write_secret_file(secret)
return secret
def _write_secret_file(self, secret):
"""write my secret to my secret_file"""
self.log.info("Writing notebook-signing key to %s", self.secret_file)
with io.open(self.secret_file, 'wb') as f:
f.write(secret)
try:
os.chmod(self.secret_file, 0o600)
except OSError:
self.log.warn(
"Could not set permissions on %s",
self.secret_file
)
return secret
def compute_signature(self, nb):
"""Compute a notebook's signature
by hashing the entire contents of the notebook via HMAC digest.
"""
hmac = HMAC(self.secret, digestmod=self.digestmod)
# don't include the previous hash in the content to hash
with signature_removed(nb):
# sign the whole thing
for b in yield_everything(nb):
hmac.update(b)
return hmac.hexdigest()
def check_signature(self, nb):
"""Check a notebook's stored signature
If a signature is stored in the notebook's metadata,
a new signature is computed and compared with the stored value.
Returns True if the signature is found and matches, False otherwise.
The following conditions must all be met for a notebook to be trusted:
- a signature is stored in the form 'scheme:hexdigest'
- the stored scheme matches the requested scheme
- the requested scheme is available from hashlib
- the computed hash from notebook_signature matches the stored hash
"""
if nb.nbformat < 3:
return False
if self.db is None:
return False
signature = self.compute_signature(nb)
r = self.db.execute("""SELECT id FROM nbsignatures WHERE
algorithm = ? AND
signature = ?;
""", (self.algorithm, signature)).fetchone()
if r is None:
return False
self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
algorithm = ? AND
signature = ?;
""",
(datetime.utcnow(), self.algorithm, signature),
)
self.db.commit()
return True
def sign(self, nb):
"""Sign a notebook, indicating that its output is trusted on this machine
Stores hash algorithm and hmac digest in a local database of trusted notebooks.
"""
if nb.nbformat < 3:
return
signature = self.compute_signature(nb)
self.store_signature(signature, nb)
def store_signature(self, signature, nb):
if self.db is None:
return
self.db.execute("""INSERT OR IGNORE INTO nbsignatures
(algorithm, signature, last_seen) VALUES (?, ?, ?)""",
(self.algorithm, signature, datetime.utcnow())
)
self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
algorithm = ? AND
signature = ?;
""",
(datetime.utcnow(), self.algorithm, signature),
)
self.db.commit()
n, = self.db.execute("SELECT Count(*) FROM nbsignatures").fetchone()
if n > self.cache_size:
self.cull_db()
def unsign(self, nb):
"""Ensure that a notebook is untrusted
by removing its signature from the trusted database, if present.
"""
signature = self.compute_signature(nb)
self.db.execute("""DELETE FROM nbsignatures WHERE
algorithm = ? AND
signature = ?;
""",
(self.algorithm, signature)
)
self.db.commit()
def cull_db(self):
"""Cull oldest 25% of the trusted signatures when the size limit is reached"""
self.db.execute("""DELETE FROM nbsignatures WHERE id IN (
SELECT id FROM nbsignatures ORDER BY last_seen DESC LIMIT -1 OFFSET ?
);
""", (max(int(0.75 * self.cache_size), 1),))
def mark_cells(self, nb, trusted):
"""Mark cells as trusted if the notebook's signature can be verified
Sets ``cell.metadata.trusted = True | False`` on all code cells,
depending on whether the stored signature can be verified.
This function is the inverse of check_cells
"""
if nb.nbformat < 3:
return
for cell in yield_code_cells(nb):
cell['metadata']['trusted'] = trusted
def _check_cell(self, cell, nbformat_version):
"""Do we trust an individual cell?
Return True if:
- cell is explicitly trusted
- cell has no potentially unsafe rich output
If a cell has no output, or only simple print statements,
it will always be trusted.
"""
# explicitly trusted
if cell['metadata'].pop("trusted", False):
return True
# explicitly safe output
if nbformat_version >= 4:
unsafe_output_types = ['execute_result', 'display_data']
safe_keys = {"output_type", "execution_count", "metadata"}
else: # v3
unsafe_output_types = ['pyout', 'display_data']
safe_keys = {"output_type", "prompt_number", "metadata"}
for output in cell['outputs']:
output_type = output['output_type']
if output_type in unsafe_output_types:
# if there are any data keys not in the safe whitelist
output_keys = set(output)
if output_keys.difference(safe_keys):
return False
return True
def check_cells(self, nb):
"""Return whether all code cells are trusted
If there are no code cells, return True.
This function is the inverse of mark_cells.
"""
if nb.nbformat < 3:
return False
trusted = True
for cell in yield_code_cells(nb):
# only distrust a cell if it actually has some output to distrust
if not self._check_cell(cell, nb.nbformat):
trusted = False
return trusted
trust_flags = {
'reset' : (
{'TrustNotebookApp' : { 'reset' : True}},
"""Delete the trusted notebook cache.
All previously signed notebooks will become untrusted.
"""
),
}
trust_flags.update(base_flags)
trust_flags.pop('init')
class TrustNotebookApp(BaseIPythonApplication):
description="""Sign one or more IPython notebooks with your key,
to trust their dynamic (HTML, Javascript) output.
Trusting a notebook only applies to the current IPython profile.
To trust a notebook for use with a profile other than default,
add `--profile [profile name]`.
Otherwise, you will have to re-execute the notebook to see output.
"""
examples = """
ipython trust mynotebook.ipynb and_this_one.ipynb
ipython trust --profile myprofile mynotebook.ipynb
"""
flags = trust_flags
reset = Bool(False, config=True,
help="""If True, delete the trusted signature cache.
After reset, all previously signed notebooks will become untrusted.
"""
)
notary = Instance(NotebookNotary)
def _notary_default(self):
return NotebookNotary(parent=self, profile_dir=self.profile_dir)
def sign_notebook(self, notebook_path):
if not os.path.exists(notebook_path):
self.log.error("Notebook missing: %s" % notebook_path)
self.exit(1)
with io.open(notebook_path, encoding='utf8') as f:
nb = read(f, NO_CONVERT)
if self.notary.check_signature(nb):
print("Notebook already signed: %s" % notebook_path)
else:
print("Signing notebook: %s" % notebook_path)
self.notary.sign(nb)
with atomic_writing(notebook_path) as f:
write(nb, f, NO_CONVERT)
def generate_new_key(self):
"""Generate a new notebook signature key"""
print("Generating new notebook key: %s" % self.notary.secret_file)
self.notary._write_secret_file(os.urandom(1024))
def start(self):
if self.reset:
if os.path.exists(self.notary.db_file):
print("Removing trusted signature cache: %s" % self.notary.db_file)
os.remove(self.notary.db_file)
self.generate_new_key()
return
if not self.extra_args:
self.log.critical("Specify at least one notebook to sign.")
self.exit(1)
for notebook_path in self.extra_args:
self.sign_notebook(notebook_path)