Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions core/dry_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import os
from google.cloud import bigquery
import sys
sys.path.insert(0, 'core')
import constants

def dry_run():
"""Dry run the SQL code generation without running the pipeline end-to-end.
Useful for ensuring that the BigQuery code works before modifying metadata."""
client = bigquery.Client(project=constants.PROJECT_ID)

# Read and render SQL exactly as create_address_view does
with open(os.path.join(constants.SQL_DIR, constants.ADDRESS_QUERY_SQL), 'r') as f:
module4_query = f.read()

with open(os.path.join(constants.SQL_DIR, constants.USER_PROFILE_QUERY_SQL), 'r') as f:
user_profile_query = f.read()

module4_query = module4_query.replace('@flat_module4', constants.MODULE_4_TABLE)
module4_query = module4_query.replace('@flat_participants', constants.FLAT_PARTICIPANTS_TABLE)
module4_query = module4_query.replace('@raw_participants', constants.RAW_PARTICIPANTS_TABLE)

user_profile_query = user_profile_query.replace('@flat_module4', constants.MODULE_4_TABLE)
user_profile_query = user_profile_query.replace('@flat_participants', constants.FLAT_PARTICIPANTS_TABLE)
user_profile_query = user_profile_query.replace('@raw_participants', constants.RAW_PARTICIPANTS_TABLE)

module4_query = module4_query.strip().rstrip(';')
user_profile_query = user_profile_query.strip().rstrip(';')

combined_query = f"""
{module4_query}
UNION ALL
{user_profile_query}
"""

view_query = f"""
CREATE OR REPLACE VIEW {constants.ADDRESSES_VIEW} AS
WITH standardized_addresses AS (
SELECT
CAST(Connect_ID AS STRING) AS Connect_ID,
CAST(ts_user_profile_updated AS TIMESTAMP) AS ts_user_profile_updated,
CAST(ts_address_delivered AS TIMESTAMP) AS ts_address_delivered,
CAST(address_src_question_cid AS STRING) AS address_src_question_cid,
CAST(address_nickname AS STRING) AS address_nickname,
CAST(address_source AS STRING) AS address_source,
CAST(historical_order AS INT64) AS historical_order,
NULLIF(TRIM(CAST(address_line_1 AS STRING)), '') AS address_line_1,
NULLIF(TRIM(CAST(address_line_2 AS STRING)), '') AS address_line_2,
NULLIF(TRIM(CAST(street_num AS STRING)), '') AS street_num,
NULLIF(TRIM(CAST(street_name AS STRING)), '') AS street_name,
NULLIF(TRIM(CAST(apartment_num AS STRING)), '') AS apartment_num,
NULLIF(TRIM(CAST(city AS STRING)), '') AS city,
NULLIF(TRIM(CAST(state AS STRING)), '') AS state,
NULLIF(TRIM(CAST(zip_code AS STRING)), '') AS zip_code,
NULLIF(TRIM(CAST(country AS STRING)), '') AS country,
NULLIF(TRIM(CAST(cross_street_1 AS STRING)), '') AS cross_street_1,
NULLIF(TRIM(CAST(cross_street_2 AS STRING)), '') AS cross_street_2
FROM (
{combined_query}
) subquery
)
SELECT *,
TO_HEX(MD5(CONCAT(
IFNULL(Connect_ID, ''),
IFNULL(address_src_question_cid, ''),
IFNULL(address_nickname, ''),
IFNULL(address_source, ''),
IFNULL(address_line_1, ''),
IFNULL(address_line_2, ''),
IFNULL(street_num, ''),
IFNULL(street_name, ''),
IFNULL(apartment_num, ''),
IFNULL(city, ''),
IFNULL(state, ''),
IFNULL(zip_code, ''),
IFNULL(country, ''),
IFNULL(cross_street_1, ''),
IFNULL(cross_street_2, '')
))) AS address_hash
FROM standardized_addresses
WHERE (
address_line_1 IS NOT NULL OR address_line_2 IS NOT NULL OR
street_num IS NOT NULL OR street_name IS NOT NULL OR
apartment_num IS NOT NULL OR city IS NOT NULL OR
state IS NOT NULL OR zip_code IS NOT NULL OR
country IS NOT NULL OR cross_street_1 IS NOT NULL OR
cross_street_2 IS NOT NULL
)
"""

