This repository contains the Geocoding Pipeline for the NIH Connect study, designed to collect and prepare participant address data for geocoding by NORC. The pipeline extracts addresses from the Connect study database, manages delivery of new addresses to NORC for geocoding, and maintains metadata to track which addresses have been processed.
The pipeline is currently designed to be executed locally by the Analytics Team, but could be automated within GCP (using Cloud Run) in the future if the time investment proves worthwhile.
- Extracts addresses from both Module 4 questionnaire and User Profile data
- Filters for participants who meet specific criteria (verified status, no data destruction request, Module 4 complete)
- Identifies new addresses that haven't been processed yet
- Exports addresses in Excel format for manual delivery to NORC via Box.com
- Stores exported files in GCS bucket for long-term retention
- Maintains comprehensive metadata of all addresses processed
- Generates detailed statistics on address data quality and coverage
- Supports deletion of specific deliveries if needed
The pipeline creates and manages several BigQuery tables and views, each serving a specific purpose in the geocoding workflow:
addresses_all (View)
- Purpose: Unified view that combines address data from multiple sources (Module 4 questionnaire and User Profile)
- Scope: All valid addresses from verified participants who haven't requested data destruction and have completed Module 4
- Key Features:
- Standardizes address formats across different sources
- Includes computed address hash for deduplication
- Filters out records with no address information
- Combines up to 27 different address types (11 home addresses, 10 seasonal addresses, work addresses, school addresses, etc.)
address_delivery_metadata (Table)
- Purpose: Tracks which addresses have been delivered to NORC for geocoding
- Scope: Metadata only - delivery IDs, dates, participant IDs, and address hashes
- Data Retention: Permanent record of all deliveries
- Key Fields: delivery_id, delivery_date, Connect_ID, address_hash, address_source
address_deliveries (Table - Comprehensive)
- Purpose: Complete historical record of all addresses delivered to NORC
- Scope: Full address details for every delivery made
- Data Retention: Permanent - serves as the authoritative record
- Key Features: Contains both metadata AND complete address information for audit trails and analysis
address_delivery_current (Table - Temporary)
- Purpose: Staging table for the current delivery being processed
- Scope: Only addresses identified for the current delivery run
- Data Retention: Temporary - gets recreated with each pipeline run
- Lifecycle: Created → Populated → Exported → Used for metadata updates
Source Data (Module 4 + User Profile)
↓
addresses_all (View) - Unified, standardized view
↓
address_delivery_current (Table) - Current batch of new addresses
↓
Export to Excel file + Store in GCS + Update metadata tables
↓
address_delivery_metadata + address_deliveries (Permanent record)
↓
[Manual delivery to NORC via Box.com per SOP]
The pipeline also creates temporary tables during processing (e.g., temp_new_addresses_*) that are automatically cleaned up after use.
The pipeline consists of several Python modules that work together:
main.py: Entry point that orchestrates the pipelineconstants.py: Configuration parametersutils.py: Utility functions like loggingaddress_processing.py: Core pipeline functionality
Functions in address_processing.py:
create_required_tables(): Creates the necessary tables if they don't existcreate_address_view(): Creates or updates the address viewidentify_new_addresses(): Identifies addresses not yet deliveredupdate_metadata(): Updates metadata tables with new delivery informationexport_addresses(): Exports addresses to CSVdelete_delivery(): Deletes a specific delivery from metadatagenerate_summary_statistics(): Generates statistics about addresses
address_view.sql: Queries that extract addresses from Module 4user_profile_address_view.sql: Queries that extract addresses from User Profile
The pipeline is configured via the constants.py file:
PROJECT_ID: Google Cloud Project IDTARGET_DATASET_ID: BigQuery dataset for storing pipeline tablesFLAT_SOURCE_DATASET_ID: Flat Connect dataset locationRAW_SOURCE_DATASET_ID: Raw Connect dataset locationMODULE_4_TABLE: Module 4 questionnaire tableFLAT_PARTICIPANTS_TABLE: Flattened participants tableRAW_PARTICIPANTS_TABLE: Raw participants tableMETADATA_TABLE: Table for storing delivery metadataADDRESSES_VIEW: View that combines all address sourcesCURRENT_DELIVERY_TABLE: Table for current deliveryCOMPREHENSIVE_TABLE: Comprehensive history of all delivered addressesLOCAL_EXPORT: Boolean to toggle between local file export and GCS exportLOCAL_EXPORT_DIR: Directory for local file exportsSQL_DIR: Directory containing SQL query filesQUERY_TIMEOUT: Timeout for BigQuery operations (seconds)
To run the full pipeline:
python main.py
This will:
- Create required tables if they don't exist
- Create/update the address view
- Identify addresses that haven't been delivered yet
- Update metadata tables
- Export addresses to an Excel file and store in GCS bucket
- Generate summary statistics
Note: The exported Excel file should be manually shared with NORC via Box.com following the procedures outlined in the SOP. Files are also automatically stored in the GCS bucket for long-term retention.
To delete a specific delivery:
from google.cloud import bigquery
import constants
import address_processing
client = bigquery.Client(project=constants.PROJECT_ID)
address_processing.delete_delivery(client, "DELIVERY_20250424")To generate statistics for a specific delivery:
from google.cloud import bigquery
import constants
import address_processing
client = bigquery.Client(project=constants.PROJECT_ID)
address_processing.generate_summary_statistics(client, "DELIVERY_20250424")- The pipeline generates detailed logs during execution
- Debug SQL queries are saved to a
debugdirectory - Summary statistics are displayed at the end of each successful run
The pipeline is designed to be extended to:
- Ingest geocoded data back from NORC
- Store and manage latitude, longitude, and cleaned up address data
- Provide quality reports on the geocoding results
- Support additional address sources as they become available