Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
578cc80
Migration to timescaledb
diveshjain-phy Aug 27, 2025
762bee9
Continuous aggregate deployment and bad statistics
diveshjain-phy Aug 29, 2025
65e07c3
Updating mean statistic computes
diveshjain-phy Sep 1, 2025
75f6942
Refactoring the code for structure and readability
diveshjain-phy Sep 17, 2025
fcd23e5
minor update
diveshjain-phy Sep 17, 2025
e3a367c
minor update
diveshjain-phy Sep 17, 2025
d43f15a
update to aggregate setup. rentention policy+multiple aggregate table
diveshjain-phy Sep 24, 2025
167e495
implementation of daily, weekly and monthly table
diveshjain-phy Sep 24, 2025
6c10186
selection of table in derived statistics
diveshjain-phy Sep 24, 2025
16aee35
minor update
diveshjain-phy Sep 24, 2025
a9484ca
generate data with past timestamps
diveshjain-phy Oct 2, 2025
eb961b2
Manual refresh for historical data
diveshjain-phy Oct 2, 2025
1fe986e
Implementation of manual refresh when dealing with historical data in…
diveshjain-phy Oct 2, 2025
04a6fee
added time stamps in responses and fixed minor bugs
diveshjain-phy Oct 2, 2025
ed02963
minor updates
diveshjain-phy Oct 3, 2025
ad879f8
passing time resolution as part of response body
diveshjain-phy Oct 3, 2025
64e0e31
added time series field and variance parameter
diveshjain-phy Oct 3, 2025
24f2207
Minor mods and handling without_aggregate requests within the same st…
diveshjain-phy Oct 15, 2025
a64f12a
modified handling time threhold strategy
diveshjain-phy Oct 15, 2025
bfb8526
dependency injection + timeseries for rawtable+ minor refactors
diveshjain-phy Oct 22, 2025
af092db
minor mods to prevent sql injection
diveshjain-phy Oct 31, 2025
d7b3a17
refactoring the code with psycopg connection and benchmarks
diveshjain-phy Nov 18, 2025
9435c41
refactors for better performance
diveshjain-phy Nov 18, 2025
f34a331
refactors for better performance
diveshjain-phy Nov 18, 2025
86ec8c4
refactors for better performance
diveshjain-phy Nov 18, 2025
dee1dac
multi-connection prescription for lightcurve.py
diveshjain-phy Nov 21, 2025
67866b3
added comfiguration for psycopg connection
diveshjain-phy Nov 21, 2025
c7db0de
Setup connection management system
diveshjain-phy Nov 21, 2025
6416208
Merge branch 'main' into database_side_analysis
diveshjain-phy Dec 4, 2025
0891326
Add storage protocols and update models
diveshjain-phy Dec 9, 2025
90149be
create protocols for hot swappable storage backends
diveshjain-phy Dec 9, 2025
7a3093f
rebasing commits
diveshjain-phy Dec 9, 2025
57302e9
cleaning sqlalchemy dependencies
diveshjain-phy Dec 9, 2025
667476c
updating simulation- stripping sqlalchemy
diveshjain-phy Dec 9, 2025
49db684
remove legacy files
diveshjain-phy Dec 9, 2025
851ade9
update simulation codes
diveshjain-phy Dec 9, 2025
2eb650e
update configuration file to include backend types
diveshjain-phy Dec 9, 2025
9e69a99
update the ephemeral and setup file
diveshjain-phy Dec 9, 2025
2674115
fixing ports on ephemeral run
diveshjain-phy Dec 10, 2025
c47f308
client band operations and tests
diveshjain-phy Dec 10, 2025
6d7b5d9
client band operations and tests
diveshjain-phy Dec 10, 2025
d3088e8
minor updation
diveshjain-phy Dec 10, 2025
c509dba
added commit to each transaction
diveshjain-phy Dec 11, 2025
253fef4
fixing commits within each transaction
diveshjain-phy Dec 12, 2025
46a0894
moved tests from shared connection to isolated connections per test
diveshjain-phy Dec 12, 2025
fdb3cb7
update client source
diveshjain-phy Dec 12, 2025
e13a341
moving lighcurve functionalities
diveshjain-phy Dec 12, 2025
42d51ae
moving lighcurve functionalities
diveshjain-phy Dec 12, 2025
9192265
updating feed client and querying
diveshjain-phy Dec 12, 2025
2e9a56e
fixing measurement
diveshjain-phy Dec 12, 2025
20a6db7
removing port binding in ephemeral setup
diveshjain-phy Dec 12, 2025
b4ba852
Minor refactoring of code
diveshjain-phy Dec 19, 2025
b204ad6
refactoring flux measurement table ddl
diveshjain-phy Dec 19, 2025
1f4920d
added statistics functionality
diveshjain-phy Dec 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
299 changes: 299 additions & 0 deletions lightcurvedb/analysis/aggregates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
"""
Functions for creating and managing TimescaleDB continuous aggregates.
"""