# Save rendered SQL for inspection
os.makedirs('debug', exist_ok=True)
with open('debug/dry_run_view_query.sql', 'w') as f:
f.write(view_query)
print("Rendered SQL saved to debug/dry_run_view_query.sql")

# Validate against BigQuery without executing
job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
try:
job = client.query(view_query, job_config=job_config)
print(f"✅ SQL is valid. Estimated bytes processed: {job.total_bytes_processed:,}")
except Exception as e:
print(f"❌ SQL validation failed: {e}")

if __name__ == "__main__":
dry_run()
2 changes: 2 additions & 0 deletions core/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
'''Check that the SQL works without generating artifacts or writing to database.'''

import datetime
from google.cloud import bigquery
import constants
Expand Down
72 changes: 72 additions & 0 deletions delivery_impact_of_change_to_up_hx.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Internal Note: Delivery Impact of `user_profile_address_view.sql` Changes

## What changed and why

Two behavioral changes were made to how user profile history addresses are computed before delivery:

1. **`historical_order` ordering corrected:** history entries were previously numbered oldest-first (`element_position + 1`). They are now numbered newest-first (`ROW_NUMBER() ... ORDER BY element_position DESC`), so `historical_order = 1` is always the most recent history snapshot. Current addresses remain `historical_order = 0`. Module4 addresses are unaffected.

2. **Carry-forward of NULL address fields:** history entries that were missing a field (true `NULL`) now receive the value from the nearest newer non-null entry for that participant and address type. Empty strings are preserved as-is and are not filled. Module4 addresses are unaffected.

---

## Impact on `address_hash` and delivery state

The `address_hash` is computed from address field values (`city`, `state`, `zip_code`, `address_line_1`, `address_line_2`). It does **not** include `historical_order`. Therefore:

| Row type | Hash changes? | Reason |
|---|---|---|
| All module4 rows | ❌ No | Untouched by this change |
| User profile current rows (`historical_order = 0`) | ⚠️ Only if a NULL field is filled by carry-forward from a history entry | Carry-forward window includes `historical_order = 0` |
| User profile history rows with no NULL fields | ❌ No | Carry-forward is a no-op; values unchanged |
| User profile history rows with at least one NULL field that can be filled | ✅ Yes | Field value changes → hash changes |

---

## Three populations in the upcoming delivery

**1. Truly new addresses:** participants or address types not previously seen. Handled normally.

**2. Re-delivered history rows (enriched):** previously delivered with one or more NULL fields; now delivered again with those fields filled via carry-forward. These will appear as new records to `identify_new_addresses()` because their hash has changed. The old NULL version remains in `address_delivery_metadata` and `address_deliveries` — it is not overwritten.

**3. History rows whose `historical_order` shifted:** if a participant's history array has grown since the last delivery, older entries will have higher `historical_order` values than before. Because `historical_order` is not in the hash, the hash is unchanged and these rows will **not** be re-delivered unless their field values also changed.

---

## Merging NORC's returned data on our end

When NORC returns geocoded results, rows should be matched back to our records using: `Connect_ID` + `address_nickname` + `ts_user_profile_updated`


- For **current addresses** (`historical_order = 0`), `ts_user_profile_updated` is `NULL` for all rows — match on `Connect_ID + address_nickname + historical_order = 0` instead.
- **Do not use `historical_order` alone** as a join key against prior deliveries, as the numbering convention has changed.
- Where a `Connect_ID + address_nickname + ts_user_profile_updated` combination appears in both an old delivery and the new one, the new delivery's geocoded result supersedes the old one. The old row (with NULLs) in `address_deliveries` can be flagged or ignored in downstream analysis.

## Upcoming Delivery Impact (as of 2026-05-05)

