Source code for django_elasticsearch_dsl_drf.filter_backends.filtering.nested

"""
Nested filtering backend.
"""

from elasticsearch_dsl.query import Q
from django.core.exceptions import ImproperlyConfigured
from django_elasticsearch_dsl import fields

from six import string_types

from ...constants import (
    ALL_LOOKUP_FILTERS_AND_QUERIES,
    LOOKUP_FILTER_TERMS,
)

from ...compat import coreapi
from ...compat import coreschema
from .common import FilteringFilterBackend

__title__ = 'django_elasticsearch_dsl_drf.filter_backends.filtering.nested'
__author__ = 'Artur Barseghyan <artur.barseghyan@gmail.com>'
__copyright__ = '2017-2019 Artur Barseghyan'
__license__ = 'GPL 2.0/LGPL 2.1'
__all__ = ('NestedFilteringFilterBackend',)


[docs]class NestedFilteringFilterBackend(FilteringFilterBackend): """Nested filter backend. Example: >>> from django_elasticsearch_dsl_drf.constants import ( >>> LOOKUP_FILTER_TERM, >>> LOOKUP_FILTER_PREFIX, >>> LOOKUP_FILTER_WILDCARD, >>> LOOKUP_QUERY_EXCLUDE, >>> LOOKUP_QUERY_ISNULL, >>> ) >>> from django_elasticsearch_dsl_drf.filter_backends import ( >>> NestedFilteringFilterBackend >>> ) >>> from django_elasticsearch_dsl_drf.viewsets import ( >>> BaseDocumentViewSet, >>> ) >>> >>> # Local article document definition >>> from .documents import ArticleDocument >>> >>> # Local article document serializer >>> from .serializers import ArticleDocumentSerializer >>> >>> class ArticleDocumentView(BaseDocumentViewSet): >>> >>> document = ArticleDocument >>> serializer_class = ArticleDocumentSerializer >>> filter_backends = [NestedFilteringFilterBackend,] >>> nested_filter_fields = { >>> 'country': { >>> 'field': 'continent.country.name.raw', >>> 'path': 'continent.country', >>> 'lookups': [ >>> LOOKUP_FILTER_TERM, >>> LOOKUP_FILTER_TERMS, >>> LOOKUP_FILTER_PREFIX, >>> LOOKUP_FILTER_WILDCARD, >>> LOOKUP_QUERY_EXCLUDE, >>> LOOKUP_QUERY_ISNULL, >>> ], >>> } >>> } """
[docs] @classmethod def prepare_filter_fields(cls, view): """Prepare filter fields. :param view: :type view: rest_framework.viewsets.ReadOnlyModelViewSet :return: Filtering options. :rtype: dict """ if not hasattr(view, 'nested_filter_fields'): raise ImproperlyConfigured( "You need to define `nested_filter_fields` in your `{}` view " "when using `{}` filter backend." "".format(view.__class__.__name__, cls.__name__) ) filter_fields = view.nested_filter_fields for field, options in filter_fields.items(): if options is None or isinstance(options, string_types): filter_fields[field] = { 'field': options or field } elif 'field' not in filter_fields[field]: filter_fields[field]['field'] = field if 'lookups' not in filter_fields[field]: filter_fields[field]['lookups'] = tuple( ALL_LOOKUP_FILTERS_AND_QUERIES ) return filter_fields
[docs] def get_filter_field_nested_path(self, filter_fields, field_name): """Get filter field path to be used in nested query. :param filter_fields: :param field_name: :return: """ if 'path' in filter_fields[field_name]: return filter_fields[field_name]['path'] return field_name
[docs] def get_filter_query_params(self, request, view): """Get query params to be filtered on. :param request: Django REST framework request. :param view: View. :type request: rest_framework.request.Request :type view: rest_framework.viewsets.ReadOnlyModelViewSet :return: Request query params to filter on. :rtype: dict """ query_params = request.query_params.copy() filter_query_params = {} filter_fields = self.prepare_filter_fields(view) for query_param in query_params: query_param_list = self.split_lookup_filter( query_param, maxsplit=1 ) field_name = query_param_list[0] if field_name in filter_fields: lookup_param = None if len(query_param_list) > 1: lookup_param = query_param_list[1] valid_lookups = filter_fields[field_name]['lookups'] nested_path = self.get_filter_field_nested_path( filter_fields, field_name ) if lookup_param is None or lookup_param in valid_lookups: values = [ __value.strip() for __value in query_params.getlist(query_param) if __value.strip() != '' ] if values: filter_query_params[query_param] = { 'lookup': lookup_param, 'values': values, 'field': filter_fields[field_name].get( 'field', field_name ), 'type': view.mapping, 'path': nested_path, } return filter_query_params
[docs] @classmethod def apply_filter(cls, queryset, options=None, args=None, kwargs=None): """Apply filter. :param queryset: :param options: :param args: :param kwargs: :return: """ if options is None: raise ImproperlyConfigured( "You should provide an `path` argument in the field options." ) path = options.pop('path') if args is None: args = [] if kwargs is None: kwargs = {} return queryset.query( 'nested', path=path, query=Q(*args, **kwargs) )
[docs] @classmethod def apply_query(cls, queryset, options=None, args=None, kwargs=None): """Apply query. :param queryset: :param options: :param args: :param kwargs: :return: """ if options is None: raise ImproperlyConfigured( "You should provide an `path` argument in the field options." ) path = options.pop('path') if args is None: args = [] if kwargs is None: kwargs = {} return queryset.query( 'nested', path=path, query=Q(*args, **kwargs) )
[docs] def get_coreschema_field(self, field): if isinstance(field, fields.IntegerField): field_cls = coreschema.Number else: field_cls = coreschema.String return field_cls()
[docs] def get_schema_fields(self, view): assert coreapi is not None, 'coreapi must be installed to ' \ 'use `get_schema_fields()`' assert coreschema is not None, 'coreschema must be installed to ' \ 'use `get_schema_fields()`' filter_fields = getattr(view, 'nested_filter_fields', None) document = getattr(view, 'document', None) return [] if not filter_fields else [ coreapi.Field( name=field_name, required=False, location='query', schema=self.get_coreschema_field( document._doc_type._fields().get(field_name) ) ) for field_name in filter_fields ]