from dataclasses import dataclass
from typing import Any, Dict, List

from sqlalchemy import column, func, select, table, text
from sqlalchemy.orm import Session

from lightcurvedb.models import FluxMeasurementTable


@dataclass
class AggregateConfig:
"""
Configuration for a continuous aggregate
"""
view_name: str
# Name of the materialized view table
time_resolution: str
# Time resolution label (daily, weekly, monthly)
bucket_interval: str
# Time window for grouping data
drop_after_interval: str
# How old data must be before deletion
drop_schedule_interval: str
# How often to check for and delete old data
refresh_start_offset: str
# How far back to look for new raw data
refresh_end_offset: str
# Exclude recent data to avoid incomplete buckets
refresh_schedule_interval: str
# How often to update the aggregate with new data
evaluate_threshold_days: int
# Maximum age in days for this aggregate to be selected
display_date_correction: str
# SQL interval expression for bucket_end display correction



AggregateConfigurations: List[AggregateConfig] = [
AggregateConfig(
view_name="band_statistics_daily",
time_resolution="daily",
bucket_interval="1 day",
drop_after_interval="1 month",
drop_schedule_interval="7 days",
refresh_start_offset="7 days",
refresh_end_offset="1 day",
refresh_schedule_interval="1 day",
evaluate_threshold_days=30,
display_date_correction=""
),
AggregateConfig(
view_name="band_statistics_weekly",
time_resolution="weekly",
bucket_interval="1 week",
drop_after_interval="6 months",
drop_schedule_interval="1 month",
refresh_start_offset="3 weeks",
refresh_end_offset="1 week",
refresh_schedule_interval="1 week",
evaluate_threshold_days=180,
display_date_correction="INTERVAL '6 days'"
),
AggregateConfig(
view_name="band_statistics_monthly",
time_resolution="monthly",
bucket_interval="1 month",
drop_after_interval="3 years",
drop_schedule_interval="4 months",
refresh_start_offset="3 months",
refresh_end_offset="1 month",
refresh_schedule_interval="1 week",
evaluate_threshold_days=3650,
display_date_correction="INTERVAL '1 month' - INTERVAL '1 day'"
)
]


def select_aggregate_config(delta_days: int) -> AggregateConfig:
"""
Select appropriate aggregate configuration based on time delta in days.
"""
for config in AggregateConfigurations:
if delta_days <= config.evaluate_threshold_days:
return config

class MetricsRegistry:
"""
Registry for statistical metrics and their SQLAlchemy expressions.
"""

def __init__(self):
self.metrics: Dict[str, Dict[str, Any]] = {
"sum_flux_over_uncertainty_squared": {
"method": self.sum_flux_over_uncertainty_squared,
"aggregate_column": "sum_flux_over_uncertainty_squared",
"description": "Sum of flux/uncertainty^2 for weighted calculations"
},
"sum_inverse_uncertainty_squared": {
"method": self.sum_inverse_uncertainty_squared,
"aggregate_column": "sum_inverse_uncertainty_squared",
"description": "Sum of 1/uncertainty^2 for weighted calculations"
},
"min_flux": {
"method": self.min_flux,
"aggregate_column": "min_flux",
"description": "Minimum flux value in the time period"
},
"max_flux": {
"method": self.max_flux,
"aggregate_column": "max_flux",
"description": "Maximum flux value in the time period"
},
"data_points": {
"method": self.data_points_count,
"aggregate_column": "data_points",
"description": "Number of data points in the time period"
},
"sum_flux": {
"method": self.sum_flux,
"aggregate_column": "sum_flux",
"description": "Sum of flux values in the time period"
},
"sum_flux_squared": {
"method": self.sum_flux_squared,
"aggregate_column": "sum_flux_squared",
"description": "Sum of squared flux values for variance calculation"
},
}
@staticmethod
def sum_flux_over_uncertainty_squared(table):
return func.sum(table.i_flux / (table.i_uncertainty * table.i_uncertainty))

@staticmethod
def sum_inverse_uncertainty_squared(table):
return func.sum(1 / (table.i_uncertainty * table.i_uncertainty))

@staticmethod
def min_flux(table):
return func.min(table.i_flux)

@staticmethod
def max_flux(table):
return func.max(table.i_flux)

@staticmethod
def data_points_count(table):
return func.count()

@staticmethod
def sum_flux(table):
return func.sum(table.i_flux)

@staticmethod
def sum_flux_squared(table):
return func.sum(table.i_flux * table.i_flux)

def get_continuous_aggregate_table(self, view_name: str):
"""
Generate table reference for specified aggregate table.
"""
base_columns = [
column('bucket'),
column('source_id'),
column('band_name')
]
metric_columns = [column(metric["aggregate_column"]) for metric in self.metrics.values()]

return table(view_name, *(base_columns + metric_columns))

def get_metric_expressions(self, table_ref):
"""
Build SQLAlchemy expressions for all metrics.
"""
return [
metric["method"](table_ref).label(metric["aggregate_column"])
for metric in self.metrics.values()
]


class ContinuousAggregateBuilder:
"""
Builds TimescaleDB continuous aggregates.
"""

def __init__(self, metrics_registry: MetricsRegistry, config: AggregateConfig):
self.metrics_registry = metrics_registry
self.config = config

def build_aggregate_query(self, engine):
ftable = FluxMeasurementTable
bucket = func.time_bucket(text(f"INTERVAL '{self.config.bucket_interval}'"), ftable.time).label("bucket")
group_cols = [bucket, ftable.source_id, ftable.band_name]

metric_exprs = self.metrics_registry.get_metric_expressions(ftable)

select_query = (
select(
bucket,
ftable.source_id,
ftable.band_name,
*metric_exprs,
)
.select_from(ftable)
.group_by(*group_cols)
)

return select_query.compile(engine, compile_kwargs={"literal_binds": True}).string

def get_create_view_sql(self, select_query: str) -> str:
"""
Build CREATE MATERIALIZED VIEW statement for the aggregate.
"""

return f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS {self.config.view_name}
WITH (
timescaledb.continuous,
timescaledb.materialized_only = false
) AS
{select_query}
WITH DATA;
"""

def get_refresh_policy_sql(self) -> text:
"""
Build refresh policy SQL for the continuous aggregate.
"""
return text("""
SELECT add_continuous_aggregate_policy(
:view_name::regclass,
start_offset => :start_offset::INTERVAL,
end_offset => :end_offset::INTERVAL,
schedule_interval => :schedule_interval::INTERVAL
)
""")

def get_retention_policy_sql(self) -> text:
"""
Build retention policy SQL for continuous aggregate.
"""
return text("""
SELECT add_retention_policy(
:view_name::regclass,
drop_after => :drop_after::INTERVAL,
schedule_interval => :schedule_interval::INTERVAL
)
""")

def create_aggregate(self, session: Session) -> None:
engine = session.bind
select_query = self.build_aggregate_query(engine)
create_view_sql = self.get_create_view_sql(select_query)

refresh_params = {
"view_name": self.config.view_name,
"start_offset": self.config.refresh_start_offset,
"end_offset": self.config.refresh_end_offset,
"schedule_interval": self.config.refresh_schedule_interval
}

retention_params = {
"view_name": self.config.view_name,
"drop_after": self.config.drop_after_interval,
"schedule_interval": self.config.drop_schedule_interval
}

with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
conn.execute(text(create_view_sql))
conn.execute(self.get_refresh_policy_sql(), refresh_params)
conn.execute(self.get_retention_policy_sql(), retention_params)



METRICS_REGISTRY = MetricsRegistry()


def create_continuous_aggregates(session: Session) -> None:
"""
Create all continuous aggregates (daily, weekly, monthly).
"""
for config in AggregateConfigurations:
builder = ContinuousAggregateBuilder(METRICS_REGISTRY, config)
builder.create_aggregate(session)


def refresh_continuous_aggregates(session: Session) -> None:
"""
Manually refresh all continuous aggregates for historical data.
"""
engine = session.get_bind()

with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
for config in AggregateConfigurations:
conn.execute(text(f"CALL refresh_continuous_aggregate('{config.view_name}', NULL, NULL)"))
Loading