Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
bc365a5
feat: :sparkles: Add logging for aircall tags in ConversationsStream …
fcortevargas Mar 11, 2026
21fbe81
fix: :bug: Change logging level from info to warning for conversation…
fcortevargas Mar 11, 2026
ca44444
feat: :sparkles: Implement IntercomSearchPaginator to handle cursor t…
fcortevargas Mar 17, 2026
a2ee62a
fix: 🚨 Correct argument passing in IntercomSearchPaginator constructo…
fcortevargas Mar 17, 2026
b4aa39c
debug: 🔊 Add extra logging for paginator advancement in IntercomSearc…
fcortevargas Mar 18, 2026
e52de36
fix: :bug: Add logger parameter to IntercomSearchPaginator and update…
fcortevargas Mar 18, 2026
28371b9
fix: :bug: Update logging level to warning for Intercom search pagina…
fcortevargas Mar 18, 2026
8ccfef4
fix: :bug: Remove unnecessary logging in Intercom paginator methods
fcortevargas Mar 18, 2026
1ee0af4
fix: :bug: Update Intercom version in request headers from 2.15 to 2.14
fcortevargas Mar 18, 2026
935b078
fix: :bug: Enhance replication key handling in IntercomStream to redu…
fcortevargas Mar 19, 2026
d093ffa
fix: :bug: Adjust operator in replication key query to prevent inclus…
fcortevargas Mar 19, 2026
b17fbae
fix: :bug: Adjust replication key signpost value to be equivalent to …
fcortevargas Mar 19, 2026
2bc15af
fix: :bug: Add end_date configuration option to limit the latest reco…
fcortevargas Mar 20, 2026
25f470e
fix: :bug: Change replication key in ConversationsStream from updated…
fcortevargas Mar 20, 2026
b5ff0d1
fix: :bug: Revert replication key in ConversationsStream from created…
fcortevargas Mar 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).
|:-------------------------------------|:--------:|:-------:|:-------------------------------------------------------------------------------------------------------------------------------------|
| access_token | True | None | The token to authenticate against the API service |
| start_date | False | None | The earliest record date to sync |
| end_date | False | None | The latest record date to sync |
| replication_lookback_window_seconds | False | 0 | Overlap window in seconds for incremental replication to replay recent records and reduce misses near bookmark boundaries |
| stream_maps | False | None | Config object for stream maps capability. |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
Expand Down
91 changes: 85 additions & 6 deletions tap_intercom/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import logging
import time
import typing as t

Expand All @@ -15,6 +16,8 @@
T = t.TypeVar("T")
TPageToken = t.TypeVar("TPageToken")

LOGGER = logging.getLogger(__name__)


class IntercomStream(RESTStream):
"""Intercom stream class."""
Expand Down Expand Up @@ -47,7 +50,7 @@ def http_headers(self) -> dict:
if user_agent:
result["User-Agent"] = user_agent
result["Content-Type"] = "application/json"
result["Intercom-Version"] = "2.15"
result["Intercom-Version"] = "2.14"
return result

def get_url_params(self, context: dict | None, next_page_token: object) -> dict: # noqa: ARG002
Expand Down Expand Up @@ -86,7 +89,15 @@ def prepare_request_payload(
if self.http_method == "POST":
body = {}
start_date = self.get_starting_replication_key_value(context)
if start_date or self.config.get("filters", {}).get(self.name):
signpost = self.get_replication_key_signpost(context)
end_date = self.config.get("end_date")

# Preserve the existing signpost behavior when end_date is not provided.
upper_bound = signpost if end_date is None else int(end_date)
if signpost is not None and end_date is not None:
upper_bound = min(signpost, int(end_date))

if start_date or upper_bound or self.config.get("filters", {}).get(self.name):
body["query"] = {
"operator": "AND",
"value": [
Expand All @@ -104,6 +115,17 @@ def prepare_request_payload(
"value": start_date,
},
)
if upper_bound:
# Freeze the extraction window for this sync to reduce cursor churn
# on rapidly mutating datasets. If end_date is configured,
# respect the earliest of end_date and signpost.
body["query"]["value"].append(
{
"field": self.replication_key,
"operator": "<",
"value": upper_bound + 1,
},
)
if next_page_token:
body["pagination"] = {"per_page": 150, "starting_after": next_page_token}
return body
Expand Down Expand Up @@ -138,9 +160,13 @@ def get_replication_key_signpost(
"""
if not self.replication_key:
return None
signpost = int(time.time())
self.logger.info("Setting replication key signpost to current Unix timestamp at sync start.")
self.logger.info("Signpost value: %s", signpost)

signpost = getattr(self, "_replication_key_signpost", None)
if signpost is None:
signpost = int(time.time())
self._replication_key_signpost = signpost
self.logger.info("Setting replication key signpost to current Unix timestamp at sync start.")
self.logger.info("Signpost value: %s", signpost)
return signpost

def post_process(
Expand Down Expand Up @@ -169,7 +195,60 @@ def get_new_paginator(self) -> JSONPathPaginator:
Returns:
JSONPathPaginator: Paginator for handling paginated API responses.
"""
return JSONPathPaginator(jsonpath="$.pages.next.starting_after")
return IntercomSearchPaginator(
"$.pages.next.starting_after",
logger=self.logger,
)


class IntercomSearchPaginator(JSONPathPaginator):
"""JSONPath paginator with loop protection for repeated cursor tokens.

Some Intercom search endpoints can return cursor sequences that revisit
previously seen tokens (for example: A -> B -> A -> B) when the underlying
dataset is changing quickly. The base paginator only detects consecutive
repeats, so we guard against any previously seen cursor to prevent infinite
loops.
"""

def __init__(
self,
jsonpath: str,
*args: t.Any,
logger: logging.Logger | None = None,
**kwargs: t.Any,
) -> None:
"""Create a new guarded paginator."""
super().__init__(jsonpath, *args, **kwargs)
self._logger = logger or LOGGER
self._seen_tokens: set[t.Any] = set()

def advance(self, response: requests.Response) -> None:
"""Advance the page token and stop gracefully if a token repeats."""
self._page_count += 1

if not self.has_more(response):
self._finished = True
return

new_value = self.get_next(response)

if new_value and new_value in self._seen_tokens:
self._logger.warning(
"Loop detected in pagination. Token %s was seen earlier (page %s). "
"Stopping pagination for this stream to avoid an infinite loop.",
new_value,
self._page_count,
)
self._finished = True
return

if not new_value:
self._finished = True
return

self._seen_tokens.add(new_value)
self._value = new_value


class IntercomHATEOASPaginator(BaseHATEOASPaginator):
Expand Down
5 changes: 5 additions & 0 deletions tap_intercom/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ class TapIntercom(Tap):
th.IntegerType,
description="The earliest record date to sync, in unix timestamp format",
),
th.Property(
"end_date",
th.IntegerType,
description="The latest record date to sync, in unix timestamp format",
),
th.Property(
"replication_lookback_window_seconds",
th.IntegerType,
Expand Down
Loading