The next delivery will contain **41,350 total rows** across **~39,926 unique participants**:

| Category | Rows | % of delivery | Participants |
|---|---|---|---|
| Truly new (never delivered) | 38,827 | 93.9% | ~38,493 |
| Enriched re-deliveries | 2,523 | 6.1% | ~1,433 |
| **Total** | **41,350** | **100%** | **~39,926** |

Re-deliveries broken down by address type:

| Address type | Rows re-delivered | % of re-deliveries | Participants |
|---|---|---|---|
| `user_profile_mailing_address` | 2,455 | 97.3% | 1,397 |
| `user_profile_physical_address` | 53 | 2.1% | 25 |
| `user_profile_alternative_address` | 15 | 0.6% | 11 |
| **Total** | **2,523** | **100%** | **~1,433** |

The 2,523 re-delivered rows represent **user profile history entries that were previously
delivered with one or more NULL address fields**, which carry-forward has now filled using
the most recent non-null value for that participant and address type. These rows should be
matched back to prior deliveries on `Connect_ID + address_nickname + ts_user_profile_updated`
and treated as **updates to existing records**, not new records.

The average enriched participant has **1.76 history entries** being resent
(`2,523 rows / 1,433 participants`), consistent with carry-forward filling gaps across
multiple history snapshots per person.
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ google-cloud-bigquery-storage==2.30.0
pandas==2.2.3
python-dateutil==2.9.0
python-tabulate==0.9.0
openpxl==3.1.5
openpxl==3.1.5
tabulate
xlsxwriter
92 changes: 67 additions & 25 deletions sql/user_profile_address_view.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,54 @@
-- =========================================================================
-- User Profile Address View
-- Author: Jake Peters
-- Updated: 2026-05-05
--
-- Purpose: Produces one row per participant per address type per historical
-- entry by unioning current and historical address data from the
-- participants table across three address types:
-- - user_profile_physical_address (cid: 207908218)
-- - user_profile_mailing_address (cid: 521824358)
-- - user_profile_alternative_address (cid: 284580415)
--
-- historical_order convention:
-- 0 = current (active) address
-- 1 = most recent history entry
-- 2 = next most recent, etc.
--
-- Carry-forward: LAST_VALUE(... IGNORE NULLS) fills true NULLs in
-- address_line_1, address_line_2, city, state, and zip_code
-- using the most recent non-null value per (Connect_ID,
-- address_nickname). Empty strings are preserved as-is (they
-- block carry-forward). NULLIF/TRIM normalization happens
-- downstream in the standardized_addresses CTE in
-- address_processing.py, after carry-forward has run.
--
-- Placeholders: @raw_participants is replaced at runtime by
-- address_processing.py with the fully qualified BigQuery
-- table path defined in constants.py.
-- =========================================================================

