Skip to content

Adding Database side analysis functions #3

Open
diveshjain-phy wants to merge 54 commits intomainfrom
database_side_analysis
Open

Adding Database side analysis functions #3
diveshjain-phy wants to merge 54 commits intomainfrom
database_side_analysis

Conversation

@diveshjain-phy
Copy link
Member

@diveshjain-phy
Copy link
Member Author

diveshjain-phy commented Aug 27, 2025

@JBorrow, this is an ongoing work to address the analysis function issue on lightserve. It took some time to understand and resolve the primary key and foreign key requirements of the tables when creating hypertables. Once I’ve added the analysis functions, I’ll request a review.

@JBorrow
Copy link
Member

JBorrow commented Aug 27, 2025

Cool! So this actually works with timescsaledb like this?

@diveshjain-phy
Copy link
Member Author

Yes, seems like working. Had to add 'time' as a primary key in FluxMeasurement Table as it was a requirement for creating hypertables chunked in time. Then had to carry the changes forward.

@JBorrow
Copy link
Member

JBorrow commented Aug 27, 2025

Very interesting that it requires time as a primary key... As long as it's ok with a composite primary key with the id field we're ok, otherwise we might need to rethink things.

@diveshjain-phy
Copy link
Member Author

diveshjain-phy commented Aug 27, 2025

Yes I checked that works.

@JBorrow
Copy link
Member

JBorrow commented Aug 27, 2025

Very interesting. Keep exploring this direction... Do we need a separate table for each time range? Or does timescaledb have some functionality for arbrtitrary time ranges too?

@diveshjain-phy
Copy link
Member Author

As far as I have read, we don't need to worry about separate tables for different time ranges. timescaledb uses one table and automatically partitions data into time-based chunks. When we query any time range, it automatically finds the relevant chunks.

@JBorrow
Copy link
Member

JBorrow commented Aug 27, 2025

Ah I understand, so the chunk_time_interval => INTERVAL '6 months', is more of an optimization thing?

@diveshjain-phy
Copy link
Member Author

diveshjain-phy commented Aug 28, 2025

Yes. Best practice is to set chunk_time_interval so that one chunk of data takes up 25% of RAM. Most examples show 7-14 days as starting points, with TimescaleDB's default being 7 days.

@diveshjain-phy
Copy link
Member Author

Right now I have set the aggregate bucketing at 1 month. This way every row in the table corresponds to the month start, so any query range includes entire months whose buckets start inside that range. For example, asking for 1 Mar–2 Sep returns the September bucket that starts on 1 Sep, and because that bucket covers the whole month, the results run through 30 Sep. If you don’t anticipate any issues, we can combine information from the Aggregate Table and Raw Table for accurate results. Alternatively, we can set shorter buckets, but this requires optimisation unless we know what users are comfortable with.

@diveshjain-phy
Copy link
Member Author

diveshjain-phy commented Oct 6, 2025

@JBorrow I've implemented performance tests for the aggregate statistics endpoints to compare continuous aggregates vs raw queries:

  • /analysis/aggregate/{source_id}/{band_name} (with continuous aggregates)
  • /analysis/wo_ca/aggregate/{source_id}/{band_name} (without continuous aggregates)
============================================================
Testing: GET http://localhost:8000/analysis/aggregate/1/f145
============================================================
Mean:12.94 ms
std:7.83 ms
min:8.40 ms
max:117.82 ms

============================================================
Testing: GET http://localhost:8000/analysis/wo_ca/aggregate/1/f145
============================================================
Mean:68.66 ms
std:56.27 ms
min:57.14 ms
max:2525.97 ms

Ratio of mean times with aggregate to without aggregate calls: 0.18845746342388867

Is it the right way to approach the tests?

perf_test.py

@diveshjain-phy
Copy link
Member Author

@JBorrow

In addition to the client and models layers, I have added the storage layer to the codebase. Backend can be specified via Settings in config.py. The client layer has been fully migrated and tested, with the exception of cutout operations which remain on the sqlalchemy implementation. Client interaction with storage layer is storage agnostic. cli tools (setup.py, ephemeral.py), and simulations work with the new backend.

Copy link
Member

@JBorrow JBorrow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, some minor comments on the postgres setup.

