Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion clouddq/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ def format(self, record):
return super().format(record)


def add_cloud_logging_handler(logger: Logger):
def add_cloud_logging_handler(logger: Logger, metadata: dict):
client = google.cloud.logging.Client()
handler = CloudLoggingHandler(
client=client,
name="clouddq",
labels={
"name": APP_NAME,
"releaseId": APP_VERSION,
"metadata": json.dumps(metadata),
},
)
handler.setFormatter(JSONFormatter())
Expand Down
29 changes: 26 additions & 3 deletions clouddq/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
import logging
import logging.config
import os

import click
import coloredlogs
Expand Down Expand Up @@ -195,6 +196,12 @@
default=8,
type=int,
)
@click.option(
"--add_metadata_to_logs",
help="If True, the metadata results will be logged to cloud and stdout. ",
is_flag=True,
default=False,
)
def main( # noqa: C901
rule_binding_ids: str,
rule_binding_config_path: str,
Expand All @@ -216,6 +223,7 @@ def main( # noqa: C901
summary_to_stdout: bool = False,
enable_experimental_bigquery_entity_uris: bool = True,
enable_experimental_dataplex_gcs_validation: bool = True,
add_metadata_to_logs: bool = False,
) -> None:
"""Run RULE_BINDING_IDS from a RULE_BINDING_CONFIG_PATH.

Expand Down Expand Up @@ -270,8 +278,25 @@ def main( # noqa: C901
gcp_service_account_key_path=gcp_service_account_key_path,
gcp_impersonation_credentials=gcp_impersonation_credentials,
)
# Load metadata
metadata = json.loads(metadata)
dataplex_batch_name = os.environ.get("DATAPLEX_BATCH_NAME")
dataplex_task_name = os.environ.get("DATAPLEX_TASK_NAME")
# Set-up cloud logging
add_cloud_logging_handler(logger=json_logger)
if add_metadata_to_logs:
if dataplex_batch_name:
metadata["dataplex_batch_name"] = dataplex_batch_name
if dataplex_task_name:
metadata["dataplex_task_name"] = dataplex_task_name
add_cloud_logging_handler(logger=json_logger, metadata=metadata)
else:
metadata_dict = {}
if dataplex_batch_name:
metadata_dict["dataplex_batch_name"] = dataplex_batch_name
if dataplex_task_name:
metadata_dict["dataplex_task_name"] = dataplex_task_name
add_cloud_logging_handler(logger=json_logger, metadata=metadata_dict)
logger.info(f"metadata: {metadata}")
logger.info("Starting CloudDQ run with configs:")
json_logger.warning(
json.dumps({"clouddq_run_configs": locals()}, cls=JsonEncoderDatetime)
Expand Down Expand Up @@ -408,8 +433,6 @@ def main( # noqa: C901
"--summary_to_stdout is True but --target_bigquery_summary_table is not set. "
"No summary logs will be logged to stdout."
)
# Load metadata
metadata = json.loads(metadata)
# Load Rule Bindings
configs_path = Path(rule_binding_config_path)
logger.debug(f"Loading rule bindings from: {configs_path.absolute()}")
Expand Down