Skip to content

Latest commit

 

History

History
178 lines (133 loc) · 7 KB

File metadata and controls

178 lines (133 loc) · 7 KB

NIH Connect Geocoding Pipeline

Overview

This repository contains the Geocoding Pipeline for the NIH Connect study, designed to collect and prepare participant address data for geocoding by NORC. The pipeline extracts addresses from the Connect study database, manages delivery of new addresses to NORC for geocoding, and maintains metadata to track which addresses have been processed.

The pipeline is currently designed to be executed locally by the Analytics Team, but could be automated within GCP (using Cloud Run) in the future if the time investment proves worthwhile.

Features

  • Extracts addresses from both Module 4 questionnaire and User Profile data
  • Filters for participants who meet specific criteria (verified status, no data destruction request, Module 4 complete)
  • Identifies new addresses that haven't been processed yet
  • Exports addresses in Excel format for manual delivery to NORC via Box.com
  • Stores exported files in GCS bucket for long-term retention
  • Maintains comprehensive metadata of all addresses processed
  • Generates detailed statistics on address data quality and coverage
  • Supports deletion of specific deliveries if needed

Data Architecture

Tables and Views Overview

The pipeline creates and manages several BigQuery tables and views, each serving a specific purpose in the geocoding workflow:

Core Data Views

addresses_all (View)

  • Purpose: Unified view that combines address data from multiple sources (Module 4 questionnaire and User Profile)
  • Scope: All valid addresses from verified participants who haven't requested data destruction and have completed Module 4
  • Key Features:
    • Standardizes address formats across different sources
    • Includes computed address hash for deduplication
    • Filters out records with no address information
    • Combines up to 27 different address types (11 home addresses, 10 seasonal addresses, work addresses, school addresses, etc.)

Metadata and Tracking Tables

address_delivery_metadata (Table)

  • Purpose: Tracks which addresses have been delivered to NORC for geocoding
  • Scope: Metadata only - delivery IDs, dates, participant IDs, and address hashes
  • Data Retention: Permanent record of all deliveries
  • Key Fields: delivery_id, delivery_date, Connect_ID, address_hash, address_source

address_deliveries (Table - Comprehensive)

  • Purpose: Complete historical record of all addresses delivered to NORC
  • Scope: Full address details for every delivery made
  • Data Retention: Permanent - serves as the authoritative record
  • Key Features: Contains both metadata AND complete address information for audit trails and analysis

Working Tables

address_delivery_current (Table - Temporary)

  • Purpose: Staging table for the current delivery being processed
  • Scope: Only addresses identified for the current delivery run
  • Data Retention: Temporary - gets recreated with each pipeline run
  • Lifecycle: Created → Populated → Exported → Used for metadata updates

Data Flow Summary

Source Data (Module 4 + User Profile)
    ↓
addresses_all (View) - Unified, standardized view
    ↓
address_delivery_current (Table) - Current batch of new addresses
    ↓
Export to Excel file + Store in GCS + Update metadata tables
    ↓
address_delivery_metadata + address_deliveries (Permanent record)
    ↓
[Manual delivery to NORC via Box.com per SOP]

Temporary Objects

The pipeline also creates temporary tables during processing (e.g., temp_new_addresses_*) that are automatically cleaned up after use.

Pipeline Architecture

The pipeline consists of several Python modules that work together:

  • main.py: Entry point that orchestrates the pipeline
  • constants.py: Configuration parameters
  • utils.py: Utility functions like logging
  • address_processing.py: Core pipeline functionality

Key Functions

Functions in address_processing.py:

  • create_required_tables(): Creates the necessary tables if they don't exist
  • create_address_view(): Creates or updates the address view
  • identify_new_addresses(): Identifies addresses not yet delivered
  • update_metadata(): Updates metadata tables with new delivery information
  • export_addresses(): Exports addresses to CSV
  • delete_delivery(): Deletes a specific delivery from metadata
  • generate_summary_statistics(): Generates statistics about addresses

SQL Query Files

  • address_view.sql: Queries that extract addresses from Module 4
  • user_profile_address_view.sql: Queries that extract addresses from User Profile

Configuration

The pipeline is configured via the constants.py file:

  • PROJECT_ID: Google Cloud Project ID
  • TARGET_DATASET_ID: BigQuery dataset for storing pipeline tables
  • FLAT_SOURCE_DATASET_ID: Flat Connect dataset location
  • RAW_SOURCE_DATASET_ID: Raw Connect dataset location
  • MODULE_4_TABLE: Module 4 questionnaire table
  • FLAT_PARTICIPANTS_TABLE: Flattened participants table
  • RAW_PARTICIPANTS_TABLE: Raw participants table
  • METADATA_TABLE: Table for storing delivery metadata
  • ADDRESSES_VIEW: View that combines all address sources
  • CURRENT_DELIVERY_TABLE: Table for current delivery
  • COMPREHENSIVE_TABLE: Comprehensive history of all delivered addresses
  • LOCAL_EXPORT: Boolean to toggle between local file export and GCS export
  • LOCAL_EXPORT_DIR: Directory for local file exports
  • SQL_DIR: Directory containing SQL query files
  • QUERY_TIMEOUT: Timeout for BigQuery operations (seconds)

Running the Pipeline

To run the full pipeline:

python main.py

This will:

  1. Create required tables if they don't exist
  2. Create/update the address view
  3. Identify addresses that haven't been delivered yet
  4. Update metadata tables
  5. Export addresses to an Excel file and store in GCS bucket
  6. Generate summary statistics

Note: The exported Excel file should be manually shared with NORC via Box.com following the procedures outlined in the SOP. Files are also automatically stored in the GCS bucket for long-term retention.

Managing Deliveries

To delete a specific delivery:

from google.cloud import bigquery
import constants
import address_processing

client = bigquery.Client(project=constants.PROJECT_ID)
address_processing.delete_delivery(client, "DELIVERY_20250424")

To generate statistics for a specific delivery:

from google.cloud import bigquery
import constants
import address_processing

client = bigquery.Client(project=constants.PROJECT_ID)
address_processing.generate_summary_statistics(client, "DELIVERY_20250424")

Monitoring and Debugging

  • The pipeline generates detailed logs during execution
  • Debug SQL queries are saved to a debug directory
  • Summary statistics are displayed at the end of each successful run

Future Extensions

The pipeline is designed to be extended to:

  • Ingest geocoded data back from NORC
  • Store and manage latitude, longitude, and cleaned up address data
  • Provide quality reports on the geocoding results
  • Support additional address sources as they become available