SELECT
Connect_ID,
ts_user_profile_updated,
ts_address_delivered,
address_src_question_cid,
address_nickname,
address_source,
historical_order,
LAST_VALUE(address_line_1 IGNORE NULLS) OVER w AS address_line_1,
LAST_VALUE(address_line_2 IGNORE NULLS) OVER w AS address_line_2,
street_num,
street_name,
apartment_num,
LAST_VALUE(city IGNORE NULLS) OVER w AS city,
LAST_VALUE(state IGNORE NULLS) OVER w AS state,
LAST_VALUE(zip_code IGNORE NULLS) OVER w AS zip_code,
country,
cross_street1,
cross_street2
FROM (

-- =========================================================================
-- Query 1: User Profile - Current Physical Addresses
-- This query extracts physical address information from the participants table
Expand Down Expand Up @@ -88,12 +139,12 @@ SELECT
'207908218' AS address_src_question_cid,
'user_profile_physical_address' AS address_nickname,
'user_profile' AS address_source,
(element_position + 1) AS historical_order, -- Converting 0-based to 1-based
ROW_NUMBER() OVER (PARTITION BY Connect_ID ORDER BY element_position DESC) AS historical_order, -- 1 = most recent history entry
element.d_207908218 AS address_line_1,
element.d_224392018 AS address_line_2,
CAST(NULL AS STRING) AS street_num,
CAST(NULL AS STRING) AS street_name,
NULL AS apartment_num,
CAST(NULL AS STRING) AS apartment_num,
element.d_451993790 AS city,
element.d_187799450 AS state,
element.d_449168732 AS zip_code,
Expand All @@ -103,17 +154,14 @@ SELECT
FROM
@raw_participants,
UNNEST(d_569151507) AS element WITH OFFSET AS element_position
-- Query 3 history filter (relaxed — let carry-forward do its job):
WHERE
Connect_ID IS NOT NULL
AND d_821247024 = 197316935 -- Verification status = verified
AND d_831041022 = 104430631 -- Data destruction requested = no
AND (
(element.d_207908218 IS NOT NULL AND element.d_207908218 != '') OR
(element.d_224392018 IS NOT NULL AND element.d_224392018 != '') OR
(element.d_451993790 IS NOT NULL AND element.d_451993790 != '') OR
(element.d_187799450 IS NOT NULL AND element.d_187799450 != '') OR
(element.d_449168732 IS NOT NULL AND element.d_449168732 != '')
)
-- No address field filter here — rows with all NULLs will be
-- enriched by carry-forward and then filtered downstream by
-- the NULLIF/TRIM + IS NOT NULL check in address_processing.py

UNION ALL

Expand All @@ -129,7 +177,7 @@ SELECT
'521824358' AS address_src_question_cid,
'user_profile_mailing_address' AS address_nickname,
'user_profile' AS address_source,
(element_position + 1) AS historical_order,
ROW_NUMBER() OVER (PARTITION BY Connect_ID ORDER BY element_position DESC) AS historical_order,
element.d_521824358 AS address_line_1,
element.d_442166669 AS address_line_2,
CAST(NULL AS STRING) AS street_num,
Expand All @@ -148,13 +196,7 @@ WHERE
Connect_ID IS NOT NULL
AND d_821247024 = 197316935 -- Verification status = verified
AND d_831041022 = 104430631 -- Data destruction requested = no
AND (
(element.d_521824358 IS NOT NULL AND element.d_521824358 != '') OR
(element.d_442166669 IS NOT NULL AND element.d_442166669 != '') OR
(element.d_703385619 IS NOT NULL AND element.d_703385619 != '') OR
(element.d_634434746 IS NOT NULL AND element.d_634434746 != '') OR
(element.d_892050548 IS NOT NULL AND element.d_892050548 != '')
)
-- include even records with all nulls and empty strings (historical table requires them for carry forward logic)

UNION ALL

Expand Down Expand Up @@ -208,7 +250,7 @@ SELECT
'284580415' AS address_src_question_cid,
'user_profile_alternative_address' AS address_nickname,
'user_profile' AS address_source,
(element_position + 1) AS historical_order,
ROW_NUMBER() OVER (PARTITION BY Connect_ID ORDER BY element_position DESC) AS historical_order,
element.D_284580415 AS address_line_1,
element.D_728926441 AS address_line_2,
CAST(NULL AS STRING) AS street_num,
Expand All @@ -227,10 +269,10 @@ WHERE
Connect_ID IS NOT NULL
AND d_821247024 = 197316935 -- Verification status = verified
AND d_831041022 = 104430631 -- Data destruction requested = no
AND (
(element.D_284580415 IS NOT NULL AND element.D_284580415 != '') OR
(element.D_728926441 IS NOT NULL AND element.D_728926441 != '') OR
(element.D_907038282 IS NOT NULL AND element.D_907038282 != '') OR
(element.D_970839481 IS NOT NULL AND element.D_970839481 != '') OR
(element.D_379899229 IS NOT NULL AND element.D_379899229 != '')
)
-- include even records with all nulls and empty strings (historical table requires them for carry forward logic)
)
WINDOW w AS (
PARTITION BY Connect_ID, address_nickname
ORDER BY historical_order DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
)