guarded_eval.py
895 lines
| 28.7 KiB
| text/x-python
|
PythonLexer
krassowski
|
r28675 | from inspect import isclass, signature, Signature | ||
krassowski
|
r27915 | from typing import ( | ||
krassowski
|
r28680 | Annotated, | ||
AnyStr, | ||||
krassowski
|
r27915 | Callable, | ||
krassowski
|
r27920 | Dict, | ||
krassowski
|
r28678 | Literal, | ||
NamedTuple, | ||||
NewType, | ||||
krassowski
|
r28680 | Optional, | ||
Protocol, | ||||
krassowski
|
r27915 | Set, | ||
krassowski
|
r27926 | Sequence, | ||
krassowski
|
r27915 | Tuple, | ||
Type, | ||||
krassowski
|
r28680 | TypeGuard, | ||
krassowski
|
r27915 | Union, | ||
krassowski
|
r28678 | get_args, | ||
get_origin, | ||||
krassowski
|
r28680 | is_typeddict, | ||
krassowski
|
r28678 | ) | ||
krassowski
|
r27920 | import ast | ||
krassowski
|
r27915 | import builtins | ||
krassowski
|
r27906 | import collections | ||
krassowski
|
r27920 | import operator | ||
krassowski
|
r27906 | import sys | ||
from functools import cached_property | ||||
from dataclasses import dataclass, field | ||||
Carlos Cordoba
|
r28272 | from types import MethodDescriptorType, ModuleType | ||
krassowski
|
r27906 | |||
krassowski
|
r28678 | from IPython.utils.decorators import undoc | ||
krassowski
|
r27912 | |||
krassowski
|
r27906 | |||
krassowski
|
r28679 | if sys.version_info < (3, 11): | ||
krassowski
|
r28680 | from typing_extensions import Self, LiteralString | ||
krassowski
|
r28679 | else: | ||
krassowski
|
r28680 | from typing import Self, LiteralString | ||
krassowski
|
r28679 | |||
if sys.version_info < (3, 12): | ||||
from typing_extensions import TypeAliasType | ||||
else: | ||||
from typing import TypeAliasType | ||||
krassowski
|
r27918 | @undoc | ||
krassowski
|
r27906 | class HasGetItem(Protocol): | ||
M Bussonnier
|
r28947 | def __getitem__(self, key) -> None: ... | ||
krassowski
|
r27906 | |||
krassowski
|
r27918 | @undoc | ||
krassowski
|
r27906 | class InstancesHaveGetItem(Protocol): | ||
M Bussonnier
|
r28947 | def __call__(self, *args, **kwargs) -> HasGetItem: ... | ||
krassowski
|
r27906 | |||
krassowski
|
r27918 | @undoc | ||
krassowski
|
r27906 | class HasGetAttr(Protocol): | ||
M Bussonnier
|
r28947 | def __getattr__(self, key) -> None: ... | ||
krassowski
|
r27906 | |||
krassowski
|
r27918 | @undoc | ||
krassowski
|
r27906 | class DoesNotHaveGetAttr(Protocol): | ||
pass | ||||
krassowski
|
r27913 | |||
krassowski
|
r27906 | # By default `__getattr__` is not explicitly implemented on most objects | ||
MayHaveGetattr = Union[HasGetAttr, DoesNotHaveGetAttr] | ||||
krassowski
|
r27918 | def _unbind_method(func: Callable) -> Union[Callable, None]: | ||
krassowski
|
r27906 | """Get unbound method for given bound method. | ||
krassowski
|
r27929 | Returns None if cannot get unbound method, or method is already unbound. | ||
""" | ||||
krassowski
|
r27913 | owner = getattr(func, "__self__", None) | ||
krassowski
|
r27906 | owner_class = type(owner) | ||
krassowski
|
r27913 | name = getattr(func, "__name__", None) | ||
instance_dict_overrides = getattr(owner, "__dict__", None) | ||||
krassowski
|
r27906 | if ( | ||
owner is not None | ||||
krassowski
|
r27913 | and name | ||
and ( | ||||
krassowski
|
r27906 | not instance_dict_overrides | ||
krassowski
|
r27913 | or (instance_dict_overrides and name not in instance_dict_overrides) | ||
krassowski
|
r27906 | ) | ||
): | ||||
return getattr(owner_class, name) | ||||
krassowski
|
r27915 | return None | ||
krassowski
|
r27906 | |||
krassowski
|
r27918 | @undoc | ||
krassowski
|
r27906 | @dataclass | ||
class EvaluationPolicy: | ||||
krassowski
|
r27918 | """Definition of evaluation policy.""" | ||
krassowski
|
r27906 | allow_locals_access: bool = False | ||
allow_globals_access: bool = False | ||||
allow_item_access: bool = False | ||||
allow_attr_access: bool = False | ||||
allow_builtins_access: bool = False | ||||
krassowski
|
r27920 | allow_all_operations: bool = False | ||
krassowski
|
r27906 | allow_any_calls: bool = False | ||
allowed_calls: Set[Callable] = field(default_factory=set) | ||||
def can_get_item(self, value, item): | ||||
return self.allow_item_access | ||||
def can_get_attr(self, value, attr): | ||||
return self.allow_attr_access | ||||
krassowski
|
r27920 | def can_operate(self, dunders: Tuple[str, ...], a, b=None): | ||
if self.allow_all_operations: | ||||
return True | ||||
krassowski
|
r27906 | def can_call(self, func): | ||
if self.allow_any_calls: | ||||
return True | ||||
if func in self.allowed_calls: | ||||
return True | ||||
krassowski
|
r27918 | owner_method = _unbind_method(func) | ||
krassowski
|
r27921 | |||
krassowski
|
r27906 | if owner_method and owner_method in self.allowed_calls: | ||
return True | ||||
krassowski
|
r27913 | |||
krassowski
|
r27926 | def _get_external(module_name: str, access_path: Sequence[str]): | ||
"""Get value from external module given a dotted access path. | ||||
Raises: | ||||
* `KeyError` if module is removed not found, and | ||||
Andrew Kreimer
|
r28889 | * `AttributeError` if access path does not match an exported object | ||
krassowski
|
r27926 | """ | ||
member_type = sys.modules[module_name] | ||||
for attr in access_path: | ||||
member_type = getattr(member_type, attr) | ||||
return member_type | ||||
krassowski
|
r27918 | def _has_original_dunder_external( | ||
krassowski
|
r27913 | value, | ||
krassowski
|
r27926 | module_name: str, | ||
access_path: Sequence[str], | ||||
method_name: str, | ||||
krassowski
|
r27913 | ): | ||
krassowski
|
r27926 | if module_name not in sys.modules: | ||
# LBYLB as it is faster | ||||
return False | ||||
krassowski
|
r27906 | try: | ||
krassowski
|
r27926 | member_type = _get_external(module_name, access_path) | ||
krassowski
|
r27906 | value_type = type(value) | ||
if type(value) == member_type: | ||||
return True | ||||
krassowski
|
r27921 | if method_name == "__getattribute__": | ||
# we have to short-circuit here due to an unresolved issue in | ||||
# `isinstance` implementation: https://bugs.python.org/issue32683 | ||||
return False | ||||
krassowski
|
r27906 | if isinstance(value, member_type): | ||
method = getattr(value_type, method_name, None) | ||||
member_method = getattr(member_type, method_name, None) | ||||
if member_method == method: | ||||
return True | ||||
except (AttributeError, KeyError): | ||||
return False | ||||
krassowski
|
r27918 | def _has_original_dunder( | ||
krassowski
|
r27913 | value, allowed_types, allowed_methods, allowed_external, method_name | ||
krassowski
|
r27906 | ): | ||
# note: Python ignores `__getattr__`/`__getitem__` on instances, | ||||
# we only need to check at class level | ||||
value_type = type(value) | ||||
# strict type check passes → no need to check method | ||||
if value_type in allowed_types: | ||||
return True | ||||
method = getattr(value_type, method_name, None) | ||||
krassowski
|
r27921 | if method is None: | ||
krassowski
|
r27906 | return None | ||
if method in allowed_methods: | ||||
return True | ||||
for module_name, *access_path in allowed_external: | ||||
krassowski
|
r27918 | if _has_original_dunder_external(value, module_name, access_path, method_name): | ||
krassowski
|
r27906 | return True | ||
return False | ||||
krassowski
|
r27918 | @undoc | ||
krassowski
|
r27906 | @dataclass | ||
class SelectivePolicy(EvaluationPolicy): | ||||
krassowski
|
r27915 | allowed_getitem: Set[InstancesHaveGetItem] = field(default_factory=set) | ||
krassowski
|
r27906 | allowed_getitem_external: Set[Tuple[str, ...]] = field(default_factory=set) | ||
krassowski
|
r27920 | |||
krassowski
|
r27906 | allowed_getattr: Set[MayHaveGetattr] = field(default_factory=set) | ||
allowed_getattr_external: Set[Tuple[str, ...]] = field(default_factory=set) | ||||
krassowski
|
r27920 | allowed_operations: Set = field(default_factory=set) | ||
allowed_operations_external: Set[Tuple[str, ...]] = field(default_factory=set) | ||||
_operation_methods_cache: Dict[str, Set[Callable]] = field( | ||||
default_factory=dict, init=False | ||||
) | ||||
krassowski
|
r27906 | def can_get_attr(self, value, attr): | ||
krassowski
|
r27918 | has_original_attribute = _has_original_dunder( | ||
krassowski
|
r27906 | value, | ||
allowed_types=self.allowed_getattr, | ||||
allowed_methods=self._getattribute_methods, | ||||
allowed_external=self.allowed_getattr_external, | ||||
krassowski
|
r27913 | method_name="__getattribute__", | ||
krassowski
|
r27906 | ) | ||
krassowski
|
r27918 | has_original_attr = _has_original_dunder( | ||
krassowski
|
r27906 | value, | ||
allowed_types=self.allowed_getattr, | ||||
allowed_methods=self._getattr_methods, | ||||
allowed_external=self.allowed_getattr_external, | ||||
krassowski
|
r27913 | method_name="__getattr__", | ||
krassowski
|
r27906 | ) | ||
krassowski
|
r27921 | |||
krassowski
|
r27926 | accept = False | ||
krassowski
|
r27929 | # Many objects do not have `__getattr__`, this is fine. | ||
krassowski
|
r27906 | if has_original_attr is None and has_original_attribute: | ||
krassowski
|
r27926 | accept = True | ||
else: | ||||
# Accept objects without modifications to `__getattr__` and `__getattribute__` | ||||
accept = has_original_attr and has_original_attribute | ||||
krassowski
|
r27906 | |||
krassowski
|
r27926 | if accept: | ||
Andrew Kreimer
|
r28889 | # We still need to check for overridden properties. | ||
krassowski
|
r27926 | |||
value_class = type(value) | ||||
if not hasattr(value_class, attr): | ||||
return True | ||||
class_attr_val = getattr(value_class, attr) | ||||
is_property = isinstance(class_attr_val, property) | ||||
if not is_property: | ||||
return True | ||||
krassowski
|
r27929 | # Properties in allowed types are ok (although we do not include any | ||
# properties in our default allow list currently). | ||||
krassowski
|
r27926 | if type(value) in self.allowed_getattr: | ||
krassowski
|
r27929 | return True # pragma: no cover | ||
krassowski
|
r27926 | |||
# Properties in subclasses of allowed types may be ok if not changed | ||||
for module_name, *access_path in self.allowed_getattr_external: | ||||
try: | ||||
external_class = _get_external(module_name, access_path) | ||||
external_class_attr_val = getattr(external_class, attr) | ||||
except (KeyError, AttributeError): | ||||
return False # pragma: no cover | ||||
return class_attr_val == external_class_attr_val | ||||
return False | ||||
krassowski
|
r27906 | |||
def can_get_item(self, value, item): | ||||
"""Allow accessing `__getiitem__` of allow-listed instances unless it was not modified.""" | ||||
krassowski
|
r27918 | return _has_original_dunder( | ||
krassowski
|
r27906 | value, | ||
allowed_types=self.allowed_getitem, | ||||
allowed_methods=self._getitem_methods, | ||||
allowed_external=self.allowed_getitem_external, | ||||
krassowski
|
r27913 | method_name="__getitem__", | ||
krassowski
|
r27906 | ) | ||
krassowski
|
r27920 | def can_operate(self, dunders: Tuple[str, ...], a, b=None): | ||
krassowski
|
r27921 | objects = [a] | ||
if b is not None: | ||||
objects.append(b) | ||||
krassowski
|
r27920 | return all( | ||
[ | ||||
_has_original_dunder( | ||||
krassowski
|
r27921 | obj, | ||
krassowski
|
r27920 | allowed_types=self.allowed_operations, | ||
krassowski
|
r27921 | allowed_methods=self._operator_dunder_methods(dunder), | ||
krassowski
|
r27920 | allowed_external=self.allowed_operations_external, | ||
method_name=dunder, | ||||
) | ||||
for dunder in dunders | ||||
krassowski
|
r27921 | for obj in objects | ||
krassowski
|
r27920 | ] | ||
) | ||||
krassowski
|
r27921 | def _operator_dunder_methods(self, dunder: str) -> Set[Callable]: | ||
krassowski
|
r27920 | if dunder not in self._operation_methods_cache: | ||
self._operation_methods_cache[dunder] = self._safe_get_methods( | ||||
self.allowed_operations, dunder | ||||
) | ||||
return self._operation_methods_cache[dunder] | ||||
krassowski
|
r27906 | @cached_property | ||
def _getitem_methods(self) -> Set[Callable]: | ||||
krassowski
|
r27913 | return self._safe_get_methods(self.allowed_getitem, "__getitem__") | ||
krassowski
|
r27906 | |||
@cached_property | ||||
def _getattr_methods(self) -> Set[Callable]: | ||||
krassowski
|
r27913 | return self._safe_get_methods(self.allowed_getattr, "__getattr__") | ||
krassowski
|
r27906 | |||
@cached_property | ||||
def _getattribute_methods(self) -> Set[Callable]: | ||||
krassowski
|
r27913 | return self._safe_get_methods(self.allowed_getattr, "__getattribute__") | ||
krassowski
|
r27906 | |||
def _safe_get_methods(self, classes, name) -> Set[Callable]: | ||||
return { | ||||
method | ||||
for class_ in classes | ||||
for method in [getattr(class_, name, None)] | ||||
if method | ||||
} | ||||
krassowski
|
r27918 | class _DummyNamedTuple(NamedTuple): | ||
krassowski
|
r27921 | """Used internally to retrieve methods of named tuple instance.""" | ||
krassowski
|
r27906 | |||
class EvaluationContext(NamedTuple): | ||||
krassowski
|
r27918 | #: Local namespace | ||
locals: dict | ||||
#: Global namespace | ||||
globals: dict | ||||
#: Evaluation policy identifier | ||||
M Bussonnier
|
r28947 | evaluation: Literal["forbidden", "minimal", "limited", "unsafe", "dangerous"] = ( | ||
"forbidden" | ||||
) | ||||
Andrew Kreimer
|
r28889 | #: Whether the evaluation of code takes place inside of a subscript. | ||
krassowski
|
r27918 | #: Useful for evaluating ``:-1, 'col'`` in ``df[:-1, 'col']``. | ||
krassowski
|
r27906 | in_subscript: bool = False | ||
krassowski
|
r27918 | class _IdentitySubscript: | ||
"""Returns the key itself when item is requested via subscript.""" | ||||
krassowski
|
r27906 | def __getitem__(self, key): | ||
return key | ||||
krassowski
|
r27913 | |||
krassowski
|
r27918 | IDENTITY_SUBSCRIPT = _IdentitySubscript() | ||
krassowski
|
r27913 | SUBSCRIPT_MARKER = "__SUBSCRIPT_SENTINEL__" | ||
krassowski
|
r28432 | UNKNOWN_SIGNATURE = Signature() | ||
krassowski
|
r28675 | NOT_EVALUATED = object() | ||
krassowski
|
r27913 | |||
krassowski
|
r27906 | |||
krassowski
|
r27918 | class GuardRejection(Exception): | ||
"""Exception raised when guard rejects evaluation attempt.""" | ||||
krassowski
|
r27906 | pass | ||
krassowski
|
r27913 | def guarded_eval(code: str, context: EvaluationContext): | ||
krassowski
|
r27918 | """Evaluate provided code in the evaluation context. | ||
If evaluation policy given by context is set to ``forbidden`` | ||||
no evaluation will be performed; if it is set to ``dangerous`` | ||||
standard :func:`eval` will be used; finally, for any other, | ||||
policy :func:`eval_node` will be called on parsed AST. | ||||
""" | ||||
locals_ = context.locals | ||||
krassowski
|
r27906 | |||
krassowski
|
r27913 | if context.evaluation == "forbidden": | ||
raise GuardRejection("Forbidden mode") | ||||
krassowski
|
r27906 | |||
# note: not using `ast.literal_eval` as it does not implement | ||||
# getitem at all, for example it fails on simple `[0][1]` | ||||
if context.in_subscript: | ||||
Andrew Kreimer
|
r28889 | # syntactic sugar for ellipsis (:) is only available in subscripts | ||
krassowski
|
r27906 | # so we need to trick the ast parser into thinking that we have | ||
# a subscript, but we need to be able to later recognise that we did | ||||
# it so we can ignore the actual __getitem__ operation | ||||
if not code: | ||||
return tuple() | ||||
locals_ = locals_.copy() | ||||
locals_[SUBSCRIPT_MARKER] = IDENTITY_SUBSCRIPT | ||||
krassowski
|
r27913 | code = SUBSCRIPT_MARKER + "[" + code + "]" | ||
krassowski
|
r27918 | context = EvaluationContext(**{**context._asdict(), **{"locals": locals_}}) | ||
krassowski
|
r27906 | |||
krassowski
|
r27913 | if context.evaluation == "dangerous": | ||
krassowski
|
r27918 | return eval(code, context.globals, context.locals) | ||
krassowski
|
r27906 | |||
krassowski
|
r27913 | expression = ast.parse(code, mode="eval") | ||
krassowski
|
r27906 | |||
return eval_node(expression, context) | ||||
krassowski
|
r27913 | |||
krassowski
|
r27920 | BINARY_OP_DUNDERS: Dict[Type[ast.operator], Tuple[str]] = { | ||
ast.Add: ("__add__",), | ||||
ast.Sub: ("__sub__",), | ||||
ast.Mult: ("__mul__",), | ||||
ast.Div: ("__truediv__",), | ||||
ast.FloorDiv: ("__floordiv__",), | ||||
ast.Mod: ("__mod__",), | ||||
ast.Pow: ("__pow__",), | ||||
ast.LShift: ("__lshift__",), | ||||
ast.RShift: ("__rshift__",), | ||||
ast.BitOr: ("__or__",), | ||||
ast.BitXor: ("__xor__",), | ||||
ast.BitAnd: ("__and__",), | ||||
ast.MatMult: ("__matmul__",), | ||||
} | ||||
COMP_OP_DUNDERS: Dict[Type[ast.cmpop], Tuple[str, ...]] = { | ||||
ast.Eq: ("__eq__",), | ||||
ast.NotEq: ("__ne__", "__eq__"), | ||||
ast.Lt: ("__lt__", "__gt__"), | ||||
ast.LtE: ("__le__", "__ge__"), | ||||
ast.Gt: ("__gt__", "__lt__"), | ||||
ast.GtE: ("__ge__", "__le__"), | ||||
ast.In: ("__contains__",), | ||||
# Note: ast.Is, ast.IsNot, ast.NotIn are handled specially | ||||
} | ||||
UNARY_OP_DUNDERS: Dict[Type[ast.unaryop], Tuple[str, ...]] = { | ||||
ast.USub: ("__neg__",), | ||||
ast.UAdd: ("__pos__",), | ||||
# we have to check both __inv__ and __invert__! | ||||
ast.Invert: ("__invert__", "__inv__"), | ||||
ast.Not: ("__not__",), | ||||
} | ||||
krassowski
|
r28680 | class ImpersonatingDuck: | ||
krassowski
|
r28431 | """A dummy class used to create objects of other classes without calling their ``__init__``""" | ||
krassowski
|
r28430 | |||
krassowski
|
r28680 | # no-op: override __class__ to impersonate | ||
class _Duck: | ||||
"""A dummy class used to create objects pretending to have given attributes""" | ||||
def __init__(self, attributes: Optional[dict] = None, items: Optional[dict] = None): | ||||
self.attributes = attributes or {} | ||||
self.items = items or {} | ||||
def __getattr__(self, attr: str): | ||||
return self.attributes[attr] | ||||
def __hasattr__(self, attr: str): | ||||
return attr in self.attributes | ||||
def __dir__(self): | ||||
return [*dir(super), *self.attributes] | ||||
def __getitem__(self, key: str): | ||||
return self.items[key] | ||||
def __hasitem__(self, key: str): | ||||
return self.items[key] | ||||
def _ipython_key_completions_(self): | ||||
return self.items.keys() | ||||
krassowski
|
r28430 | |||
krassowski
|
r27920 | def _find_dunder(node_op, dunders) -> Union[Tuple[str, ...], None]: | ||
dunder = None | ||||
for op, candidate_dunder in dunders.items(): | ||||
if isinstance(node_op, op): | ||||
dunder = candidate_dunder | ||||
return dunder | ||||
krassowski
|
r27906 | def eval_node(node: Union[ast.AST, None], context: EvaluationContext): | ||
krassowski
|
r27918 | """Evaluate AST node in provided context. | ||
krassowski
|
r27906 | |||
krassowski
|
r27918 | Applies evaluation restrictions defined in the context. Currently does not support evaluation of functions with keyword arguments. | ||
krassowski
|
r27906 | |||
krassowski
|
r27918 | Does not evaluate actions that always have side effects: | ||
krassowski
|
r27906 | |||
krassowski
|
r27912 | - class definitions (``class sth: ...``) | ||
- function definitions (``def sth: ...``) | ||||
- variable assignments (``x = 1``) | ||||
krassowski
|
r27914 | - augmented assignments (``x += 1``) | ||
krassowski
|
r27912 | - deletions (``del x``) | ||
krassowski
|
r27906 | |||
Does not evaluate operations which do not return values: | ||||
krassowski
|
r27918 | |||
krassowski
|
r27912 | - assertions (``assert x``) | ||
- pass (``pass``) | ||||
- imports (``import x``) | ||||
krassowski
|
r27918 | - control flow: | ||
- conditionals (``if x:``) except for ternary IfExp (``a if x else b``) | ||||
Matthias Bussonnier
|
r28390 | - loops (``for`` and ``while``) | ||
krassowski
|
r27918 | - exception handling | ||
krassowski
|
r27912 | |||
The purpose of this function is to guard against unwanted side-effects; | ||||
it does not give guarantees on protection from malicious code execution. | ||||
krassowski
|
r27906 | """ | ||
policy = EVALUATION_POLICIES[context.evaluation] | ||||
if node is None: | ||||
return None | ||||
if isinstance(node, ast.Expression): | ||||
return eval_node(node.body, context) | ||||
if isinstance(node, ast.BinOp): | ||||
left = eval_node(node.left, context) | ||||
right = eval_node(node.right, context) | ||||
krassowski
|
r27920 | dunders = _find_dunder(node.op, BINARY_OP_DUNDERS) | ||
if dunders: | ||||
if policy.can_operate(dunders, left, right): | ||||
return getattr(left, dunders[0])(right) | ||||
else: | ||||
raise GuardRejection( | ||||
f"Operation (`{dunders}`) for", | ||||
type(left), | ||||
f"not allowed in {context.evaluation} mode", | ||||
) | ||||
if isinstance(node, ast.Compare): | ||||
left = eval_node(node.left, context) | ||||
all_true = True | ||||
negate = False | ||||
for op, right in zip(node.ops, node.comparators): | ||||
right = eval_node(right, context) | ||||
dunder = None | ||||
dunders = _find_dunder(op, COMP_OP_DUNDERS) | ||||
if not dunders: | ||||
if isinstance(op, ast.NotIn): | ||||
dunders = COMP_OP_DUNDERS[ast.In] | ||||
negate = True | ||||
if isinstance(op, ast.Is): | ||||
dunder = "is_" | ||||
if isinstance(op, ast.IsNot): | ||||
dunder = "is_" | ||||
negate = True | ||||
if not dunder and dunders: | ||||
dunder = dunders[0] | ||||
if dunder: | ||||
a, b = (right, left) if dunder == "__contains__" else (left, right) | ||||
if dunder == "is_" or dunders and policy.can_operate(dunders, a, b): | ||||
result = getattr(operator, dunder)(a, b) | ||||
if negate: | ||||
result = not result | ||||
if not result: | ||||
all_true = False | ||||
left = right | ||||
else: | ||||
raise GuardRejection( | ||||
f"Comparison (`{dunder}`) for", | ||||
type(left), | ||||
f"not allowed in {context.evaluation} mode", | ||||
) | ||||
else: | ||||
krassowski
|
r27921 | raise ValueError( | ||
f"Comparison `{dunder}` not supported" | ||||
) # pragma: no cover | ||||
krassowski
|
r27920 | return all_true | ||
krassowski
|
r27906 | if isinstance(node, ast.Constant): | ||
return node.value | ||||
if isinstance(node, ast.Tuple): | ||||
krassowski
|
r27913 | return tuple(eval_node(e, context) for e in node.elts) | ||
krassowski
|
r27906 | if isinstance(node, ast.List): | ||
krassowski
|
r27913 | return [eval_node(e, context) for e in node.elts] | ||
krassowski
|
r27906 | if isinstance(node, ast.Set): | ||
krassowski
|
r27913 | return {eval_node(e, context) for e in node.elts} | ||
krassowski
|
r27906 | if isinstance(node, ast.Dict): | ||
krassowski
|
r27913 | return dict( | ||
zip( | ||||
[eval_node(k, context) for k in node.keys], | ||||
[eval_node(v, context) for v in node.values], | ||||
) | ||||
) | ||||
krassowski
|
r27906 | if isinstance(node, ast.Slice): | ||
return slice( | ||||
eval_node(node.lower, context), | ||||
eval_node(node.upper, context), | ||||
krassowski
|
r27913 | eval_node(node.step, context), | ||
krassowski
|
r27906 | ) | ||
if isinstance(node, ast.UnaryOp): | ||||
value = eval_node(node.operand, context) | ||||
krassowski
|
r27920 | dunders = _find_dunder(node.op, UNARY_OP_DUNDERS) | ||
if dunders: | ||||
if policy.can_operate(dunders, value): | ||||
return getattr(value, dunders[0])() | ||||
else: | ||||
raise GuardRejection( | ||||
f"Operation (`{dunders}`) for", | ||||
type(value), | ||||
f"not allowed in {context.evaluation} mode", | ||||
) | ||||
krassowski
|
r27906 | if isinstance(node, ast.Subscript): | ||
value = eval_node(node.value, context) | ||||
slice_ = eval_node(node.slice, context) | ||||
if policy.can_get_item(value, slice_): | ||||
return value[slice_] | ||||
raise GuardRejection( | ||||
krassowski
|
r27913 | "Subscript access (`__getitem__`) for", | ||
type(value), # not joined to avoid calling `repr` | ||||
f" not allowed in {context.evaluation} mode", | ||||
krassowski
|
r27906 | ) | ||
if isinstance(node, ast.Name): | ||||
krassowski
|
r28678 | return _eval_node_name(node.id, context) | ||
krassowski
|
r27906 | if isinstance(node, ast.Attribute): | ||
value = eval_node(node.value, context) | ||||
if policy.can_get_attr(value, node.attr): | ||||
return getattr(value, node.attr) | ||||
raise GuardRejection( | ||||
krassowski
|
r27913 | "Attribute access (`__getattr__`) for", | ||
type(value), # not joined to avoid calling `repr` | ||||
f"not allowed in {context.evaluation} mode", | ||||
krassowski
|
r27906 | ) | ||
if isinstance(node, ast.IfExp): | ||||
test = eval_node(node.test, context) | ||||
if test: | ||||
krassowski
|
r27913 | return eval_node(node.body, context) | ||
krassowski
|
r27906 | else: | ||
return eval_node(node.orelse, context) | ||||
if isinstance(node, ast.Call): | ||||
func = eval_node(node.func, context) | ||||
if policy.can_call(func) and not node.keywords: | ||||
krassowski
|
r27913 | args = [eval_node(arg, context) for arg in node.args] | ||
krassowski
|
r27906 | return func(*args) | ||
krassowski
|
r28675 | if isclass(func): | ||
# this code path gets entered when calling class e.g. `MyClass()` | ||||
# or `my_instance.__class__()` - in both cases `func` is `MyClass`. | ||||
# Should return `MyClass` if `__new__` is not overridden, | ||||
# otherwise whatever `__new__` return type is. | ||||
krassowski
|
r28678 | overridden_return_type = _eval_return_type(func.__new__, node, context) | ||
krassowski
|
r28675 | if overridden_return_type is not NOT_EVALUATED: | ||
return overridden_return_type | ||||
krassowski
|
r28678 | return _create_duck_for_heap_type(func) | ||
krassowski
|
r28675 | else: | ||
krassowski
|
r28678 | return_type = _eval_return_type(func, node, context) | ||
krassowski
|
r28675 | if return_type is not NOT_EVALUATED: | ||
return return_type | ||||
krassowski
|
r27906 | raise GuardRejection( | ||
krassowski
|
r27913 | "Call for", | ||
func, # not joined to avoid calling `repr` | ||||
f"not allowed in {context.evaluation} mode", | ||||
krassowski
|
r27906 | ) | ||
krassowski
|
r27921 | raise ValueError("Unhandled node", ast.dump(node)) | ||
krassowski
|
r27906 | |||
krassowski
|
r28678 | def _eval_return_type(func: Callable, node: ast.Call, context: EvaluationContext): | ||
krassowski
|
r28675 | """Evaluate return type of a given callable function. | ||
Returns the built-in type, a duck or NOT_EVALUATED sentinel. | ||||
""" | ||||
try: | ||||
sig = signature(func) | ||||
except ValueError: | ||||
sig = UNKNOWN_SIGNATURE | ||||
# if annotation was not stringized, or it was stringized | ||||
# but resolved by signature call we know the return type | ||||
not_empty = sig.return_annotation is not Signature.empty | ||||
krassowski
|
r28676 | if not_empty: | ||
krassowski
|
r28680 | return _resolve_annotation(sig.return_annotation, sig, func, node, context) | ||
krassowski
|
r28675 | return NOT_EVALUATED | ||
krassowski
|
r28680 | def _resolve_annotation( | ||
annotation, | ||||
sig: Signature, | ||||
func: Callable, | ||||
node: ast.Call, | ||||
context: EvaluationContext, | ||||
): | ||||
"""Resolve annotation created by user with `typing` module and custom objects.""" | ||||
annotation = ( | ||||
_eval_node_name(annotation, context) | ||||
if isinstance(annotation, str) | ||||
else annotation | ||||
) | ||||
origin = get_origin(annotation) | ||||
if annotation is Self and hasattr(func, "__self__"): | ||||
return func.__self__ | ||||
elif origin is Literal: | ||||
type_args = get_args(annotation) | ||||
if len(type_args) == 1: | ||||
return type_args[0] | ||||
elif annotation is LiteralString: | ||||
return "" | ||||
elif annotation is AnyStr: | ||||
index = None | ||||
for i, (key, value) in enumerate(sig.parameters.items()): | ||||
if value.annotation is AnyStr: | ||||
index = i | ||||
break | ||||
if index is not None and index < len(node.args): | ||||
return eval_node(node.args[index], context) | ||||
elif origin is TypeGuard: | ||||
return bool() | ||||
elif origin is Union: | ||||
attributes = [ | ||||
attr | ||||
for type_arg in get_args(annotation) | ||||
for attr in dir(_resolve_annotation(type_arg, sig, func, node, context)) | ||||
] | ||||
return _Duck(attributes=dict.fromkeys(attributes)) | ||||
elif is_typeddict(annotation): | ||||
return _Duck( | ||||
attributes=dict.fromkeys(dir(dict())), | ||||
items={ | ||||
k: _resolve_annotation(v, sig, func, node, context) | ||||
for k, v in annotation.__annotations__.items() | ||||
}, | ||||
) | ||||
elif hasattr(annotation, "_is_protocol"): | ||||
return _Duck(attributes=dict.fromkeys(dir(annotation))) | ||||
elif origin is Annotated: | ||||
type_arg = get_args(annotation)[0] | ||||
return _resolve_annotation(type_arg, sig, func, node, context) | ||||
elif isinstance(annotation, NewType): | ||||
return _eval_or_create_duck(annotation.__supertype__, node, context) | ||||
elif isinstance(annotation, TypeAliasType): | ||||
return _eval_or_create_duck(annotation.__value__, node, context) | ||||
else: | ||||
return _eval_or_create_duck(annotation, node, context) | ||||
krassowski
|
r28678 | def _eval_node_name(node_id: str, context: EvaluationContext): | ||
policy = EVALUATION_POLICIES[context.evaluation] | ||||
krassowski
|
r28676 | if policy.allow_locals_access and node_id in context.locals: | ||
return context.locals[node_id] | ||||
if policy.allow_globals_access and node_id in context.globals: | ||||
return context.globals[node_id] | ||||
if policy.allow_builtins_access and hasattr(builtins, node_id): | ||||
# note: do not use __builtins__, it is implementation detail of cPython | ||||
return getattr(builtins, node_id) | ||||
if not policy.allow_globals_access and not policy.allow_locals_access: | ||||
raise GuardRejection( | ||||
f"Namespace access not allowed in {context.evaluation} mode" | ||||
) | ||||
else: | ||||
raise NameError(f"{node_id} not found in locals, globals, nor builtins") | ||||
krassowski
|
r28678 | def _eval_or_create_duck(duck_type, node: ast.Call, context: EvaluationContext): | ||
policy = EVALUATION_POLICIES[context.evaluation] | ||||
# if allow-listed builtin is on type annotation, instantiate it | ||||
if policy.can_call(duck_type) and not node.keywords: | ||||
args = [eval_node(arg, context) for arg in node.args] | ||||
return duck_type(*args) | ||||
# if custom class is in type annotation, mock it | ||||
return _create_duck_for_heap_type(duck_type) | ||||
def _create_duck_for_heap_type(duck_type): | ||||
krassowski
|
r28675 | """Create an imitation of an object of a given type (a duck). | ||
Returns the duck or NOT_EVALUATED sentinel if duck could not be created. | ||||
""" | ||||
krassowski
|
r28680 | duck = ImpersonatingDuck() | ||
krassowski
|
r28675 | try: | ||
# this only works for heap types, not builtins | ||||
duck.__class__ = duck_type | ||||
return duck | ||||
except TypeError: | ||||
pass | ||||
return NOT_EVALUATED | ||||
krassowski
|
r27906 | SUPPORTED_EXTERNAL_GETITEM = { | ||
krassowski
|
r27913 | ("pandas", "core", "indexing", "_iLocIndexer"), | ||
("pandas", "core", "indexing", "_LocIndexer"), | ||||
("pandas", "DataFrame"), | ||||
("pandas", "Series"), | ||||
("numpy", "ndarray"), | ||||
("numpy", "void"), | ||||
krassowski
|
r27906 | } | ||
krassowski
|
r27921 | |||
krassowski
|
r27915 | BUILTIN_GETITEM: Set[InstancesHaveGetItem] = { | ||
krassowski
|
r27906 | dict, | ||
Matthias Bussonnier
|
r28091 | str, # type: ignore[arg-type] | ||
bytes, # type: ignore[arg-type] | ||||
krassowski
|
r27906 | list, | ||
tuple, | ||||
collections.defaultdict, | ||||
collections.deque, | ||||
collections.OrderedDict, | ||||
collections.ChainMap, | ||||
collections.UserDict, | ||||
collections.UserList, | ||||
Matthias Bussonnier
|
r28091 | collections.UserString, # type: ignore[arg-type] | ||
krassowski
|
r27918 | _DummyNamedTuple, | ||
_IdentitySubscript, | ||||
krassowski
|
r27906 | } | ||
def _list_methods(cls, source=None): | ||||
"""For use on immutable objects or with methods returning a copy""" | ||||
krassowski
|
r27913 | return [getattr(cls, k) for k in (source if source else dir(cls))] | ||
krassowski
|
r27906 | |||
krassowski
|
r27913 | dict_non_mutating_methods = ("copy", "keys", "values", "items") | ||
list_non_mutating_methods = ("copy", "index", "count") | ||||
krassowski
|
r27906 | set_non_mutating_methods = set(dir(set)) & set(dir(frozenset)) | ||
krassowski
|
r27915 | dict_keys: Type[collections.abc.KeysView] = type({}.keys()) | ||
krassowski
|
r27906 | |||
krassowski
|
r27921 | NUMERICS = {int, float, complex} | ||
krassowski
|
r27906 | ALLOWED_CALLS = { | ||
bytes, | ||||
*_list_methods(bytes), | ||||
dict, | ||||
*_list_methods(dict, dict_non_mutating_methods), | ||||
dict_keys.isdisjoint, | ||||
list, | ||||
*_list_methods(list, list_non_mutating_methods), | ||||
set, | ||||
*_list_methods(set, set_non_mutating_methods), | ||||
frozenset, | ||||
*_list_methods(frozenset), | ||||
range, | ||||
str, | ||||
*_list_methods(str), | ||||
tuple, | ||||
*_list_methods(tuple), | ||||
krassowski
|
r27921 | *NUMERICS, | ||
*[method for numeric_cls in NUMERICS for method in _list_methods(numeric_cls)], | ||||
krassowski
|
r27906 | collections.deque, | ||
*_list_methods(collections.deque, list_non_mutating_methods), | ||||
collections.defaultdict, | ||||
*_list_methods(collections.defaultdict, dict_non_mutating_methods), | ||||
collections.OrderedDict, | ||||
*_list_methods(collections.OrderedDict, dict_non_mutating_methods), | ||||
collections.UserDict, | ||||
*_list_methods(collections.UserDict, dict_non_mutating_methods), | ||||
collections.UserList, | ||||
*_list_methods(collections.UserList, list_non_mutating_methods), | ||||
collections.UserString, | ||||
*_list_methods(collections.UserString, dir(str)), | ||||
collections.Counter, | ||||
*_list_methods(collections.Counter, dict_non_mutating_methods), | ||||
collections.Counter.elements, | ||||
krassowski
|
r27913 | collections.Counter.most_common, | ||
krassowski
|
r27906 | } | ||
krassowski
|
r27915 | BUILTIN_GETATTR: Set[MayHaveGetattr] = { | ||
*BUILTIN_GETITEM, | ||||
set, | ||||
frozenset, | ||||
object, | ||||
type, # `type` handles a lot of generic cases, e.g. numbers as in `int.real`. | ||||
krassowski
|
r27921 | *NUMERICS, | ||
krassowski
|
r27915 | dict_keys, | ||
Carlos Cordoba
|
r28272 | MethodDescriptorType, | ||
ModuleType, | ||||
krassowski
|
r27915 | } | ||
krassowski
|
r27920 | |||
krassowski
|
r27921 | BUILTIN_OPERATIONS = {*BUILTIN_GETATTR} | ||
krassowski
|
r27920 | |||
krassowski
|
r27906 | EVALUATION_POLICIES = { | ||
krassowski
|
r27913 | "minimal": EvaluationPolicy( | ||
krassowski
|
r27906 | allow_builtins_access=True, | ||
allow_locals_access=False, | ||||
allow_globals_access=False, | ||||
allow_item_access=False, | ||||
allow_attr_access=False, | ||||
allowed_calls=set(), | ||||
krassowski
|
r27913 | allow_any_calls=False, | ||
krassowski
|
r27920 | allow_all_operations=False, | ||
krassowski
|
r27906 | ), | ||
krassowski
|
r27914 | "limited": SelectivePolicy( | ||
krassowski
|
r27906 | allowed_getitem=BUILTIN_GETITEM, | ||
allowed_getitem_external=SUPPORTED_EXTERNAL_GETITEM, | ||||
krassowski
|
r27915 | allowed_getattr=BUILTIN_GETATTR, | ||
krassowski
|
r27906 | allowed_getattr_external={ | ||
# pandas Series/Frame implements custom `__getattr__` | ||||
krassowski
|
r27913 | ("pandas", "DataFrame"), | ||
("pandas", "Series"), | ||||
krassowski
|
r27906 | }, | ||
krassowski
|
r27920 | allowed_operations=BUILTIN_OPERATIONS, | ||
krassowski
|
r27906 | allow_builtins_access=True, | ||
allow_locals_access=True, | ||||
allow_globals_access=True, | ||||
krassowski
|
r27913 | allowed_calls=ALLOWED_CALLS, | ||
krassowski
|
r27906 | ), | ||
krassowski
|
r27913 | "unsafe": EvaluationPolicy( | ||
krassowski
|
r27906 | allow_builtins_access=True, | ||
allow_locals_access=True, | ||||
allow_globals_access=True, | ||||
allow_attr_access=True, | ||||
allow_item_access=True, | ||||
krassowski
|
r27913 | allow_any_calls=True, | ||
krassowski
|
r27920 | allow_all_operations=True, | ||
krassowski
|
r27913 | ), | ||
} | ||||
krassowski
|
r27918 | |||
__all__ = [ | ||||
"guarded_eval", | ||||
"eval_node", | ||||
"GuardRejection", | ||||
"EvaluationContext", | ||||
"_unbind_method", | ||||
] | ||||