Skip to content

Replica schema from logs.changes#233

Closed
julianam-w wants to merge 11 commits intomainfrom
replication
Closed

Replica schema from logs.changes#233
julianam-w wants to merge 11 commits intomainfrom
replication

Conversation

@julianam-w
Copy link
Copy Markdown
Collaborator

No description provided.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @julianam-w, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly expands the dbt project's capabilities by introducing a robust mechanism to reconstruct operational database schemas from change logs. This new 'replica' layer provides a near real-time view of source tables, which is crucial for advanced analytics and historical data analysis. The changes include new dbt profiles, macros, automated model generation scripts, and updated documentation to support this new data flow.

Highlights

  • New Data Reconstruction Layer: Introduced a 'reconstructs' model layer to rebuild operational database tables from 'logs.changes' for analytics, providing a near real-time replica of source tables.
  • Automated Model Generation: Added a Python script ('create_reconstructs_models.py') and new dbt macros ('get_table_names', 'jsonb_to_columns_dynamic') to automate the creation of these 'reconstructs' models, dynamically extracting columns from JSONB data.
  • Updated dbt Configuration: Modified 'config/profiles.yml' to include 'replica_demo' and 'replica_release' profiles for connecting to replica databases, and 'dbt_project.yml' to define the 'reconstructs' model type with incremental materialization.
  • Enhanced Documentation: 'README.md' and 'models/overview.md' were updated to reflect the new model architecture, data flow paths for reporting and analytics, and new deployment targets for replica schemas.
  • dbt Selectors Implementation: Introduced 'selectors.yml' and updated 'scripts/build_reporting_assets.py' to leverage dbt selectors ('analytics', 'replica', 'reporting', 'reports') for more granular control over model execution and documentation generation.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Comment thread config/profiles.yml Outdated
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new reconstructs layer in the dbt project, designed to create replica tables from the logs.changes data. This is a significant architectural addition, enabling analytics on a replicated database. The changes include new dbt macros for dynamic table reconstruction, a Python script to automate model creation, and updates to configuration and documentation.

My review focuses on improving the robustness and maintainability of the new implementation. Key points include:

  • Strengthening the logic for discovering table columns in the reconstruction macro.
  • Making the Python script for model generation more resilient to changes in dbt's output.
  • Adhering to architectural principles by moving filtering logic out of the raw reconstruction layer.
  • Avoiding hardcoded configuration values.

Overall, this is a great initiative. The feedback provided aims to make the new functionality more robust and easier to maintain.

Comment thread config/profiles.yml Outdated
Comment thread macros/logs_to_table.sql Outdated
Comment on lines +4 to +14
WITH latest_version AS (
SELECT record_data,
row_number() over (order by string_to_array(version, '.')::int[] desc) as rn
FROM {{ source("logs__tamanu", "changes") }}
WHERE table_name = '{{ table_name }}'
and version != 'unknown'
)
SELECT DISTINCT key
FROM latest_version,
LATERAL jsonb_each_text(record_data)
WHERE rn = 1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current logic for discovering columns is not robust. It relies on finding a single record with the highest software version and extracts JSON keys only from that record. This can lead to missing columns if:

  1. Other records for the same table have different (e.g., optional) fields.
  2. The version string is not in a format that can be sorted correctly (e.g., contains non-numeric parts).

A more reliable approach is to scan all records for the given table_name and collect all distinct keys that have ever appeared. This ensures the reconstructed table schema is complete.

            SELECT DISTINCT key
            FROM {{ source("logs__tamanu", "changes") }},
            LATERAL jsonb_object_keys(record_data) AS keys(key)
            WHERE table_name = '{{ table_name }}'

Comment thread config/profiles.yml Outdated
Comment thread macros/get_table_names.sql Outdated
Comment on lines +11 to +13
{% for table_name in table_names %}
{{ print(table_name) }}
{% endfor %}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Returning values from a dbt operation by printing them line-by-line is fragile and can break if dbt's logging output changes. A more robust approach is to print the result as a single JSON string. This makes parsing on the client-side (in your Python script) much more reliable.

    {{ print(tojson(table_names)) }}

Comment on lines +21 to +41
# Parse output to extract table names
table_names = []
lines = result.stdout.split("\n")

