Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions pyatlan/client/aio/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,10 @@ async def _retrieve_connection_with_retry(guid):

@validate_arguments
async def save_merging_cm(
self, entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False
self,
entity: Union[Asset, List[Asset]],
replace_atlan_tags: bool = False,
append_atlan_tags: bool = False,
) -> AssetMutationResponse:
"""
Async save with merging custom metadata.
Expand All @@ -398,18 +401,23 @@ async def save_merging_cm(

:param entity: one or more assets to save
:param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False)
:param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False)
:returns: details of the created or updated assets
"""
return await self.save(
entity=entity,
replace_atlan_tags=replace_atlan_tags,
replace_custom_metadata=True,
overwrite_custom_metadata=False,
append_atlan_tags=append_atlan_tags,
)

@validate_arguments
async def update_merging_cm(
self, entity: Asset, replace_atlan_tags: bool = False
self,
entity: Asset,
replace_atlan_tags: bool = False,
append_atlan_tags: bool = False,
) -> AssetMutationResponse:
"""
Async update with merging custom metadata.
Expand All @@ -419,6 +427,7 @@ async def update_merging_cm(

:param entity: the asset to update
:param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False)
:param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False)
:returns: details of the updated asset
:raises NotFoundError: if the asset does not exist (will not create it)
"""
Expand All @@ -430,7 +439,9 @@ async def update_merging_cm(
get_by_qualified_name_func=self.get_by_qualified_name,
)
return await self.save_merging_cm(
entity=entity, replace_atlan_tags=replace_atlan_tags
entity=entity,
replace_atlan_tags=replace_atlan_tags,
append_atlan_tags=append_atlan_tags,
)

@validate_arguments
Expand Down
14 changes: 12 additions & 2 deletions pyatlan/client/aio/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
case_insensitive: bool = False,
table_view_agnostic: bool = False,
creation_handling: AssetCreationHandling = AssetCreationHandling.FULL,
append_atlan_tags: bool = False,
):
"""
Create a new async batch of assets to be bulk-saved.
Expand All @@ -68,10 +69,15 @@ def __init__(
view if not found as a table, and vice versa)
:param creation_handling: when allowing assets to be created,
how to handle those creations (full assets or partial assets).
:param append_atlan_tags: if True, Atlan tags on assets in the batch
will be added/updated/removed using the add_or_update_classifications
and remove_classifications fields, without replacing all existing tags.
When True, replace_atlan_tags is ignored.
"""
self._client: AsyncAtlanClient = client
self._max_size: int = max_size
self._replace_atlan_tags: bool = replace_atlan_tags
self._append_atlan_tags: bool = append_atlan_tags
self._custom_metadata_handling: CustomMetadataHandling = (
custom_metadata_handling
)
Expand Down Expand Up @@ -320,7 +326,9 @@ async def flush(self) -> Optional[AssetMutationResponse]:
try:
if self._custom_metadata_handling == CustomMetadataHandling.IGNORE:
response = await self._client.asset.save(
revised, replace_atlan_tags=self._replace_atlan_tags
revised,
replace_atlan_tags=self._replace_atlan_tags,
append_atlan_tags=self._append_atlan_tags,
)
elif (
self._custom_metadata_handling
Expand All @@ -331,7 +339,9 @@ async def flush(self) -> Optional[AssetMutationResponse]:
)
elif self._custom_metadata_handling == CustomMetadataHandling.MERGE:
response = await self._client.asset.save_merging_cm(
revised, replace_atlan_tags=self._replace_atlan_tags
revised,
replace_atlan_tags=self._replace_atlan_tags,
append_atlan_tags=self._append_atlan_tags,
)
else:
raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
Expand Down
31 changes: 26 additions & 5 deletions pyatlan/client/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,10 @@ def upsert_merging_cm(

@validate_arguments
def save_merging_cm(
self, entity: Union[Asset, List[Asset]], replace_atlan_tags: bool = False
self,
entity: Union[Asset, List[Asset]],
replace_atlan_tags: bool = False,
append_atlan_tags: bool = False,
) -> AssetMutationResponse:
"""
If no asset exists, has the same behavior as the upsert() method, while also setting
Expand All @@ -499,18 +502,23 @@ def save_merging_cm(

:param entity: one or more assets to save
:param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False)
:param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False)
:returns: details of the created or updated assets
"""
return self.save(
entity=entity,
replace_atlan_tags=replace_atlan_tags,
replace_custom_metadata=True,
overwrite_custom_metadata=False,
append_atlan_tags=append_atlan_tags,
)

@validate_arguments
def update_merging_cm(
self, entity: Asset, replace_atlan_tags: bool = False
self,
entity: Asset,
replace_atlan_tags: bool = False,
append_atlan_tags: bool = False,
) -> AssetMutationResponse:
"""
If no asset exists, fails with a NotFoundError. Will merge any provided
Expand All @@ -519,6 +527,7 @@ def update_merging_cm(

:param entity: the asset to update
:param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False)
:param append_atlan_tags: whether to add/update/remove AtlanTags during an update (True) or not (False)
:returns: details of the updated asset
:raises NotFoundError: if the asset does not exist (will not create it)
"""
Expand All @@ -528,7 +537,9 @@ def update_merging_cm(
get_by_qualified_name_func=self.get_by_qualified_name,
)
return self.save_merging_cm(
entity=entity, replace_atlan_tags=replace_atlan_tags
entity=entity,
replace_atlan_tags=replace_atlan_tags,
append_atlan_tags=append_atlan_tags,
)

@validate_arguments
Expand Down Expand Up @@ -2201,6 +2212,7 @@ def __init__(
case_insensitive: bool = False,
table_view_agnostic: bool = False,
creation_handling: AssetCreationHandling = AssetCreationHandling.FULL,
append_atlan_tags: bool = False,
):
"""
Create a new batch of assets to be bulk-saved.
Expand All @@ -2226,10 +2238,15 @@ def __init__(
view if not found as a table, and vice versa)
:param creation_handling: when allowing assets to be created,
how to handle those creations (full assets or partial assets).
:param append_atlan_tags: if True, Atlan tags on assets in the batch
will be added/updated/removed using the add_or_update_classifications
and remove_classifications fields, without replacing all existing tags.
When True, replace_atlan_tags is ignored.
"""
self._client: AtlanClient = client
self._max_size: int = max_size
self._replace_atlan_tags: bool = replace_atlan_tags
self._append_atlan_tags: bool = append_atlan_tags
self._custom_metadata_handling: CustomMetadataHandling = (
custom_metadata_handling
)
Expand Down Expand Up @@ -2480,7 +2497,9 @@ def flush(self) -> Optional[AssetMutationResponse]:
try:
if self._custom_metadata_handling == CustomMetadataHandling.IGNORE:
response = self._client.asset.save(
revised, replace_atlan_tags=self._replace_atlan_tags
revised,
replace_atlan_tags=self._replace_atlan_tags,
append_atlan_tags=self._append_atlan_tags,
)
elif (
self._custom_metadata_handling
Expand All @@ -2491,7 +2510,9 @@ def flush(self) -> Optional[AssetMutationResponse]:
)
elif self._custom_metadata_handling == CustomMetadataHandling.MERGE:
response = self._client.asset.save_merging_cm(
revised, replace_atlan_tags=self._replace_atlan_tags
revised,
replace_atlan_tags=self._replace_atlan_tags,
append_atlan_tags=self._append_atlan_tags,
)
else:
raise ErrorCode.INVALID_PARAMETER_TYPE.exception_with_parameters(
Expand Down
Loading