diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..e9bc219 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,6 @@ +key.json +*.local +.git +*.env +env/ +venv/ diff --git a/.gitignore b/.gitignore index aa5ff12..864dd0a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ ext/ **/.DS_Store scripts/key.json *.local +slot_range_msg* diff --git a/Dockerfile.bq-deep-check b/Dockerfile.bq-deep-check new file mode 100644 index 0000000..76067d3 --- /dev/null +++ b/Dockerfile.bq-deep-check @@ -0,0 +1,13 @@ +FROM python:3.12-slim + +RUN apt-get update && apt-get install -y --no-install-recommends \ + bash curl jq \ + && rm -rf /var/lib/apt/lists/* + +RUN pip install --no-cache-dir \ + pandas \ + pandas-gbq \ + psycopg2-binary + +COPY scripts/ /app/scripts/ +COPY schema/ /app/schema/ diff --git a/deployment/bq-deep-check/README.md b/deployment/bq-deep-check/README.md new file mode 100644 index 0000000..80005eb --- /dev/null +++ b/deployment/bq-deep-check/README.md @@ -0,0 +1,44 @@ +# Deep comparison on slot range + +* runs every 6 hours +* covers 6.5 hours of last slots up to "HH:20" + +## Docker image + +```sh +docker buildx build --platform linux/amd64,linux/arm64 --tag code.blockchain-applied.com/bca/bq-deep-check:latest --tag code.blockchain-applied.com/bca/bq-deep-check:1.0.0 --push -f Dockerfile.bq-deep-check . +``` + + +## Secrets + +```sh +kubectl create secret generic credentials-pg \ + -n bq-monitoring \ + --from-literal=PGDATABASE="" \ + --from-literal=PGHOST="" \ + --from-literal=PGPORT="" \ + --from-literal=PGUSER="" \ + --from-literal=PGPASSWORD="" +``` + +```sh +kubectl create secret generic credentials-bq \ + -n bq-monitoring \ + --from-literal=BQ_PROJECT="" \ + --from-literal=BQ_REGION="" \ + --from-literal=PUBSUB_TOPIC_NAME="" +``` + +```sh +kubectl create secret generic credentials-zammad \ + -n bq-monitoring \ + --from-literal=ZAMMAD_URL="" \ + --from-literal=ZAMMAD_TOKEN="" +``` + +```sh +kubectl create secret generic key-bq \ + -n bq-monitoring \ + --from-file="key.json"="" +``` diff --git a/deployment/bq-deep-check/cronjob.yaml b/deployment/bq-deep-check/cronjob.yaml new file mode 100644 index 0000000..9f50579 --- /dev/null +++ b/deployment/bq-deep-check/cronjob.yaml @@ -0,0 +1,87 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: bq-deep-check + labels: + app: bq-deep-check + repo: Cardano_on_BigQuery.git +spec: + schedule: 48 5/6 * * * + concurrencyPolicy: Forbid + suspend: false + jobTemplate: + spec: + template: + metadata: + labels: + app: bq-deep-check + repo: Cardano_on_BigQuery.git + spec: + containers: + - name: bq-deep-check + image: bq-deep-check + command: + - /usr/bin/bash + - "-c" + - | + set -euo pipefail + cd /app/scripts + python3 deep_compare/slot_range_check.py + sleep 2 + cat slot_range_msg.txt + echo "done." + if grep -q MISMATCH slot_range_msg.txt; then + BODY=$(cat slot_range_msg.txt) + curl -s -X POST "${ZAMMAD_URL}/api/v1/tickets" \ + -H "Authorization: Token token=${ZAMMAD_TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{ + \"title\": \"BQ deep-check MISMATCH\", + \"group\": \"Infrastructure\", + \"customer\": \"bq-deep-check.bq-monitoring@blockchain-applied.com\", + \"article\": { + \"subject\": \"BQ deep-check MISMATCH\", + \"body\": $(echo "$BODY" | jq -Rs .), + \"type\": \"note\", + \"internal\": false + } + }" + fi + exit 0 + env: + - name: SLOT_WINDOW_DURATION_S + value: "23400" + - name: BQ_KEY_FILE + value: /app/key.json + envFrom: + - secretRef: + name: credentials-pg + - secretRef: + name: credentials-bq + - secretRef: + name: credentials-zammad + volumeMounts: + - name: key-secret + mountPath: /app/key.json + subPath: key.json + readOnly: true + resources: + limits: + cpu: 300m + memory: 256Mi + requests: + cpu: 100m + memory: 128Mi + imagePullPolicy: Always + volumes: + - name: key-secret + secret: + secretName: key-bq + restartPolicy: OnFailure + terminationGracePeriodSeconds: 30 + dnsPolicy: ClusterFirst + securityContext: {} + imagePullSecrets: + - name: pullsecret + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 3 diff --git a/deployment/bq-deep-check/kustomization.yaml b/deployment/bq-deep-check/kustomization.yaml new file mode 100644 index 0000000..d5acb44 --- /dev/null +++ b/deployment/bq-deep-check/kustomization.yaml @@ -0,0 +1,12 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: bq-monitoring + +resources: + - cronjob.yaml + +images: + - name: bq-deep-check + newName: code.blockchain-applied.com/bca/bq-deep-check + newTag: "1.0.0" diff --git a/scripts/deep_compare/slot_range_check.py b/scripts/deep_compare/slot_range_check.py new file mode 100644 index 0000000..54664ec --- /dev/null +++ b/scripts/deep_compare/slot_range_check.py @@ -0,0 +1,170 @@ +import os +import sys +import warnings +import psycopg2 +import pandas as pd +import pandas_gbq +from datetime import datetime, timezone, timedelta +from google.oauth2 import service_account + +warnings.filterwarnings('ignore', message='A progress bar was requested.*tqdm', category=UserWarning) +warnings.filterwarnings('ignore', message='pandas only supports SQLAlchemy connectable', category=UserWarning) + +from slot_range_queries import bq_slot_range_query, pg_slot_range_query + + +def get_pg_connection(): + try: + return psycopg2.connect( + dbname=os.environ['PGDATABASE'], host=os.environ['PGHOST'], + port=os.environ['PGPORT'], password=os.environ['PGPASSWORD'], + user=os.environ['PGUSER']) + except Exception as e: + print(f"Failed connecting to database: {e}") + sys.exit(1) + + +def get_local_pg_connection(): + try: + return psycopg2.connect( + dbname=os.environ['PGDATABASE'], host='localhost', + port='5432', password=os.environ['PGPASSWORD'], + user=os.environ['PGUSER']) + except Exception as e: + print(f"Failed connecting to local database: {e}") + sys.exit(1) + + +def get_bq(creds, query): + proj_id = os.environ['BQ_PROJECT'] + location = os.environ['BQ_REGION'] + return pandas_gbq.read_gbq(query, project_id=proj_id, location=location, credentials=creds) + + +def get_pg(pg_conn, query): + return pd.DataFrame(pd.read_sql_query(query, pg_conn)) + + +def run_pg(pg_con, query): + with pg_con.cursor() as cur: + cur.execute(query) + + +def compute_slot_window(duration_s: int = 3600): + """Return (start_time, end_time) for a safe comparison window. + + end_time = current UTC hour at :20:00 + start_time = end_time - duration_s seconds (default 3600 = 1 hour) + + The BQ sync runs at ~x:34, so anchoring end_time at x:20 ensures + the window falls within fully-synced data on both sides. + """ + now = datetime.now(timezone.utc) + end_time = now.replace(minute=20, second=0, microsecond=0) + start_time = end_time - timedelta(seconds=duration_s) + return start_time, end_time + + +def get_slot_range(credentials, bq_project, start_time, end_time): + # block_time is DATETIME in BQ (no timezone); strip tz for the literal + start_str = start_time.strftime('%Y-%m-%dT%H:%M:%S') + end_str = end_time.strftime('%Y-%m-%dT%H:%M:%S') + query = f""" +SELECT MIN(slot_no) AS min_slot, MAX(slot_no) AS max_slot +FROM `{bq_project}.cardano_mainnet.block` +WHERE block_time BETWEEN DATETIME('{start_str}') AND DATETIME('{end_str}') +""" + df = get_bq(credentials, query) + min_slot = int(df['min_slot'][0]) + max_slot = int(df['max_slot'][0]) + return min_slot, max_slot + + +def log_results(pg_local_con, run_time, slot_min, slot_max, merged_df): + rows = [] + for _, row in merged_df.iterrows(): + bq_hash = row['hash_bq'] if pd.notna(row.get('hash_bq')) else 'empty' + pg_hash = row['hash_pg'] if pd.notna(row.get('hash_pg')) else 'empty' + row_count_bq = int(row['row_count_bq']) if pd.notna(row.get('row_count_bq')) else 0 + row_count_pg = int(row['row_count_pg']) if pd.notna(row.get('row_count_pg')) else 0 + rows.append( + f"('{run_time.isoformat()}', {slot_min}, {slot_max}," + f" '{row['table_name']}', {row_count_bq}, {row_count_pg}," + f" '{bq_hash}', '{pg_hash}')" + ) + values = ", ".join(rows) + query = ( + "INSERT INTO analytics.log_slot_range_comparison " + "(run_time, slot_min, slot_max, table_name, row_count_bq, row_count_pg, bq_hash, pg_hash) " + f"VALUES {values};" + ) + run_pg(pg_local_con, query) + + +def main(): + bq_project = os.environ['BQ_PROJECT'] + current_script_path = os.path.abspath(__file__) + parent_directory = os.path.dirname(os.path.dirname(current_script_path)) + # keyfile_path = f"{parent_directory}/key.json" + keyfile_path = os.environ['BQ_KEY_FILE'] + credentials = service_account.Credentials.from_service_account_file(keyfile_path) + + pg_con = get_pg_connection() + # pg_local_con = get_local_pg_connection() + + duration_s = int(os.environ.get('SLOT_WINDOW_DURATION_S', 3600)) + start_time, end_time = compute_slot_window(duration_s) + print(f"Slot window: {start_time.isoformat()} → {end_time.isoformat()}") + + min_slot, max_slot = get_slot_range(credentials, bq_project, start_time, end_time) + print(f"Slot range: {min_slot} – {max_slot}") + + run_time = datetime.now(timezone.utc) + + bq_sql = bq_slot_range_query(min_slot, max_slot, bq_project) + pg_sql = pg_slot_range_query(min_slot, max_slot) + + print("Running BQ query...") + bq_df = get_bq(credentials, bq_sql) + bq_df = bq_df.rename(columns={'row_count': 'row_count_bq', 'hash_val': 'hash_bq'}) + + print("Running PG query...") + pg_df = get_pg(pg_con, pg_sql) + pg_df = pg_df.rename(columns={'row_count': 'row_count_pg', 'hash_val': 'hash_pg'}) + + merged = pd.merge(bq_df, pg_df, on='table_name', how='outer') + merged['match'] = (merged['hash_bq'].fillna('__empty__') == merged['hash_pg'].fillna('__empty__')) + + mismatches = merged[~merged['match']] + print(f"\nResults: {len(merged)} tables, {len(mismatches)} mismatches") + + #msg_file = f"slot_range_msg-{end_time.isoformat()}.txt" + msg_file = "slot_range_msg.txt" + with open(msg_file, 'w') as f: + f.write(f"Slot range check: {start_time.isoformat()} → {end_time.isoformat()}\n") + f.write(f"Slots: {min_slot} – {max_slot}\n\n") + for _, row in merged.iterrows(): + status = "OK" if row['match'] else "MISMATCH" + f.write( + f"[{status}] {row['table_name']}" + f" BQ={row.get('row_count_bq', 0)} rows" + f" PG={row.get('row_count_pg', 0)} rows\n" + ) + if not row['match']: + f.write(f" BQ hash: {row.get('hash_bq', 'empty')}\n") + f.write(f" PG hash: {row.get('hash_pg', 'empty')}\n") + + # log_results(pg_local_con, run_time, min_slot, max_slot, merged) + # pg_local_con.commit() + # pg_local_con.close() + pg_con.close() + + print(f"Done. Results written to {msg_file}") + if len(mismatches) > 0: + print("MISMATCHES DETECTED:") + for _, row in mismatches.iterrows(): + print(f" {row['table_name']}: BQ={row.get('hash_bq','empty')} PG={row.get('hash_pg','empty')}") + + +if __name__ == '__main__': + main() diff --git a/scripts/deep_compare/slot_range_queries.py b/scripts/deep_compare/slot_range_queries.py new file mode 100644 index 0000000..3c1a8c0 --- /dev/null +++ b/scripts/deep_compare/slot_range_queries.py @@ -0,0 +1,327 @@ +import os + + +def bq_slot_range_query(min_slot: int, max_slot: int, bq_project: str = None) -> str: + if bq_project is None: + bq_project = os.environ['BQ_PROJECT'] + p = bq_project + sl = f"slot_no BETWEEN {min_slot} AND {max_slot}" + ep_in = f"epoch_no IN (SELECT DISTINCT epoch_no FROM `{p}.cardano_mainnet.block` WHERE {sl})" + + parts = [] + + # block + parts.append(f"""SELECT 'block' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| block_time ||','|| block_size ||','|| tx_count + ||','|| sum_tx_fee ||','|| script_count ||','|| sum_script_size ||')' AS str + FROM `{p}.cardano_mainnet.block` + WHERE {sl} + ORDER BY epoch_no, slot_no ASC)) AS innerq""") + + # block_hash + parts.append(f"""SELECT 'block_hash' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| block_hash ||')' AS str + FROM `{p}.cardano_mainnet.block_hash` + WHERE {sl} + ORDER BY epoch_no, slot_no ASC)) AS innerq""") + + # collateral + parts.append(f"""SELECT 'collateral' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx ||','|| epoch_no_out ||','|| slot_no_out + ||','|| txidx_out ||','|| tx_out_index ||')' AS str + FROM `{p}.cardano_mainnet.collateral` + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx, epoch_no_out, slot_no_out, txidx_out, tx_out_index ASC)) AS innerq""") + + # delegation (epoch-derived) -- disabled until epoch-based check is added + + # ma_minting (epoch-derived) -- disabled until epoch-based check is added + + # pool_offline_data (epoch-derived) -- disabled until epoch-based check is added + + # pool_owner + parts.append(f"""SELECT 'pool_owner' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| pool_hash ||','|| epoch_no ||','|| addr_hash ||','|| slot_no ||','|| txidx ||')' AS str + FROM `{p}.cardano_mainnet.pool_owner` + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx, pool_hash, addr_hash ASC)) AS innerq""") + + # pool_retire + parts.append(f"""SELECT 'pool_retire' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| pool_hash ||','|| retiring_epoch ||','|| epoch_no ||','|| cert_index + ||','|| announced_tx_hash ||','|| slot_no ||','|| announced_txidx ||')' AS str + FROM `{p}.cardano_mainnet.pool_retire` + WHERE {sl} + ORDER BY epoch_no, slot_no, announced_txidx, pool_hash ASC)) AS innerq""") + + # pool_update (epoch-derived) -- disabled until epoch-based check is added + + # redeemer + parts.append(f"""SELECT 'redeemer' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx ||','|| count ||')' AS str + FROM `{p}.cardano_mainnet.redeemer` + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx ASC)) AS innerq""") + + # rel_addr_txout (epoch-derived) -- disabled until epoch-based check is added + + # rel_stake_hash + parts.append(f"""SELECT 'rel_stake_hash' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| stake_address ||','|| stake_addr_hash ||')' AS str + FROM `{p}.cardano_mainnet.rel_stake_hash` + WHERE {sl} + ORDER BY epoch_no, slot_no, stake_address ASC)) AS innerq""") + + # rel_stake_txout (epoch-derived) -- disabled until epoch-based check is added + + # reward (epoch-derived) -- disabled until epoch-based check is added + + # script + parts.append(f"""SELECT 'script' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx ||','|| script_hash ||','|| `type` + ||','|| COALESCE(TO_BASE64(`bytes`), 'null') + ||','|| COALESCE(CAST(serialised_size AS STRING), 'null') ||')' AS str + FROM `{p}.cardano_mainnet.script` + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx, script_hash ASC)) AS innerq""") + + # stake_deregistration + parts.append(f"""SELECT 'stake_deregistration' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx ||','|| stake_addr_hash ||','|| cert_index ||')' AS str + FROM `{p}.cardano_mainnet.stake_deregistration` + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx, stake_addr_hash, cert_index ASC)) AS innerq""") + + # stake_registration + parts.append(f"""SELECT 'stake_registration' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx ||','|| stake_addr_hash ||','|| cert_index ||')' AS str + FROM `{p}.cardano_mainnet.stake_registration` + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx, stake_addr_hash, cert_index ASC)) AS innerq""") + + # tx + parts.append(f"""SELECT 'tx' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| tx_hash ||','|| block_time ||','|| slot_no ||','|| txidx + ||','|| out_sum ||','|| fee ||','|| deposit ||','|| size + ||','|| COALESCE(CAST(invalid_before AS STRING), 'null') + ||','|| COALESCE(CAST(invalid_after AS STRING), 'null') + ||','|| valid_script ||','|| script_size ||','|| count_inputs ||','|| count_outputs ||')' AS str + FROM `{p}.cardano_mainnet.tx` + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx ASC)) AS innerq""") + + # tx_consumed_output (filter by producing tx slot_no) + parts.append(f"""SELECT 'tx_consumed_output' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| slot_no ||','|| txidx ||','|| `index` ||','|| consumed_in_slot_no ||','|| consumed_in_txidx ||')' AS str + FROM `{p}.cardano_mainnet.tx_consumed_output` + WHERE {sl} + ORDER BY slot_no, txidx, `index` ASC)) AS innerq""") + + # tx_hash + parts.append(f"""SELECT 'tx_hash' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx ||','|| tx_hash ||')' AS str + FROM `{p}.cardano_mainnet.tx_hash` + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx ASC)) AS innerq""") + + # tx_in_out + parts.append(f"""SELECT 'tx_in_out' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx + ||','|| TO_JSON_STRING(inputs) + ||','|| COALESCE(TO_JSON_STRING(outputs), 'null') ||')' AS str + FROM `{p}.cardano_mainnet.tx_in_out` + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx ASC)) AS innerq""") + + # tx_metadata + parts.append(f"""SELECT 'tx_metadata' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx ||','|| tx_hash ||','|| key ||')' AS str + FROM `{p}.cardano_mainnet.tx_metadata` + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx, tx_hash, key, JSON_VALUE(metadata) ASC)) AS innerq""") + + # withdrawal + parts.append(f"""SELECT 'withdrawal' AS table_name, innerq.cnt AS row_count, TO_BASE64(SHA256(innerq.hash_b64)) AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(TO_BASE64(SHA256(str)), ',') AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| stake_addr_hash ||','|| amount ||','|| slot_no ||','|| txidx ||')' AS str + FROM `{p}.cardano_mainnet.withdrawal` + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx, stake_addr_hash ASC)) AS innerq""") + + return "\nUNION ALL\n\n".join(parts) + + +def pg_slot_range_query(min_slot: int, max_slot: int) -> str: + sl = f"slot_no BETWEEN {min_slot} AND {max_slot}" + ep_in = f"epoch_no IN (SELECT DISTINCT epoch_no FROM public.block WHERE {sl})" + + parts = [] + + # block (no normalization) + parts.append(f"""SELECT 'block' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(subq.str::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| block_time ||','|| block_size ||','|| tx_count + ||','|| sum_tx_fee ||','|| script_count ||','|| sum_script_size ||')' AS str + FROM analytics.vw_bq_block + WHERE {sl} + ORDER BY epoch_no, slot_no ASC) AS subq) AS innerq""") + + # block_hash (whitespace normalization) + parts.append(f"""SELECT 'block_hash' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(regexp_replace(regexp_replace(subq.str, '[\n]', '', 'g'), '[\\s]', '', 'g')::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| block_hash ||')' AS str + FROM analytics.vw_bq_block_hash + WHERE {sl} + ORDER BY epoch_no, slot_no ASC) AS subq) AS innerq""") + + # collateral (no normalization) + parts.append(f"""SELECT 'collateral' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(subq.str::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx ||','|| epoch_no_out ||','|| slot_no_out + ||','|| txidx_out ||','|| tx_out_index ||')' AS str + FROM analytics.vw_bq_collateral + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx, epoch_no_out, slot_no_out, txidx_out, tx_out_index ASC) AS subq) AS innerq""") + + # delegation (epoch-derived) -- disabled until epoch-based check is added + # ma_minting (epoch-derived) -- disabled until epoch-based check is added + # pool_offline_data (epoch-derived) -- disabled until epoch-based check is added + + # pool_owner (no normalization) + parts.append(f"""SELECT 'pool_owner' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(subq.str::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| pool_hash ||','|| epoch_no ||','|| addr_hash ||','|| slot_no ||','|| txidx ||')' AS str + FROM analytics.vw_bq_pool_owner + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx, pool_hash, addr_hash ASC) AS subq) AS innerq""") + + # pool_retire (no normalization) + parts.append(f"""SELECT 'pool_retire' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(subq.str::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| pool_hash ||','|| retiring_epoch ||','|| epoch_no ||','|| cert_index + ||','|| announced_tx_hash ||','|| slot_no ||','|| announced_txidx ||')' AS str + FROM analytics.vw_bq_pool_retire + WHERE {sl} + ORDER BY epoch_no, slot_no, announced_txidx, pool_hash ASC) AS subq) AS innerq""") + + # pool_update (epoch-derived) -- disabled until epoch-based check is added + + # redeemer (no normalization) + parts.append(f"""SELECT 'redeemer' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(subq.str::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx ||','|| count ||')' AS str + FROM analytics.vw_bq_redeemer + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx ASC) AS subq) AS innerq""") + + # rel_addr_txout (epoch-derived) -- disabled until epoch-based check is added + + # rel_stake_hash (no normalization) + parts.append(f"""SELECT 'rel_stake_hash' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(subq.str::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| stake_address ||','|| stake_addr_hash ||')' AS str + FROM analytics.vw_bq_rel_stake_hash + WHERE {sl} + ORDER BY epoch_no, slot_no, stake_address ASC) AS subq) AS innerq""") + + # rel_stake_txout (epoch-derived) -- disabled until epoch-based check is added + # reward (epoch-derived) -- disabled until epoch-based check is added + + # script (whitespace normalization) + parts.append(f"""SELECT 'script' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(regexp_replace(regexp_replace(subq.str, '[\n]', '', 'g'), '[\\s]', '', 'g')::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx ||','|| script_hash ||','|| "type" + ||','|| COALESCE("bytes", 'null') + ||','|| COALESCE(serialised_size::text, 'null') ||')' AS str + FROM analytics.vw_bq_script + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx, script_hash ASC) AS subq) AS innerq""") + + # stake_deregistration (no normalization) + parts.append(f"""SELECT 'stake_deregistration' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(subq.str::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx ||','|| stake_addr_hash ||','|| cert_index ||')' AS str + FROM analytics.vw_bq_stake_deregistration + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx, stake_addr_hash, cert_index ASC) AS subq) AS innerq""") + + # stake_registration (no normalization) + parts.append(f"""SELECT 'stake_registration' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(subq.str::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx ||','|| stake_addr_hash ||','|| cert_index ||')' AS str + FROM analytics.vw_bq_stake_registration + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx, stake_addr_hash, cert_index ASC) AS subq) AS innerq""") + + # tx (no normalization) + parts.append(f"""SELECT 'tx' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(subq.str::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| tx_hash ||','|| block_time ||','|| slot_no ||','|| txidx + ||','|| out_sum ||','|| fee ||','|| deposit ||','|| size + ||','|| COALESCE(invalid_before::text, 'null') + ||','|| COALESCE(invalid_after::text, 'null') + ||','|| valid_script ||','|| script_size ||','|| count_inputs ||','|| count_outputs ||')' AS str + FROM analytics.vw_bq_tx + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx ASC) AS subq) AS innerq""") + + # tx_consumed_output (filter by producing tx slot_no, no normalization) + parts.append(f"""SELECT 'tx_consumed_output' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(subq.str::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| slot_no ||','|| txidx ||','|| "index" ||','|| consumed_in_slot_no ||','|| consumed_in_txidx ||')' AS str + FROM analytics.vw_bq_tx_consumed_output + WHERE {sl} + ORDER BY slot_no, txidx, "index" ASC) AS subq) AS innerq""") + + # tx_hash (no normalization) + parts.append(f"""SELECT 'tx_hash' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(subq.str::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx ||','|| tx_hash ||')' AS str + FROM analytics.vw_bq_tx_hash + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx ASC) AS subq) AS innerq""") + + # tx_in_out (whitespace normalization) + parts.append(f"""SELECT 'tx_in_out' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(regexp_replace(regexp_replace(subq.str, '[\n]+', '', 'g'), '[\\s]+', '', 'g')::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| slot_no ||','|| txidx + ||','|| inputs::text + ||','|| COALESCE(outputs::text, 'null') ||')' AS str + FROM analytics.vw_bq_tx_in_out + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx ASC) AS subq) AS innerq""") + + # tx_metadata (direct table join, no normalization) + parts.append(f"""SELECT 'tx_metadata' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(subq.str::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| ib.epoch_no ||','|| ib.slot_no ||','|| itx.block_index ||','|| encode(itx.hash,'hex') ||','|| tm.key ||')' AS str + FROM public.tx_metadata tm + JOIN public.tx itx ON itx.id = tm.tx_id + JOIN public.block ib ON ib.id = itx.block_id + WHERE ib.{sl} + ORDER BY ib.epoch_no, ib.slot_no, itx.block_index, encode(itx.hash,'hex'), tm.key, tm.json::text ASC) AS subq) AS innerq""") + + # withdrawal (whitespace normalization) + parts.append(f"""SELECT 'withdrawal' AS table_name, innerq.cnt AS row_count, encode(SHA256(innerq.hash_b64), 'base64') AS hash_val FROM +(SELECT COUNT(*) AS cnt, STRING_AGG(encode(SHA256(regexp_replace(regexp_replace(subq.str, '[\n]', '', 'g'), '[\\s]', '', 'g')::bytea), 'base64'), ',')::bytea AS hash_b64 FROM + (SELECT '('|| epoch_no ||','|| stake_addr_hash ||','|| amount ||','|| slot_no ||','|| txidx ||')' AS str + FROM analytics.vw_bq_withdrawal + WHERE {sl} + ORDER BY epoch_no, slot_no, txidx, stake_addr_hash ASC) AS subq) AS innerq""") + + return "\nUNION ALL\n\n".join(parts) diff --git a/scripts/schema/deployment/schemas-ingress.yaml b/scripts/schema/deployment/schemas-ingress.yaml index c9606e9..513f449 100644 --- a/scripts/schema/deployment/schemas-ingress.yaml +++ b/scripts/schema/deployment/schemas-ingress.yaml @@ -90,4 +90,4 @@ spec: # - name: security-chain tls: - certResolver: letsencrypt + secretName: bca-schemas-tls