for line in lines:
line = line.strip()

if (
line
and not line.startswith("Running")
and not line.startswith("Completed")
and not line.startswith("Found")
and not line.startswith("Registered")
and not ":" in line
): # Skip timestamps

# This should be a table name - validate it
if line.replace("_", "").isalnum() and len(line) > 0:
table_names.append(line)

return table_names
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Parsing dbt's stdout is brittle. Since the get_table_names macro should be modified to return a JSON array (as per my other comment), this function should be updated to parse that JSON. This makes the script much more robust to changes in dbt's logging format. You'll also need to import json at the top of the file.

        # Parse JSON output to extract table names
        output_lines = result.stdout.split('\n')
        for line in output_lines:
            if line.strip().startswith('['):
                try:
                    import json
                    return json.loads(line.strip())
                except json.JSONDecodeError:
                    cprint(f"Failed to parse JSON from dbt output: {line.strip()}", "error")
                    return []
        
        cprint("Could not find JSON output from dbt macro.", "error")
        return []

Comment thread config/profiles.yml Outdated
Comment thread models/reconstructs/rec__patient_program_registrations.sql Outdated
Comment thread models/overview.md

## Project Overview
This dbt project transforms Tamanu healthcare system data into optimised datasets following a structured data flow: **sources/logs → bases → datasets → reports**. The architecture supports both reporting and analytics use cases while maintaining data governance and privacy standards.
This dbt project transforms Tamanu healthcare system data into optimised datasets following a structured data flow:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we say EMR instead of healthcare system? I have heard PMs referring to it as EMR in their presentations.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Contentious to call it an EMR or EHR, I would rather avoid it.

Comment thread macros/get_table_names.sql Outdated
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could rename this macro to get_tables_with_metadata(). It would return the table name and an array of keys, which we can then pass to other macro in a loop to create models:

WITH latest_records AS (
    SELECT DISTINCT ON (table_oid)
        table_name,
        record_data
    FROM logs.changes
    WHERE version != 'unknown'
    ORDER BY table_oid, string_to_array(version, '.')::int[] DESC
)
SELECT
    table_name,
    ARRAY_AGG(DISTINCT key) AS keys
FROM latest_records,
LATERAL jsonb_object_keys(record_data::jsonb) AS key
GROUP BY table_name;

Copy link
Copy Markdown
Collaborator Author

@julianam-w julianam-w Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason for keeping it separate is because I anticipate customisation of the tables. For example patient_program_registration and patient_program_registration_conditions. So this list of tables is to only check for missing tables and to generate a template.
If it is part of a macro that loops through, it will override any custom logic applied. I've used a python script to determine the new tables that needs a template to be generated.

Comment thread selectors.yml
- name: reporting
description: Reporting schema models built as views
default: true
definition:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does "default true" here means that if we don't pass the selector it will by default run this set?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct.

Comment thread macros/logs_to_table.sql Outdated
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we update the other macro to return the table name and keys, we could simplify this macro like this:

{% macro jsonb_to_columns_dynamic(table_name, keys) %}

WITH latest_changes AS (
    SELECT DISTINCT ON (record_id)
        record_updated_at,
        record_data
    FROM {{ source('logs__tamanu', 'changes') }}
    {% if is_incremental() %}
    WHERE record_updated_at > (
        SELECT COALESCE(MAX(record_updated_at), '1900-01-01') 
        FROM {{ this }}
    )
    {% endif %}
    AND table_name = '{{ table_name }}'
    ORDER BY record_id, record_updated_at DESC
)

SELECT
    record_updated_at,
    {% if keys | length > 0 %}
        {% for key in keys %}
            record_data->>'{{ key }}' AS {{ key }}{% if not loop.last %},{% endif %}
        {% endfor %}
    {% else %}
        NULL AS placeholder
    {% endif %}
FROM latest_changes

{% endmacro %}

Comment thread macros/logs_to_table.sql Outdated
Comment thread models/reconstructs/rec__patient_program_registrations.sql Outdated
@julianam-w julianam-w closed this Mar 15, 2026
@julianam-w julianam-w deleted the replication branch March 15, 2026 04:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants