Source code for django_declarative_apis.machinery.attributes

#
# Copyright (c) 2019, salesforce.com, inc.
# All rights reserved.
# SPDX-License-Identifier: BSD-3-Clause
# For full license text, see the LICENSE file in the repo root or https://opensource.org/licenses/BSD-3-Clause
#

import abc
import collections.abc
import inspect
import random
import string
import time

from django.db import models as django_models
import pydantic
import logging

from . import errors
from . import tasks

logger = logging.getLogger(__name__)


class EndpointAttribute(metaclass=abc.ABCMeta):
    _next_attribute_number = 0

    @classmethod
    def claim_attribute_number(cls):
        next_serial_number = cls._next_attribute_number
        cls._next_attribute_number += 1
        return next_serial_number

    def __init__(
        self, name=None, required=False, description=None, hidden=False, advanced=False
    ):
        super().__init__()

        self.hidden = hidden
        self.name = name  # will be populated by EndpointDefinitionMeta
        self.advanced = advanced
        self.required = required
        self.description = description
        self.attribute_number = EndpointAttribute.claim_attribute_number()

    def __get__(self, owner_instance, owner_class):
        if not owner_instance:
            # If accessed from the class, return the EndpointAttribute itself
            return self

        if not self.name:
            raise ValueError(
                "All EndpointAttribute objects must have a name before they can be accessed "
                "from an instance"
            )

        # The instance dictionary is used as a cache
        if self.name in owner_instance.__dict__:
            return owner_instance.__dict__[self.name]

        # Delegate to _get_value_for_instance and cache the result within the instance's dict
        value = self.get_instance_value(owner_instance, owner_class)
        owner_instance.__dict__[self.name] = value
        return value

    @abc.abstractmethod
    def get_instance_value(self, owner_instance, owner_class):
        raise NotImplementedError()


class RequestProperty(EndpointAttribute):
    __hidden_request_attribute_name = "_" + "".join(
        random.choice(string.printable) for _ in range(10)
    )

    @classmethod
    def bind_request_to_instance(cls, instance, request):
        # Hide the request within the instance
        setattr(instance, cls.__hidden_request_attribute_name, request)

    @classmethod
    def request_has_been_bound(cls, instance):
        return cls.__hidden_request_attribute_name in instance.__dict__

    def __init__(self, property_getter, **kwargs):
        super().__init__(**kwargs)

        self.property_getter = property_getter

        # Capture the hidden attribute name within each RequestProperty
        # instance, so lookups work through being pickled
        self.__hidden_request_attribute_name = (
            RequestProperty.__hidden_request_attribute_name
        )

    def __extract_request_from_instance(self, instance):
        return getattr(instance, self.__hidden_request_attribute_name, None)

    def get_instance_value(self, owner_instance, owner_class):
        request = self.__extract_request_from_instance(owner_instance)
        if not request:
            raise ValueError(
                "A request must be bound with the instance before accessing this property"
            )

        return self.property_getter(owner_instance, request)

    @property
    def documentation(self):
        result = {"name": self.name}
        if self.description is not None:
            result["description"] = self.description
        return result


class TypedEndpointAttributeMixin:
    def __init__(self, *args, **kwargs):
        self.field_type = kwargs.pop("type", str)
        if not any(
            issubclass(self.field_type, t) for t in RequestField.VALID_FIELD_TYPES
        ):
            raise NotImplementedError(
                "Request fields of type {0} not supported".format(
                    self.field_type.__name__
                )
            )
        super().__init__(*args, **kwargs)

    def coerce_value_to_type(self, raw_value):
        """Coerce a raw value to the expected field type.

        Args:
            raw_value: The value to coerce

        Returns:
            The coerced value of the expected type

        Raises:
            ClientErrorInvalidFieldValues: If the value cannot be coerced to the expected type
        """
        # handle tricksy quickly right off the bat
        if raw_value is None:
            return None

        try:
            if self.field_type == bool:
                if isinstance(raw_value, bool):
                    return raw_value
                if isinstance(raw_value, str):
                    return raw_value.lower() in ("true", "1", "yes", "on")
                # handle ints and floats too
                if isinstance(raw_value, (int, float)):
                    return bool(raw_value)
                raise ValueError(f"Cannot convert {raw_value} to boolean")

            if issubclass(self.field_type, pydantic.BaseModel):
                return self.field_type.parse_obj(raw_value)

            if isinstance(raw_value, collections.abc.Iterable) and not isinstance(
                raw_value, (str, dict)
            ):
                return list(self.field_type(r) for r in raw_value)

            return self.field_type(raw_value)
        except Exception as e:
            name = self.name or self.api_name
            logger.info(
                'ev=dda, loc=coerce_value_to_type, name=%s, raw="%s", error="%s"',
                name,
                raw_value,
                e,
            )
            raise errors.ClientErrorInvalidFieldValues(
                [name],
                "Could not parse {val} as type {type}".format(
                    val=raw_value,
                    type=self.field_type.__name__,
                ),
            )


