diff --git a/pyatlan/client/aio/asset.py b/pyatlan/client/aio/asset.py index eb6c582a5..565000af2 100644 --- a/pyatlan/client/aio/asset.py +++ b/pyatlan/client/aio/asset.py @@ -388,7 +388,10 @@ async def _retrieve_connection_with_retry(guid): @validate_arguments async def save_merging_cm( - self, entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False + self, + entity: Union[Asset, List[Asset]], + replace_atlan_tags: bool = False, + append_atlan_tags: bool = False, ) -> AssetMutationResponse: """ Async save with merging custom metadata. @@ -398,6 +401,7 @@ async def save_merging_cm( :param entity: one or more assets to save :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) + :param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False) :returns: details of the created or updated assets """ return await self.save( @@ -405,11 +409,15 @@ async def save_merging_cm( replace_atlan_tags=replace_atlan_tags, replace_custom_metadata=True, overwrite_custom_metadata=False, + append_atlan_tags=append_atlan_tags, ) @validate_arguments async def update_merging_cm( - self, entity: Asset, replace_atlan_tags: bool = False + self, + entity: Asset, + replace_atlan_tags: bool = False, + append_atlan_tags: bool = False, ) -> AssetMutationResponse: """ Async update with merging custom metadata. @@ -419,6 +427,7 @@ async def update_merging_cm( :param entity: the asset to update :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) + :param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False) :returns: details of the updated asset :raises NotFoundError: if the asset does not exist (will not create it) """ @@ -430,7 +439,9 @@ async def update_merging_cm( get_by_qualified_name_func=self.get_by_qualified_name, ) return await self.save_merging_cm( - entity=entity, replace_atlan_tags=replace_atlan_tags + entity=entity, + replace_atlan_tags=replace_atlan_tags, + append_atlan_tags=append_atlan_tags, ) @validate_arguments diff --git a/pyatlan/client/aio/batch.py b/pyatlan/client/aio/batch.py index b99444208..2ab1f3829 100644 --- a/pyatlan/client/aio/batch.py +++ b/pyatlan/client/aio/batch.py @@ -43,6 +43,7 @@ def __init__( case_insensitive: bool = False, table_view_agnostic: bool = False, creation_handling: AssetCreationHandling = AssetCreationHandling.FULL, + append_atlan_tags: bool = False, ): """ Create a new async batch of assets to be bulk-saved. @@ -68,10 +69,15 @@ def __init__( view if not found as a table, and vice versa) :param creation_handling: when allowing assets to be created, how to handle those creations (full assets or partial assets). + :param append_atlan_tags: if True, Atlan tags on assets in the batch + will be added/updated/removed using the add_or_update_classifications + and remove_classifications fields, without replacing all existing tags. + When True, replace_atlan_tags is ignored. """ self._client: AsyncAtlanClient = client self._max_size: int = max_size self._replace_atlan_tags: bool = replace_atlan_tags + self._append_atlan_tags: bool = append_atlan_tags self._custom_metadata_handling: CustomMetadataHandling = ( custom_metadata_handling ) @@ -320,7 +326,9 @@ async def flush(self) -> Optional[AssetMutationResponse]: try: if self._custom_metadata_handling == CustomMetadataHandling.IGNORE: response = await self._client.asset.save( - revised, replace_atlan_tags=self._replace_atlan_tags + revised, + replace_atlan_tags=self._replace_atlan_tags, + append_atlan_tags=self._append_atlan_tags, ) elif ( self._custom_metadata_handling @@ -331,7 +339,9 @@ async def flush(self) -> Optional[AssetMutationResponse]: ) elif self._custom_metadata_handling == CustomMetadataHandling.MERGE: response = await self._client.asset.save_merging_cm( - revised, replace_atlan_tags=self._replace_atlan_tags + revised, + replace_atlan_tags=self._replace_atlan_tags, + append_atlan_tags=self._append_atlan_tags, ) else: raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters( diff --git a/pyatlan/client/asset.py b/pyatlan/client/asset.py index cf7667e9a..f990ac690 100644 --- a/pyatlan/client/asset.py +++ b/pyatlan/client/asset.py @@ -490,7 +490,10 @@ def upsert_merging_cm( @validate_arguments def save_merging_cm( - self, entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False + self, + entity: Union[Asset, List[Asset]], + replace_atlan_tags: bool = False, + append_atlan_tags: bool = False, ) -> AssetMutationResponse: """ If no asset exists, has the same behavior as the upsert() method, while also setting @@ -499,6 +502,7 @@ def save_merging_cm( :param entity: one or more assets to save :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) + :param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False) :returns: details of the created or updated assets """ return self.save( @@ -506,11 +510,15 @@ def save_merging_cm( replace_atlan_tags=replace_atlan_tags, replace_custom_metadata=True, overwrite_custom_metadata=False, + append_atlan_tags=append_atlan_tags, ) @validate_arguments def update_merging_cm( - self, entity: Asset, replace_atlan_tags: bool = False + self, + entity: Asset, + replace_atlan_tags: bool = False, + append_atlan_tags: bool = False, ) -> AssetMutationResponse: """ If no asset exists, fails with a NotFoundError. Will merge any provided @@ -519,6 +527,7 @@ def update_merging_cm( :param entity: the asset to update :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) + :param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False) :returns: details of the updated asset :raises NotFoundError: if the asset does not exist (will not create it) """ @@ -528,7 +537,9 @@ def update_merging_cm( get_by_qualified_name_func=self.get_by_qualified_name, ) return self.save_merging_cm( - entity=entity, replace_atlan_tags=replace_atlan_tags + entity=entity, + replace_atlan_tags=replace_atlan_tags, + append_atlan_tags=append_atlan_tags, ) @validate_arguments @@ -2201,6 +2212,7 @@ def __init__( case_insensitive: bool = False, table_view_agnostic: bool = False, creation_handling: AssetCreationHandling = AssetCreationHandling.FULL, + append_atlan_tags: bool = False, ): """ Create a new batch of assets to be bulk-saved. @@ -2226,10 +2238,15 @@ def __init__( view if not found as a table, and vice versa) :param creation_handling: when allowing assets to be created, how to handle those creations (full assets or partial assets). + :param append_atlan_tags: if True, Atlan tags on assets in the batch + will be added/updated/removed using the add_or_update_classifications + and remove_classifications fields, without replacing all existing tags. + When True, replace_atlan_tags is ignored. """ self._client: AtlanClient = client self._max_size: int = max_size self._replace_atlan_tags: bool = replace_atlan_tags + self._append_atlan_tags: bool = append_atlan_tags self._custom_metadata_handling: CustomMetadataHandling = ( custom_metadata_handling ) @@ -2480,7 +2497,9 @@ def flush(self) -> Optional[AssetMutationResponse]: try: if self._custom_metadata_handling == CustomMetadataHandling.IGNORE: response = self._client.asset.save( - revised, replace_atlan_tags=self._replace_atlan_tags + revised, + replace_atlan_tags=self._replace_atlan_tags, + append_atlan_tags=self._append_atlan_tags, ) elif ( self._custom_metadata_handling @@ -2491,7 +2510,9 @@ def flush(self) -> Optional[AssetMutationResponse]: ) elif self._custom_metadata_handling == CustomMetadataHandling.MERGE: response = self._client.asset.save_merging_cm( - revised, replace_atlan_tags=self._replace_atlan_tags + revised, + replace_atlan_tags=self._replace_atlan_tags, + append_atlan_tags=self._append_atlan_tags, ) else: raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(