From bc365a5d154c949c4424e81b0cf85c1764908a2f Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Wed, 11 Mar 2026 13:37:45 +0100 Subject: [PATCH 01/15] feat: :sparkles: Add logging for aircall tags in ConversationsStream post_process method --- tap_intercom/streams.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tap_intercom/streams.py b/tap_intercom/streams.py index 0185262..a93cc86 100644 --- a/tap_intercom/streams.py +++ b/tap_intercom/streams.py @@ -33,6 +33,34 @@ class ConversationsStream(IntercomStream): schema = conversations_schema is_sorted = False + def post_process(self, row: dict, context: dict | None = None) -> dict | None: + """Log id-tags combinations if they contain "aircall" to help diagnose tap vs target issues. + + Args: + row: Individual record in the stream. + context: Stream partition or context dictionary. + + Returns: + The record dict, or `None` if the record should be excluded. + """ + row = super().post_process(row, context) + if row is None: + return None + aircall_tags = [ + tag for tag in (row.get("tags") or {}).get("tags") or [] + if "aircall" in (tag.get("name") or "").lower() + ] + if aircall_tags: + # Log only minimal tag information at DEBUG level to avoid high-volume, + # potentially sensitive per-record logging at INFO. + tag_names = [tag.get("name") for tag in aircall_tags if tag.get("name")] + self.logger.info( + "CONVERSATION_TAGS: id=%s tag_names=%s", + row.get("id"), + tag_names, + ) + return row + def get_child_context(self, record: dict, context: dict | None) -> dict: # noqa: ARG002 """Return a context dictionary for child streams.""" return {"conversation_id": record["id"]} From 21fbe8146514cf53bc214476313fbe18ebf65289 Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Wed, 11 Mar 2026 16:18:57 +0100 Subject: [PATCH 02/15] fix: :bug: Change logging level from info to warning for conversation tags in ConversationsStream --- tap_intercom/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_intercom/streams.py b/tap_intercom/streams.py index a93cc86..ecc1c8c 100644 --- a/tap_intercom/streams.py +++ b/tap_intercom/streams.py @@ -54,7 +54,7 @@ def post_process(self, row: dict, context: dict | None = None) -> dict | None: # Log only minimal tag information at DEBUG level to avoid high-volume, # potentially sensitive per-record logging at INFO. tag_names = [tag.get("name") for tag in aircall_tags if tag.get("name")] - self.logger.info( + self.logger.warning( "CONVERSATION_TAGS: id=%s tag_names=%s", row.get("id"), tag_names, From ca44444157b2549ad04dad6dfca8d33535447465 Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Tue, 17 Mar 2026 15:10:13 +0100 Subject: [PATCH 03/15] feat: :sparkles: Implement IntercomSearchPaginator to handle cursor token loops in pagination --- tap_intercom/client.py | 48 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/tap_intercom/client.py b/tap_intercom/client.py index 47db926..5fa4b37 100644 --- a/tap_intercom/client.py +++ b/tap_intercom/client.py @@ -2,6 +2,7 @@ from __future__ import annotations +import logging import time import typing as t @@ -15,6 +16,8 @@ T = t.TypeVar("T") TPageToken = t.TypeVar("TPageToken") +LOGGER = logging.getLogger(__name__) + class IntercomStream(RESTStream): """Intercom stream class.""" @@ -169,7 +172,50 @@ def get_new_paginator(self) -> JSONPathPaginator: Returns: JSONPathPaginator: Paginator for handling paginated API responses. """ - return JSONPathPaginator(jsonpath="$.pages.next.starting_after") + return IntercomSearchPaginator(jsonpath="$.pages.next.starting_after") + + +class IntercomSearchPaginator(JSONPathPaginator): + """JSONPath paginator with loop protection for repeated cursor tokens. + + Some Intercom search endpoints can return cursor sequences that revisit + previously seen tokens (for example: A -> B -> A -> B) when the underlying + dataset is changing quickly. The base paginator only detects consecutive + repeats, so we guard against any previously seen cursor to prevent infinite + loops. + """ + + def __init__(self, jsonpath: str, *args: t.Any, **kwargs: t.Any) -> None: + """Create a new guarded paginator.""" + super().__init__(jsonpath=jsonpath, *args, **kwargs) + self._seen_tokens: set[t.Any] = set() + + def advance(self, response: requests.Response) -> None: + """Advance the page token and stop gracefully if a token repeats.""" + self._page_count += 1 + + if not self.has_more(response): + self._finished = True + return + + new_value = self.get_next(response) + + if new_value and new_value in self._seen_tokens: + LOGGER.warning( + "Loop detected in pagination. Token %s was seen earlier (page %s). " + "Stopping pagination for this stream to avoid an infinite loop.", + new_value, + self._page_count, + ) + self._finished = True + return + + if not new_value: + self._finished = True + return + + self._seen_tokens.add(new_value) + self._value = new_value class IntercomHATEOASPaginator(BaseHATEOASPaginator): From a2ee62a90df806de1569184efe0705747c435099 Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Tue, 17 Mar 2026 15:21:22 +0100 Subject: [PATCH 04/15] =?UTF-8?q?fix:=20=F0=9F=9A=A8=20Correct=20argument?= =?UTF-8?q?=20passing=20in=20IntercomSearchPaginator=20constructor=20to=20?= =?UTF-8?q?resolve=20CI=20failure?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tap_intercom/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_intercom/client.py b/tap_intercom/client.py index 5fa4b37..04e6ad1 100644 --- a/tap_intercom/client.py +++ b/tap_intercom/client.py @@ -187,7 +187,7 @@ class IntercomSearchPaginator(JSONPathPaginator): def __init__(self, jsonpath: str, *args: t.Any, **kwargs: t.Any) -> None: """Create a new guarded paginator.""" - super().__init__(jsonpath=jsonpath, *args, **kwargs) + super().__init__(jsonpath, *args, **kwargs) self._seen_tokens: set[t.Any] = set() def advance(self, response: requests.Response) -> None: From b4aa39c3bc68d3e327f089cd4226e568f2f88d92 Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Wed, 18 Mar 2026 10:48:00 +0100 Subject: [PATCH 05/15] =?UTF-8?q?debug:=20=F0=9F=94=8A=20Add=20extra=20log?= =?UTF-8?q?ging=20for=20paginator=20advancement=20in=20IntercomSearchPagin?= =?UTF-8?q?ator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tap_intercom/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tap_intercom/client.py b/tap_intercom/client.py index 04e6ad1..334082d 100644 --- a/tap_intercom/client.py +++ b/tap_intercom/client.py @@ -192,6 +192,7 @@ def __init__(self, jsonpath: str, *args: t.Any, **kwargs: t.Any) -> None: def advance(self, response: requests.Response) -> None: """Advance the page token and stop gracefully if a token repeats.""" + LOGGER.warning("Advancing paginator to next page.") self._page_count += 1 if not self.has_more(response): From e52de3667bf67fa6c33a49d8f34c5a0d7979443b Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Wed, 18 Mar 2026 11:00:53 +0100 Subject: [PATCH 06/15] fix: :bug: Add logger parameter to IntercomSearchPaginator and update logging in advance method --- tap_intercom/client.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/tap_intercom/client.py b/tap_intercom/client.py index 334082d..d73089b 100644 --- a/tap_intercom/client.py +++ b/tap_intercom/client.py @@ -172,7 +172,10 @@ def get_new_paginator(self) -> JSONPathPaginator: Returns: JSONPathPaginator: Paginator for handling paginated API responses. """ - return IntercomSearchPaginator(jsonpath="$.pages.next.starting_after") + return IntercomSearchPaginator( + "$.pages.next.starting_after", + logger=self.logger, + ) class IntercomSearchPaginator(JSONPathPaginator): @@ -185,14 +188,21 @@ class IntercomSearchPaginator(JSONPathPaginator): loops. """ - def __init__(self, jsonpath: str, *args: t.Any, **kwargs: t.Any) -> None: + def __init__( + self, + jsonpath: str, + *args: t.Any, + logger: logging.Logger | None = None, + **kwargs: t.Any, + ) -> None: """Create a new guarded paginator.""" super().__init__(jsonpath, *args, **kwargs) + self._logger = logger or LOGGER self._seen_tokens: set[t.Any] = set() def advance(self, response: requests.Response) -> None: """Advance the page token and stop gracefully if a token repeats.""" - LOGGER.warning("Advancing paginator to next page.") + self._logger.debug("Advancing Intercom search paginator to next page.") self._page_count += 1 if not self.has_more(response): @@ -202,7 +212,7 @@ def advance(self, response: requests.Response) -> None: new_value = self.get_next(response) if new_value and new_value in self._seen_tokens: - LOGGER.warning( + self._logger.warning( "Loop detected in pagination. Token %s was seen earlier (page %s). " "Stopping pagination for this stream to avoid an infinite loop.", new_value, From 28371b903f613ab9078b0ec88617f8e3382f4e5e Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Wed, 18 Mar 2026 11:58:11 +0100 Subject: [PATCH 07/15] fix: :bug: Update logging level to warning for Intercom search paginator usage and advancement --- tap_intercom/client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tap_intercom/client.py b/tap_intercom/client.py index d73089b..9f62913 100644 --- a/tap_intercom/client.py +++ b/tap_intercom/client.py @@ -172,6 +172,7 @@ def get_new_paginator(self) -> JSONPathPaginator: Returns: JSONPathPaginator: Paginator for handling paginated API responses. """ + self.logger.warning("Using guarded Intercom search paginator for stream '%s'.", self.name) return IntercomSearchPaginator( "$.pages.next.starting_after", logger=self.logger, @@ -202,7 +203,7 @@ def __init__( def advance(self, response: requests.Response) -> None: """Advance the page token and stop gracefully if a token repeats.""" - self._logger.debug("Advancing Intercom search paginator to next page.") + self._logger.warning("Advancing Intercom search paginator to next page.") self._page_count += 1 if not self.has_more(response): From 8ccfef4345ef91e5cd9fe63810ca59ec73d99cfc Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Wed, 18 Mar 2026 14:46:50 +0100 Subject: [PATCH 08/15] fix: :bug: Remove unnecessary logging in Intercom paginator methods --- tap_intercom/client.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tap_intercom/client.py b/tap_intercom/client.py index 9f62913..5f0d03d 100644 --- a/tap_intercom/client.py +++ b/tap_intercom/client.py @@ -172,7 +172,6 @@ def get_new_paginator(self) -> JSONPathPaginator: Returns: JSONPathPaginator: Paginator for handling paginated API responses. """ - self.logger.warning("Using guarded Intercom search paginator for stream '%s'.", self.name) return IntercomSearchPaginator( "$.pages.next.starting_after", logger=self.logger, @@ -203,7 +202,6 @@ def __init__( def advance(self, response: requests.Response) -> None: """Advance the page token and stop gracefully if a token repeats.""" - self._logger.warning("Advancing Intercom search paginator to next page.") self._page_count += 1 if not self.has_more(response): From 1ee0af4218fee865fc723d816d600efdc28f8cb7 Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Wed, 18 Mar 2026 14:51:07 +0100 Subject: [PATCH 09/15] fix: :bug: Update Intercom version in request headers from 2.15 to 2.14 --- tap_intercom/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_intercom/client.py b/tap_intercom/client.py index 5f0d03d..e9a50b0 100644 --- a/tap_intercom/client.py +++ b/tap_intercom/client.py @@ -50,7 +50,7 @@ def http_headers(self) -> dict: if user_agent: result["User-Agent"] = user_agent result["Content-Type"] = "application/json" - result["Intercom-Version"] = "2.15" + result["Intercom-Version"] = "2.14" return result def get_url_params(self, context: dict | None, next_page_token: object) -> dict: # noqa: ARG002 From 935b078125a6ee94f21df8d56cde932c9208b908 Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Thu, 19 Mar 2026 09:58:52 +0100 Subject: [PATCH 10/15] fix: :bug: Enhance replication key handling in IntercomStream to reduce cursor churn --- tap_intercom/client.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/tap_intercom/client.py b/tap_intercom/client.py index e9a50b0..06050c2 100644 --- a/tap_intercom/client.py +++ b/tap_intercom/client.py @@ -89,7 +89,9 @@ def prepare_request_payload( if self.http_method == "POST": body = {} start_date = self.get_starting_replication_key_value(context) - if start_date or self.config.get("filters", {}).get(self.name): + signpost = self.get_replication_key_signpost(context) + + if start_date or signpost or self.config.get("filters", {}).get(self.name): body["query"] = { "operator": "AND", "value": [ @@ -107,6 +109,16 @@ def prepare_request_payload( "value": start_date, }, ) + if signpost: + # Freeze the extraction window for this sync to reduce cursor churn + # on rapidly mutating datasets. + body["query"]["value"].append( + { + "field": self.replication_key, + "operator": "<=", + "value": signpost, + }, + ) if next_page_token: body["pagination"] = {"per_page": 150, "starting_after": next_page_token} return body @@ -141,9 +153,13 @@ def get_replication_key_signpost( """ if not self.replication_key: return None - signpost = int(time.time()) - self.logger.info("Setting replication key signpost to current Unix timestamp at sync start.") - self.logger.info("Signpost value: %s", signpost) + + signpost = getattr(self, "_replication_key_signpost", None) + if signpost is None: + signpost = int(time.time()) + self._replication_key_signpost = signpost + self.logger.info("Setting replication key signpost to current Unix timestamp at sync start.") + self.logger.info("Signpost value: %s", signpost) return signpost def post_process( From d093ffa053c7b8174865a95c727c774ff36e34b7 Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Thu, 19 Mar 2026 10:43:24 +0100 Subject: [PATCH 11/15] fix: :bug: Adjust operator in replication key query to prevent inclusive comparisons --- tap_intercom/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_intercom/client.py b/tap_intercom/client.py index 06050c2..c71b257 100644 --- a/tap_intercom/client.py +++ b/tap_intercom/client.py @@ -115,7 +115,7 @@ def prepare_request_payload( body["query"]["value"].append( { "field": self.replication_key, - "operator": "<=", + "operator": "<", "value": signpost, }, ) From b17fbae1efa31cd62867e84d801b3d7f5b34e8e5 Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Thu, 19 Mar 2026 13:42:42 +0100 Subject: [PATCH 12/15] fix: :bug: Adjust replication key signpost value to be equivalent to less or equal than --- tap_intercom/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_intercom/client.py b/tap_intercom/client.py index c71b257..8f4758e 100644 --- a/tap_intercom/client.py +++ b/tap_intercom/client.py @@ -116,7 +116,7 @@ def prepare_request_payload( { "field": self.replication_key, "operator": "<", - "value": signpost, + "value": signpost + 1, }, ) if next_page_token: From 2bc15afa62a1c2cc03d9567f3a6fc9ca4ccb6bb4 Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Fri, 20 Mar 2026 15:45:31 +0100 Subject: [PATCH 13/15] fix: :bug: Add end_date configuration option to limit the latest record date to sync --- README.md | 1 + tap_intercom/client.py | 15 +++++++++++---- tap_intercom/tap.py | 5 +++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 32e7ab1..f48cf75 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com). |:-------------------------------------|:--------:|:-------:|:-------------------------------------------------------------------------------------------------------------------------------------| | access_token | True | None | The token to authenticate against the API service | | start_date | False | None | The earliest record date to sync | +| end_date | False | None | The latest record date to sync | | replication_lookback_window_seconds | False | 0 | Overlap window in seconds for incremental replication to replay recent records and reduce misses near bookmark boundaries | | stream_maps | False | None | Config object for stream maps capability. | | stream_map_config | False | None | User-defined config values to be used within map expressions. | diff --git a/tap_intercom/client.py b/tap_intercom/client.py index 8f4758e..dd7594a 100644 --- a/tap_intercom/client.py +++ b/tap_intercom/client.py @@ -90,8 +90,14 @@ def prepare_request_payload( body = {} start_date = self.get_starting_replication_key_value(context) signpost = self.get_replication_key_signpost(context) + end_date = self.config.get("end_date") - if start_date or signpost or self.config.get("filters", {}).get(self.name): + # Preserve the existing signpost behavior when end_date is not provided. + upper_bound = signpost if end_date is None else int(end_date) + if signpost is not None and end_date is not None: + upper_bound = min(signpost, int(end_date)) + + if start_date or upper_bound or self.config.get("filters", {}).get(self.name): body["query"] = { "operator": "AND", "value": [ @@ -109,14 +115,15 @@ def prepare_request_payload( "value": start_date, }, ) - if signpost: + if upper_bound: # Freeze the extraction window for this sync to reduce cursor churn - # on rapidly mutating datasets. + # on rapidly mutating datasets. If end_date is configured, + # respect the earliest of end_date and signpost. body["query"]["value"].append( { "field": self.replication_key, "operator": "<", - "value": signpost + 1, + "value": upper_bound + 1, }, ) if next_page_token: diff --git a/tap_intercom/tap.py b/tap_intercom/tap.py index 851a265..71db0bc 100644 --- a/tap_intercom/tap.py +++ b/tap_intercom/tap.py @@ -26,6 +26,11 @@ class TapIntercom(Tap): th.IntegerType, description="The earliest record date to sync, in unix timestamp format", ), + th.Property( + "end_date", + th.IntegerType, + description="The latest record date to sync, in unix timestamp format", + ), th.Property( "replication_lookback_window_seconds", th.IntegerType, From 25f470ed0e0630099e65864e34d8920a04d5c84b Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Fri, 20 Mar 2026 15:54:02 +0100 Subject: [PATCH 14/15] fix: :bug: Change replication key in ConversationsStream from updated_at to created_at --- tap_intercom/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_intercom/streams.py b/tap_intercom/streams.py index ecc1c8c..94e18b3 100644 --- a/tap_intercom/streams.py +++ b/tap_intercom/streams.py @@ -27,7 +27,7 @@ class ConversationsStream(IntercomStream): name = "conversations" path = "/conversations/search" - replication_key = "updated_at" + replication_key = "created_at" records_jsonpath = "$.conversations[*]" http_method = "POST" schema = conversations_schema From b5ff0d12d0487f303f89fd3917ee9165ebdaa62a Mon Sep 17 00:00:00 2001 From: Fernando Corte Vargas Date: Mon, 23 Mar 2026 14:35:42 +0100 Subject: [PATCH 15/15] fix: :bug: Revert replication key in ConversationsStream from created_at to updated_at --- tap_intercom/streams.py | 30 +----------------------------- 1 file changed, 1 insertion(+), 29 deletions(-) diff --git a/tap_intercom/streams.py b/tap_intercom/streams.py index 94e18b3..0185262 100644 --- a/tap_intercom/streams.py +++ b/tap_intercom/streams.py @@ -27,40 +27,12 @@ class ConversationsStream(IntercomStream): name = "conversations" path = "/conversations/search" - replication_key = "created_at" + replication_key = "updated_at" records_jsonpath = "$.conversations[*]" http_method = "POST" schema = conversations_schema is_sorted = False - def post_process(self, row: dict, context: dict | None = None) -> dict | None: - """Log id-tags combinations if they contain "aircall" to help diagnose tap vs target issues. - - Args: - row: Individual record in the stream. - context: Stream partition or context dictionary. - - Returns: - The record dict, or `None` if the record should be excluded. - """ - row = super().post_process(row, context) - if row is None: - return None - aircall_tags = [ - tag for tag in (row.get("tags") or {}).get("tags") or [] - if "aircall" in (tag.get("name") or "").lower() - ] - if aircall_tags: - # Log only minimal tag information at DEBUG level to avoid high-volume, - # potentially sensitive per-record logging at INFO. - tag_names = [tag.get("name") for tag in aircall_tags if tag.get("name")] - self.logger.warning( - "CONVERSATION_TAGS: id=%s tag_names=%s", - row.get("id"), - tag_names, - ) - return row - def get_child_context(self, record: dict, context: dict | None) -> dict: # noqa: ARG002 """Return a context dictionary for child streams.""" return {"conversation_id": record["id"]}