##// END OF EJS Templates
handle message arriving when sockets are closed...
handle message arriving when sockets are closed check both the incoming and outgoing streams before proceeding to send messages

File last commit:

r19629:6075fc10
r20427:6631fad7
Show More
sign.py
427 lines | 14.2 KiB | text/x-python | PythonLexer
"""Utilities for signing notebooks"""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import base64
from contextlib import contextmanager
from datetime import datetime
import hashlib
from hmac import HMAC
import io
import os
try:
import sqlite3
except ImportError:
try:
from pysqlite2 import dbapi2 as sqlite3
except ImportError:
sqlite3 = None
from IPython.utils.io import atomic_writing
from IPython.utils.py3compat import unicode_type, cast_bytes
from IPython.utils.traitlets import Instance, Bytes, Enum, Any, Unicode, Bool, Integer
from IPython.config import LoggingConfigurable, MultipleInstanceError
from IPython.core.application import BaseIPythonApplication, base_flags
from . import read, write, NO_CONVERT
try:
# Python 3
algorithms = hashlib.algorithms_guaranteed
except AttributeError:
algorithms = hashlib.algorithms
def yield_everything(obj):
"""Yield every item in a container as bytes
Allows any JSONable object to be passed to an HMAC digester
without having to serialize the whole thing.
"""
if isinstance(obj, dict):
for key in sorted(obj):
value = obj[key]
yield cast_bytes(key)
for b in yield_everything(value):
yield b
elif isinstance(obj, (list, tuple)):
for element in obj:
for b in yield_everything(element):
yield b
elif isinstance(obj, unicode_type):
yield obj.encode('utf8')
else:
yield unicode_type(obj).encode('utf8')
def yield_code_cells(nb):
"""Iterator that yields all cells in a notebook
nbformat version independent
"""
if nb.nbformat >= 4:
for cell in nb['cells']:
if cell['cell_type'] == 'code':
yield cell
elif nb.nbformat == 3:
for ws in nb['worksheets']:
for cell in ws['cells']:
if cell['cell_type'] == 'code':
yield cell
@contextmanager
def signature_removed(nb):
"""Context manager for operating on a notebook with its signature removed
Used for excluding the previous signature when computing a notebook's signature.
"""
save_signature = nb['metadata'].pop('signature', None)
try:
yield
finally:
if save_signature is not None:
nb['metadata']['signature'] = save_signature
class NotebookNotary(LoggingConfigurable):
"""A class for computing and verifying notebook signatures."""
profile_dir = Instance("IPython.core.profiledir.ProfileDir")
def _profile_dir_default(self):
from IPython.core.application import BaseIPythonApplication
app = None
try:
if BaseIPythonApplication.initialized():
app = BaseIPythonApplication.instance()
except MultipleInstanceError:
pass
if app is None:
# create an app, without the global instance
app = BaseIPythonApplication()
app.initialize(argv=[])
return app.profile_dir
db_file = Unicode(config=True,
help="""The sqlite file in which to store notebook signatures.
By default, this will be in your IPython profile.
You can set it to ':memory:' to disable sqlite writing to the filesystem.
""")
def _db_file_default(self):
if self.profile_dir is None:
return ':memory:'
return os.path.join(self.profile_dir.security_dir, u'nbsignatures.db')
# 64k entries ~ 12MB
cache_size = Integer(65535, config=True,
help="""The number of notebook signatures to cache.
When the number of signatures exceeds this value,
the oldest 25% of signatures will be culled.
"""
)
db = Any()
def _db_default(self):
if sqlite3 is None:
self.log.warn("Missing SQLite3, all notebooks will be untrusted!")
return
kwargs = dict(detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES)
db = sqlite3.connect(self.db_file, **kwargs)
self.init_db(db)
return db
def init_db(self, db):
db.execute("""
CREATE TABLE IF NOT EXISTS nbsignatures
(
id integer PRIMARY KEY AUTOINCREMENT,
algorithm text,
signature text,
path text,
last_seen timestamp
)""")
db.execute("""
CREATE INDEX IF NOT EXISTS algosig ON nbsignatures(algorithm, signature)
""")
db.commit()
algorithm = Enum(algorithms, default_value='sha256', config=True,
help="""The hashing algorithm used to sign notebooks."""
)
def _algorithm_changed(self, name, old, new):
self.digestmod = getattr(hashlib, self.algorithm)
digestmod = Any()
def _digestmod_default(self):
return getattr(hashlib, self.algorithm)
secret_file = Unicode(config=True,
help="""The file where the secret key is stored."""
)
def _secret_file_default(self):
if self.profile_dir is None:
return ''
return os.path.join(self.profile_dir.security_dir, 'notebook_secret')
secret = Bytes(config=True,
help="""The secret key with which notebooks are signed."""
)
def _secret_default(self):
# note : this assumes an Application is running
if os.path.exists(self.secret_file):
with io.open(self.secret_file, 'rb') as f:
return f.read()
else:
secret = base64.encodestring(os.urandom(1024))
self._write_secret_file(secret)
return secret
def _write_secret_file(self, secret):
"""write my secret to my secret_file"""
self.log.info("Writing notebook-signing key to %s", self.secret_file)
with io.open(self.secret_file, 'wb') as f:
f.write(secret)
try:
os.chmod(self.secret_file, 0o600)
except OSError:
self.log.warn(
"Could not set permissions on %s",
self.secret_file
)
return secret
def compute_signature(self, nb):
"""Compute a notebook's signature
by hashing the entire contents of the notebook via HMAC digest.
"""
hmac = HMAC(self.secret, digestmod=self.digestmod)
# don't include the previous hash in the content to hash
with signature_removed(nb):
# sign the whole thing
for b in yield_everything(nb):
hmac.update(b)
return hmac.hexdigest()
def check_signature(self, nb):
"""Check a notebook's stored signature
If a signature is stored in the notebook's metadata,
a new signature is computed and compared with the stored value.
Returns True if the signature is found and matches, False otherwise.
The following conditions must all be met for a notebook to be trusted:
- a signature is stored in the form 'scheme:hexdigest'
- the stored scheme matches the requested scheme
- the requested scheme is available from hashlib
- the computed hash from notebook_signature matches the stored hash
"""
if nb.nbformat < 3:
return False
if self.db is None:
return False
signature = self.compute_signature(nb)
r = self.db.execute("""SELECT id FROM nbsignatures WHERE
algorithm = ? AND
signature = ?;
""", (self.algorithm, signature)).fetchone()
if r is None:
return False
self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
algorithm = ? AND
signature = ?;
""",
(datetime.utcnow(), self.algorithm, signature),
)
self.db.commit()
return True
def sign(self, nb):
"""Sign a notebook, indicating that its output is trusted on this machine
Stores hash algorithm and hmac digest in a local database of trusted notebooks.
"""
if nb.nbformat < 3:
return
signature = self.compute_signature(nb)
self.store_signature(signature, nb)
def store_signature(self, signature, nb):
if self.db is None:
return
self.db.execute("""INSERT OR IGNORE INTO nbsignatures
(algorithm, signature, last_seen) VALUES (?, ?, ?)""",
(self.algorithm, signature, datetime.utcnow())
)
self.db.execute("""UPDATE nbsignatures SET last_seen = ? WHERE
algorithm = ? AND
signature = ?;
""",
(datetime.utcnow(), self.algorithm, signature),
)
self.db.commit()
n, = self.db.execute("SELECT Count(*) FROM nbsignatures").fetchone()
if n > self.cache_size:
self.cull_db()
def unsign(self, nb):
"""Ensure that a notebook is untrusted
by removing its signature from the trusted database, if present.
"""
signature = self.compute_signature(nb)
self.db.execute("""DELETE FROM nbsignatures WHERE
algorithm = ? AND
signature = ?;
""",
(self.algorithm, signature)
)
self.db.commit()
def cull_db(self):
"""Cull oldest 25% of the trusted signatures when the size limit is reached"""
self.db.execute("""DELETE FROM nbsignatures WHERE id IN (
SELECT id FROM nbsignatures ORDER BY last_seen DESC LIMIT -1 OFFSET ?
);
""", (max(int(0.75 * self.cache_size), 1),))
def mark_cells(self, nb, trusted):
"""Mark cells as trusted if the notebook's signature can be verified
Sets ``cell.metadata.trusted = True | False`` on all code cells,
depending on whether the stored signature can be verified.
This function is the inverse of check_cells
"""
if nb.nbformat < 3:
return
for cell in yield_code_cells(nb):
cell['metadata']['trusted'] = trusted
def _check_cell(self, cell, nbformat_version):
"""Do we trust an individual cell?
Return True if:
- cell is explicitly trusted
- cell has no potentially unsafe rich output
If a cell has no output, or only simple print statements,
it will always be trusted.
"""
# explicitly trusted
if cell['metadata'].pop("trusted", False):
return True
# explicitly safe output
if nbformat_version >= 4:
unsafe_output_types = ['execute_result', 'display_data']
safe_keys = {"output_type", "execution_count", "metadata"}
else: # v3
unsafe_output_types = ['pyout', 'display_data']
safe_keys = {"output_type", "prompt_number", "metadata"}
for output in cell['outputs']:
output_type = output['output_type']
if output_type in unsafe_output_types:
# if there are any data keys not in the safe whitelist
output_keys = set(output)
if output_keys.difference(safe_keys):
return False
return True
def check_cells(self, nb):
"""Return whether all code cells are trusted
If there are no code cells, return True.
This function is the inverse of mark_cells.
"""
if nb.nbformat < 3:
return False
trusted = True
for cell in yield_code_cells(nb):
# only distrust a cell if it actually has some output to distrust
if not self._check_cell(cell, nb.nbformat):
trusted = False
return trusted
trust_flags = {
'reset' : (
{'TrustNotebookApp' : { 'reset' : True}},
"""Delete the trusted notebook cache.
All previously signed notebooks will become untrusted.
"""
),
}
trust_flags.update(base_flags)
trust_flags.pop('init')
class TrustNotebookApp(BaseIPythonApplication):
description="""Sign one or more IPython notebooks with your key,
to trust their dynamic (HTML, Javascript) output.
Trusting a notebook only applies to the current IPython profile.
To trust a notebook for use with a profile other than default,
add `--profile [profile name]`.
Otherwise, you will have to re-execute the notebook to see output.
"""
examples = """
ipython trust mynotebook.ipynb and_this_one.ipynb
ipython trust --profile myprofile mynotebook.ipynb
"""
flags = trust_flags
reset = Bool(False, config=True,
help="""If True, delete the trusted signature cache.
After reset, all previously signed notebooks will become untrusted.
"""
)
notary = Instance(NotebookNotary)
def _notary_default(self):
return NotebookNotary(parent=self, profile_dir=self.profile_dir)
def sign_notebook(self, notebook_path):
if not os.path.exists(notebook_path):
self.log.error("Notebook missing: %s" % notebook_path)
self.exit(1)
with io.open(notebook_path, encoding='utf8') as f:
nb = read(f, NO_CONVERT)
if self.notary.check_signature(nb):
print("Notebook already signed: %s" % notebook_path)
else:
print("Signing notebook: %s" % notebook_path)
self.notary.sign(nb)
with atomic_writing(notebook_path) as f:
write(nb, f, NO_CONVERT)
def generate_new_key(self):
"""Generate a new notebook signature key"""
print("Generating new notebook key: %s" % self.notary.secret_file)
self.notary._write_secret_file(os.urandom(1024))
def start(self):
if self.reset:
if os.path.exists(self.notary.db_file):
print("Removing trusted signature cache: %s" % self.notary.db_file)
os.remove(self.notary.db_file)
self.generate_new_key()
return
if not self.extra_args:
self.log.critical("Specify at least one notebook to sign.")
self.exit(1)
for notebook_path in self.extra_args:
self.sign_notebook(notebook_path)