##// END OF EJS Templates
Merge pull request #5098 from minrk/parallel-debug...
Merge pull request #5098 from minrk/parallel-debug mostly debugging changes for IPython.parallel

File last commit:

r15271:3a65e52e
r15313:9d27bffe merge
Show More
sign.py
310 lines | 10.6 KiB | text/x-python | PythonLexer
"""Functions for signing notebooks"""
#-----------------------------------------------------------------------------
# Copyright (C) 2014, The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import base64
from contextlib import contextmanager
import hashlib
from hmac import HMAC
import io
import os
from IPython.utils.py3compat import string_types, unicode_type, cast_bytes
from IPython.utils.traitlets import Instance, Bytes, Enum, Any, Unicode, Bool
from IPython.config import LoggingConfigurable, MultipleInstanceError
from IPython.core.application import BaseIPythonApplication, base_flags
from .current import read, write
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------
try:
# Python 3
algorithms = hashlib.algorithms_guaranteed
except AttributeError:
algorithms = hashlib.algorithms
def yield_everything(obj):
"""Yield every item in a container as bytes
Allows any JSONable object to be passed to an HMAC digester
without having to serialize the whole thing.
"""
if isinstance(obj, dict):
for key in sorted(obj):
value = obj[key]
yield cast_bytes(key)
for b in yield_everything(value):
yield b
elif isinstance(obj, (list, tuple)):
for element in obj:
for b in yield_everything(element):
yield b
elif isinstance(obj, unicode_type):
yield obj.encode('utf8')
else:
yield unicode_type(obj).encode('utf8')
@contextmanager
def signature_removed(nb):
"""Context manager for operating on a notebook with its signature removed
Used for excluding the previous signature when computing a notebook's signature.
"""
save_signature = nb['metadata'].pop('signature', None)
try:
yield
finally:
if save_signature is not None:
nb['metadata']['signature'] = save_signature
class NotebookNotary(LoggingConfigurable):
"""A class for computing and verifying notebook signatures."""
profile_dir = Instance("IPython.core.profiledir.ProfileDir")
def _profile_dir_default(self):
from IPython.core.application import BaseIPythonApplication
app = None
try:
if BaseIPythonApplication.initialized():
app = BaseIPythonApplication.instance()
except MultipleInstanceError:
pass
if app is None:
# create an app, without the global instance
app = BaseIPythonApplication()
app.initialize(argv=[])
return app.profile_dir
algorithm = Enum(algorithms, default_value='sha256', config=True,
help="""The hashing algorithm used to sign notebooks."""
)
def _algorithm_changed(self, name, old, new):
self.digestmod = getattr(hashlib, self.algorithm)
digestmod = Any()
def _digestmod_default(self):
return getattr(hashlib, self.algorithm)
secret_file = Unicode()
def _secret_file_default(self):
if self.profile_dir is None:
return ''
return os.path.join(self.profile_dir.security_dir, 'notebook_secret')
secret = Bytes(config=True,
help="""The secret key with which notebooks are signed."""
)
def _secret_default(self):
# note : this assumes an Application is running
if os.path.exists(self.secret_file):
with io.open(self.secret_file, 'rb') as f:
return f.read()
else:
secret = base64.encodestring(os.urandom(1024))
self._write_secret_file(secret)
return secret
def _write_secret_file(self, secret):
"""write my secret to my secret_file"""
self.log.info("Writing notebook-signing key to %s", self.secret_file)
with io.open(self.secret_file, 'wb') as f:
f.write(secret)
try:
os.chmod(self.secret_file, 0o600)
except OSError:
self.log.warn(
"Could not set permissions on %s",
self.secret_file
)
return secret
def compute_signature(self, nb):
"""Compute a notebook's signature
by hashing the entire contents of the notebook via HMAC digest.
"""
hmac = HMAC(self.secret, digestmod=self.digestmod)
# don't include the previous hash in the content to hash
with signature_removed(nb):
# sign the whole thing
for b in yield_everything(nb):
hmac.update(b)
return hmac.hexdigest()
def check_signature(self, nb):
"""Check a notebook's stored signature
If a signature is stored in the notebook's metadata,
a new signature is computed and compared with the stored value.
Returns True if the signature is found and matches, False otherwise.
The following conditions must all be met for a notebook to be trusted:
- a signature is stored in the form 'scheme:hexdigest'
- the stored scheme matches the requested scheme
- the requested scheme is available from hashlib
- the computed hash from notebook_signature matches the stored hash
"""
stored_signature = nb['metadata'].get('signature', None)
if not stored_signature \
or not isinstance(stored_signature, string_types) \
or ':' not in stored_signature:
return False
stored_algo, sig = stored_signature.split(':', 1)
if self.algorithm != stored_algo:
return False
my_signature = self.compute_signature(nb)
return my_signature == sig
def sign(self, nb):
"""Sign a notebook, indicating that its output is trusted
stores 'algo:hmac-hexdigest' in notebook.metadata.signature
e.g. 'sha256:deadbeef123...'
"""
signature = self.compute_signature(nb)
nb['metadata']['signature'] = "%s:%s" % (self.algorithm, signature)
def mark_cells(self, nb, trusted):
"""Mark cells as trusted if the notebook's signature can be verified
Sets ``cell.trusted = True | False`` on all code cells,
depending on whether the stored signature can be verified.
This function is the inverse of check_cells
"""
if not nb['worksheets']:
# nothing to mark if there are no cells
return
for cell in nb['worksheets'][0]['cells']:
if cell['cell_type'] == 'code':
cell['trusted'] = trusted
def _check_cell(self, cell):
"""Do we trust an individual cell?
Return True if:
- cell is explicitly trusted
- cell has no potentially unsafe rich output
If a cell has no output, or only simple print statements,
it will always be trusted.
"""
# explicitly trusted
if cell.pop("trusted", False):
return True
# explicitly safe output
safe = {
'text/plain', 'image/png', 'image/jpeg',
'text', 'png', 'jpg', # v3-style short keys
}
for output in cell['outputs']:
output_type = output['output_type']
if output_type in ('pyout', 'display_data'):
# if there are any data keys not in the safe whitelist
output_keys = set(output).difference({"output_type", "prompt_number", "metadata"})
if output_keys.difference(safe):
return False
return True
def check_cells(self, nb):
"""Return whether all code cells are trusted
If there are no code cells, return True.
This function is the inverse of mark_cells.
"""
if not nb['worksheets']:
return True
trusted = True
for cell in nb['worksheets'][0]['cells']:
if cell['cell_type'] != 'code':
continue
# only distrust a cell if it actually has some output to distrust
if not self._check_cell(cell):
trusted = False
return trusted
trust_flags = {
'reset' : (
{'TrustNotebookApp' : { 'reset' : True}},
"""Generate a new key for notebook signature.
All previously signed notebooks will become untrusted.
"""
),
}
trust_flags.update(base_flags)
trust_flags.pop('init')
class TrustNotebookApp(BaseIPythonApplication):
description="""Sign one or more IPython notebooks with your key,
to trust their dynamic (HTML, Javascript) output.
Otherwise, you will have to re-execute the notebook to see output.
"""
examples = """ipython trust mynotebook.ipynb and_this_one.ipynb"""
flags = trust_flags
reset = Bool(False, config=True,
help="""If True, generate a new key for notebook signature.
After reset, all previously signed notebooks will become untrusted.
"""
)
notary = Instance(NotebookNotary)
def _notary_default(self):
return NotebookNotary(parent=self, profile_dir=self.profile_dir)
def sign_notebook(self, notebook_path):
if not os.path.exists(notebook_path):
self.log.error("Notebook missing: %s" % notebook_path)
self.exit(1)
with io.open(notebook_path, encoding='utf8') as f:
nb = read(f, 'json')
if self.notary.check_signature(nb):
print("Notebook already signed: %s" % notebook_path)
else:
print("Signing notebook: %s" % notebook_path)
self.notary.sign(nb)
with io.open(notebook_path, 'w', encoding='utf8') as f:
write(nb, f, 'json')
def generate_new_key(self):
"""Generate a new notebook signature key"""
print("Generating new notebook key: %s" % self.notary.secret_file)
self.notary._write_secret_file(os.urandom(1024))
def start(self):
if self.reset:
self.generate_new_key()
return
if not self.extra_args:
self.log.critical("Specify at least one notebook to sign.")
self.exit(1)
for notebook_path in self.extra_args:
self.sign_notebook(notebook_path)