diff --git a/core/dry_run.py b/core/dry_run.py new file mode 100644 index 0000000..3be4b52 --- /dev/null +++ b/core/dry_run.py @@ -0,0 +1,106 @@ +import os +from google.cloud import bigquery +import sys +sys.path.insert(0, 'core') +import constants + +def dry_run(): + """Dry run the SQL code generation without running the pipeline end-to-end. + Useful for ensuring that the BigQuery code works before modifying metadata.""" + client = bigquery.Client(project=constants.PROJECT_ID) + + # Read and render SQL exactly as create_address_view does + with open(os.path.join(constants.SQL_DIR, constants.ADDRESS_QUERY_SQL), 'r') as f: + module4_query = f.read() + + with open(os.path.join(constants.SQL_DIR, constants.USER_PROFILE_QUERY_SQL), 'r') as f: + user_profile_query = f.read() + + module4_query = module4_query.replace('@flat_module4', constants.MODULE_4_TABLE) + module4_query = module4_query.replace('@flat_participants', constants.FLAT_PARTICIPANTS_TABLE) + module4_query = module4_query.replace('@raw_participants', constants.RAW_PARTICIPANTS_TABLE) + + user_profile_query = user_profile_query.replace('@flat_module4', constants.MODULE_4_TABLE) + user_profile_query = user_profile_query.replace('@flat_participants', constants.FLAT_PARTICIPANTS_TABLE) + user_profile_query = user_profile_query.replace('@raw_participants', constants.RAW_PARTICIPANTS_TABLE) + + module4_query = module4_query.strip().rstrip(';') + user_profile_query = user_profile_query.strip().rstrip(';') + + combined_query = f""" + {module4_query} + UNION ALL + {user_profile_query} + """ + + view_query = f""" + CREATE OR REPLACE VIEW {constants.ADDRESSES_VIEW} AS + WITH standardized_addresses AS ( + SELECT + CAST(Connect_ID AS STRING) AS Connect_ID, + CAST(ts_user_profile_updated AS TIMESTAMP) AS ts_user_profile_updated, + CAST(ts_address_delivered AS TIMESTAMP) AS ts_address_delivered, + CAST(address_src_question_cid AS STRING) AS address_src_question_cid, + CAST(address_nickname AS STRING) AS address_nickname, + CAST(address_source AS STRING) AS address_source, + CAST(historical_order AS INT64) AS historical_order, + NULLIF(TRIM(CAST(address_line_1 AS STRING)), '') AS address_line_1, + NULLIF(TRIM(CAST(address_line_2 AS STRING)), '') AS address_line_2, + NULLIF(TRIM(CAST(street_num AS STRING)), '') AS street_num, + NULLIF(TRIM(CAST(street_name AS STRING)), '') AS street_name, + NULLIF(TRIM(CAST(apartment_num AS STRING)), '') AS apartment_num, + NULLIF(TRIM(CAST(city AS STRING)), '') AS city, + NULLIF(TRIM(CAST(state AS STRING)), '') AS state, + NULLIF(TRIM(CAST(zip_code AS STRING)), '') AS zip_code, + NULLIF(TRIM(CAST(country AS STRING)), '') AS country, + NULLIF(TRIM(CAST(cross_street_1 AS STRING)), '') AS cross_street_1, + NULLIF(TRIM(CAST(cross_street_2 AS STRING)), '') AS cross_street_2 + FROM ( + {combined_query} + ) subquery + ) + SELECT *, + TO_HEX(MD5(CONCAT( + IFNULL(Connect_ID, ''), + IFNULL(address_src_question_cid, ''), + IFNULL(address_nickname, ''), + IFNULL(address_source, ''), + IFNULL(address_line_1, ''), + IFNULL(address_line_2, ''), + IFNULL(street_num, ''), + IFNULL(street_name, ''), + IFNULL(apartment_num, ''), + IFNULL(city, ''), + IFNULL(state, ''), + IFNULL(zip_code, ''), + IFNULL(country, ''), + IFNULL(cross_street_1, ''), + IFNULL(cross_street_2, '') + ))) AS address_hash + FROM standardized_addresses + WHERE ( + address_line_1 IS NOT NULL OR address_line_2 IS NOT NULL OR + street_num IS NOT NULL OR street_name IS NOT NULL OR + apartment_num IS NOT NULL OR city IS NOT NULL OR + state IS NOT NULL OR zip_code IS NOT NULL OR + country IS NOT NULL OR cross_street_1 IS NOT NULL OR + cross_street_2 IS NOT NULL + ) + """ + + # Save rendered SQL for inspection + os.makedirs('debug', exist_ok=True) + with open('debug/dry_run_view_query.sql', 'w') as f: + f.write(view_query) + print("Rendered SQL saved to debug/dry_run_view_query.sql") + + # Validate against BigQuery without executing + job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False) + try: + job = client.query(view_query, job_config=job_config) + print(f"✅ SQL is valid. Estimated bytes processed: {job.total_bytes_processed:,}") + except Exception as e: + print(f"❌ SQL validation failed: {e}") + +if __name__ == "__main__": + dry_run() \ No newline at end of file diff --git a/core/main.py b/core/main.py index 14cd03b..3e1f9bf 100644 --- a/core/main.py +++ b/core/main.py @@ -1,3 +1,5 @@ +'''Check that the SQL works without generating artifacts or writing to database.''' + import datetime from google.cloud import bigquery import constants diff --git a/delivery_impact_of_change_to_up_hx.md b/delivery_impact_of_change_to_up_hx.md new file mode 100644 index 0000000..af6cdfa --- /dev/null +++ b/delivery_impact_of_change_to_up_hx.md @@ -0,0 +1,72 @@ +# Internal Note: Delivery Impact of `user_profile_address_view.sql` Changes + +## What changed and why + +Two behavioral changes were made to how user profile history addresses are computed before delivery: + +1. **`historical_order` ordering corrected:** history entries were previously numbered oldest-first (`element_position + 1`). They are now numbered newest-first (`ROW_NUMBER() ... ORDER BY element_position DESC`), so `historical_order = 1` is always the most recent history snapshot. Current addresses remain `historical_order = 0`. Module4 addresses are unaffected. + +2. **Carry-forward of NULL address fields:** history entries that were missing a field (true `NULL`) now receive the value from the nearest newer non-null entry for that participant and address type. Empty strings are preserved as-is and are not filled. Module4 addresses are unaffected. + +--- + +## Impact on `address_hash` and delivery state + +The `address_hash` is computed from address field values (`city`, `state`, `zip_code`, `address_line_1`, `address_line_2`). It does **not** include `historical_order`. Therefore: + +| Row type | Hash changes? | Reason | +|---|---|---| +| All module4 rows | ❌ No | Untouched by this change | +| User profile current rows (`historical_order = 0`) | ⚠️ Only if a NULL field is filled by carry-forward from a history entry | Carry-forward window includes `historical_order = 0` | +| User profile history rows with no NULL fields | ❌ No | Carry-forward is a no-op; values unchanged | +| User profile history rows with at least one NULL field that can be filled | ✅ Yes | Field value changes → hash changes | + +--- + +## Three populations in the upcoming delivery + +**1. Truly new addresses:** participants or address types not previously seen. Handled normally. + +**2. Re-delivered history rows (enriched):** previously delivered with one or more NULL fields; now delivered again with those fields filled via carry-forward. These will appear as new records to `identify_new_addresses()` because their hash has changed. The old NULL version remains in `address_delivery_metadata` and `address_deliveries` — it is not overwritten. + +**3. History rows whose `historical_order` shifted:** if a participant's history array has grown since the last delivery, older entries will have higher `historical_order` values than before. Because `historical_order` is not in the hash, the hash is unchanged and these rows will **not** be re-delivered unless their field values also changed. + +--- + +## Merging NORC's returned data on our end + +When NORC returns geocoded results, rows should be matched back to our records using: `Connect_ID` + `address_nickname` + `ts_user_profile_updated` + + +- For **current addresses** (`historical_order = 0`), `ts_user_profile_updated` is `NULL` for all rows — match on `Connect_ID + address_nickname + historical_order = 0` instead. +- **Do not use `historical_order` alone** as a join key against prior deliveries, as the numbering convention has changed. +- Where a `Connect_ID + address_nickname + ts_user_profile_updated` combination appears in both an old delivery and the new one, the new delivery's geocoded result supersedes the old one. The old row (with NULLs) in `address_deliveries` can be flagged or ignored in downstream analysis. + +## Upcoming Delivery Impact (as of 2026-05-05) + +The next delivery will contain **41,350 total rows** across **~39,926 unique participants**: + +| Category | Rows | % of delivery | Participants | +|---|---|---|---| +| Truly new (never delivered) | 38,827 | 93.9% | ~38,493 | +| Enriched re-deliveries | 2,523 | 6.1% | ~1,433 | +| **Total** | **41,350** | **100%** | **~39,926** | + +Re-deliveries broken down by address type: + +| Address type | Rows re-delivered | % of re-deliveries | Participants | +|---|---|---|---| +| `user_profile_mailing_address` | 2,455 | 97.3% | 1,397 | +| `user_profile_physical_address` | 53 | 2.1% | 25 | +| `user_profile_alternative_address` | 15 | 0.6% | 11 | +| **Total** | **2,523** | **100%** | **~1,433** | + +The 2,523 re-delivered rows represent **user profile history entries that were previously +delivered with one or more NULL address fields**, which carry-forward has now filled using +the most recent non-null value for that participant and address type. These rows should be +matched back to prior deliveries on `Connect_ID + address_nickname + ts_user_profile_updated` +and treated as **updates to existing records**, not new records. + +The average enriched participant has **1.76 history entries** being resent +(`2,523 rows / 1,433 participants`), consistent with carry-forward filling gaps across +multiple history snapshots per person. \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 4c8c458..80f5d57 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,6 @@ google-cloud-bigquery-storage==2.30.0 pandas==2.2.3 python-dateutil==2.9.0 python-tabulate==0.9.0 -openpxl==3.1.5 \ No newline at end of file +openpxl==3.1.5 +tabulate +xlsxwriter \ No newline at end of file diff --git a/sql/user_profile_address_view.sql b/sql/user_profile_address_view.sql index 6011eef..bcfaf9d 100644 --- a/sql/user_profile_address_view.sql +++ b/sql/user_profile_address_view.sql @@ -1,3 +1,54 @@ +-- ========================================================================= +-- User Profile Address View +-- Author: Jake Peters +-- Updated: 2026-05-05 +-- +-- Purpose: Produces one row per participant per address type per historical +-- entry by unioning current and historical address data from the +-- participants table across three address types: +-- - user_profile_physical_address (cid: 207908218) +-- - user_profile_mailing_address (cid: 521824358) +-- - user_profile_alternative_address (cid: 284580415) +-- +-- historical_order convention: +-- 0 = current (active) address +-- 1 = most recent history entry +-- 2 = next most recent, etc. +-- +-- Carry-forward: LAST_VALUE(... IGNORE NULLS) fills true NULLs in +-- address_line_1, address_line_2, city, state, and zip_code +-- using the most recent non-null value per (Connect_ID, +-- address_nickname). Empty strings are preserved as-is (they +-- block carry-forward). NULLIF/TRIM normalization happens +-- downstream in the standardized_addresses CTE in +-- address_processing.py, after carry-forward has run. +-- +-- Placeholders: @raw_participants is replaced at runtime by +-- address_processing.py with the fully qualified BigQuery +-- table path defined in constants.py. +-- ========================================================================= + +SELECT + Connect_ID, + ts_user_profile_updated, + ts_address_delivered, + address_src_question_cid, + address_nickname, + address_source, + historical_order, + LAST_VALUE(address_line_1 IGNORE NULLS) OVER w AS address_line_1, + LAST_VALUE(address_line_2 IGNORE NULLS) OVER w AS address_line_2, + street_num, + street_name, + apartment_num, + LAST_VALUE(city IGNORE NULLS) OVER w AS city, + LAST_VALUE(state IGNORE NULLS) OVER w AS state, + LAST_VALUE(zip_code IGNORE NULLS) OVER w AS zip_code, + country, + cross_street1, + cross_street2 +FROM ( + -- ========================================================================= -- Query 1: User Profile - Current Physical Addresses -- This query extracts physical address information from the participants table @@ -88,12 +139,12 @@ SELECT '207908218' AS address_src_question_cid, 'user_profile_physical_address' AS address_nickname, 'user_profile' AS address_source, - (element_position + 1) AS historical_order, -- Converting 0-based to 1-based + ROW_NUMBER() OVER (PARTITION BY Connect_ID ORDER BY element_position DESC) AS historical_order, -- 1 = most recent history entry element.d_207908218 AS address_line_1, element.d_224392018 AS address_line_2, CAST(NULL AS STRING) AS street_num, CAST(NULL AS STRING) AS street_name, - NULL AS apartment_num, + CAST(NULL AS STRING) AS apartment_num, element.d_451993790 AS city, element.d_187799450 AS state, element.d_449168732 AS zip_code, @@ -103,17 +154,14 @@ SELECT FROM @raw_participants, UNNEST(d_569151507) AS element WITH OFFSET AS element_position +-- Query 3 history filter (relaxed — let carry-forward do its job): WHERE Connect_ID IS NOT NULL AND d_821247024 = 197316935 -- Verification status = verified AND d_831041022 = 104430631 -- Data destruction requested = no - AND ( - (element.d_207908218 IS NOT NULL AND element.d_207908218 != '') OR - (element.d_224392018 IS NOT NULL AND element.d_224392018 != '') OR - (element.d_451993790 IS NOT NULL AND element.d_451993790 != '') OR - (element.d_187799450 IS NOT NULL AND element.d_187799450 != '') OR - (element.d_449168732 IS NOT NULL AND element.d_449168732 != '') - ) + -- No address field filter here — rows with all NULLs will be + -- enriched by carry-forward and then filtered downstream by + -- the NULLIF/TRIM + IS NOT NULL check in address_processing.py UNION ALL @@ -129,7 +177,7 @@ SELECT '521824358' AS address_src_question_cid, 'user_profile_mailing_address' AS address_nickname, 'user_profile' AS address_source, - (element_position + 1) AS historical_order, + ROW_NUMBER() OVER (PARTITION BY Connect_ID ORDER BY element_position DESC) AS historical_order, element.d_521824358 AS address_line_1, element.d_442166669 AS address_line_2, CAST(NULL AS STRING) AS street_num, @@ -148,13 +196,7 @@ WHERE Connect_ID IS NOT NULL AND d_821247024 = 197316935 -- Verification status = verified AND d_831041022 = 104430631 -- Data destruction requested = no - AND ( - (element.d_521824358 IS NOT NULL AND element.d_521824358 != '') OR - (element.d_442166669 IS NOT NULL AND element.d_442166669 != '') OR - (element.d_703385619 IS NOT NULL AND element.d_703385619 != '') OR - (element.d_634434746 IS NOT NULL AND element.d_634434746 != '') OR - (element.d_892050548 IS NOT NULL AND element.d_892050548 != '') - ) + -- include even records with all nulls and empty strings (historical table requires them for carry forward logic) UNION ALL @@ -208,7 +250,7 @@ SELECT '284580415' AS address_src_question_cid, 'user_profile_alternative_address' AS address_nickname, 'user_profile' AS address_source, - (element_position + 1) AS historical_order, + ROW_NUMBER() OVER (PARTITION BY Connect_ID ORDER BY element_position DESC) AS historical_order, element.D_284580415 AS address_line_1, element.D_728926441 AS address_line_2, CAST(NULL AS STRING) AS street_num, @@ -227,10 +269,10 @@ WHERE Connect_ID IS NOT NULL AND d_821247024 = 197316935 -- Verification status = verified AND d_831041022 = 104430631 -- Data destruction requested = no - AND ( - (element.D_284580415 IS NOT NULL AND element.D_284580415 != '') OR - (element.D_728926441 IS NOT NULL AND element.D_728926441 != '') OR - (element.D_907038282 IS NOT NULL AND element.D_907038282 != '') OR - (element.D_970839481 IS NOT NULL AND element.D_970839481 != '') OR - (element.D_379899229 IS NOT NULL AND element.D_379899229 != '') - ) + -- include even records with all nulls and empty strings (historical table requires them for carry forward logic) +) +WINDOW w AS ( + PARTITION BY Connect_ID, address_nickname + ORDER BY historical_order DESC + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW +) \ No newline at end of file