guarded_eval.py
738 lines
| 23.6 KiB
| text/x-python
|
PythonLexer
krassowski
|
r27915 | from typing import ( | ||
Any, | ||||
Callable, | ||||
krassowski
|
r27920 | Dict, | ||
krassowski
|
r27915 | Set, | ||
krassowski
|
r27926 | Sequence, | ||
krassowski
|
r27915 | Tuple, | ||
NamedTuple, | ||||
Type, | ||||
Literal, | ||||
Union, | ||||
TYPE_CHECKING, | ||||
) | ||||
krassowski
|
r27920 | import ast | ||
krassowski
|
r27915 | import builtins | ||
krassowski
|
r27906 | import collections | ||
krassowski
|
r27920 | import operator | ||
krassowski
|
r27906 | import sys | ||
from functools import cached_property | ||||
from dataclasses import dataclass, field | ||||
krassowski
|
r27912 | from IPython.utils.docs import GENERATING_DOCUMENTATION | ||
krassowski
|
r27918 | from IPython.utils.decorators import undoc | ||
krassowski
|
r27912 | |||
if TYPE_CHECKING or GENERATING_DOCUMENTATION: | ||||
from typing_extensions import Protocol | ||||
else: | ||||
# do not require on runtime | ||||
Protocol = object # requires Python >=3.8 | ||||
krassowski
|
r27906 | |||
krassowski
|
r27918 | @undoc | ||
krassowski
|
r27906 | class HasGetItem(Protocol): | ||
krassowski
|
r27913 | def __getitem__(self, key) -> None: | ||
... | ||||
krassowski
|
r27906 | |||
krassowski
|
r27918 | @undoc | ||
krassowski
|
r27906 | class InstancesHaveGetItem(Protocol): | ||
krassowski
|
r27915 | def __call__(self, *args, **kwargs) -> HasGetItem: | ||
krassowski
|
r27913 | ... | ||
krassowski
|
r27906 | |||
krassowski
|
r27918 | @undoc | ||
krassowski
|
r27906 | class HasGetAttr(Protocol): | ||
krassowski
|
r27913 | def __getattr__(self, key) -> None: | ||
... | ||||
krassowski
|
r27906 | |||
krassowski
|
r27918 | @undoc | ||
krassowski
|
r27906 | class DoesNotHaveGetAttr(Protocol): | ||
pass | ||||
krassowski
|
r27913 | |||
krassowski
|
r27906 | # By default `__getattr__` is not explicitly implemented on most objects | ||
MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr] | ||||
krassowski
|
r27918 | def _unbind_method(func: Callable) -> Union[Callable, None]: | ||
krassowski
|
r27906 | """Get unbound method for given bound method. | ||
krassowski
|
r27929 | Returns None if cannot get unbound method, or method is already unbound. | ||
""" | ||||
krassowski
|
r27913 | owner = getattr(func, "__self__", None) | ||
krassowski
|
r27906 | owner_class = type(owner) | ||
krassowski
|
r27913 | name = getattr(func, "__name__", None) | ||
instance_dict_overrides = getattr(owner, "__dict__", None) | ||||
krassowski
|
r27906 | if ( | ||
owner is not None | ||||
krassowski
|
r27913 | and name | ||
and ( | ||||
krassowski
|
r27906 | not instance_dict_overrides | ||
krassowski
|
r27913 | or (instance_dict_overrides and name not in instance_dict_overrides) | ||
krassowski
|
r27906 | ) | ||
): | ||||
return getattr(owner_class, name) | ||||
krassowski
|
r27915 | return None | ||
krassowski
|
r27906 | |||
krassowski
|
r27918 | @undoc | ||
krassowski
|
r27906 | @dataclass | ||
class EvaluationPolicy: | ||||
krassowski
|
r27918 | """Definition of evaluation policy.""" | ||
krassowski
|
r27906 | allow_locals_access: bool = False | ||
allow_globals_access: bool = False | ||||
allow_item_access: bool = False | ||||
allow_attr_access: bool = False | ||||
allow_builtins_access: bool = False | ||||
krassowski
|
r27920 | allow_all_operations: bool = False | ||
krassowski
|
r27906 | allow_any_calls: bool = False | ||
allowed_calls: Set[Callable] = field(default_factory=set) | ||||
def can_get_item(self, value, item): | ||||
return self.allow_item_access | ||||
def can_get_attr(self, value, attr): | ||||
return self.allow_attr_access | ||||
krassowski
|
r27920 | def can_operate(self, dunders: Tuple[str, ...], a, b=None): | ||
if self.allow_all_operations: | ||||
return True | ||||
krassowski
|
r27906 | def can_call(self, func): | ||
if self.allow_any_calls: | ||||
return True | ||||
if func in self.allowed_calls: | ||||
return True | ||||
krassowski
|
r27918 | owner_method = _unbind_method(func) | ||
krassowski
|
r27921 | |||
krassowski
|
r27906 | if owner_method and owner_method in self.allowed_calls: | ||
return True | ||||
krassowski
|
r27913 | |||
krassowski
|
r27926 | def _get_external(module_name: str, access_path: Sequence[str]): | ||
"""Get value from external module given a dotted access path. | ||||
Raises: | ||||
* `KeyError` if module is removed not found, and | ||||
* `AttributeError` if acess path does not match an exported object | ||||
""" | ||||
member_type = sys.modules[module_name] | ||||
for attr in access_path: | ||||
member_type = getattr(member_type, attr) | ||||
return member_type | ||||
krassowski
|
r27918 | def _has_original_dunder_external( | ||
krassowski
|
r27913 | value, | ||
krassowski
|
r27926 | module_name: str, | ||
access_path: Sequence[str], | ||||
method_name: str, | ||||
krassowski
|
r27913 | ): | ||
krassowski
|
r27926 | if module_name not in sys.modules: | ||
# LBYLB as it is faster | ||||
return False | ||||
krassowski
|
r27906 | try: | ||
krassowski
|
r27926 | member_type = _get_external(module_name, access_path) | ||
krassowski
|
r27906 | value_type = type(value) | ||
if type(value) == member_type: | ||||
return True | ||||
krassowski
|
r27921 | if method_name == "__getattribute__": | ||
# we have to short-circuit here due to an unresolved issue in | ||||
# `isinstance` implementation: https://bugs.python.org/issue32683 | ||||
return False | ||||
krassowski
|
r27906 | if isinstance(value, member_type): | ||
method = getattr(value_type, method_name, None) | ||||
member_method = getattr(member_type, method_name, None) | ||||
if member_method == method: | ||||
return True | ||||
except (AttributeError, KeyError): | ||||
return False | ||||
krassowski
|
r27918 | def _has_original_dunder( | ||
krassowski
|
r27913 | value, allowed_types, allowed_methods, allowed_external, method_name | ||
krassowski
|
r27906 | ): | ||
# note: Python ignores `__getattr__`/`__getitem__` on instances, | ||||
# we only need to check at class level | ||||
value_type = type(value) | ||||
# strict type check passes → no need to check method | ||||
if value_type in allowed_types: | ||||
return True | ||||
method = getattr(value_type, method_name, None) | ||||
krassowski
|
r27921 | if method is None: | ||
krassowski
|
r27906 | return None | ||
if method in allowed_methods: | ||||
return True | ||||
for module_name, *access_path in allowed_external: | ||||
krassowski
|
r27918 | if _has_original_dunder_external(value, module_name, access_path, method_name): | ||
krassowski
|
r27906 | return True | ||
return False | ||||
krassowski
|
r27918 | @undoc | ||
krassowski
|
r27906 | @dataclass | ||
class SelectivePolicy(EvaluationPolicy): | ||||
krassowski
|
r27915 | allowed_getitem: Set[InstancesHaveGetItem] = field(default_factory=set) | ||
krassowski
|
r27906 | allowed_getitem_external: Set[Tuple[str, ...]] = field(default_factory=set) | ||
krassowski
|
r27920 | |||
krassowski
|
r27906 | allowed_getattr: Set[MayHaveGetattr] = field(default_factory=set) | ||
allowed_getattr_external: Set[Tuple[str, ...]] = field(default_factory=set) | ||||
krassowski
|
r27920 | allowed_operations: Set = field(default_factory=set) | ||
allowed_operations_external: Set[Tuple[str, ...]] = field(default_factory=set) | ||||
_operation_methods_cache: Dict[str, Set[Callable]] = field( | ||||
default_factory=dict, init=False | ||||
) | ||||
krassowski
|
r27906 | def can_get_attr(self, value, attr): | ||
krassowski
|
r27918 | has_original_attribute = _has_original_dunder( | ||
krassowski
|
r27906 | value, | ||
allowed_types=self.allowed_getattr, | ||||
allowed_methods=self._getattribute_methods, | ||||
allowed_external=self.allowed_getattr_external, | ||||
krassowski
|
r27913 | method_name="__getattribute__", | ||
krassowski
|
r27906 | ) | ||
krassowski
|
r27918 | has_original_attr = _has_original_dunder( | ||
krassowski
|
r27906 | value, | ||
allowed_types=self.allowed_getattr, | ||||
allowed_methods=self._getattr_methods, | ||||
allowed_external=self.allowed_getattr_external, | ||||
krassowski
|
r27913 | method_name="__getattr__", | ||
krassowski
|
r27906 | ) | ||
krassowski
|
r27921 | |||
krassowski
|
r27926 | accept = False | ||
krassowski
|
r27929 | # Many objects do not have `__getattr__`, this is fine. | ||
krassowski
|
r27906 | if has_original_attr is None and has_original_attribute: | ||
krassowski
|
r27926 | accept = True | ||
else: | ||||
# Accept objects without modifications to `__getattr__` and `__getattribute__` | ||||
accept = has_original_attr and has_original_attribute | ||||
krassowski
|
r27906 | |||
krassowski
|
r27926 | if accept: | ||
# We still need to check for overriden properties. | ||||
value_class = type(value) | ||||
if not hasattr(value_class, attr): | ||||
return True | ||||
class_attr_val = getattr(value_class, attr) | ||||
is_property = isinstance(class_attr_val, property) | ||||
if not is_property: | ||||
return True | ||||
krassowski
|
r27929 | # Properties in allowed types are ok (although we do not include any | ||
# properties in our default allow list currently). | ||||
krassowski
|
r27926 | if type(value) in self.allowed_getattr: | ||
krassowski
|
r27929 | return True # pragma: no cover | ||
krassowski
|
r27926 | |||
# Properties in subclasses of allowed types may be ok if not changed | ||||
for module_name, *access_path in self.allowed_getattr_external: | ||||
try: | ||||
external_class = _get_external(module_name, access_path) | ||||
external_class_attr_val = getattr(external_class, attr) | ||||
except (KeyError, AttributeError): | ||||
return False # pragma: no cover | ||||
return class_attr_val == external_class_attr_val | ||||
return False | ||||
krassowski
|
r27906 | |||
def can_get_item(self, value, item): | ||||
"""Allow accessing `__getiitem__` of allow-listed instances unless it was not modified.""" | ||||
krassowski
|
r27918 | return _has_original_dunder( | ||
krassowski
|
r27906 | value, | ||
allowed_types=self.allowed_getitem, | ||||
allowed_methods=self._getitem_methods, | ||||
allowed_external=self.allowed_getitem_external, | ||||
krassowski
|
r27913 | method_name="__getitem__", | ||
krassowski
|
r27906 | ) | ||
krassowski
|
r27920 | def can_operate(self, dunders: Tuple[str, ...], a, b=None): | ||
krassowski
|
r27921 | objects = [a] | ||
if b is not None: | ||||
objects.append(b) | ||||
krassowski
|
r27920 | return all( | ||
[ | ||||
_has_original_dunder( | ||||
krassowski
|
r27921 | obj, | ||
krassowski
|
r27920 | allowed_types=self.allowed_operations, | ||
krassowski
|
r27921 | allowed_methods=self._operator_dunder_methods(dunder), | ||
krassowski
|
r27920 | allowed_external=self.allowed_operations_external, | ||
method_name=dunder, | ||||
) | ||||
for dunder in dunders | ||||
krassowski
|
r27921 | for obj in objects | ||
krassowski
|
r27920 | ] | ||
) | ||||
krassowski
|
r27921 | def _operator_dunder_methods(self, dunder: str) -> Set[Callable]: | ||
krassowski
|
r27920 | if dunder not in self._operation_methods_cache: | ||
self._operation_methods_cache[dunder] = self._safe_get_methods( | ||||
self.allowed_operations, dunder | ||||
) | ||||
return self._operation_methods_cache[dunder] | ||||
krassowski
|
r27906 | @cached_property | ||
def _getitem_methods(self) -> Set[Callable]: | ||||
krassowski
|
r27913 | return self._safe_get_methods(self.allowed_getitem, "__getitem__") | ||
krassowski
|
r27906 | |||
@cached_property | ||||
def _getattr_methods(self) -> Set[Callable]: | ||||
krassowski
|
r27913 | return self._safe_get_methods(self.allowed_getattr, "__getattr__") | ||
krassowski
|
r27906 | |||
@cached_property | ||||
def _getattribute_methods(self) -> Set[Callable]: | ||||
krassowski
|
r27913 | return self._safe_get_methods(self.allowed_getattr, "__getattribute__") | ||
krassowski
|
r27906 | |||
def _safe_get_methods(self, classes, name) -> Set[Callable]: | ||||
return { | ||||
method | ||||
for class_ in classes | ||||
for method in [getattr(class_, name, None)] | ||||
if method | ||||
} | ||||
krassowski
|
r27918 | class _DummyNamedTuple(NamedTuple): | ||
krassowski
|
r27921 | """Used internally to retrieve methods of named tuple instance.""" | ||
krassowski
|
r27906 | |||
class EvaluationContext(NamedTuple): | ||||
krassowski
|
r27918 | #: Local namespace | ||
locals: dict | ||||
#: Global namespace | ||||
globals: dict | ||||
#: Evaluation policy identifier | ||||
krassowski
|
r27913 | evaluation: Literal[ | ||
krassowski
|
r27914 | "forbidden", "minimal", "limited", "unsafe", "dangerous" | ||
krassowski
|
r27913 | ] = "forbidden" | ||
krassowski
|
r27918 | #: Whether the evalution of code takes place inside of a subscript. | ||
#: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``. | ||||
krassowski
|
r27906 | in_subscript: bool = False | ||
krassowski
|
r27918 | class _IdentitySubscript: | ||
"""Returns the key itself when item is requested via subscript.""" | ||||
krassowski
|
r27906 | def __getitem__(self, key): | ||
return key | ||||
krassowski
|
r27913 | |||
krassowski
|
r27918 | IDENTITY_SUBSCRIPT = _IdentitySubscript() | ||
krassowski
|
r27913 | SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__" | ||
krassowski
|
r27906 | |||
krassowski
|
r27918 | class GuardRejection(Exception): | ||
"""Exception raised when guard rejects evaluation attempt.""" | ||||
krassowski
|
r27906 | pass | ||
krassowski
|
r27913 | def guarded_eval(code: str, context: EvaluationContext): | ||
krassowski
|
r27918 | """Evaluate provided code in the evaluation context. | ||
If evaluation policy given by context is set to ``forbidden`` | ||||
no evaluation will be performed; if it is set to ``dangerous`` | ||||
standard :func:`eval` will be used; finally, for any other, | ||||
policy :func:`eval_node` will be called on parsed AST. | ||||
""" | ||||
locals_ = context.locals | ||||
krassowski
|
r27906 | |||
krassowski
|
r27913 | if context.evaluation == "forbidden": | ||
raise GuardRejection("Forbidden mode") | ||||
krassowski
|
r27906 | |||
# note: not using `ast.literal_eval` as it does not implement | ||||
# getitem at all, for example it fails on simple `[0][1]` | ||||
if context.in_subscript: | ||||
# syntatic sugar for ellipsis (:) is only available in susbcripts | ||||
# so we need to trick the ast parser into thinking that we have | ||||
# a subscript, but we need to be able to later recognise that we did | ||||
# it so we can ignore the actual __getitem__ operation | ||||
if not code: | ||||
return tuple() | ||||
locals_ = locals_.copy() | ||||
locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT | ||||
krassowski
|
r27913 | code = SUBSCRIPT_MARKER + "[" + code + "]" | ||
krassowski
|
r27918 | context = EvaluationContext(**{**context._asdict(), **{"locals": locals_}}) | ||
krassowski
|
r27906 | |||
krassowski
|
r27913 | if context.evaluation == "dangerous": | ||
krassowski
|
r27918 | return eval(code, context.globals, context.locals) | ||
krassowski
|
r27906 | |||
krassowski
|
r27913 | expression = ast.parse(code, mode="eval") | ||
krassowski
|
r27906 | |||
return eval_node(expression, context) | ||||
krassowski
|
r27913 | |||
krassowski
|
r27920 | BINARY_OP_DUNDERS: Dict[Type[ast.operator], Tuple[str]] = { | ||
ast.Add: ("__add__",), | ||||
ast.Sub: ("__sub__",), | ||||
ast.Mult: ("__mul__",), | ||||
ast.Div: ("__truediv__",), | ||||
ast.FloorDiv: ("__floordiv__",), | ||||
ast.Mod: ("__mod__",), | ||||
ast.Pow: ("__pow__",), | ||||
ast.LShift: ("__lshift__",), | ||||
ast.RShift: ("__rshift__",), | ||||
ast.BitOr: ("__or__",), | ||||
ast.BitXor: ("__xor__",), | ||||
ast.BitAnd: ("__and__",), | ||||
ast.MatMult: ("__matmul__",), | ||||
} | ||||
COMP_OP_DUNDERS: Dict[Type[ast.cmpop], Tuple[str, ...]] = { | ||||
ast.Eq: ("__eq__",), | ||||
ast.NotEq: ("__ne__", "__eq__"), | ||||
ast.Lt: ("__lt__", "__gt__"), | ||||
ast.LtE: ("__le__", "__ge__"), | ||||
ast.Gt: ("__gt__", "__lt__"), | ||||
ast.GtE: ("__ge__", "__le__"), | ||||
ast.In: ("__contains__",), | ||||
# Note: ast.Is, ast.IsNot, ast.NotIn are handled specially | ||||
} | ||||
UNARY_OP_DUNDERS: Dict[Type[ast.unaryop], Tuple[str, ...]] = { | ||||
ast.USub: ("__neg__",), | ||||
ast.UAdd: ("__pos__",), | ||||
# we have to check both __inv__ and __invert__! | ||||
ast.Invert: ("__invert__", "__inv__"), | ||||
ast.Not: ("__not__",), | ||||
} | ||||
def _find_dunder(node_op, dunders) -> Union[Tuple[str, ...], None]: | ||||
dunder = None | ||||
for op, candidate_dunder in dunders.items(): | ||||
if isinstance(node_op, op): | ||||
dunder = candidate_dunder | ||||
return dunder | ||||
krassowski
|
r27906 | def eval_node(node: Union[ast.AST, None], context: EvaluationContext): | ||
krassowski
|
r27918 | """Evaluate AST node in provided context. | ||
krassowski
|
r27906 | |||
krassowski
|
r27918 | Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments. | ||
krassowski
|
r27906 | |||
krassowski
|
r27918 | Does not evaluate actions that always have side effects: | ||
krassowski
|
r27906 | |||
krassowski
|
r27912 | - class definitions (``class sth: ...``) | ||
- function definitions (``def sth: ...``) | ||||
- variable assignments (``x = 1``) | ||||
krassowski
|
r27914 | - augmented assignments (``x += 1``) | ||
krassowski
|
r27912 | - deletions (``del x``) | ||
krassowski
|
r27906 | |||
Does not evaluate operations which do not return values: | ||||
krassowski
|
r27918 | |||
krassowski
|
r27912 | - assertions (``assert x``) | ||
- pass (``pass``) | ||||
- imports (``import x``) | ||||
krassowski
|
r27918 | - control flow: | ||
- conditionals (``if x:``) except for ternary IfExp (``a if x else b``) | ||||
- loops (``for`` and `while``) | ||||
- exception handling | ||||
krassowski
|
r27912 | |||
The purpose of this function is to guard against unwanted side-effects; | ||||
it does not give guarantees on protection from malicious code execution. | ||||
krassowski
|
r27906 | """ | ||
policy = EVALUATION_POLICIES[context.evaluation] | ||||
if node is None: | ||||
return None | ||||
if isinstance(node, ast.Expression): | ||||
return eval_node(node.body, context) | ||||
if isinstance(node, ast.BinOp): | ||||
left = eval_node(node.left, context) | ||||
right = eval_node(node.right, context) | ||||
krassowski
|
r27920 | dunders = _find_dunder(node.op, BINARY_OP_DUNDERS) | ||
if dunders: | ||||
if policy.can_operate(dunders, left, right): | ||||
return getattr(left, dunders[0])(right) | ||||
else: | ||||
raise GuardRejection( | ||||
f"Operation (`{dunders}`) for", | ||||
type(left), | ||||
f"not allowed in {context.evaluation} mode", | ||||
) | ||||
if isinstance(node, ast.Compare): | ||||
left = eval_node(node.left, context) | ||||
all_true = True | ||||
negate = False | ||||
for op, right in zip(node.ops, node.comparators): | ||||
right = eval_node(right, context) | ||||
dunder = None | ||||
dunders = _find_dunder(op, COMP_OP_DUNDERS) | ||||
if not dunders: | ||||
if isinstance(op, ast.NotIn): | ||||
dunders = COMP_OP_DUNDERS[ast.In] | ||||
negate = True | ||||
if isinstance(op, ast.Is): | ||||
dunder = "is_" | ||||
if isinstance(op, ast.IsNot): | ||||
dunder = "is_" | ||||
negate = True | ||||
if not dunder and dunders: | ||||
dunder = dunders[0] | ||||
if dunder: | ||||
a, b = (right, left) if dunder == "__contains__" else (left, right) | ||||
if dunder == "is_" or dunders and policy.can_operate(dunders, a, b): | ||||
result = getattr(operator, dunder)(a, b) | ||||
if negate: | ||||
result = not result | ||||
if not result: | ||||
all_true = False | ||||
left = right | ||||
else: | ||||
raise GuardRejection( | ||||
f"Comparison (`{dunder}`) for", | ||||
type(left), | ||||
f"not allowed in {context.evaluation} mode", | ||||
) | ||||
else: | ||||
krassowski
|
r27921 | raise ValueError( | ||
f"Comparison `{dunder}` not supported" | ||||
) # pragma: no cover | ||||
krassowski
|
r27920 | return all_true | ||
krassowski
|
r27906 | if isinstance(node, ast.Constant): | ||
return node.value | ||||
if isinstance(node, ast.Index): | ||||
krassowski
|
r27921 | # deprecated since Python 3.9 | ||
return eval_node(node.value, context) # pragma: no cover | ||||
krassowski
|
r27906 | if isinstance(node, ast.Tuple): | ||
krassowski
|
r27913 | return tuple(eval_node(e, context) for e in node.elts) | ||
krassowski
|
r27906 | if isinstance(node, ast.List): | ||
krassowski
|
r27913 | return [eval_node(e, context) for e in node.elts] | ||
krassowski
|
r27906 | if isinstance(node, ast.Set): | ||
krassowski
|
r27913 | return {eval_node(e, context) for e in node.elts} | ||
krassowski
|
r27906 | if isinstance(node, ast.Dict): | ||
krassowski
|
r27913 | return dict( | ||
zip( | ||||
[eval_node(k, context) for k in node.keys], | ||||
[eval_node(v, context) for v in node.values], | ||||
) | ||||
) | ||||
krassowski
|
r27906 | if isinstance(node, ast.Slice): | ||
return slice( | ||||
eval_node(node.lower, context), | ||||
eval_node(node.upper, context), | ||||
krassowski
|
r27913 | eval_node(node.step, context), | ||
krassowski
|
r27906 | ) | ||
if isinstance(node, ast.ExtSlice): | ||||
krassowski
|
r27921 | # deprecated since Python 3.9 | ||
return tuple([eval_node(dim, context) for dim in node.dims]) # pragma: no cover | ||||
krassowski
|
r27906 | if isinstance(node, ast.UnaryOp): | ||
value = eval_node(node.operand, context) | ||||
krassowski
|
r27920 | dunders = _find_dunder(node.op, UNARY_OP_DUNDERS) | ||
if dunders: | ||||
if policy.can_operate(dunders, value): | ||||
return getattr(value, dunders[0])() | ||||
else: | ||||
raise GuardRejection( | ||||
f"Operation (`{dunders}`) for", | ||||
type(value), | ||||
f"not allowed in {context.evaluation} mode", | ||||
) | ||||
krassowski
|
r27906 | if isinstance(node, ast.Subscript): | ||
value = eval_node(node.value, context) | ||||
slice_ = eval_node(node.slice, context) | ||||
if policy.can_get_item(value, slice_): | ||||
return value[slice_] | ||||
raise GuardRejection( | ||||
krassowski
|
r27913 | "Subscript access (`__getitem__`) for", | ||
type(value), # not joined to avoid calling `repr` | ||||
f" not allowed in {context.evaluation} mode", | ||||
krassowski
|
r27906 | ) | ||
if isinstance(node, ast.Name): | ||||
krassowski
|
r27918 | if policy.allow_locals_access and node.id in context.locals: | ||
return context.locals[node.id] | ||||
if policy.allow_globals_access and node.id in context.globals: | ||||
return context.globals[node.id] | ||||
krassowski
|
r27915 | if policy.allow_builtins_access and hasattr(builtins, node.id): | ||
krassowski
|
r27921 | # note: do not use __builtins__, it is implementation detail of cPython | ||
krassowski
|
r27915 | return getattr(builtins, node.id) | ||
krassowski
|
r27906 | if not policy.allow_globals_access and not policy.allow_locals_access: | ||
raise GuardRejection( | ||||
krassowski
|
r27913 | f"Namespace access not allowed in {context.evaluation} mode" | ||
krassowski
|
r27906 | ) | ||
else: | ||||
krassowski
|
r27921 | raise NameError(f"{node.id} not found in locals, globals, nor builtins") | ||
krassowski
|
r27906 | if isinstance(node, ast.Attribute): | ||
value = eval_node(node.value, context) | ||||
if policy.can_get_attr(value, node.attr): | ||||
return getattr(value, node.attr) | ||||
raise GuardRejection( | ||||
krassowski
|
r27913 | "Attribute access (`__getattr__`) for", | ||
type(value), # not joined to avoid calling `repr` | ||||
f"not allowed in {context.evaluation} mode", | ||||
krassowski
|
r27906 | ) | ||
if isinstance(node, ast.IfExp): | ||||
test = eval_node(node.test, context) | ||||
if test: | ||||
krassowski
|
r27913 | return eval_node(node.body, context) | ||
krassowski
|
r27906 | else: | ||
return eval_node(node.orelse, context) | ||||
if isinstance(node, ast.Call): | ||||
func = eval_node(node.func, context) | ||||
if policy.can_call(func) and not node.keywords: | ||||
krassowski
|
r27913 | args = [eval_node(arg, context) for arg in node.args] | ||
krassowski
|
r27906 | return func(*args) | ||
raise GuardRejection( | ||||
krassowski
|
r27913 | "Call for", | ||
func, # not joined to avoid calling `repr` | ||||
f"not allowed in {context.evaluation} mode", | ||||
krassowski
|
r27906 | ) | ||
krassowski
|
r27921 | raise ValueError("Unhandled node", ast.dump(node)) | ||
krassowski
|
r27906 | |||
SUPPORTED_EXTERNAL_GETITEM = { | ||||
krassowski
|
r27913 | ("pandas", "core", "indexing", "_iLocIndexer"), | ||
("pandas", "core", "indexing", "_LocIndexer"), | ||||
("pandas", "DataFrame"), | ||||
("pandas", "Series"), | ||||
("numpy", "ndarray"), | ||||
("numpy", "void"), | ||||
krassowski
|
r27906 | } | ||
krassowski
|
r27921 | |||
krassowski
|
r27915 | BUILTIN_GETITEM: Set[InstancesHaveGetItem] = { | ||
krassowski
|
r27906 | dict, | ||
Matthias Bussonnier
|
r28091 | str, # type: ignore[arg-type] | ||
bytes, # type: ignore[arg-type] | ||||
krassowski
|
r27906 | list, | ||
tuple, | ||||
collections.defaultdict, | ||||
collections.deque, | ||||
collections.OrderedDict, | ||||
collections.ChainMap, | ||||
collections.UserDict, | ||||
collections.UserList, | ||||
Matthias Bussonnier
|
r28091 | collections.UserString, # type: ignore[arg-type] | ||
krassowski
|
r27918 | _DummyNamedTuple, | ||
_IdentitySubscript, | ||||
krassowski
|
r27906 | } | ||
def _list_methods(cls, source=None): | ||||
"""For use on immutable objects or with methods returning a copy""" | ||||
krassowski
|
r27913 | return [getattr(cls, k) for k in (source if source else dir(cls))] | ||
krassowski
|
r27906 | |||
krassowski
|
r27913 | dict_non_mutating_methods = ("copy", "keys", "values", "items") | ||
list_non_mutating_methods = ("copy", "index", "count") | ||||
krassowski
|
r27906 | set_non_mutating_methods = set(dir(set)) & set(dir(frozenset)) | ||
krassowski
|
r27915 | dict_keys: Type[collections.abc.KeysView] = type({}.keys()) | ||
method_descriptor: Any = type(list.copy) | ||||
krassowski
|
r27906 | |||
krassowski
|
r27921 | NUMERICS = {int, float, complex} | ||
krassowski
|
r27906 | ALLOWED_CALLS = { | ||
bytes, | ||||
*_list_methods(bytes), | ||||
dict, | ||||
*_list_methods(dict, dict_non_mutating_methods), | ||||
dict_keys.isdisjoint, | ||||
list, | ||||
*_list_methods(list, list_non_mutating_methods), | ||||
set, | ||||
*_list_methods(set, set_non_mutating_methods), | ||||
frozenset, | ||||
*_list_methods(frozenset), | ||||
range, | ||||
str, | ||||
*_list_methods(str), | ||||
tuple, | ||||
*_list_methods(tuple), | ||||
krassowski
|
r27921 | *NUMERICS, | ||
*[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)], | ||||
krassowski
|
r27906 | collections.deque, | ||
*_list_methods(collections.deque, list_non_mutating_methods), | ||||
collections.defaultdict, | ||||
*_list_methods(collections.defaultdict, dict_non_mutating_methods), | ||||
collections.OrderedDict, | ||||
*_list_methods(collections.OrderedDict, dict_non_mutating_methods), | ||||
collections.UserDict, | ||||
*_list_methods(collections.UserDict, dict_non_mutating_methods), | ||||
collections.UserList, | ||||
*_list_methods(collections.UserList, list_non_mutating_methods), | ||||
collections.UserString, | ||||
*_list_methods(collections.UserString, dir(str)), | ||||
collections.Counter, | ||||
*_list_methods(collections.Counter, dict_non_mutating_methods), | ||||
collections.Counter.elements, | ||||
krassowski
|
r27913 | collections.Counter.most_common, | ||
krassowski
|
r27906 | } | ||
krassowski
|
r27915 | BUILTIN_GETATTR: Set[MayHaveGetattr] = { | ||
*BUILTIN_GETITEM, | ||||
set, | ||||
frozenset, | ||||
object, | ||||
type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`. | ||||
krassowski
|
r27921 | *NUMERICS, | ||
krassowski
|
r27915 | dict_keys, | ||
method_descriptor, | ||||
} | ||||
krassowski
|
r27920 | |||
krassowski
|
r27921 | BUILTIN_OPERATIONS = {*BUILTIN_GETATTR} | ||
krassowski
|
r27920 | |||
krassowski
|
r27906 | EVALUATION_POLICIES = { | ||
krassowski
|
r27913 | "minimal": EvaluationPolicy( | ||
krassowski
|
r27906 | allow_builtins_access=True, | ||
allow_locals_access=False, | ||||
allow_globals_access=False, | ||||
allow_item_access=False, | ||||
allow_attr_access=False, | ||||
allowed_calls=set(), | ||||
krassowski
|
r27913 | allow_any_calls=False, | ||
krassowski
|
r27920 | allow_all_operations=False, | ||
krassowski
|
r27906 | ), | ||
krassowski
|
r27914 | "limited": SelectivePolicy( | ||
krassowski
|
r27906 | allowed_getitem=BUILTIN_GETITEM, | ||
allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM, | ||||
krassowski
|
r27915 | allowed_getattr=BUILTIN_GETATTR, | ||
krassowski
|
r27906 | allowed_getattr_external={ | ||
# pandas Series/Frame implements custom `__getattr__` | ||||
krassowski
|
r27913 | ("pandas", "DataFrame"), | ||
("pandas", "Series"), | ||||
krassowski
|
r27906 | }, | ||
krassowski
|
r27920 | allowed_operations=BUILTIN_OPERATIONS, | ||
krassowski
|
r27906 | allow_builtins_access=True, | ||
allow_locals_access=True, | ||||
allow_globals_access=True, | ||||
krassowski
|
r27913 | allowed_calls=ALLOWED_CALLS, | ||
krassowski
|
r27906 | ), | ||
krassowski
|
r27913 | "unsafe": EvaluationPolicy( | ||
krassowski
|
r27906 | allow_builtins_access=True, | ||
allow_locals_access=True, | ||||
allow_globals_access=True, | ||||
allow_attr_access=True, | ||||
allow_item_access=True, | ||||
krassowski
|
r27913 | allow_any_calls=True, | ||
krassowski
|
r27920 | allow_all_operations=True, | ||
krassowski
|
r27913 | ), | ||
} | ||||
krassowski
|
r27918 | |||
__all__ = [ | ||||
"guarded_eval", | ||||
"eval_node", | ||||
"GuardRejection", | ||||
"EvaluationContext", | ||||
"_unbind_method", | ||||
] | ||||