diff --git a/README.md b/README.md index 32e7ab1..f48cf75 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com). |:-------------------------------------|:--------:|:-------:|:-------------------------------------------------------------------------------------------------------------------------------------| | access_token | True | None | The token to authenticate against the API service | | start_date | False | None | The earliest record date to sync | +| end_date | False | None | The latest record date to sync | | replication_lookback_window_seconds | False | 0 | Overlap window in seconds for incremental replication to replay recent records and reduce misses near bookmark boundaries | | stream_maps | False | None | Config object for stream maps capability. | | stream_map_config | False | None | User-defined config values to be used within map expressions. | diff --git a/tap_intercom/client.py b/tap_intercom/client.py index 47db926..dd7594a 100644 --- a/tap_intercom/client.py +++ b/tap_intercom/client.py @@ -2,6 +2,7 @@ from __future__ import annotations +import logging import time import typing as t @@ -15,6 +16,8 @@ T = t.TypeVar("T") TPageToken = t.TypeVar("TPageToken") +LOGGER = logging.getLogger(__name__) + class IntercomStream(RESTStream): """Intercom stream class.""" @@ -47,7 +50,7 @@ def http_headers(self) -> dict: if user_agent: result["User-Agent"] = user_agent result["Content-Type"] = "application/json" - result["Intercom-Version"] = "2.15" + result["Intercom-Version"] = "2.14" return result def get_url_params(self, context: dict | None, next_page_token: object) -> dict: # noqa: ARG002 @@ -86,7 +89,15 @@ def prepare_request_payload( if self.http_method == "POST": body = {} start_date = self.get_starting_replication_key_value(context) - if start_date or self.config.get("filters", {}).get(self.name): + signpost = self.get_replication_key_signpost(context) + end_date = self.config.get("end_date") + + # Preserve the existing signpost behavior when end_date is not provided. + upper_bound = signpost if end_date is None else int(end_date) + if signpost is not None and end_date is not None: + upper_bound = min(signpost, int(end_date)) + + if start_date or upper_bound or self.config.get("filters", {}).get(self.name): body["query"] = { "operator": "AND", "value": [ @@ -104,6 +115,17 @@ def prepare_request_payload( "value": start_date, }, ) + if upper_bound: + # Freeze the extraction window for this sync to reduce cursor churn + # on rapidly mutating datasets. If end_date is configured, + # respect the earliest of end_date and signpost. + body["query"]["value"].append( + { + "field": self.replication_key, + "operator": "<", + "value": upper_bound + 1, + }, + ) if next_page_token: body["pagination"] = {"per_page": 150, "starting_after": next_page_token} return body @@ -138,9 +160,13 @@ def get_replication_key_signpost( """ if not self.replication_key: return None - signpost = int(time.time()) - self.logger.info("Setting replication key signpost to current Unix timestamp at sync start.") - self.logger.info("Signpost value: %s", signpost) + + signpost = getattr(self, "_replication_key_signpost", None) + if signpost is None: + signpost = int(time.time()) + self._replication_key_signpost = signpost + self.logger.info("Setting replication key signpost to current Unix timestamp at sync start.") + self.logger.info("Signpost value: %s", signpost) return signpost def post_process( @@ -169,7 +195,60 @@ def get_new_paginator(self) -> JSONPathPaginator: Returns: JSONPathPaginator: Paginator for handling paginated API responses. """ - return JSONPathPaginator(jsonpath="$.pages.next.starting_after") + return IntercomSearchPaginator( + "$.pages.next.starting_after", + logger=self.logger, + ) + + +class IntercomSearchPaginator(JSONPathPaginator): + """JSONPath paginator with loop protection for repeated cursor tokens. + + Some Intercom search endpoints can return cursor sequences that revisit + previously seen tokens (for example: A -> B -> A -> B) when the underlying + dataset is changing quickly. The base paginator only detects consecutive + repeats, so we guard against any previously seen cursor to prevent infinite + loops. + """ + + def __init__( + self, + jsonpath: str, + *args: t.Any, + logger: logging.Logger | None = None, + **kwargs: t.Any, + ) -> None: + """Create a new guarded paginator.""" + super().__init__(jsonpath, *args, **kwargs) + self._logger = logger or LOGGER + self._seen_tokens: set[t.Any] = set() + + def advance(self, response: requests.Response) -> None: + """Advance the page token and stop gracefully if a token repeats.""" + self._page_count += 1 + + if not self.has_more(response): + self._finished = True + return + + new_value = self.get_next(response) + + if new_value and new_value in self._seen_tokens: + self._logger.warning( + "Loop detected in pagination. Token %s was seen earlier (page %s). " + "Stopping pagination for this stream to avoid an infinite loop.", + new_value, + self._page_count, + ) + self._finished = True + return + + if not new_value: + self._finished = True + return + + self._seen_tokens.add(new_value) + self._value = new_value class IntercomHATEOASPaginator(BaseHATEOASPaginator): diff --git a/tap_intercom/tap.py b/tap_intercom/tap.py index 851a265..71db0bc 100644 --- a/tap_intercom/tap.py +++ b/tap_intercom/tap.py @@ -26,6 +26,11 @@ class TapIntercom(Tap): th.IntegerType, description="The earliest record date to sync, in unix timestamp format", ), + th.Property( + "end_date", + th.IntegerType, + description="The latest record date to sync, in unix timestamp format", + ), th.Property( "replication_lookback_window_seconds", th.IntegerType,