SOURCES_TABLE = """
CREATE TABLE IF NOT EXISTS sources (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VARCHAR confers no performance improvement over TEXT and is more limiting.

Comment on lines +23 to +25
name VARCHAR(50) PRIMARY KEY,
telescope VARCHAR(100) NOT NULL,
instrument VARCHAR(100) NOT NULL,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments as above.

Comment on lines +84 to +141
async def get_band_data(self, source_id: int, band_name: str) -> LightcurveBandData:
"""
Get all measurements as arrays using database-side aggregation.
"""
query = """
SELECT
COALESCE(ARRAY_AGG(id ORDER BY time), ARRAY[]::INTEGER[]) as ids,
COALESCE(ARRAY_AGG(time ORDER BY time), ARRAY[]::TIMESTAMPTZ[]) as times,
COALESCE(ARRAY_AGG(ra ORDER BY time), ARRAY[]::DOUBLE PRECISION[]) as ra,
COALESCE(ARRAY_AGG(dec ORDER BY time), ARRAY[]::DOUBLE PRECISION[]) as dec,
COALESCE(ARRAY_AGG(ra_uncertainty ORDER BY time), ARRAY[]::DOUBLE PRECISION[]) as ra_uncertainty,
COALESCE(ARRAY_AGG(dec_uncertainty ORDER BY time), ARRAY[]::DOUBLE PRECISION[]) as dec_uncertainty,
COALESCE(ARRAY_AGG(i_flux ORDER BY time), ARRAY[]::DOUBLE PRECISION[]) as i_flux,
COALESCE(ARRAY_AGG(i_uncertainty ORDER BY time), ARRAY[]::DOUBLE PRECISION[]) as i_uncertainty
FROM flux_measurements
WHERE source_id = %(source_id)s AND band_name = %(band_name)s
"""

async with self.conn.cursor(row_factory=dict_row) as cur:
await cur.execute(query, {"source_id": source_id, "band_name": band_name})
row = await cur.fetchone()
return LightcurveBandData(**row)

async def get_time_range(
self,
source_id: int,
band_name: str,
start_time: datetime,
end_time: datetime
) -> LightcurveBandData:
"""
Get measurements in time.
"""
query = """
SELECT
COALESCE(ARRAY_AGG(id ORDER BY time), ARRAY[]::INTEGER[]) as ids,
COALESCE(ARRAY_AGG(time ORDER BY time), ARRAY[]::TIMESTAMPTZ[]) as times,
COALESCE(ARRAY_AGG(ra ORDER BY time), ARRAY[]::DOUBLE PRECISION[]) as ra,
COALESCE(ARRAY_AGG(dec ORDER BY time), ARRAY[]::DOUBLE PRECISION[]) as dec,
COALESCE(ARRAY_AGG(ra_uncertainty ORDER BY time), ARRAY[]::DOUBLE PRECISION[]) as ra_uncertainty,
COALESCE(ARRAY_AGG(dec_uncertainty ORDER BY time), ARRAY[]::DOUBLE PRECISION[]) as dec_uncertainty,
COALESCE(ARRAY_AGG(i_flux ORDER BY time), ARRAY[]::DOUBLE PRECISION[]) as i_flux,
COALESCE(ARRAY_AGG(i_uncertainty ORDER BY time), ARRAY[]::DOUBLE PRECISION[]) as i_uncertainty
FROM flux_measurements
WHERE source_id = %(source_id)s
AND band_name = %(band_name)s
AND time BETWEEN %(start_time)s AND %(end_time)s
"""

async with self.conn.cursor(row_factory=dict_row) as cur:
await cur.execute(query, {
"source_id": source_id,
"band_name": band_name,
"start_time": start_time,
"end_time": end_time
})
row = await cur.fetchone()
return LightcurveBandData(**row)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two could be the same function but with the time range values accepting None as an input maybe?

Comment on lines +30 to +49
FLUX_MEASUREMENTS_TABLE = """
CREATE TABLE IF NOT EXISTS flux_measurements (
id SERIAL PRIMARY KEY,
band_name VARCHAR(50) NOT NULL REFERENCES bands(name),
source_id INTEGER NOT NULL REFERENCES sources(id),
time TIMESTAMPTZ NOT NULL,
ra DOUBLE PRECISION NOT NULL CHECK (ra >= -180 AND ra <= 180),
dec DOUBLE PRECISION NOT NULL CHECK (dec >= -90 AND dec <= 90),
ra_uncertainty DOUBLE PRECISION,
dec_uncertainty DOUBLE PRECISION,
i_flux DOUBLE PRECISION NOT NULL,
i_uncertainty DOUBLE PRECISION,
extra JSONB
);

CREATE INDEX IF NOT EXISTS idx_flux_source_band_time
ON flux_measurements (source_id, band_name, time DESC);

CREATE INDEX IF NOT EXISTS idx_flux_time
ON flux_measurements (time DESC);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could consider partitioning the flux-measurements table by soruce_id? https://www.postgresql.org/docs/current/ddl-partitioning.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants