diff --git a/clouddq/log.py b/clouddq/log.py index da47a64..8e14c3b 100644 --- a/clouddq/log.py +++ b/clouddq/log.py @@ -64,7 +64,7 @@ def format(self, record): return super().format(record) -def add_cloud_logging_handler(logger: Logger): +def add_cloud_logging_handler(logger: Logger, metadata: dict): client = google.cloud.logging.Client() handler = CloudLoggingHandler( client=client, @@ -72,6 +72,7 @@ def add_cloud_logging_handler(logger: Logger): labels={ "name": APP_NAME, "releaseId": APP_VERSION, + "metadata": json.dumps(metadata), }, ) handler.setFormatter(JSONFormatter()) diff --git a/clouddq/main.py b/clouddq/main.py index 08b5f32..bff641a 100644 --- a/clouddq/main.py +++ b/clouddq/main.py @@ -22,6 +22,7 @@ import json import logging import logging.config +import os import click import coloredlogs @@ -195,6 +196,12 @@ default=8, type=int, ) +@click.option( + "--add_metadata_to_logs", + help="If True, the metadata results will be logged to cloud and stdout. ", + is_flag=True, + default=False, +) def main( # noqa: C901 rule_binding_ids: str, rule_binding_config_path: str, @@ -216,6 +223,7 @@ def main( # noqa: C901 summary_to_stdout: bool = False, enable_experimental_bigquery_entity_uris: bool = True, enable_experimental_dataplex_gcs_validation: bool = True, + add_metadata_to_logs: bool = False, ) -> None: """Run RULE_BINDING_IDS from a RULE_BINDING_CONFIG_PATH. @@ -270,8 +278,25 @@ def main( # noqa: C901 gcp_service_account_key_path=gcp_service_account_key_path, gcp_impersonation_credentials=gcp_impersonation_credentials, ) + # Load metadata + metadata = json.loads(metadata) + dataplex_batch_name = os.environ.get("DATAPLEX_BATCH_NAME") + dataplex_task_name = os.environ.get("DATAPLEX_TASK_NAME") # Set-up cloud logging - add_cloud_logging_handler(logger=json_logger) + if add_metadata_to_logs: + if dataplex_batch_name: + metadata["dataplex_batch_name"] = dataplex_batch_name + if dataplex_task_name: + metadata["dataplex_task_name"] = dataplex_task_name + add_cloud_logging_handler(logger=json_logger, metadata=metadata) + else: + metadata_dict = {} + if dataplex_batch_name: + metadata_dict["dataplex_batch_name"] = dataplex_batch_name + if dataplex_task_name: + metadata_dict["dataplex_task_name"] = dataplex_task_name + add_cloud_logging_handler(logger=json_logger, metadata=metadata_dict) + logger.info(f"metadata: {metadata}") logger.info("Starting CloudDQ run with configs:") json_logger.warning( json.dumps({"clouddq_run_configs": locals()}, cls=JsonEncoderDatetime) @@ -408,8 +433,6 @@ def main( # noqa: C901 "--summary_to_stdout is True but --target_bigquery_summary_table is not set. " "No summary logs will be logged to stdout." ) - # Load metadata - metadata = json.loads(metadata) # Load Rule Bindings configs_path = Path(rule_binding_config_path) logger.debug(f"Loading rule bindings from: {configs_path.absolute()}")