From ed7cf3abccbb0ed4894b3799b0bd641a3d2b01dd Mon Sep 17 00:00:00 2001 From: Mothership Date: Wed, 18 Mar 2026 08:42:36 +0000 Subject: [PATCH] feat: add append_atlan_tags support to Batch, AsyncBatch, save_merging_cm, and update_merging_cm The Batch and AsyncBatch classes only supported replace_atlan_tags (True/False), meaning Atlan tags were either fully overwritten or completely ignored during bulk operations. This made it impossible to use the Batch class for incremental tag updates (add/update/remove specific tags without affecting others). This change adds an append_atlan_tags parameter to: - Batch.__init__() and its flush() method - AsyncBatch.__init__() and its flush() method - AssetClient.save_merging_cm() and update_merging_cm() - AsyncAssetClient.save_merging_cm() and update_merging_cm() When append_atlan_tags=True, the bulk API is called with appendTags=true, enabling incremental tag operations via add_or_update_classifications and remove_classifications fields on assets. This is critical for high-volume tag sync workflows (100K+ assets) where customers need to add/remove specific tags without fetching existing state first. Resolves: REQ-717 Co-Authored-By: Claude Opus 4.6 --- pyatlan/client/aio/asset.py | 17 ++++++++++++++--- pyatlan/client/aio/batch.py | 14 ++++++++++++-- pyatlan/client/asset.py | 31 ++++++++++++++++++++++++++----- 3 files changed, 52 insertions(+), 10 deletions(-) diff --git a/pyatlan/client/aio/asset.py b/pyatlan/client/aio/asset.py index eb6c582a5..565000af2 100644 --- a/pyatlan/client/aio/asset.py +++ b/pyatlan/client/aio/asset.py @@ -388,7 +388,10 @@ async def _retrieve_connection_with_retry(guid): @validate_arguments async def save_merging_cm( - self, entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False + self, + entity: Union[Asset, List[Asset]], + replace_atlan_tags: bool = False, + append_atlan_tags: bool = False, ) -> AssetMutationResponse: """ Async save with merging custom metadata. @@ -398,6 +401,7 @@ async def save_merging_cm( :param entity: one or more assets to save :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) + :param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False) :returns: details of the created or updated assets """ return await self.save( @@ -405,11 +409,15 @@ async def save_merging_cm( replace_atlan_tags=replace_atlan_tags, replace_custom_metadata=True, overwrite_custom_metadata=False, + append_atlan_tags=append_atlan_tags, ) @validate_arguments async def update_merging_cm( - self, entity: Asset, replace_atlan_tags: bool = False + self, + entity: Asset, + replace_atlan_tags: bool = False, + append_atlan_tags: bool = False, ) -> AssetMutationResponse: """ Async update with merging custom metadata. @@ -419,6 +427,7 @@ async def update_merging_cm( :param entity: the asset to update :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) + :param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False) :returns: details of the updated asset :raises NotFoundError: if the asset does not exist (will not create it) """ @@ -430,7 +439,9 @@ async def update_merging_cm( get_by_qualified_name_func=self.get_by_qualified_name, ) return await self.save_merging_cm( - entity=entity, replace_atlan_tags=replace_atlan_tags + entity=entity, + replace_atlan_tags=replace_atlan_tags, + append_atlan_tags=append_atlan_tags, ) @validate_arguments diff --git a/pyatlan/client/aio/batch.py b/pyatlan/client/aio/batch.py index b99444208..2ab1f3829 100644 --- a/pyatlan/client/aio/batch.py +++ b/pyatlan/client/aio/batch.py @@ -43,6 +43,7 @@ def __init__( case_insensitive: bool = False, table_view_agnostic: bool = False, creation_handling: AssetCreationHandling = AssetCreationHandling.FULL, + append_atlan_tags: bool = False, ): """ Create a new async batch of assets to be bulk-saved. @@ -68,10 +69,15 @@ def __init__( view if not found as a table, and vice versa) :param creation_handling: when allowing assets to be created, how to handle those creations (full assets or partial assets). + :param append_atlan_tags: if True, Atlan tags on assets in the batch + will be added/updated/removed using the add_or_update_classifications + and remove_classifications fields, without replacing all existing tags. + When True, replace_atlan_tags is ignored. """ self._client: AsyncAtlanClient = client self._max_size: int = max_size self._replace_atlan_tags: bool = replace_atlan_tags + self._append_atlan_tags: bool = append_atlan_tags self._custom_metadata_handling: CustomMetadataHandling = ( custom_metadata_handling ) @@ -320,7 +326,9 @@ async def flush(self) -> Optional[AssetMutationResponse]: try: if self._custom_metadata_handling == CustomMetadataHandling.IGNORE: response = await self._client.asset.save( - revised, replace_atlan_tags=self._replace_atlan_tags + revised, + replace_atlan_tags=self._replace_atlan_tags, + append_atlan_tags=self._append_atlan_tags, ) elif ( self._custom_metadata_handling @@ -331,7 +339,9 @@ async def flush(self) -> Optional[AssetMutationResponse]: ) elif self._custom_metadata_handling == CustomMetadataHandling.MERGE: response = await self._client.asset.save_merging_cm( - revised, replace_atlan_tags=self._replace_atlan_tags + revised, + replace_atlan_tags=self._replace_atlan_tags, + append_atlan_tags=self._append_atlan_tags, ) else: raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters( diff --git a/pyatlan/client/asset.py b/pyatlan/client/asset.py index cf7667e9a..f990ac690 100644 --- a/pyatlan/client/asset.py +++ b/pyatlan/client/asset.py @@ -490,7 +490,10 @@ def upsert_merging_cm( @validate_arguments def save_merging_cm( - self, entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False + self, + entity: Union[Asset, List[Asset]], + replace_atlan_tags: bool = False, + append_atlan_tags: bool = False, ) -> AssetMutationResponse: """ If no asset exists, has the same behavior as the upsert() method, while also setting @@ -499,6 +502,7 @@ def save_merging_cm( :param entity: one or more assets to save :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) + :param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False) :returns: details of the created or updated assets """ return self.save( @@ -506,11 +510,15 @@ def save_merging_cm( replace_atlan_tags=replace_atlan_tags, replace_custom_metadata=True, overwrite_custom_metadata=False, + append_atlan_tags=append_atlan_tags, ) @validate_arguments def update_merging_cm( - self, entity: Asset, replace_atlan_tags: bool = False + self, + entity: Asset, + replace_atlan_tags: bool = False, + append_atlan_tags: bool = False, ) -> AssetMutationResponse: """ If no asset exists, fails with a NotFoundError. Will merge any provided @@ -519,6 +527,7 @@ def update_merging_cm( :param entity: the asset to update :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) + :param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False) :returns: details of the updated asset :raises NotFoundError: if the asset does not exist (will not create it) """ @@ -528,7 +537,9 @@ def update_merging_cm( get_by_qualified_name_func=self.get_by_qualified_name, ) return self.save_merging_cm( - entity=entity, replace_atlan_tags=replace_atlan_tags + entity=entity, + replace_atlan_tags=replace_atlan_tags, + append_atlan_tags=append_atlan_tags, ) @validate_arguments @@ -2201,6 +2212,7 @@ def __init__( case_insensitive: bool = False, table_view_agnostic: bool = False, creation_handling: AssetCreationHandling = AssetCreationHandling.FULL, + append_atlan_tags: bool = False, ): """ Create a new batch of assets to be bulk-saved. @@ -2226,10 +2238,15 @@ def __init__( view if not found as a table, and vice versa) :param creation_handling: when allowing assets to be created, how to handle those creations (full assets or partial assets). + :param append_atlan_tags: if True, Atlan tags on assets in the batch + will be added/updated/removed using the add_or_update_classifications + and remove_classifications fields, without replacing all existing tags. + When True, replace_atlan_tags is ignored. """ self._client: AtlanClient = client self._max_size: int = max_size self._replace_atlan_tags: bool = replace_atlan_tags + self._append_atlan_tags: bool = append_atlan_tags self._custom_metadata_handling: CustomMetadataHandling = ( custom_metadata_handling ) @@ -2480,7 +2497,9 @@ def flush(self) -> Optional[AssetMutationResponse]: try: if self._custom_metadata_handling == CustomMetadataHandling.IGNORE: response = self._client.asset.save( - revised, replace_atlan_tags=self._replace_atlan_tags + revised, + replace_atlan_tags=self._replace_atlan_tags, + append_atlan_tags=self._append_atlan_tags, ) elif ( self._custom_metadata_handling @@ -2491,7 +2510,9 @@ def flush(self) -> Optional[AssetMutationResponse]: ) elif self._custom_metadata_handling == CustomMetadataHandling.MERGE: response = self._client.asset.save_merging_cm( - revised, replace_atlan_tags=self._replace_atlan_tags + revised, + replace_atlan_tags=self._replace_atlan_tags, + append_atlan_tags=self._append_atlan_tags, ) else: raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(