Source code for django_elasticsearch_dsl_drf.pagination
# coding: utf-8
"""
Pagination.
"""
from __future__ import unicode_literals
from collections import OrderedDict
from django.core import paginator as django_paginator
from rest_framework import pagination
from rest_framework.pagination import _get_count
from rest_framework.exceptions import NotFound
from rest_framework.response import Response
import six
__title__ = 'django_elasticsearch_dsl_drf.pagination'
__author__ = 'Artur Barseghyan <artur.barseghyan@gmail.com>'
__copyright__ = '2016-2017 Artur Barseghyan'
__license__ = 'GPL 2.0/LGPL 2.1'
__all__ = (
'LimitOffsetPagination',
'Page',
'PageNumberPagination',
'Paginator',
)
[docs]class Page(django_paginator.Page):
"""Page for Elasticsearch."""
def __init__(self, object_list, number, paginator, facets):
self.facets = facets
super(Page, self).__init__(object_list, number, paginator)
[docs]class Paginator(django_paginator.Paginator):
"""Paginator for Elasticsearch."""
[docs] def page(self, number):
"""Returns a Page object for the given 1-based page number.
:param number:
:return:
"""
number = self.validate_number(number)
bottom = (number - 1) * self.per_page
top = bottom + self.per_page
if top + self.orphans >= self.count:
top = self.count
object_list = self.object_list[bottom:top].execute()
__facets = getattr(object_list, 'aggregations', None)
return self._get_page(object_list, number, self, facets=__facets)
def _get_page(self, *args, **kwargs):
"""Get page.
Returns an instance of a single page.
This hook can be used by subclasses to use an alternative to the
standard :cls:`Page` object.
"""
return Page(*args, **kwargs)
[docs]class PageNumberPagination(pagination.PageNumberPagination):
"""Page number pagination.
A simple page number based style that supports page numbers as
query parameters.
Example:
http://api.example.org/accounts/?page=4
http://api.example.org/accounts/?page=4&page_size=100
"""
django_paginator_class = Paginator
def __init__(self, *args, **kwargs):
"""Constructor.
:param args:
:param kwargs:
"""
self.facets = None
# self.page = None
# self.request = None
super(PageNumberPagination, self).__init__(*args, **kwargs)
[docs] def get_facets(self, page=None):
"""Get facets.
:param page:
:return:
"""
if page is None:
page = self.page
if hasattr(page, 'facets') and hasattr(page.facets, '_d_'):
return page.facets._d_
[docs] def paginate_queryset(self, queryset, request, view=None):
"""Paginate a queryset.
Paginate a queryset if required, either returning a page object,
or `None` if pagination is not configured for this view.
:param queryset:
:param request:
:param view:
:return:
"""
page_size = self.get_page_size(request)
if not page_size:
return None
paginator = self.django_paginator_class(queryset, page_size)
page_number = request.query_params.get(self.page_query_param, 1)
if page_number in self.last_page_strings:
page_number = paginator.num_pages
try:
self.page = paginator.page(page_number)
except django_paginator.InvalidPage as exc:
msg = self.invalid_page_message.format(
page_number=page_number, message=six.text_type(exc)
)
raise NotFound(msg)
if paginator.num_pages > 1 and self.template is not None:
# The browsable API should display pagination controls.
self.display_page_controls = True
self.request = request
return list(self.page)
[docs] def get_paginated_response(self, data):
"""Get paginated response.
:param data:
:return:
"""
__data = [
('count', self.page.paginator.count),
('next', self.get_next_link()),
('previous', self.get_previous_link()),
]
__facets = self.get_facets()
if __facets is not None:
__data.append(
('facets', __facets),
)
__data.append(
('results', data),
)
return Response(OrderedDict(__data))
[docs]class LimitOffsetPagination(pagination.LimitOffsetPagination):
"""A limit/offset pagination.
Example:
http://api.example.org/accounts/?limit=100
http://api.example.org/accounts/?offset=400&limit=100
"""
def __init__(self, *args, **kwargs):
"""Constructor.
:param args:
:param kwargs:
"""
self.facets = None
# self.count = None
# self.limit = None
# self.offset = None
# self.request = None
super(LimitOffsetPagination, self).__init__(*args, **kwargs)
[docs] def paginate_queryset(self, queryset, request, view=None):
self.count = _get_count(queryset)
self.limit = self.get_limit(request)
if self.limit is None:
return None
self.offset = self.get_offset(request)
self.request = request
if self.count > self.limit and self.template is not None:
self.display_page_controls = True
if self.count == 0 or self.offset > self.count:
return []
__queryset = queryset[self.offset:self.offset + self.limit].execute()
self.facets = getattr(__queryset, 'aggregations', None)
return list(queryset[self.offset:self.offset + self.limit])
[docs] def get_facets(self, facets=None):
"""Get facets.
:param facets:
:return:
"""
if facets is None:
facets = self.facets
if facets is None:
return None
if hasattr(facets, '_d_'):
return facets._d_
[docs] def get_paginated_response(self, data):
"""Get paginated response.
:param data:
:return:
"""
__data = [
('count', self.count),
('next', self.get_next_link()),
('previous', self.get_previous_link()),
]
__facets = self.get_facets()
if __facets is not None:
__data.append(
('facets', __facets),
)
__data.append(
('results', data),
)
return Response(OrderedDict(__data))