Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
key.json
*.local
.git
*.env
env/
venv/
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ ext/
**/.DS_Store
scripts/key.json
*.local
slot_range_msg*
13 changes: 13 additions & 0 deletions Dockerfile.bq-deep-check
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM python:3.12-slim

RUN apt-get update && apt-get install -y --no-install-recommends \
bash curl jq \
&& rm -rf /var/lib/apt/lists/*

RUN pip install --no-cache-dir \
pandas \
pandas-gbq \
psycopg2-binary

COPY scripts/ /app/scripts/
COPY schema/ /app/schema/
44 changes: 44 additions & 0 deletions deployment/bq-deep-check/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Deep comparison on slot range

* runs every 6 hours
* covers 6.5 hours of last slots up to "HH:20"

## Docker image

```sh
docker buildx build --platform linux/amd64,linux/arm64 --tag code.blockchain-applied.com/bca/bq-deep-check:latest --tag code.blockchain-applied.com/bca/bq-deep-check:1.0.0 --push -f Dockerfile.bq-deep-check .
```


## Secrets

```sh
kubectl create secret generic credentials-pg \
-n bq-monitoring \
--from-literal=PGDATABASE="" \
--from-literal=PGHOST="" \
--from-literal=PGPORT="" \
--from-literal=PGUSER="" \
--from-literal=PGPASSWORD=""
```

```sh
kubectl create secret generic credentials-bq \
-n bq-monitoring \
--from-literal=BQ_PROJECT="" \
--from-literal=BQ_REGION="" \
--from-literal=PUBSUB_TOPIC_NAME=""
```

```sh
kubectl create secret generic credentials-zammad \
-n bq-monitoring \
--from-literal=ZAMMAD_URL="" \
--from-literal=ZAMMAD_TOKEN=""
```

```sh
kubectl create secret generic key-bq \
-n bq-monitoring \
--from-file="key.json"=""
```
87 changes: 87 additions & 0 deletions deployment/bq-deep-check/cronjob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
apiVersion: batch/v1
kind: CronJob
metadata:
name: bq-deep-check
labels:
app: bq-deep-check
repo: Cardano_on_BigQuery.git
spec:
schedule: 48 5/6 * * *
concurrencyPolicy: Forbid
suspend: false
jobTemplate:
spec:
template:
metadata:
labels:
app: bq-deep-check
repo: Cardano_on_BigQuery.git
spec:
containers:
- name: bq-deep-check
image: bq-deep-check
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could add a tag that would contain the git commit sha or a version, so that we know deterministically which version is deployed

command:
- /usr/bin/bash
- "-c"
- |
set -euo pipefail
cd /app/scripts
python3 deep_compare/slot_range_check.py
sleep 2
cat slot_range_msg.txt
echo "done."
if grep -q MISMATCH slot_range_msg.txt; then
BODY=$(cat slot_range_msg.txt)
curl -s -X POST "${ZAMMAD_URL}/api/v1/tickets" \
-H "Authorization: Token token=${ZAMMAD_TOKEN}" \
-H "Content-Type: application/json" \
-d "{
\"title\": \"BQ deep-check MISMATCH\",
\"group\": \"Infrastructure\",
\"customer\": \"bq-deep-check.bq-monitoring@blockchain-applied.com\",
\"article\": {
\"subject\": \"BQ deep-check MISMATCH\",
\"body\": $(echo "$BODY" | jq -Rs .),
\"type\": \"note\",
\"internal\": false
}
}"
fi
exit 0
env:
- name: SLOT_WINDOW_DURATION_S
value: "23400"
- name: BQ_KEY_FILE
value: /app/key.json
envFrom:
- secretRef:
name: credentials-pg
- secretRef:
name: credentials-bq
- secretRef:
name: credentials-zammad
volumeMounts:
- name: key-secret
mountPath: /app/key.json
subPath: key.json
readOnly: true
resources:
limits:
cpu: 300m
memory: 256Mi
requests:
cpu: 100m
memory: 128Mi
imagePullPolicy: Always
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also if we have different image tags we could change this to IfNotPresent

volumes:
- name: key-secret
secret:
secretName: key-bq
restartPolicy: OnFailure
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here maybe we could set this to Never?: ie. if a pod fails, to create a new pod for the next attempt and keep the failed one around, instead of restarting the existing one.
also we could set:

    backoffLimit: 2

so that we have 1+2 retries = 3 total runs in case of a failure (the default is 6 I think)

terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
securityContext: {}
imagePullSecrets:
- name: pullsecret
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
12 changes: 12 additions & 0 deletions deployment/bq-deep-check/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

namespace: bq-monitoring

resources:
- cronjob.yaml

images:
- name: bq-deep-check
newName: code.blockchain-applied.com/bca/bq-deep-check
newTag: "1.0.0"
170 changes: 170 additions & 0 deletions scripts/deep_compare/slot_range_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import os
import sys
import warnings
import psycopg2
import pandas as pd
import pandas_gbq
from datetime import datetime, timezone, timedelta
from google.oauth2 import service_account

warnings.filterwarnings('ignore', message='A progress bar was requested.*tqdm', category=UserWarning)
warnings.filterwarnings('ignore', message='pandas only supports SQLAlchemy connectable', category=UserWarning)

from slot_range_queries import bq_slot_range_query, pg_slot_range_query


def get_pg_connection():
try:
return psycopg2.connect(
dbname=os.environ['PGDATABASE'], host=os.environ['PGHOST'],
port=os.environ['PGPORT'], password=os.environ['PGPASSWORD'],
user=os.environ['PGUSER'])
except Exception as e:
print(f"Failed connecting to database: {e}")
sys.exit(1)


def get_local_pg_connection():
try:
return psycopg2.connect(
dbname=os.environ['PGDATABASE'], host='localhost',
port='5432', password=os.environ['PGPASSWORD'],
user=os.environ['PGUSER'])
except Exception as e:
print(f"Failed connecting to local database: {e}")
sys.exit(1)


def get_bq(creds, query):
proj_id = os.environ['BQ_PROJECT']
location = os.environ['BQ_REGION']
return pandas_gbq.read_gbq(query, project_id=proj_id, location=location, credentials=creds)


def get_pg(pg_conn, query):
return pd.DataFrame(pd.read_sql_query(query, pg_conn))


def run_pg(pg_con, query):
with pg_con.cursor() as cur:
cur.execute(query)


def compute_slot_window(duration_s: int = 3600):
"""Return (start_time, end_time) for a safe comparison window.

end_time = current UTC hour at :20:00
start_time = end_time - duration_s seconds (default 3600 = 1 hour)

The BQ sync runs at ~x:34, so anchoring end_time at x:20 ensures
the window falls within fully-synced data on both sides.
"""
now = datetime.now(timezone.utc)
end_time = now.replace(minute=20, second=0, microsecond=0)
start_time = end_time - timedelta(seconds=duration_s)
return start_time, end_time


def get_slot_range(credentials, bq_project, start_time, end_time):
# block_time is DATETIME in BQ (no timezone); strip tz for the literal
start_str = start_time.strftime('%Y-%m-%dT%H:%M:%S')
end_str = end_time.strftime('%Y-%m-%dT%H:%M:%S')
query = f"""
SELECT MIN(slot_no) AS min_slot, MAX(slot_no) AS max_slot
FROM `{bq_project}.cardano_mainnet.block`
WHERE block_time BETWEEN DATETIME('{start_str}') AND DATETIME('{end_str}')
"""
df = get_bq(credentials, query)
min_slot = int(df['min_slot'][0])
max_slot = int(df['max_slot'][0])
return min_slot, max_slot


def log_results(pg_local_con, run_time, slot_min, slot_max, merged_df):
rows = []
for _, row in merged_df.iterrows():
bq_hash = row['hash_bq'] if pd.notna(row.get('hash_bq')) else 'empty'
pg_hash = row['hash_pg'] if pd.notna(row.get('hash_pg')) else 'empty'
row_count_bq = int(row['row_count_bq']) if pd.notna(row.get('row_count_bq')) else 0
row_count_pg = int(row['row_count_pg']) if pd.notna(row.get('row_count_pg')) else 0
rows.append(
f"('{run_time.isoformat()}', {slot_min}, {slot_max},"
f" '{row['table_name']}', {row_count_bq}, {row_count_pg},"
f" '{bq_hash}', '{pg_hash}')"
)
values = ", ".join(rows)
query = (
"INSERT INTO analytics.log_slot_range_comparison "
"(run_time, slot_min, slot_max, table_name, row_count_bq, row_count_pg, bq_hash, pg_hash) "
f"VALUES {values};"
)
run_pg(pg_local_con, query)


def main():
bq_project = os.environ['BQ_PROJECT']
current_script_path = os.path.abspath(__file__)
parent_directory = os.path.dirname(os.path.dirname(current_script_path))
# keyfile_path = f"{parent_directory}/key.json"
keyfile_path = os.environ['BQ_KEY_FILE']
credentials = service_account.Credentials.from_service_account_file(keyfile_path)

pg_con = get_pg_connection()
# pg_local_con = get_local_pg_connection()

duration_s = int(os.environ.get('SLOT_WINDOW_DURATION_S', 3600))
start_time, end_time = compute_slot_window(duration_s)
print(f"Slot window: {start_time.isoformat()} → {end_time.isoformat()}")

min_slot, max_slot = get_slot_range(credentials, bq_project, start_time, end_time)
print(f"Slot range: {min_slot} – {max_slot}")

run_time = datetime.now(timezone.utc)

bq_sql = bq_slot_range_query(min_slot, max_slot, bq_project)
pg_sql = pg_slot_range_query(min_slot, max_slot)

print("Running BQ query...")
bq_df = get_bq(credentials, bq_sql)
bq_df = bq_df.rename(columns={'row_count': 'row_count_bq', 'hash_val': 'hash_bq'})

print("Running PG query...")
pg_df = get_pg(pg_con, pg_sql)
pg_df = pg_df.rename(columns={'row_count': 'row_count_pg', 'hash_val': 'hash_pg'})

merged = pd.merge(bq_df, pg_df, on='table_name', how='outer')
merged['match'] = (merged['hash_bq'].fillna('__empty__') == merged['hash_pg'].fillna('__empty__'))

mismatches = merged[~merged['match']]
print(f"\nResults: {len(merged)} tables, {len(mismatches)} mismatches")

#msg_file = f"slot_range_msg-{end_time.isoformat()}.txt"
msg_file = "slot_range_msg.txt"
with open(msg_file, 'w') as f:
f.write(f"Slot range check: {start_time.isoformat()} → {end_time.isoformat()}\n")
f.write(f"Slots: {min_slot} – {max_slot}\n\n")
for _, row in merged.iterrows():
status = "OK" if row['match'] else "MISMATCH"
f.write(
f"[{status}] {row['table_name']}"
f" BQ={row.get('row_count_bq', 0)} rows"
f" PG={row.get('row_count_pg', 0)} rows\n"
)
if not row['match']:
f.write(f" BQ hash: {row.get('hash_bq', 'empty')}\n")
f.write(f" PG hash: {row.get('hash_pg', 'empty')}\n")

# log_results(pg_local_con, run_time, min_slot, max_slot, merged)
# pg_local_con.commit()
# pg_local_con.close()
pg_con.close()

print(f"Done. Results written to {msg_file}")
if len(mismatches) > 0:
print("MISMATCHES DETECTED:")
for _, row in mismatches.iterrows():
print(f" {row['table_name']}: BQ={row.get('hash_bq','empty')} PG={row.get('hash_pg','empty')}")


if __name__ == '__main__':
main()
Loading