Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions edfi_api_client/edfi_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
if TYPE_CHECKING:
from edfi_api_client.edfi_client import EdFiClient

from functools import partial


class EdFiEndpoint:
"""
Expand Down Expand Up @@ -163,10 +165,31 @@ def get_rows(self,
:param max_wait:
:return:
"""
paged_result_iter = self.get_pages(
# Always default to cursor-pagination; fall back to reverse-offset paging if unsupported
paginator = self.get_pages_cursor

## Check ODS version compatibility for cursor paging
ods_version = tuple(map(int, self.client.get_ods_version().split(".")[:2]))
if ods_version < (7,3):
logging.warning(f"ODS version {ods_version} is incompatible with cursor-pagination (requires v7.3 or higher). Falling back to reverse-offset pagination...")
paginator = partial(self.get_pages,
reverse_paging = reverse_paging,
step_change_version=step_change_version,
change_version_step_size=change_version_step_size
)

## deletes/key_changes cannot be retrieved with cursor paging
if self.get_deletes or self.get_key_changes:
logging.warning(f"Cursor-pagination is unsupported in deletes/key_changes endpoints. Falling back to reverse-offset pagination...")
paginator = partial(self.get_pages,
reverse_paging = reverse_paging,
step_change_version=step_change_version,
change_version_step_size=change_version_step_size
)

paged_result_iter = paginator(
params=params,
page_size=page_size, reverse_paging=reverse_paging,
step_change_version=step_change_version, change_version_step_size=change_version_step_size,
page_size=page_size,
**kwargs
)

Expand Down Expand Up @@ -218,9 +241,10 @@ def get_pages(self,

# Begin pagination-loop
while True:
logging.info(f"[Get {self.component}] Parameters: {paged_params}")

### GET from the API and yield the resulting JSON payload
paged_rows = self.get(params=paged_params, **kwargs)
paged_rows = self.client.session.get_response(self.url, params=paged_params, **kwargs).json()
logging.info(f"[Get {self.component}] Retrieved {len(paged_rows)} rows.")
yield paged_rows

Expand Down Expand Up @@ -259,6 +283,35 @@ def get_pages(self,
else:
logging.info(f"@ Paginating offset...")
paged_params.page_by_offset()


def get_pages_cursor(self,
*,
params: Optional[dict] = None, # Optional alternative params
page_size: int = 100,
**kwargs
) -> Iterator[List[dict]]:

# Override init params if passed
paged_params = EdFiParams(params or self.params).copy()
logging.info(f"[Paged Get {self.component}] Pagination Method: Cursor Paging")

# Begin pagination loop
while True:
logging.info(f"[Get {self.component}] Parameters: {paged_params}")

result = self.client.session.get_response(self.url, params = paged_params, **kwargs)
paged_rows = result.json()
logging.info(f"[Get {self.component}] Retrieved {len(paged_rows)} rows")
yield paged_rows

logging.info(f"[Paged Get {self.component}] @ Checking next page token...")
if not result.headers.get("Next-Page-Token"):
logging.info(f"[Paged Get {self.component}] @ Retrieved empty page token. Ending pagination.")
break

paged_params.page_by_token(page_token = result.headers.get("Next-Page-Token"), page_size=page_size)



def get_total_count(self, *, params: Optional[dict] = None, **kwargs) -> int:
Expand Down Expand Up @@ -402,6 +455,10 @@ def get_total_count(self, *args, **kwargs):
:return:
"""
raise NotImplementedError("Total counts have not been implemented in Ed-Fi composites!")

def get_pages_cursor(self, *args, **kwargs):
logging.info(f"Composite endpoints are incompatible with cursor-pagination. Falling back to offset pagination...")
yield from self.get_pages(*args, **kwargs)

def get_pages(self, *, params: Optional[dict] = None, page_size: int = 100, **kwargs) -> Iterator[List[dict]]:
"""
Expand Down
23 changes: 23 additions & 0 deletions edfi_api_client/edfi_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def __init__(self,
# These parameters are only used during pagination. They must be explicitly initialized.
self.page_size = None
self.change_version_step_size = None
self.page_token = None
self.number = None


def copy(self) -> 'EdFiParams':
Expand Down Expand Up @@ -190,3 +192,24 @@ def reverse_page_by_offset(self):

if self['offset'] < 0:
raise StopIteration


def page_by_token(self, page_token: str, page_size: int):
"""
Cursor paging behavior: page_token is required when page_size is specified.
- If page_token is None: first request, do NOT include page_size
- If page_token is present: include page_token and page_size

:param page_size:
:param page_token:
:return:
"""
self.page_size = page_size
self.page_token = page_token

if page_token is None:
self.pop("pageToken", None)
self.pop("pageSize", None)
else:
self["pageToken"] = self.page_token
self["pageSize"] = self.page_size