[docs] class RequestUrlField(TypedEndpointAttributeMixin, EndpointAttribute): """A specialized type of field that takes any parameter that directly appears in the URL path. :param name: Allows the name of the field in HTTP API to be different from its name defined on the EndpointDefinition. Defaults to :code:`None` :type name: optional **Example:** URL defined in :code:`urls.py` .. code-block:: python url_patterns = [ url( r"^tasks/(?P<id>{0})/$".format(r"[0-9]{1}"), handlers.TodoDetailEndpoint, ) ] :code:`url_field` is used to extract the id of a single task from the above URL for deleting that task. .. code-block:: python from django_declarative_apis.machinery import url_field class TodoDeleteSingleTaskDefinition( TodoResourceMixin, machinery.ResourceEndpointDefinition, ): resource_id = url_field(name='id') @endpoint_resource(type=Todo) def resource(self): task = Todo.objects.delete(id=self.resource_id) return django.http.HttpResponse(status=http.HTTPStatus.OK) """ def __init__(self, *args, **kwargs): self.api_name = kwargs.pop("name", None) self.value = None super().__init__(*args, **kwargs) def set_value(self, value): self.value = value def get_instance_value(self, owner_instance, owner_class): return self.coerce_value_to_type(self.value)
class RequestAdhocQuerySet(RequestUrlField): def __init__(self, *args, **kwargs): super().__init__(*args, type=dict, **kwargs) self.value = {}
[docs] class RequestField(TypedEndpointAttributeMixin, RequestProperty): """Endpoint properties are called fields. Fields can be simple types such as int, or they can be used as a decorator on a function. **Valid field types:** A subclass of :code:`int`, :code:`bool`, :code:`float`, :code:`str`, :code:`dict`, :code:`complex`, :code:`pydantic.BaseModel` **Example** .. code-block:: python from django_declarative_apis.machinery import field task = field(required=True, type=str) :param required: Determines whether the field is required for the EndpointDefinition. Defaults to :code:`False`. :type required: optional :param name: Allows the name of the field in HTTP API to be different from its name defined on the EndpointDefinition. Defaults to :code:`None`. :type name: optional :param type: Determines the type of the field. Type needs to be on of the *valid field types* listed above. Defaults to :code:`String` :type type: optional :param default: Sets the default value for the field. Defaults to :code:`None`. :type default: optional :param description: Describes the purpose of the field. Defaults to :code:`None`. :type description: optional :param multivalued: Allows a field to be specified multiple times in the request. With multivalued set to True, the EndpointHandler will receive a list of values instead of a single value. Defaults to :code:`False`. :type multivalued: optional **Example** Request: .. code-block:: GET https://example.com?foo=bar1&foo=bar2 EndpointDefinition: .. code-block:: python from django_declarative_apis.machinery import field class FooDefinition(EndpointDefinition): foo = field(multivalued=True) In the :code:`EndpointDefinition`, :code:`self.foo` would be equal to ['bar1', 'bar2'] """ VALID_FIELD_TYPES = (bool, int, float, complex, str, dict, pydantic.BaseModel) def __init__(self, *args, **kwargs): self.default_value = kwargs.pop("default", None) self.api_name = kwargs.pop("name", None) self.multivalued = kwargs.pop("multivalued", False) super().__init__(property_getter=self.get_field, **kwargs) self.post_processor = None def __call__(self, post_processor): self.post_processor = post_processor return self @property def documentation(self): result = super().documentation result["type"] = self.field_type result["multivalued"] = self.multivalued if self.api_name: result["name"] = self.api_name if self.default_value is not None: result["default_value"] = self.default_value return result
[docs] def get_without_default(self, owner_instance, request): """Get the field value from the request without applying default value. Args: owner_instance: The instance of the endpoint definition request: The Django request object Returns: The coerced value of the field, or None if not found """ if not request: return None query_dict = request.POST if request.method == "POST" else request.GET # handle errors during MIME type translation or data deserialization if not query_dict: return None field_name = self.api_name or self.name if not field_name: return None if field_name in query_dict: if not self.multivalued: raw_value = query_dict.get(field_name) else: raw_value = query_dict.getlist(field_name) typed_value = self.coerce_value_to_type(raw_value) else: typed_value = None if self.post_processor: return self.post_processor(owner_instance, typed_value) return typed_value
def get_field(self, owner_instance, request): raw_value = self.get_without_default(owner_instance, request) if raw_value is not None: return raw_value return self.default_value
class ResourceField(RequestField): pass
[docs] class RequestAttribute(RequestProperty): """Used to initialize a consumer object for an endpoint definition. **Example** .. code-block:: python from django_declarative_apis.machinery import request_attribute consumer = request_attribute() """ def __init__(self, attribute_getter=None, required=True, default=None, **kwargs): super().__init__( property_getter=self.get_request_attribute, required=required, **kwargs ) self.attribute_getter = attribute_getter self.default = default # Facilitated use as a decorator with arguments def __call__(self, attribute_getter): self.attribute_getter = attribute_getter return self def get_without_default(self, owner_instance, request): if self.attribute_getter: result = self.attribute_getter(owner_instance, request) else: result = getattr(request, self.name, None) return result def get_request_attribute(self, owner_instance, request): result = self.get_without_default(owner_instance, request) if result is not None: return result else: return self.default
[docs] class ConsumerAttribute(RequestAttribute): """Creates a requester/authenticator object for an endpoint definition. **Example** .. code-block:: python from django_declarative_apis.machinery import consumer_attribute requester = consumer_attribute() """ def __init__(self, *args, field_name=None, **kwargs): self.field_name = field_name super().__init__(*args, **kwargs) def get_without_default(self, owner_instance, request): consumer = request.consumer if self.attribute_getter: return self.attribute_getter(owner_instance, consumer) else: return getattr(consumer, self.field_name or self.name, None)
[docs] class RawRequestObjectProperty(RequestAttribute): """Creates a request object for an endpoint definition. **Example** .. code-block:: python from django_declarative_apis.machinery import RawRequestObjectProperty request = RawRequestObjectProperty() """ class SafeRequestWrapper: __hidden_request_attribute_name = "_" + "".join( random.choice(string.printable) for _ in range(10) ) __permitted_request_properties = ("build_absolute_uri", "method", "META") def __init__(self, request, additional_safe_fields=()): setattr(self, self.__hidden_request_attribute_name, request) self.additional_safe_fields = additional_safe_fields @property def body_field_names(self): return set(getattr(self, self.__hidden_request_attribute_name).POST.keys()) def __getattr__(self, name): if ( name in RawRequestObjectProperty.SafeRequestWrapper.__permitted_request_properties or name in self.additional_safe_fields ): hidden_request = getattr(self, self.__hidden_request_attribute_name) return getattr(hidden_request, name) else: raise AttributeError() def __init__(self, *args, additional_safe_fields=(), **kwargs): super().__init__(*args, **kwargs) self.additional_safe_fields = additional_safe_fields def get_without_default(self, owner_instance, request): return RawRequestObjectProperty.SafeRequestWrapper( request, additional_safe_fields=self.additional_safe_fields )
[docs] class EndpointTask(EndpointAttribute): """:code:`task` is used as a decorator on a function. It encapsulate the side-effect operations of an endpoint. For instance, if hitting an endpoint causes an operation to happen in another resource or it causes an operation to be queued and run as a background task. :code:`task` runs **synchronously**, which means it will be executed before the response is returned to the user. It can also affect the response by making changes to the :code:`EndpointDefinition.resource()`. **Example** .. code-block:: python from django_declarative_apis.machinery import task class SampleClass: # code @task def sample_function(): # your code goes here :param task_runner: A callable that dictates how the task is executed. Defaults to :code:`None`. :type task_runner: optional :param depends_on: A reference to another task that should be run before this one. Overrides :code:`priority`. Defaults to :code:`None`. It is important to note that :code:`deferrable_task` cannot be used as a :code:`depends_on` argument. :type depends_on: optional :param priority: Specifies the priority of the task. Tasks with lower priority are executed first. Defaults to :code:`0`. :type priority: optional **Example** .. code-block:: python from django_declarative_apis.machinery import task class SampleEndpointDefinition: def is_authorized(self): return True @task def set_response_filter(self): self.response._api_filter = filters.SampleFilters """ STATE_NOT_RUN = 0 STATE_RUNNING = 1 STATE_COMPLETED = 2 def __init__( self, task_runner=None, depends_on=None, # Reference to another task that should be run before this one. Overrides priority priority=0, # lower priority gets executed first **kwargs, ): super().__init__(**kwargs) self.task_runner = task_runner self.depends_on = depends_on self.task_state = EndpointTask.STATE_NOT_RUN self.priority = priority def __call__(self, task_runner): self.task_runner = task_runner return self def _run_task(self, owner_instance): self.task_runner(owner_instance) def run(self, owner_instance): assert ( self.task_state != EndpointTask.STATE_RUNNING ), "Circular task reference detected!" try: self.task_state = EndpointTask.STATE_RUNNING depends_on = self.depends_on if isinstance(depends_on, str): depends_on = getattr(owner_instance, depends_on) if depends_on and (depends_on.task_state != EndpointTask.STATE_COMPLETED): assert not isinstance( depends_on, DeferrableEndpointTask ), "DeferredEndpointTask cannot be used as depends_on arg" depends_on.run(owner_instance) self._run_task(owner_instance) finally: self.task_state = EndpointTask.STATE_COMPLETED def get_instance_value(self, owner_instance, owner_class): return self
[docs] class DeferrableEndpointTask(EndpointTask): """:code:`deferrable_task` is used as a decorator on a function. It is similar to :code:`task` in that it encapsulates side-effects, but can be automatically executed in a deferred queue outside of the request-response cycle. :code:`deferrable_task` runs **asynchronously** and because of that it is used for operations that take time and when we want to avoid delaying the response to the user. **Deferrable Task Rules**: * Deferrable task methods must always be a :code:`staticmethod`. Therefore, anything a deferrable task needs to know should be saved in the :code:`EndpointDefinition.resource()`. * The :code:`staticmethod` decorator should come after :code:`deferrable_task` decorator. .. code-block:: python from django_declarative_apis.machinery import deferrable_task class SampleClass: # code @deferrable_task @staticmethod def sample_method(arg): # your code goes here * Works only with a Django Model instance as the resource .. note:: Depending on the parameters used, a deferrable task can be run in different time intervals. In some cases, it can be made to run synchronously. :param task_runner: A callable that dictates how the task is executed. Defaults to :code:`None`. :type task_runner: optional :param delay: Sets the delay in seconds before running the task. Requires :code:`always_defer=True.` Defaults to :code:`None`. :type delay: optional :param always_defer: Runs task in deferred queue even when :code:`delay=0.` Defaults to :code:`False`. :type always_defer: optional :param task_args_factory: Stores task args and kwargs. :code:`task_args_factory` must be a **callable**. Defaults to :code:`None`. :type task_args_factory: optional :param queue: Sets the celery queue that will be used for storing the tasks. Defaults to :code:`None`. :type queue: optional :param routing_key: It is used to determine which queue the task should be routed to. Defaults to :code:`None`. :type routing_key: optional :param retries: Specifies the number of times the current task has been retried. Defaults to :code:`0` :type retries: optional :param retry_exception_filter: It is used to store retry exception information that is used in logs. Defaults to :code:`()` - empty tuple. :type retry_exception_filter: optional :param execute_unless: Execute the task unless a condition is met.It must be a **callable**. Defaults to :code:`None`. :type execute_unless: optional **Example** .. code-block:: python from django_declarative_apis.machinery import deferrable_task class SampleClass: # code @deferrable_task(execute_unless=<condition>) @staticmethod def sample_method(arg): # your code goes here """ @staticmethod def unwrap_staticmethod(method): assert isinstance(method, staticmethod), ( "Deferrable task methods MUST be staticmethods. Hint: the staticmethod " "decorator should come after the deferrable task decorator " ) # we're effectively unwrapping the staticmethod here... return method.__func__ def __init__( self, task_runner=None, delay=None, # delay in seconds before running the task. Requires deferred=True always_defer=True, # True: run task in deferred queue even when delay=0 task_args_factory=None, queue=None, routing_key=None, retries=0, retry_exception_filter=(), execute_unless=None, **kwargs, ): super().__init__(**kwargs) if task_runner: self.task_runner = DeferrableEndpointTask.unwrap_staticmethod(task_runner) else: self.task_runner = None self.delay = delay self.always_defer = always_defer self.queue = queue self.routing_key = routing_key self.retries = retries self.retry_exception_filter = retry_exception_filter if execute_unless: assert callable(execute_unless), "execute_unless MUST be callable" assert ( inspect.getfullargspec(execute_unless).args == ["self"] ), "execute_unless MUST be an instance method that takes only the 'self' argument" self.execute_unless = execute_unless assert task_args_factory is None or callable(task_args_factory) self.task_args_factory = task_args_factory def __call__(self, task_runner): self.task_runner = DeferrableEndpointTask.unwrap_staticmethod(task_runner) return self def _resolve_maybe_callable(self, owner_instance, maybe_callable): if callable(maybe_callable): return maybe_callable(owner_instance) else: return maybe_callable def _run_task(self, owner_instance): if self.execute_unless and self.execute_unless(owner_instance): return resource = owner_instance.resource assert isinstance( resource, django_models.Model ), "resource must be an instance of django.db.models.Model to run as deferred task" delay = self._resolve_maybe_callable(owner_instance, self.delay) or 0 always_defer = self._resolve_maybe_callable(owner_instance, self.always_defer) if self.task_args_factory: task_args, task_kwargs = self.task_args_factory(owner_instance) else: task_args, task_kwargs = [], {} if delay == 0 and not always_defer: self.task_runner(*([resource] + list(task_args)), **task_kwargs) else: if resource.pk is None: resource.save() resource_id = resource.pk resource_class_name = "{0}.{1}".format( resource.__module__, resource.__class__.__name__ ) endpoint_class_name = "{0}.{1}".format( owner_instance.__module__, owner_instance.__class__.__name__ ) task_runner_args = ( endpoint_class_name, self.task_runner.__name__, resource_class_name, str(resource_id), ) task_runner_kwargs = { "task_creation_time": time.time(), "scheduled_execution_delay": delay, "task_args": (task_args, task_kwargs), } tasks.schedule_future_task_runner( task_runner_args, task_runner_kwargs, retries=self.retries, retry_exception_filter=self.retry_exception_filter, delay=delay, queue=self.queue, routing_key=self.routing_key, )
class DeferrableGenericEndpointTask(DeferrableEndpointTask): # very similar to DeferrableEndpointTask, but doesn't assume that the resource is a Django model instance def __init__( self, task_args_packer=None, **kwargs, ): super().__init__(**kwargs) assert ( task_args_packer is not None ), "task_args_packer required for DeferrableGenericEndpointTask" self.task_args_packer = task_args_packer def _run_task(self, owner_instance): if self.execute_unless and self.execute_unless(owner_instance): return delay = self._resolve_maybe_callable(owner_instance, self.delay) or 0 always_defer = self._resolve_maybe_callable(owner_instance, self.always_defer) packed_args = self.task_args_packer.pack(owner_instance) if delay == 0 and not always_defer: unpacked_args, unpacked_kwargs = self.task_args_packer.unpack(packed_args) self.task_runner(*unpacked_args, **unpacked_kwargs) else: packer_name = "{0}.{1}".format( self.task_args_packer.__module__, self.task_args_packer.__name__ ) endpoint_class_name = "{0}.{1}".format( owner_instance.__module__, owner_instance.__class__.__name__ ) task_runner_args = ( endpoint_class_name, self.task_runner.__name__, packed_args, packer_name, ) task_runner_kwargs = { "task_creation_time": time.time(), "scheduled_execution_delay": delay, } tasks.schedule_generic_future_task_runner( task_runner_args, task_runner_kwargs, retries=self.retries, retry_exception_filter=self.retry_exception_filter, delay=delay, queue=self.queue, routing_key=self.routing_key, ) class RequestFieldGroup(RequestProperty): def __init__(self, *component_field_getters, **kwargs): super().__init__(property_getter=self.get_value, **kwargs) self.component_field_getters = component_field_getters self.component_field_names = [] for component_field_getter in self.component_field_getters: component_field_getter.required = False def __call__(self, *component_field_getters): self.component_field_getters = component_field_getters return self def _get_request_dict(self, request): if request.method == "GET": return request.GET elif request.method == "POST": return request.POST else: return {} def _get_missing_component_fields(self, owner_instance, request): self.component_field_names = map(lambda x: x.name, self.component_field_getters) missing_fields = [] for getter in self.component_field_getters: result = getter.get_without_default(owner_instance, request) if result is None: missing_fields.append(getter.name) return missing_fields
[docs] class RequireOneAttribute(RequestFieldGroup): """Exactly one of the given fields must be present. **Example** .. code-block:: python from django_declarative_apis.machinery import require_one sample_field_1 = field() sample_field_2 = field() sample_require_one = require_one( sample_field_1, sample_field_2, ) """ def get_value(self, owner_instance, request): missing_fields = self._get_missing_component_fields(owner_instance, request) # all but one field should be missing from the request if len(self.component_field_getters) - len(missing_fields) == 1: return True else: raise errors.ClientErrorMissingFields( self.component_field_names, extra_message="Exactly one field must be populated", )
[docs] class RequireAllAttribute(RequestFieldGroup): """All fields must be populated.""" def get_value(self, owner_instance, request): missing_fields = self._get_missing_component_fields(owner_instance, request) # no fields should be missing from the request if len(missing_fields) == 0: return True else: raise errors.ClientErrorMissingFields( self.component_field_names, extra_message="All fields must be populated" )
[docs] class RequireAllIfAnyAttribute(RequestFieldGroup): """Either all fields must be present or all fields must be missing.""" def get_value(self, owner_instance, request): missing_fields = self._get_missing_component_fields(owner_instance, request) # either all present or all missing if (len(missing_fields) == 0) or ( len(missing_fields) == len(self.component_field_getters) ): return True else: raise errors.ClientErrorMissingFields( self.component_field_names, extra_message="All fields must be populated" )
[docs] class Aggregate(EndpointAttribute): """DDA uses aggregates to perform memoization to avoid repeated calculations, querying, or any task that can be performed once and the result cached. Aggregates retrieve or create a related object based on one or more field that is in use in the EndpointDefinition. An aggregate is calculated only once and then the data is cached for future retrieval. **Aggregates are used as decorators on functions.** .. code-block:: python from django_declarative_apis.machinery import aggregate class SampleClass: # code @aggregate def sample_function(): # code :param required: Defines whether the aggregate is required or not. Defaults to :code:`False`. :type required: optional :param depends_on: Reference to another aggregate that should be run before this aggregate. Defaults to :code:`None`. :type depends_on: optional **Example:** We want to query a user only once and cache that information for future use. .. code-block:: python from django_declarative_apis.machinery import aggregate class SampleClass: user_id = url_field() @aggregate(required=True) def get_user(self): try: user = models.User.objects.get(id=self.user_id) except: raise Exception("User with matching id not found") return user """ def __init__(self, aggregation_function=None, **kwargs): self.aggregation_function = aggregation_function self.depends_on = kwargs.pop("depends_on", None) super().__init__(**kwargs) def __call__(self, aggregation_function): self.aggregation_function = aggregation_function return self def get_instance_value(self, owner_instance, owner_class): if not RequestProperty.request_has_been_bound(owner_instance): raise Exception( "Request must be bound to endpoint before accessing aggregate values" ) return self.aggregation_function(owner_instance)