diff --git a/src/pgstac/migrations/pgstac--0.9.11--unreleased.sql b/src/pgstac/migrations/pgstac--0.9.11--unreleased.sql index e5733646..df3deb76 100644 --- a/src/pgstac/migrations/pgstac--0.9.11--unreleased.sql +++ b/src/pgstac/migrations/pgstac--0.9.11--unreleased.sql @@ -197,6 +197,8 @@ RETURNS timestamptz AS $$ END ; $$ LANGUAGE SQL IMMUTABLE STRICT; +create sequence "pgstac"."item_fragments_id_seq"; + drop function if exists "pgstac"."content_slim"(_item jsonb); @@ -214,6 +216,25 @@ drop index if exists "pgstac"."search_wheres_where"; drop table "pgstac"."search_wheres"; +create table "pgstac"."item_field_registry" ( + "collection" text not null, + "path" text not null, + "is_leaf" boolean default true, + "value_kinds" text[] default '{}'::text[], + "first_seen" timestamp with time zone not null default now(), + "last_seen" timestamp with time zone not null default now() +); + + +create table "pgstac"."item_fragments" ( + "id" bigint not null default nextval('item_fragments_id_seq'::regclass), + "collection" text not null, + "hash" text not null, + "content" jsonb not null, + "created_at" timestamp with time zone not null default now() +); + + create table "pgstac"."items_deleted_log" ( "id" bigint generated always as identity not null, "item_id" text not null, @@ -226,10 +247,34 @@ create table "pgstac"."items_deleted_log" ( ); +alter table "pgstac"."items" add column "assets" jsonb default '{}'::jsonb; + +alter table "pgstac"."items" add column "bbox" jsonb; + alter table "pgstac"."items" add column "content_hash" text not null default ''::text; +alter table "pgstac"."items" add column "eo_cloud_cover" double precision; + +alter table "pgstac"."items" add column "eo_snow_cover" double precision; + +alter table "pgstac"."items" add column "extra" jsonb; + +alter table "pgstac"."items" add column "fragment_id" bigint; + +alter table "pgstac"."items" add column "gsd" double precision; + +alter table "pgstac"."items" add column "links" jsonb default '[]'::jsonb; + alter table "pgstac"."items" add column "pgstac_updated_at" timestamp with time zone not null default now(); +alter table "pgstac"."items" add column "properties" jsonb default '{}'::jsonb; + +alter table "pgstac"."items" add column "view_off_nadir" double precision; + +alter table "pgstac"."items" add column "view_sun_azimuth" double precision; + +alter table "pgstac"."items" add column "view_sun_elevation" double precision; + alter table "pgstac"."searches" add column "context_count" bigint; alter table "pgstac"."searches" add column "created_at" timestamp with time zone default now(); @@ -242,6 +287,18 @@ alter table "pgstac"."searches" add column "statslastupdated" timestamp with tim alter table "pgstac"."searches" alter column "hash" drop expression; +alter sequence "pgstac"."item_fragments_id_seq" owned by "pgstac"."item_fragments"."id"; + +CREATE INDEX item_field_registry_path_idx ON pgstac.item_field_registry USING btree (path); + +CREATE UNIQUE INDEX item_field_registry_pkey ON pgstac.item_field_registry USING btree (collection, path); + +CREATE UNIQUE INDEX item_fragments_collection_hash_key ON pgstac.item_fragments USING btree (collection, hash); + +CREATE INDEX item_fragments_collection_idx ON pgstac.item_fragments USING btree (collection); + +CREATE UNIQUE INDEX item_fragments_pkey ON pgstac.item_fragments USING btree (id); + CREATE INDEX items_deleted_log_deleted_at_idx ON pgstac.items_deleted_log USING btree (deleted_at); CREATE UNIQUE INDEX items_deleted_log_pkey ON pgstac.items_deleted_log USING btree (id); @@ -250,12 +307,39 @@ CREATE INDEX searches_lastused_anon_idx ON pgstac.searches USING btree (lastused CREATE UNIQUE INDEX searches_name_key ON pgstac.searches USING btree (name); +alter table "pgstac"."item_field_registry" add constraint "item_field_registry_pkey" PRIMARY KEY using index "item_field_registry_pkey"; + +alter table "pgstac"."item_fragments" add constraint "item_fragments_pkey" PRIMARY KEY using index "item_fragments_pkey"; + alter table "pgstac"."items_deleted_log" add constraint "items_deleted_log_pkey" PRIMARY KEY using index "items_deleted_log_pkey"; +alter table "pgstac"."item_field_registry" add constraint "item_field_registry_collection_fkey" FOREIGN KEY ("collection") REFERENCES "pgstac"."collections"("id") ON DELETE CASCADE NOT VALID; + +alter table "pgstac"."item_field_registry" validate constraint "item_field_registry_collection_fkey"; + +alter table "pgstac"."item_fragments" add constraint "item_fragments_collection_fkey" FOREIGN KEY ("collection") REFERENCES "pgstac"."collections"("id") ON DELETE CASCADE NOT VALID; + +alter table "pgstac"."item_fragments" validate constraint "item_fragments_collection_fkey"; + +alter table "pgstac"."item_fragments" add constraint "item_fragments_collection_hash_key" UNIQUE using index "item_fragments_collection_hash_key"; + +-- items.fragment_id FK: added as VALID because PostgreSQL does not support +-- NOT VALID foreign keys on partitioned tables. +alter table "pgstac"."items" add constraint "items_fragment_id_fkey" FOREIGN KEY ("fragment_id") REFERENCES "pgstac"."item_fragments"("id"); + alter table "pgstac"."searches" add constraint "searches_name_key" UNIQUE using index "searches_name_key"; set check_function_bodies = off; +CREATE OR REPLACE FUNCTION pgstac.extract_fragment(content jsonb, excluded_keys text[] DEFAULT '{id,geometry,collection,type}'::text[]) + RETURNS jsonb + LANGUAGE sql + IMMUTABLE PARALLEL SAFE +AS $function$ + SELECT content - COALESCE(excluded_keys, '{id,geometry,collection,type}'::text[]); +$function$ +; + CREATE OR REPLACE FUNCTION pgstac.gc_anonymous_searches(retention_interval interval DEFAULT NULL::interval, conf jsonb DEFAULT NULL::jsonb) RETURNS bigint LANGUAGE sql @@ -352,6 +436,24 @@ END; $procedure$ ; +CREATE OR REPLACE FUNCTION pgstac.gc_fragments(_collection text DEFAULT NULL::text, retention_interval interval DEFAULT '90 days'::interval) + RETURNS TABLE(collection_id text, fragments_removed integer) + LANGUAGE sql +AS $function$ + WITH deleted AS ( + DELETE FROM item_fragments f + WHERE + (_collection IS NULL OR f.collection = _collection) + AND f.created_at < now() - retention_interval + AND NOT EXISTS (SELECT 1 FROM items i WHERE i.fragment_id = f.id) + RETURNING f.collection + ) + SELECT collection, count(*)::int + FROM deleted + GROUP BY collection; +$function$ +; + CREATE OR REPLACE FUNCTION pgstac.gc_search_caches(retention_interval interval DEFAULT NULL::interval, conf jsonb DEFAULT NULL::jsonb) RETURNS jsonb LANGUAGE sql @@ -364,6 +466,43 @@ AS $function$ $function$ ; +CREATE OR REPLACE FUNCTION pgstac.get_or_create_fragment(content jsonb, _collection text, excluded_keys text[] DEFAULT '{id,geometry,collection,type}'::text[]) + RETURNS bigint + LANGUAGE plpgsql +AS $function$ +DECLARE + frag_content jsonb; + frag_hash text; + frag_id bigint; +BEGIN + IF content IS NULL OR _collection IS NULL THEN + RETURN NULL; + END IF; + + frag_content := extract_fragment(content, excluded_keys); + frag_hash := pgstac_hash_fragment(frag_content); + + -- Insert-first: one round trip when the fragment is new. + WITH ins AS ( + INSERT INTO item_fragments (collection, hash, content) + VALUES (_collection, frag_hash, frag_content) + ON CONFLICT (collection, hash) DO NOTHING + RETURNING id + ) + SELECT id INTO frag_id FROM ins; + + -- Fallback SELECT: one extra round trip only on the conflict path. + IF frag_id IS NULL THEN + SELECT id INTO frag_id + FROM item_fragments + WHERE collection = _collection AND hash = frag_hash; + END IF; + + RETURN frag_id; +END; +$function$ +; + CREATE OR REPLACE FUNCTION pgstac.items_delete_log_trigger() RETURNS trigger LANGUAGE plpgsql @@ -405,6 +544,44 @@ END; $function$ ; +CREATE OR REPLACE FUNCTION pgstac.jsonb_field_rows(data jsonb, parent_path text DEFAULT ''::text, max_depth integer DEFAULT 20) + RETURNS TABLE(path text, is_leaf boolean, value_kind text) + LANGUAGE plpgsql + IMMUTABLE PARALLEL SAFE +AS $function$ +DECLARE + k text; + v jsonb; + current_path text; + jtype text; +BEGIN + IF data IS NULL OR max_depth <= 0 THEN + RETURN; + END IF; + jtype := jsonb_typeof(data); + IF jtype = 'object' THEN + FOR k, v IN SELECT * FROM jsonb_each(data) LOOP + current_path := CASE WHEN parent_path = '' THEN k ELSE parent_path || '.' || k END; + IF jsonb_typeof(v) IN ('object', 'array') THEN + RETURN QUERY SELECT current_path, FALSE, jsonb_typeof(v); + RETURN QUERY SELECT * FROM jsonb_field_rows(v, current_path, max_depth - 1); + ELSE + RETURN QUERY SELECT current_path, TRUE, jsonb_typeof(v); + END IF; + END LOOP; + ELSIF jtype = 'array' THEN + -- Walk array elements (e.g. arrays of nested objects); arrays of scalars + -- are already handled as leaves in the object branch above. + FOR v IN SELECT jsonb_array_elements(data) LOOP + IF jsonb_typeof(v) = 'object' THEN + RETURN QUERY SELECT * FROM jsonb_field_rows(v, parent_path, max_depth - 1); + END IF; + END LOOP; + END IF; +END; +$function$ +; + CREATE OR REPLACE FUNCTION pgstac.name_search(_search jsonb, _name text, _metadata jsonb DEFAULT '{}'::jsonb) RETURNS searches LANGUAGE plpgsql @@ -440,6 +617,15 @@ AS $function$ $function$ ; +CREATE OR REPLACE FUNCTION pgstac.pgstac_hash_fragment(fragment jsonb) + RETURNS text + LANGUAGE sql + IMMUTABLE PARALLEL SAFE +AS $function$ +SELECT pgstac_hash(fragment::text); +$function$ +; + CREATE OR REPLACE FUNCTION pgstac.pin_search(_name text) RETURNS searches LANGUAGE plpgsql @@ -465,6 +651,22 @@ END; $function$ ; +CREATE OR REPLACE FUNCTION pgstac.refresh_field_registry(_collection text DEFAULT NULL::text, retention_interval interval DEFAULT '90 days'::interval) + RETURNS TABLE(collection_id text, expired_paths integer) + LANGUAGE sql +AS $function$ + WITH deleted AS ( + DELETE FROM item_field_registry + WHERE (_collection IS NULL OR collection = _collection) + AND last_seen < now() - retention_interval + RETURNING collection + ) + SELECT collection, count(*)::int + FROM deleted + GROUP BY collection; +$function$ +; + CREATE OR REPLACE FUNCTION pgstac.rename_search(_old_name text, _new_name text) RETURNS searches LANGUAGE plpgsql @@ -681,6 +883,117 @@ END; $function$ ; +CREATE OR REPLACE FUNCTION pgstac.update_field_registry_from_items(_collection text) + RETURNS TABLE(registered_paths integer, rows_processed integer) + LANGUAGE plpgsql + SECURITY DEFINER +AS $function$ +DECLARE + est_rows bigint; + nrows int; + npaths int; +BEGIN + -- Sum reltuples across all partitions for this collection. + -- reltuples can be -1 (never analyzed); treat negative values as zero. + SELECT COALESCE(sum(GREATEST(c.reltuples::bigint, 0)), 0) INTO est_rows + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'pgstac' + AND c.relkind = 'r' + AND c.relname LIKE '_items_%' + AND c.relname LIKE '%' || regexp_replace(_collection, '[^a-zA-Z0-9_-]', '', 'g') || '%'; + + IF est_rows > 10000 THEN + -- Large collection: use statistical sampling to avoid full seq-scan. + WITH sampled AS ( + SELECT content FROM items TABLESAMPLE BERNOULLI(5) WHERE collection = _collection + ), + upserted AS ( + INSERT INTO item_field_registry (collection, path, is_leaf, value_kinds, first_seen, last_seen) + SELECT + _collection, + r.path, + bool_and(r.is_leaf) AS is_leaf, + array_agg(DISTINCT r.value_kind) FILTER (WHERE r.value_kind IS NOT NULL) AS value_kinds, + now(), now() + FROM sampled + CROSS JOIN LATERAL jsonb_field_rows(content) AS r(path, is_leaf, value_kind) + GROUP BY r.path + ON CONFLICT (collection, path) DO UPDATE SET + is_leaf = EXCLUDED.is_leaf, + value_kinds = ( + SELECT array_agg(DISTINCT v) + FROM unnest(item_field_registry.value_kinds || EXCLUDED.value_kinds) t(v) + ), + last_seen = now() + RETURNING 1 + ) + SELECT + (SELECT count(*)::int FROM upserted), + (SELECT count(*)::int FROM sampled) + INTO npaths, nrows; + ELSE + -- Small collection: process up to 1000 rows to avoid BERNOULLI returning 0 rows. + WITH sampled AS ( + SELECT content FROM items WHERE collection = _collection LIMIT 1000 + ), + upserted AS ( + INSERT INTO item_field_registry (collection, path, is_leaf, value_kinds, first_seen, last_seen) + SELECT + _collection, + r.path, + bool_and(r.is_leaf) AS is_leaf, + array_agg(DISTINCT r.value_kind) FILTER (WHERE r.value_kind IS NOT NULL) AS value_kinds, + now(), now() + FROM sampled + CROSS JOIN LATERAL jsonb_field_rows(content) AS r(path, is_leaf, value_kind) + GROUP BY r.path + ON CONFLICT (collection, path) DO UPDATE SET + is_leaf = EXCLUDED.is_leaf, + value_kinds = ( + SELECT array_agg(DISTINCT v) + FROM unnest(item_field_registry.value_kinds || EXCLUDED.value_kinds) t(v) + ), + last_seen = now() + RETURNING 1 + ) + SELECT + (SELECT count(*)::int FROM upserted), + (SELECT count(*)::int FROM sampled) + INTO npaths, nrows; + END IF; + + RETURN QUERY SELECT npaths, nrows; +END; +$function$ +; + +CREATE OR REPLACE FUNCTION pgstac.update_field_registry_from_sample(_collection text, item_contents jsonb[]) + RETURNS void + LANGUAGE sql +AS $function$ + INSERT INTO item_field_registry (collection, path, is_leaf, value_kinds, first_seen, last_seen) + SELECT + _collection, + r.path, + bool_and(r.is_leaf) AS is_leaf, + array_agg(DISTINCT r.value_kind) FILTER (WHERE r.value_kind IS NOT NULL) AS value_kinds, + now(), + now() + FROM unnest(item_contents) AS item(content) + CROSS JOIN LATERAL jsonb_field_rows(item.content) AS r(path, is_leaf, value_kind) + GROUP BY r.path + ON CONFLICT (collection, path) DO UPDATE SET + is_leaf = EXCLUDED.is_leaf, + value_kinds = ( + SELECT array_agg(DISTINCT v) + FROM unnest(item_field_registry.value_kinds || EXCLUDED.value_kinds) t(v) + ), + last_seen = now() + ; +$function$ +; + CREATE OR REPLACE FUNCTION pgstac.where_stats(inhash text, inwhere text, updatestats boolean DEFAULT false, conf jsonb DEFAULT NULL::jsonb) RETURNS searches LANGUAGE plpgsql @@ -814,6 +1127,8 @@ CREATE OR REPLACE FUNCTION pgstac.content_dehydrate(content jsonb) AS $function$ DECLARE out items; + props jsonb; + base_item jsonb; BEGIN out.id := content->>'id'; out.geometry := stac_geom(content); @@ -822,17 +1137,46 @@ BEGIN out.end_datetime := stac_end_datetime(content); out.pgstac_updated_at := now(); out.content_hash := encode(sha256(content::text::bytea), 'hex'); + + base_item := collection_base_item(content->>'collection'); + props := content->'properties'; + + -- Split columns: dedicated storage for standard top-level STAC fields. + -- These enable index-only scans on promoted queryables and avoid JSONB parse + -- on the hot SELECT path once the legacy content column is retired. + out.bbox := content->'bbox'; + out.links := COALESCE(content->'links', '[]'::jsonb); + out.assets := COALESCE(content->'assets', '{}'::jsonb); + out.properties := COALESCE(props, '{}'::jsonb); + -- extra: non-standard top-level fields not in id/geometry/collection/type/bbox/links/assets/properties + out.extra := content - '{id,geometry,collection,type,bbox,links,assets,properties}'::text[]; + + -- Promoted queryable columns: direct float8 storage avoids JSONB parse on range queries. + out.eo_cloud_cover := (props->>'eo:cloud_cover')::float8; + out.eo_snow_cover := (props->>'eo:snow_cover')::float8; + out.gsd := (props->>'gsd')::float8; + out.view_off_nadir := (props->>'view:off_nadir')::float8; + out.view_sun_azimuth := (props->>'view:sun_azimuth')::float8; + out.view_sun_elevation := (props->>'view:sun_elevation')::float8; + + -- Legacy content column: kept for backwards compatibility with clients that + -- read items.content directly. Contains all fields except id/geometry/collection/type, + -- with base_item fields stripped out for dedup storage. + -- NOTE: content_hash above hashes the raw incoming JSONB (pre-strip), which is + -- intentional for change detection; it differs from the hash produced by + -- items_touch_triggerfunc (which hashes the hydrated form on UPDATE). out.content := strip_jsonb( content - '{id,geometry,collection,type}'::text[], - collection_base_item(content->>'collection') + base_item ) - '{id,geometry,collection,type}'::text[]; + out.private := null; RETURN out; END; $function$ ; -CREATE TRIGGER items_before_update_trigger BEFORE UPDATE ON pgstac.items FOR EACH ROW EXECUTE FUNCTION items_touch_triggerfunc(); +CREATE TRIGGER items_before_update_trigger BEFORE UPDATE ON pgstac.items FOR EACH ROW WHEN (((old.content IS DISTINCT FROM new.content) OR (old.bbox IS DISTINCT FROM new.bbox) OR (old.links IS DISTINCT FROM new.links) OR (old.assets IS DISTINCT FROM new.assets) OR (old.properties IS DISTINCT FROM new.properties) OR (old.extra IS DISTINCT FROM new.extra))) EXECUTE FUNCTION items_touch_triggerfunc(); CREATE TRIGGER items_delete_log_after_delete_trigger AFTER DELETE ON pgstac.items REFERENCING OLD TABLE AS old_rows FOR EACH STATEMENT EXECUTE FUNCTION items_delete_log_trigger(); @@ -843,6 +1187,8 @@ CREATE OR REPLACE FUNCTION pgstac.content_dehydrate(content jsonb) AS $function$ DECLARE out items; + props jsonb; + base_item jsonb; BEGIN out.id := content->>'id'; out.geometry := stac_geom(content); @@ -851,16 +1197,211 @@ BEGIN out.end_datetime := stac_end_datetime(content); out.pgstac_updated_at := now(); out.content_hash := encode(sha256(content::text::bytea), 'hex'); + + base_item := collection_base_item(content->>'collection'); + props := content->'properties'; + + -- Split columns: dedicated storage for standard top-level STAC fields. + -- These enable index-only scans on promoted queryables and avoid JSONB parse + -- on the hot SELECT path once the legacy content column is retired. + out.bbox := content->'bbox'; + out.links := COALESCE(content->'links', '[]'::jsonb); + out.assets := COALESCE(content->'assets', '{}'::jsonb); + out.properties := COALESCE(props, '{}'::jsonb); + -- extra: non-standard top-level fields not in id/geometry/collection/type/bbox/links/assets/properties + out.extra := content - '{id,geometry,collection,type,bbox,links,assets,properties}'::text[]; + + -- Promoted queryable columns: direct float8 storage avoids JSONB parse on range queries. + out.eo_cloud_cover := (props->>'eo:cloud_cover')::float8; + out.eo_snow_cover := (props->>'eo:snow_cover')::float8; + out.gsd := (props->>'gsd')::float8; + out.view_off_nadir := (props->>'view:off_nadir')::float8; + out.view_sun_azimuth := (props->>'view:sun_azimuth')::float8; + out.view_sun_elevation := (props->>'view:sun_elevation')::float8; + + -- Legacy content column: kept for backwards compatibility with clients that + -- read items.content directly. Contains all fields except id/geometry/collection/type, + -- with base_item fields stripped out for dedup storage. + -- NOTE: content_hash above hashes the raw incoming JSONB (pre-strip), which is + -- intentional for change detection; it differs from the hash produced by + -- items_touch_triggerfunc (which hashes the hydrated form on UPDATE). out.content := strip_jsonb( content - '{id,geometry,collection,type}'::text[], - collection_base_item(content->>'collection') + base_item ) - '{id,geometry,collection,type}'::text[]; + out.private := null; RETURN out; END; $function$ ; +CREATE OR REPLACE FUNCTION pgstac.content_hydrate(_item items, _collection collections, fields jsonb DEFAULT '{}'::jsonb) + RETURNS jsonb + LANGUAGE plpgsql + STABLE PARALLEL SAFE +AS $function$ +DECLARE + geom jsonb; + output jsonb; + content jsonb; +BEGIN + IF include_field('geometry', fields) THEN + geom := ST_ASGeoJson(_item.geometry, 20)::jsonb; + END IF; + + IF _item.fragment_id IS NOT NULL THEN + -- Preferred path: reconstruct item from split columns. + -- fragment_id IS NOT NULL is the canonical indicator that split columns + -- are populated; checking a nullable bigint is cheaper than a JSONB equality. + content := jsonb_build_object( + 'id', _item.id, + 'geometry', geom, + 'collection', _item.collection, + 'type', 'Feature' + ); + IF _item.bbox IS NOT NULL THEN + content := content || jsonb_build_object('bbox', _item.bbox); + END IF; + IF _item.links IS NOT NULL THEN + content := content || jsonb_build_object('links', _item.links); + END IF; + IF _item.assets IS NOT NULL THEN + content := content || jsonb_build_object('assets', _item.assets); + END IF; + IF _item.properties IS NOT NULL THEN + content := content || jsonb_build_object('properties', _item.properties); + END IF; + IF _item.extra IS NOT NULL THEN + content := content || _item.extra; + END IF; + ELSE + -- Legacy fallback: reconstruct from the content column (pre-v0.10 rows). + content := jsonb_build_object( + 'id', _item.id, + 'geometry', geom, + 'collection', _item.collection, + 'type', 'Feature' + ) || _item.content; + END IF; + + output := content_hydrate(content, _collection.base_item, fields); + RETURN output; +END; +$function$ +; + +CREATE OR REPLACE FUNCTION pgstac.items_staging_triggerfunc() + RETURNS trigger + LANGUAGE plpgsql +AS $function$ +DECLARE + p record; + _partitions text[]; + part text; + ts timestamptz := clock_timestamp(); + nrows int; +BEGIN + RAISE NOTICE 'Creating Partitions. %', clock_timestamp() - ts; + + FOR part IN WITH t AS ( + SELECT + n.content->>'collection' as collection, + stac_daterange(n.content->'properties') as dtr, + partition_trunc + FROM newdata n JOIN collections ON (n.content->>'collection'=collections.id) + ), p AS ( + SELECT + collection, + COALESCE(date_trunc(partition_trunc::text, lower(dtr)),'-infinity') as d, + tstzrange(min(lower(dtr)),max(lower(dtr)),'[]') as dtrange, + tstzrange(min(upper(dtr)),max(upper(dtr)),'[]') as edtrange + FROM t + GROUP BY 1,2 + ) SELECT check_partition(collection, dtrange, edtrange) FROM p LOOP + RAISE NOTICE 'Partition %', part; + END LOOP; + + RAISE NOTICE 'Creating temp table with data to be added. %', clock_timestamp() - ts; + DROP TABLE IF EXISTS tmpdata; + CREATE TEMP TABLE tmpdata ON COMMIT DROP AS + SELECT + (content_dehydrate(content)).* + FROM newdata; + GET DIAGNOSTICS nrows = ROW_COUNT; + RAISE NOTICE 'Added % rows to tmpdata. %', nrows, clock_timestamp() - ts; + + -- Batch fragment dedup: insert all unique fragments in one statement rather than + -- calling get_or_create_fragment() per row (which is O(N) round-trips). + -- pgstac_hash_fragment(content) is computed twice (once for insert, once for the + -- join update) but both calls are IMMUTABLE so the planner can CSE them; the net + -- cost is far lower than N individual PL/pgSQL function round-trips. + -- Concurrent inserts of identical fragments are safe: ON CONFLICT DO NOTHING means + -- both sides succeed with the same row; the join below finds it for either winner. + RAISE NOTICE 'Batch inserting fragments. %', clock_timestamp() - ts; + INSERT INTO item_fragments (collection, hash, content) + SELECT DISTINCT ON (collection, pgstac_hash_fragment(content)) + collection, + pgstac_hash_fragment(content) AS hash, + content + FROM tmpdata + WHERE content IS NOT NULL AND content != '{}'::jsonb + ON CONFLICT (collection, hash) DO NOTHING; + + RAISE NOTICE 'Assigning fragment_id. %', clock_timestamp() - ts; + UPDATE tmpdata t + SET fragment_id = f.id + FROM item_fragments f + WHERE f.collection = t.collection + AND f.hash = pgstac_hash_fragment(t.content) + AND t.content IS NOT NULL AND t.content != '{}'::jsonb; + + -- Queue registry sampling per collection (async via run_or_queue so it does not + -- block the ingest transaction). One queued call per distinct collection in the batch. + PERFORM run_or_queue(format('SELECT update_field_registry_from_items(%L);', c)) + FROM (SELECT DISTINCT collection FROM tmpdata) AS cte(c); + + RAISE NOTICE 'Doing the insert. %', clock_timestamp() - ts; + IF TG_TABLE_NAME = 'items_staging' THEN + INSERT INTO items + SELECT * FROM tmpdata; + GET DIAGNOSTICS nrows = ROW_COUNT; + RAISE NOTICE 'Inserted % rows to items. %', nrows, clock_timestamp() - ts; + ELSIF TG_TABLE_NAME = 'items_staging_ignore' THEN + INSERT INTO items + SELECT * FROM tmpdata + ON CONFLICT DO NOTHING; + GET DIAGNOSTICS nrows = ROW_COUNT; + RAISE NOTICE 'Inserted % rows to items. %', nrows, clock_timestamp() - ts; + ELSIF TG_TABLE_NAME = 'items_staging_upsert' THEN + DELETE FROM items i USING tmpdata s + WHERE + i.id = s.id + AND i.collection = s.collection + AND i IS DISTINCT FROM s + ; + GET DIAGNOSTICS nrows = ROW_COUNT; + RAISE NOTICE 'Deleted % rows from items. %', nrows, clock_timestamp() - ts; + INSERT INTO items AS t + SELECT * FROM tmpdata + ON CONFLICT DO NOTHING; + GET DIAGNOSTICS nrows = ROW_COUNT; + RAISE NOTICE 'Inserted % rows to items. %', nrows, clock_timestamp() - ts; + END IF; + + RAISE NOTICE 'Deleting data from staging table. %', clock_timestamp() - ts; + -- Use TG_TABLE_NAME so the correct staging table is cleared. + -- The previous hard-coded 'DELETE FROM items_staging' was a bug that left + -- items_staging_ignore and items_staging_upsert un-cleared after processing. + EXECUTE format('DELETE FROM %I', TG_TABLE_NAME); + RAISE NOTICE 'Done. %', clock_timestamp() - ts; + + RETURN NULL; + +END; +$function$ +; + CREATE OR REPLACE FUNCTION pgstac.search(_search jsonb DEFAULT '{}'::jsonb) RETURNS jsonb LANGUAGE plpgsql diff --git a/src/pgstac/migrations/pgstac--unreleased.sql b/src/pgstac/migrations/pgstac--unreleased.sql index 522595a7..64cd5fa3 100644 --- a/src/pgstac/migrations/pgstac--unreleased.sql +++ b/src/pgstac/migrations/pgstac--unreleased.sql @@ -2064,6 +2064,17 @@ $$ LANGUAGE PLPGSQL STABLE STRICT; -- END FRAGMENT: 002b_cql.sql -- BEGIN FRAGMENT: 003a_items.sql +-- Item fragments: deduplicated part of item content (shared across items in a collection) +CREATE TABLE IF NOT EXISTS item_fragments ( + id bigserial PRIMARY KEY, + collection text NOT NULL REFERENCES collections(id) ON DELETE CASCADE, + hash text NOT NULL, + content jsonb NOT NULL, + created_at timestamptz NOT NULL DEFAULT now(), + UNIQUE (collection, hash) +); +CREATE INDEX IF NOT EXISTS item_fragments_collection_idx ON item_fragments (collection); + CREATE TABLE items ( id text NOT NULL, geometry geometry NOT NULL, @@ -2073,7 +2084,21 @@ CREATE TABLE items ( pgstac_updated_at timestamptz NOT NULL DEFAULT now(), content_hash text NOT NULL DEFAULT '', content JSONB NOT NULL, - private jsonb + private jsonb, + -- Split columns (populated from v0.10+; item_fragments must exist first) + fragment_id bigint REFERENCES item_fragments(id), + bbox jsonb, + links jsonb DEFAULT '[]', + assets jsonb DEFAULT '{}', + properties jsonb DEFAULT '{}', + extra jsonb, + -- Promoted queryable columns (redundant copies for index-only scans) + eo_cloud_cover float8, + eo_snow_cover float8, + gsd float8, + view_off_nadir float8, + view_sun_azimuth float8, + view_sun_elevation float8 ) PARTITION BY LIST (collection) ; @@ -2090,6 +2115,18 @@ CREATE TABLE IF NOT EXISTS items_deleted_log ( ); CREATE INDEX IF NOT EXISTS items_deleted_log_deleted_at_idx ON items_deleted_log (deleted_at); +-- Field registry: tracks which JSON paths exist in each collection (for queryables) +CREATE TABLE IF NOT EXISTS item_field_registry ( + collection text NOT NULL REFERENCES collections(id) ON DELETE CASCADE, + path text NOT NULL, + is_leaf boolean DEFAULT true, + value_kinds text[] DEFAULT '{}', + first_seen timestamptz NOT NULL DEFAULT now(), + last_seen timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (collection, path) +); +CREATE INDEX IF NOT EXISTS item_field_registry_path_idx ON item_field_registry (path); + CREATE INDEX "datetime_idx" ON items USING BTREE (datetime DESC, end_datetime ASC); CREATE INDEX "geometry_idx" ON items USING GIST (geometry); @@ -2145,9 +2182,19 @@ $$ LANGUAGE PLPGSQL SECURITY DEFINER; DROP TRIGGER IF EXISTS items_before_upsert_trigger ON items; DROP TRIGGER IF EXISTS items_before_update_trigger ON items; +-- WHEN guard: skip the expensive content_hydrate hash recomputation when only +-- non-content fields change (e.g. fragment_id assignment, pgstac_updated_at). CREATE TRIGGER items_before_update_trigger BEFORE UPDATE ON items FOR EACH ROW +WHEN ( + OLD.content IS DISTINCT FROM NEW.content + OR OLD.bbox IS DISTINCT FROM NEW.bbox + OR OLD.links IS DISTINCT FROM NEW.links + OR OLD.assets IS DISTINCT FROM NEW.assets + OR OLD.properties IS DISTINCT FROM NEW.properties + OR OLD.extra IS DISTINCT FROM NEW.extra +) EXECUTE FUNCTION items_touch_triggerfunc(); CREATE OR REPLACE FUNCTION items_delete_log_trigger() RETURNS TRIGGER AS $$ @@ -2183,6 +2230,8 @@ CREATE TRIGGER items_delete_log_after_delete_trigger CREATE OR REPLACE FUNCTION content_dehydrate(content jsonb) RETURNS items AS $$ DECLARE out items; + props jsonb; + base_item jsonb; BEGIN out.id := content->>'id'; out.geometry := stac_geom(content); @@ -2191,10 +2240,39 @@ BEGIN out.end_datetime := stac_end_datetime(content); out.pgstac_updated_at := now(); out.content_hash := encode(sha256(content::text::bytea), 'hex'); + + base_item := collection_base_item(content->>'collection'); + props := content->'properties'; + + -- Split columns: dedicated storage for standard top-level STAC fields. + -- These enable index-only scans on promoted queryables and avoid JSONB parse + -- on the hot SELECT path once the legacy content column is retired. + out.bbox := content->'bbox'; + out.links := COALESCE(content->'links', '[]'::jsonb); + out.assets := COALESCE(content->'assets', '{}'::jsonb); + out.properties := COALESCE(props, '{}'::jsonb); + -- extra: non-standard top-level fields not in id/geometry/collection/type/bbox/links/assets/properties + out.extra := content - '{id,geometry,collection,type,bbox,links,assets,properties}'::text[]; + + -- Promoted queryable columns: direct float8 storage avoids JSONB parse on range queries. + out.eo_cloud_cover := (props->>'eo:cloud_cover')::float8; + out.eo_snow_cover := (props->>'eo:snow_cover')::float8; + out.gsd := (props->>'gsd')::float8; + out.view_off_nadir := (props->>'view:off_nadir')::float8; + out.view_sun_azimuth := (props->>'view:sun_azimuth')::float8; + out.view_sun_elevation := (props->>'view:sun_elevation')::float8; + + -- Legacy content column: kept for backwards compatibility with clients that + -- read items.content directly. Contains all fields except id/geometry/collection/type, + -- with base_item fields stripped out for dedup storage. + -- NOTE: content_hash above hashes the raw incoming JSONB (pre-strip), which is + -- intentional for change detection; it differs from the hash produced by + -- items_touch_triggerfunc (which hashes the hydrated form on UPDATE). out.content := strip_jsonb( content - '{id,geometry,collection,type}'::text[], - collection_base_item(content->>'collection') + base_item ) - '{id,geometry,collection,type}'::text[]; + out.private := null; RETURN out; END; @@ -2254,25 +2332,49 @@ $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; CREATE OR REPLACE FUNCTION content_hydrate(_item items, _collection collections, fields jsonb DEFAULT '{}'::jsonb) RETURNS jsonb AS $$ DECLARE geom jsonb; - bbox jsonb; output jsonb; content jsonb; - base_item jsonb := _collection.base_item; BEGIN IF include_field('geometry', fields) THEN geom := ST_ASGeoJson(_item.geometry, 20)::jsonb; END IF; - output := content_hydrate( - jsonb_build_object( - 'id', _item.id, - 'geometry', geom, + + IF _item.fragment_id IS NOT NULL THEN + -- Preferred path: reconstruct item from split columns. + -- fragment_id IS NOT NULL is the canonical indicator that split columns + -- are populated; checking a nullable bigint is cheaper than a JSONB equality. + content := jsonb_build_object( + 'id', _item.id, + 'geometry', geom, 'collection', _item.collection, - 'type', 'Feature' - ) || _item.content, - _collection.base_item, - fields - ); + 'type', 'Feature' + ); + IF _item.bbox IS NOT NULL THEN + content := content || jsonb_build_object('bbox', _item.bbox); + END IF; + IF _item.links IS NOT NULL THEN + content := content || jsonb_build_object('links', _item.links); + END IF; + IF _item.assets IS NOT NULL THEN + content := content || jsonb_build_object('assets', _item.assets); + END IF; + IF _item.properties IS NOT NULL THEN + content := content || jsonb_build_object('properties', _item.properties); + END IF; + IF _item.extra IS NOT NULL THEN + content := content || _item.extra; + END IF; + ELSE + -- Legacy fallback: reconstruct from the content column (pre-v0.10 rows). + content := jsonb_build_object( + 'id', _item.id, + 'geometry', geom, + 'collection', _item.collection, + 'type', 'Feature' + ) || _item.content; + END IF; + output := content_hydrate(content, _collection.base_item, fields); RETURN output; END; $$ LANGUAGE PLPGSQL STABLE PARALLEL SAFE; @@ -2355,6 +2457,36 @@ BEGIN GET DIAGNOSTICS nrows = ROW_COUNT; RAISE NOTICE 'Added % rows to tmpdata. %', nrows, clock_timestamp() - ts; + -- Batch fragment dedup: insert all unique fragments in one statement rather than + -- calling get_or_create_fragment() per row (which is O(N) round-trips). + -- pgstac_hash_fragment(content) is computed twice (once for insert, once for the + -- join update) but both calls are IMMUTABLE so the planner can CSE them; the net + -- cost is far lower than N individual PL/pgSQL function round-trips. + -- Concurrent inserts of identical fragments are safe: ON CONFLICT DO NOTHING means + -- both sides succeed with the same row; the join below finds it for either winner. + RAISE NOTICE 'Batch inserting fragments. %', clock_timestamp() - ts; + INSERT INTO item_fragments (collection, hash, content) + SELECT DISTINCT ON (collection, pgstac_hash_fragment(content)) + collection, + pgstac_hash_fragment(content) AS hash, + content + FROM tmpdata + WHERE content IS NOT NULL AND content != '{}'::jsonb + ON CONFLICT (collection, hash) DO NOTHING; + + RAISE NOTICE 'Assigning fragment_id. %', clock_timestamp() - ts; + UPDATE tmpdata t + SET fragment_id = f.id + FROM item_fragments f + WHERE f.collection = t.collection + AND f.hash = pgstac_hash_fragment(t.content) + AND t.content IS NOT NULL AND t.content != '{}'::jsonb; + + -- Queue registry sampling per collection (async via run_or_queue so it does not + -- block the ingest transaction). One queued call per distinct collection in the batch. + PERFORM run_or_queue(format('SELECT update_field_registry_from_items(%L);', c)) + FROM (SELECT DISTINCT collection FROM tmpdata) AS cte(c); + RAISE NOTICE 'Doing the insert. %', clock_timestamp() - ts; IF TG_TABLE_NAME = 'items_staging' THEN INSERT INTO items @@ -2384,7 +2516,10 @@ BEGIN END IF; RAISE NOTICE 'Deleting data from staging table. %', clock_timestamp() - ts; - DELETE FROM items_staging; + -- Use TG_TABLE_NAME so the correct staging table is cleared. + -- The previous hard-coded 'DELETE FROM items_staging' was a bug that left + -- items_staging_ignore and items_staging_upsert un-cleared after processing. + EXECUTE format('DELETE FROM %I', TG_TABLE_NAME); RAISE NOTICE 'Done. %', clock_timestamp() - ts; RETURN NULL; @@ -2479,6 +2614,266 @@ UPDATE collections ) ; $$ LANGUAGE SQL; + +-- --------------------------------------------------------------------------- +-- Field Registry: walks JSONB item content to track which paths exist in each +-- collection. Used to auto-populate queryables and support schema inference. +-- --------------------------------------------------------------------------- + +-- jsonb_field_rows: Recursively walk a JSONB document and emit one row per field path. +-- max_depth guards against runaway recursion on pathologically nested documents. +CREATE OR REPLACE FUNCTION jsonb_field_rows( + data jsonb, + parent_path text DEFAULT '', + max_depth int DEFAULT 20 +) RETURNS TABLE (path text, is_leaf boolean, value_kind text) AS $$ +DECLARE + k text; + v jsonb; + current_path text; + jtype text; +BEGIN + IF data IS NULL OR max_depth <= 0 THEN + RETURN; + END IF; + jtype := jsonb_typeof(data); + IF jtype = 'object' THEN + FOR k, v IN SELECT * FROM jsonb_each(data) LOOP + current_path := CASE WHEN parent_path = '' THEN k ELSE parent_path || '.' || k END; + IF jsonb_typeof(v) IN ('object', 'array') THEN + RETURN QUERY SELECT current_path, FALSE, jsonb_typeof(v); + RETURN QUERY SELECT * FROM jsonb_field_rows(v, current_path, max_depth - 1); + ELSE + RETURN QUERY SELECT current_path, TRUE, jsonb_typeof(v); + END IF; + END LOOP; + ELSIF jtype = 'array' THEN + -- Walk array elements (e.g. arrays of nested objects); arrays of scalars + -- are already handled as leaves in the object branch above. + FOR v IN SELECT jsonb_array_elements(data) LOOP + IF jsonb_typeof(v) = 'object' THEN + RETURN QUERY SELECT * FROM jsonb_field_rows(v, parent_path, max_depth - 1); + END IF; + END LOOP; + END IF; +END; +$$ LANGUAGE PLPGSQL IMMUTABLE PARALLEL SAFE; + +-- update_field_registry_from_sample: UPSERT registry rows from a pre-selected array of +-- raw item content JSONBs. Callers supply the sample to decouple sampling strategy +-- from the registry write; merge value_kinds to accumulate observed types over time. +CREATE OR REPLACE FUNCTION update_field_registry_from_sample( + _collection text, + item_contents jsonb[] +) RETURNS void AS $$ + INSERT INTO item_field_registry (collection, path, is_leaf, value_kinds, first_seen, last_seen) + SELECT + _collection, + r.path, + bool_and(r.is_leaf) AS is_leaf, + array_agg(DISTINCT r.value_kind) FILTER (WHERE r.value_kind IS NOT NULL) AS value_kinds, + now(), + now() + FROM unnest(item_contents) AS item(content) + CROSS JOIN LATERAL jsonb_field_rows(item.content) AS r(path, is_leaf, value_kind) + GROUP BY r.path + ON CONFLICT (collection, path) DO UPDATE SET + is_leaf = EXCLUDED.is_leaf, + value_kinds = ( + SELECT array_agg(DISTINCT v) + FROM unnest(item_field_registry.value_kinds || EXCLUDED.value_kinds) t(v) + ), + last_seen = now() + ; +$$ LANGUAGE SQL VOLATILE; + +-- update_field_registry_from_items: Sample a live collection and UPSERT registry rows. +-- Uses TABLESAMPLE BERNOULLI(5) for large collections (>10k rows by pg_class estimate) +-- and LIMIT 1000 for smaller ones to avoid a full seq-scan for tiny collections. +-- pg_class.reltuples is an estimate (may be stale); its only role is threshold selection. +-- Returns (registered_paths, rows_processed) for observability. +CREATE OR REPLACE FUNCTION update_field_registry_from_items( + _collection text +) RETURNS TABLE (registered_paths int, rows_processed int) AS $$ +DECLARE + est_rows bigint; + nrows int; + npaths int; +BEGIN + -- Sum reltuples across all partitions for this collection. + -- reltuples can be -1 (never analyzed); treat negative values as zero. + SELECT COALESCE(sum(GREATEST(c.reltuples::bigint, 0)), 0) INTO est_rows + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'pgstac' + AND c.relkind = 'r' + AND c.relname LIKE '_items_%' + AND c.relname LIKE '%' || regexp_replace(_collection, '[^a-zA-Z0-9_-]', '', 'g') || '%'; + + IF est_rows > 10000 THEN + -- Large collection: use statistical sampling to avoid full seq-scan. + WITH sampled AS ( + SELECT content FROM items TABLESAMPLE BERNOULLI(5) WHERE collection = _collection + ), + upserted AS ( + INSERT INTO item_field_registry (collection, path, is_leaf, value_kinds, first_seen, last_seen) + SELECT + _collection, + r.path, + bool_and(r.is_leaf) AS is_leaf, + array_agg(DISTINCT r.value_kind) FILTER (WHERE r.value_kind IS NOT NULL) AS value_kinds, + now(), now() + FROM sampled + CROSS JOIN LATERAL jsonb_field_rows(content) AS r(path, is_leaf, value_kind) + GROUP BY r.path + ON CONFLICT (collection, path) DO UPDATE SET + is_leaf = EXCLUDED.is_leaf, + value_kinds = ( + SELECT array_agg(DISTINCT v) + FROM unnest(item_field_registry.value_kinds || EXCLUDED.value_kinds) t(v) + ), + last_seen = now() + RETURNING 1 + ) + SELECT + (SELECT count(*)::int FROM upserted), + (SELECT count(*)::int FROM sampled) + INTO npaths, nrows; + ELSE + -- Small collection: process up to 1000 rows to avoid BERNOULLI returning 0 rows. + WITH sampled AS ( + SELECT content FROM items WHERE collection = _collection LIMIT 1000 + ), + upserted AS ( + INSERT INTO item_field_registry (collection, path, is_leaf, value_kinds, first_seen, last_seen) + SELECT + _collection, + r.path, + bool_and(r.is_leaf) AS is_leaf, + array_agg(DISTINCT r.value_kind) FILTER (WHERE r.value_kind IS NOT NULL) AS value_kinds, + now(), now() + FROM sampled + CROSS JOIN LATERAL jsonb_field_rows(content) AS r(path, is_leaf, value_kind) + GROUP BY r.path + ON CONFLICT (collection, path) DO UPDATE SET + is_leaf = EXCLUDED.is_leaf, + value_kinds = ( + SELECT array_agg(DISTINCT v) + FROM unnest(item_field_registry.value_kinds || EXCLUDED.value_kinds) t(v) + ), + last_seen = now() + RETURNING 1 + ) + SELECT + (SELECT count(*)::int FROM upserted), + (SELECT count(*)::int FROM sampled) + INTO npaths, nrows; + END IF; + + RETURN QUERY SELECT npaths, nrows; +END; +$$ LANGUAGE PLPGSQL VOLATILE SECURITY DEFINER; + +-- refresh_field_registry: Expire stale registry entries that haven't been seen recently. +-- Intended for scheduled maintenance (e.g. pg_cron daily job). +-- Returns (collection, expired_paths) for each collection affected. +CREATE OR REPLACE FUNCTION refresh_field_registry( + _collection text DEFAULT NULL, + retention_interval interval DEFAULT '90 days' +) RETURNS TABLE (collection_id text, expired_paths int) AS $$ + WITH deleted AS ( + DELETE FROM item_field_registry + WHERE (_collection IS NULL OR collection = _collection) + AND last_seen < now() - retention_interval + RETURNING collection + ) + SELECT collection, count(*)::int + FROM deleted + GROUP BY collection; +$$ LANGUAGE SQL VOLATILE; + +-- Item Fragment Management functions +-- extract_fragment: Strip the per-item keys from content to get the dedup-eligible portion. +-- Pure SQL so PostgreSQL can inline and constant-fold it; avoid PLPGSQL wrapper overhead. +CREATE OR REPLACE FUNCTION extract_fragment( + content jsonb, + excluded_keys text[] DEFAULT '{id,geometry,collection,type}'::text[] +) RETURNS jsonb AS $$ + SELECT content - COALESCE(excluded_keys, '{id,geometry,collection,type}'::text[]); +$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; + +-- pgstac_hash_fragment: Hash a fragment content for dedup +CREATE OR REPLACE FUNCTION pgstac_hash_fragment(fragment jsonb) RETURNS text AS $$ +SELECT pgstac_hash(fragment::text); +$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; + +-- get_or_create_fragment: Look up or insert a fragment, returning its id. +-- Uses INSERT … ON CONFLICT … RETURNING to avoid a redundant pre-check SELECT; +-- only falls back to a SELECT when the conflict path suppresses the RETURNING row. +-- This is safe under concurrent inserts: two transactions racing to create the same +-- fragment both see ON CONFLICT DO NOTHING; the loser's RETURNING is empty so it +-- falls through to the SELECT which finds the winner's row. +CREATE OR REPLACE FUNCTION get_or_create_fragment( + content jsonb, + _collection text, + excluded_keys text[] DEFAULT '{id,geometry,collection,type}'::text[] +) RETURNS bigint AS $$ +DECLARE + frag_content jsonb; + frag_hash text; + frag_id bigint; +BEGIN + IF content IS NULL OR _collection IS NULL THEN + RETURN NULL; + END IF; + + frag_content := extract_fragment(content, excluded_keys); + frag_hash := pgstac_hash_fragment(frag_content); + + -- Insert-first: one round trip when the fragment is new. + WITH ins AS ( + INSERT INTO item_fragments (collection, hash, content) + VALUES (_collection, frag_hash, frag_content) + ON CONFLICT (collection, hash) DO NOTHING + RETURNING id + ) + SELECT id INTO frag_id FROM ins; + + -- Fallback SELECT: one extra round trip only on the conflict path. + IF frag_id IS NULL THEN + SELECT id INTO frag_id + FROM item_fragments + WHERE collection = _collection AND hash = frag_hash; + END IF; + + RETURN frag_id; +END; +$$ LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE; + +-- gc_fragments: Garbage collect orphaned fragments using a single set-based DELETE. +-- Replaces the previous per-collection FOR LOOP with a single statement that lets +-- the planner choose the optimal join/anti-join strategy across all collections. +-- The NOT EXISTS sub-select is evaluated per fragment; with an index on items.fragment_id +-- this is an efficient anti-join rather than a full seq-scan. +CREATE OR REPLACE FUNCTION gc_fragments( + _collection text DEFAULT NULL, + retention_interval interval DEFAULT '90 days' +) RETURNS TABLE ( + collection_id text, + fragments_removed int +) AS $$ + WITH deleted AS ( + DELETE FROM item_fragments f + WHERE + (_collection IS NULL OR f.collection = _collection) + AND f.created_at < now() - retention_interval + AND NOT EXISTS (SELECT 1 FROM items i WHERE i.fragment_id = f.id) + RETURNING f.collection + ) + SELECT collection, count(*)::int + FROM deleted + GROUP BY collection; +$$ LANGUAGE SQL VOLATILE PARALLEL UNSAFE; -- END FRAGMENT: 003a_items.sql -- BEGIN FRAGMENT: 003b_partitions.sql diff --git a/src/pgstac/pgstac.sql b/src/pgstac/pgstac.sql index 522595a7..64cd5fa3 100644 --- a/src/pgstac/pgstac.sql +++ b/src/pgstac/pgstac.sql @@ -2064,6 +2064,17 @@ $$ LANGUAGE PLPGSQL STABLE STRICT; -- END FRAGMENT: 002b_cql.sql -- BEGIN FRAGMENT: 003a_items.sql +-- Item fragments: deduplicated part of item content (shared across items in a collection) +CREATE TABLE IF NOT EXISTS item_fragments ( + id bigserial PRIMARY KEY, + collection text NOT NULL REFERENCES collections(id) ON DELETE CASCADE, + hash text NOT NULL, + content jsonb NOT NULL, + created_at timestamptz NOT NULL DEFAULT now(), + UNIQUE (collection, hash) +); +CREATE INDEX IF NOT EXISTS item_fragments_collection_idx ON item_fragments (collection); + CREATE TABLE items ( id text NOT NULL, geometry geometry NOT NULL, @@ -2073,7 +2084,21 @@ CREATE TABLE items ( pgstac_updated_at timestamptz NOT NULL DEFAULT now(), content_hash text NOT NULL DEFAULT '', content JSONB NOT NULL, - private jsonb + private jsonb, + -- Split columns (populated from v0.10+; item_fragments must exist first) + fragment_id bigint REFERENCES item_fragments(id), + bbox jsonb, + links jsonb DEFAULT '[]', + assets jsonb DEFAULT '{}', + properties jsonb DEFAULT '{}', + extra jsonb, + -- Promoted queryable columns (redundant copies for index-only scans) + eo_cloud_cover float8, + eo_snow_cover float8, + gsd float8, + view_off_nadir float8, + view_sun_azimuth float8, + view_sun_elevation float8 ) PARTITION BY LIST (collection) ; @@ -2090,6 +2115,18 @@ CREATE TABLE IF NOT EXISTS items_deleted_log ( ); CREATE INDEX IF NOT EXISTS items_deleted_log_deleted_at_idx ON items_deleted_log (deleted_at); +-- Field registry: tracks which JSON paths exist in each collection (for queryables) +CREATE TABLE IF NOT EXISTS item_field_registry ( + collection text NOT NULL REFERENCES collections(id) ON DELETE CASCADE, + path text NOT NULL, + is_leaf boolean DEFAULT true, + value_kinds text[] DEFAULT '{}', + first_seen timestamptz NOT NULL DEFAULT now(), + last_seen timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (collection, path) +); +CREATE INDEX IF NOT EXISTS item_field_registry_path_idx ON item_field_registry (path); + CREATE INDEX "datetime_idx" ON items USING BTREE (datetime DESC, end_datetime ASC); CREATE INDEX "geometry_idx" ON items USING GIST (geometry); @@ -2145,9 +2182,19 @@ $$ LANGUAGE PLPGSQL SECURITY DEFINER; DROP TRIGGER IF EXISTS items_before_upsert_trigger ON items; DROP TRIGGER IF EXISTS items_before_update_trigger ON items; +-- WHEN guard: skip the expensive content_hydrate hash recomputation when only +-- non-content fields change (e.g. fragment_id assignment, pgstac_updated_at). CREATE TRIGGER items_before_update_trigger BEFORE UPDATE ON items FOR EACH ROW +WHEN ( + OLD.content IS DISTINCT FROM NEW.content + OR OLD.bbox IS DISTINCT FROM NEW.bbox + OR OLD.links IS DISTINCT FROM NEW.links + OR OLD.assets IS DISTINCT FROM NEW.assets + OR OLD.properties IS DISTINCT FROM NEW.properties + OR OLD.extra IS DISTINCT FROM NEW.extra +) EXECUTE FUNCTION items_touch_triggerfunc(); CREATE OR REPLACE FUNCTION items_delete_log_trigger() RETURNS TRIGGER AS $$ @@ -2183,6 +2230,8 @@ CREATE TRIGGER items_delete_log_after_delete_trigger CREATE OR REPLACE FUNCTION content_dehydrate(content jsonb) RETURNS items AS $$ DECLARE out items; + props jsonb; + base_item jsonb; BEGIN out.id := content->>'id'; out.geometry := stac_geom(content); @@ -2191,10 +2240,39 @@ BEGIN out.end_datetime := stac_end_datetime(content); out.pgstac_updated_at := now(); out.content_hash := encode(sha256(content::text::bytea), 'hex'); + + base_item := collection_base_item(content->>'collection'); + props := content->'properties'; + + -- Split columns: dedicated storage for standard top-level STAC fields. + -- These enable index-only scans on promoted queryables and avoid JSONB parse + -- on the hot SELECT path once the legacy content column is retired. + out.bbox := content->'bbox'; + out.links := COALESCE(content->'links', '[]'::jsonb); + out.assets := COALESCE(content->'assets', '{}'::jsonb); + out.properties := COALESCE(props, '{}'::jsonb); + -- extra: non-standard top-level fields not in id/geometry/collection/type/bbox/links/assets/properties + out.extra := content - '{id,geometry,collection,type,bbox,links,assets,properties}'::text[]; + + -- Promoted queryable columns: direct float8 storage avoids JSONB parse on range queries. + out.eo_cloud_cover := (props->>'eo:cloud_cover')::float8; + out.eo_snow_cover := (props->>'eo:snow_cover')::float8; + out.gsd := (props->>'gsd')::float8; + out.view_off_nadir := (props->>'view:off_nadir')::float8; + out.view_sun_azimuth := (props->>'view:sun_azimuth')::float8; + out.view_sun_elevation := (props->>'view:sun_elevation')::float8; + + -- Legacy content column: kept for backwards compatibility with clients that + -- read items.content directly. Contains all fields except id/geometry/collection/type, + -- with base_item fields stripped out for dedup storage. + -- NOTE: content_hash above hashes the raw incoming JSONB (pre-strip), which is + -- intentional for change detection; it differs from the hash produced by + -- items_touch_triggerfunc (which hashes the hydrated form on UPDATE). out.content := strip_jsonb( content - '{id,geometry,collection,type}'::text[], - collection_base_item(content->>'collection') + base_item ) - '{id,geometry,collection,type}'::text[]; + out.private := null; RETURN out; END; @@ -2254,25 +2332,49 @@ $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; CREATE OR REPLACE FUNCTION content_hydrate(_item items, _collection collections, fields jsonb DEFAULT '{}'::jsonb) RETURNS jsonb AS $$ DECLARE geom jsonb; - bbox jsonb; output jsonb; content jsonb; - base_item jsonb := _collection.base_item; BEGIN IF include_field('geometry', fields) THEN geom := ST_ASGeoJson(_item.geometry, 20)::jsonb; END IF; - output := content_hydrate( - jsonb_build_object( - 'id', _item.id, - 'geometry', geom, + + IF _item.fragment_id IS NOT NULL THEN + -- Preferred path: reconstruct item from split columns. + -- fragment_id IS NOT NULL is the canonical indicator that split columns + -- are populated; checking a nullable bigint is cheaper than a JSONB equality. + content := jsonb_build_object( + 'id', _item.id, + 'geometry', geom, 'collection', _item.collection, - 'type', 'Feature' - ) || _item.content, - _collection.base_item, - fields - ); + 'type', 'Feature' + ); + IF _item.bbox IS NOT NULL THEN + content := content || jsonb_build_object('bbox', _item.bbox); + END IF; + IF _item.links IS NOT NULL THEN + content := content || jsonb_build_object('links', _item.links); + END IF; + IF _item.assets IS NOT NULL THEN + content := content || jsonb_build_object('assets', _item.assets); + END IF; + IF _item.properties IS NOT NULL THEN + content := content || jsonb_build_object('properties', _item.properties); + END IF; + IF _item.extra IS NOT NULL THEN + content := content || _item.extra; + END IF; + ELSE + -- Legacy fallback: reconstruct from the content column (pre-v0.10 rows). + content := jsonb_build_object( + 'id', _item.id, + 'geometry', geom, + 'collection', _item.collection, + 'type', 'Feature' + ) || _item.content; + END IF; + output := content_hydrate(content, _collection.base_item, fields); RETURN output; END; $$ LANGUAGE PLPGSQL STABLE PARALLEL SAFE; @@ -2355,6 +2457,36 @@ BEGIN GET DIAGNOSTICS nrows = ROW_COUNT; RAISE NOTICE 'Added % rows to tmpdata. %', nrows, clock_timestamp() - ts; + -- Batch fragment dedup: insert all unique fragments in one statement rather than + -- calling get_or_create_fragment() per row (which is O(N) round-trips). + -- pgstac_hash_fragment(content) is computed twice (once for insert, once for the + -- join update) but both calls are IMMUTABLE so the planner can CSE them; the net + -- cost is far lower than N individual PL/pgSQL function round-trips. + -- Concurrent inserts of identical fragments are safe: ON CONFLICT DO NOTHING means + -- both sides succeed with the same row; the join below finds it for either winner. + RAISE NOTICE 'Batch inserting fragments. %', clock_timestamp() - ts; + INSERT INTO item_fragments (collection, hash, content) + SELECT DISTINCT ON (collection, pgstac_hash_fragment(content)) + collection, + pgstac_hash_fragment(content) AS hash, + content + FROM tmpdata + WHERE content IS NOT NULL AND content != '{}'::jsonb + ON CONFLICT (collection, hash) DO NOTHING; + + RAISE NOTICE 'Assigning fragment_id. %', clock_timestamp() - ts; + UPDATE tmpdata t + SET fragment_id = f.id + FROM item_fragments f + WHERE f.collection = t.collection + AND f.hash = pgstac_hash_fragment(t.content) + AND t.content IS NOT NULL AND t.content != '{}'::jsonb; + + -- Queue registry sampling per collection (async via run_or_queue so it does not + -- block the ingest transaction). One queued call per distinct collection in the batch. + PERFORM run_or_queue(format('SELECT update_field_registry_from_items(%L);', c)) + FROM (SELECT DISTINCT collection FROM tmpdata) AS cte(c); + RAISE NOTICE 'Doing the insert. %', clock_timestamp() - ts; IF TG_TABLE_NAME = 'items_staging' THEN INSERT INTO items @@ -2384,7 +2516,10 @@ BEGIN END IF; RAISE NOTICE 'Deleting data from staging table. %', clock_timestamp() - ts; - DELETE FROM items_staging; + -- Use TG_TABLE_NAME so the correct staging table is cleared. + -- The previous hard-coded 'DELETE FROM items_staging' was a bug that left + -- items_staging_ignore and items_staging_upsert un-cleared after processing. + EXECUTE format('DELETE FROM %I', TG_TABLE_NAME); RAISE NOTICE 'Done. %', clock_timestamp() - ts; RETURN NULL; @@ -2479,6 +2614,266 @@ UPDATE collections ) ; $$ LANGUAGE SQL; + +-- --------------------------------------------------------------------------- +-- Field Registry: walks JSONB item content to track which paths exist in each +-- collection. Used to auto-populate queryables and support schema inference. +-- --------------------------------------------------------------------------- + +-- jsonb_field_rows: Recursively walk a JSONB document and emit one row per field path. +-- max_depth guards against runaway recursion on pathologically nested documents. +CREATE OR REPLACE FUNCTION jsonb_field_rows( + data jsonb, + parent_path text DEFAULT '', + max_depth int DEFAULT 20 +) RETURNS TABLE (path text, is_leaf boolean, value_kind text) AS $$ +DECLARE + k text; + v jsonb; + current_path text; + jtype text; +BEGIN + IF data IS NULL OR max_depth <= 0 THEN + RETURN; + END IF; + jtype := jsonb_typeof(data); + IF jtype = 'object' THEN + FOR k, v IN SELECT * FROM jsonb_each(data) LOOP + current_path := CASE WHEN parent_path = '' THEN k ELSE parent_path || '.' || k END; + IF jsonb_typeof(v) IN ('object', 'array') THEN + RETURN QUERY SELECT current_path, FALSE, jsonb_typeof(v); + RETURN QUERY SELECT * FROM jsonb_field_rows(v, current_path, max_depth - 1); + ELSE + RETURN QUERY SELECT current_path, TRUE, jsonb_typeof(v); + END IF; + END LOOP; + ELSIF jtype = 'array' THEN + -- Walk array elements (e.g. arrays of nested objects); arrays of scalars + -- are already handled as leaves in the object branch above. + FOR v IN SELECT jsonb_array_elements(data) LOOP + IF jsonb_typeof(v) = 'object' THEN + RETURN QUERY SELECT * FROM jsonb_field_rows(v, parent_path, max_depth - 1); + END IF; + END LOOP; + END IF; +END; +$$ LANGUAGE PLPGSQL IMMUTABLE PARALLEL SAFE; + +-- update_field_registry_from_sample: UPSERT registry rows from a pre-selected array of +-- raw item content JSONBs. Callers supply the sample to decouple sampling strategy +-- from the registry write; merge value_kinds to accumulate observed types over time. +CREATE OR REPLACE FUNCTION update_field_registry_from_sample( + _collection text, + item_contents jsonb[] +) RETURNS void AS $$ + INSERT INTO item_field_registry (collection, path, is_leaf, value_kinds, first_seen, last_seen) + SELECT + _collection, + r.path, + bool_and(r.is_leaf) AS is_leaf, + array_agg(DISTINCT r.value_kind) FILTER (WHERE r.value_kind IS NOT NULL) AS value_kinds, + now(), + now() + FROM unnest(item_contents) AS item(content) + CROSS JOIN LATERAL jsonb_field_rows(item.content) AS r(path, is_leaf, value_kind) + GROUP BY r.path + ON CONFLICT (collection, path) DO UPDATE SET + is_leaf = EXCLUDED.is_leaf, + value_kinds = ( + SELECT array_agg(DISTINCT v) + FROM unnest(item_field_registry.value_kinds || EXCLUDED.value_kinds) t(v) + ), + last_seen = now() + ; +$$ LANGUAGE SQL VOLATILE; + +-- update_field_registry_from_items: Sample a live collection and UPSERT registry rows. +-- Uses TABLESAMPLE BERNOULLI(5) for large collections (>10k rows by pg_class estimate) +-- and LIMIT 1000 for smaller ones to avoid a full seq-scan for tiny collections. +-- pg_class.reltuples is an estimate (may be stale); its only role is threshold selection. +-- Returns (registered_paths, rows_processed) for observability. +CREATE OR REPLACE FUNCTION update_field_registry_from_items( + _collection text +) RETURNS TABLE (registered_paths int, rows_processed int) AS $$ +DECLARE + est_rows bigint; + nrows int; + npaths int; +BEGIN + -- Sum reltuples across all partitions for this collection. + -- reltuples can be -1 (never analyzed); treat negative values as zero. + SELECT COALESCE(sum(GREATEST(c.reltuples::bigint, 0)), 0) INTO est_rows + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'pgstac' + AND c.relkind = 'r' + AND c.relname LIKE '_items_%' + AND c.relname LIKE '%' || regexp_replace(_collection, '[^a-zA-Z0-9_-]', '', 'g') || '%'; + + IF est_rows > 10000 THEN + -- Large collection: use statistical sampling to avoid full seq-scan. + WITH sampled AS ( + SELECT content FROM items TABLESAMPLE BERNOULLI(5) WHERE collection = _collection + ), + upserted AS ( + INSERT INTO item_field_registry (collection, path, is_leaf, value_kinds, first_seen, last_seen) + SELECT + _collection, + r.path, + bool_and(r.is_leaf) AS is_leaf, + array_agg(DISTINCT r.value_kind) FILTER (WHERE r.value_kind IS NOT NULL) AS value_kinds, + now(), now() + FROM sampled + CROSS JOIN LATERAL jsonb_field_rows(content) AS r(path, is_leaf, value_kind) + GROUP BY r.path + ON CONFLICT (collection, path) DO UPDATE SET + is_leaf = EXCLUDED.is_leaf, + value_kinds = ( + SELECT array_agg(DISTINCT v) + FROM unnest(item_field_registry.value_kinds || EXCLUDED.value_kinds) t(v) + ), + last_seen = now() + RETURNING 1 + ) + SELECT + (SELECT count(*)::int FROM upserted), + (SELECT count(*)::int FROM sampled) + INTO npaths, nrows; + ELSE + -- Small collection: process up to 1000 rows to avoid BERNOULLI returning 0 rows. + WITH sampled AS ( + SELECT content FROM items WHERE collection = _collection LIMIT 1000 + ), + upserted AS ( + INSERT INTO item_field_registry (collection, path, is_leaf, value_kinds, first_seen, last_seen) + SELECT + _collection, + r.path, + bool_and(r.is_leaf) AS is_leaf, + array_agg(DISTINCT r.value_kind) FILTER (WHERE r.value_kind IS NOT NULL) AS value_kinds, + now(), now() + FROM sampled + CROSS JOIN LATERAL jsonb_field_rows(content) AS r(path, is_leaf, value_kind) + GROUP BY r.path + ON CONFLICT (collection, path) DO UPDATE SET + is_leaf = EXCLUDED.is_leaf, + value_kinds = ( + SELECT array_agg(DISTINCT v) + FROM unnest(item_field_registry.value_kinds || EXCLUDED.value_kinds) t(v) + ), + last_seen = now() + RETURNING 1 + ) + SELECT + (SELECT count(*)::int FROM upserted), + (SELECT count(*)::int FROM sampled) + INTO npaths, nrows; + END IF; + + RETURN QUERY SELECT npaths, nrows; +END; +$$ LANGUAGE PLPGSQL VOLATILE SECURITY DEFINER; + +-- refresh_field_registry: Expire stale registry entries that haven't been seen recently. +-- Intended for scheduled maintenance (e.g. pg_cron daily job). +-- Returns (collection, expired_paths) for each collection affected. +CREATE OR REPLACE FUNCTION refresh_field_registry( + _collection text DEFAULT NULL, + retention_interval interval DEFAULT '90 days' +) RETURNS TABLE (collection_id text, expired_paths int) AS $$ + WITH deleted AS ( + DELETE FROM item_field_registry + WHERE (_collection IS NULL OR collection = _collection) + AND last_seen < now() - retention_interval + RETURNING collection + ) + SELECT collection, count(*)::int + FROM deleted + GROUP BY collection; +$$ LANGUAGE SQL VOLATILE; + +-- Item Fragment Management functions +-- extract_fragment: Strip the per-item keys from content to get the dedup-eligible portion. +-- Pure SQL so PostgreSQL can inline and constant-fold it; avoid PLPGSQL wrapper overhead. +CREATE OR REPLACE FUNCTION extract_fragment( + content jsonb, + excluded_keys text[] DEFAULT '{id,geometry,collection,type}'::text[] +) RETURNS jsonb AS $$ + SELECT content - COALESCE(excluded_keys, '{id,geometry,collection,type}'::text[]); +$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; + +-- pgstac_hash_fragment: Hash a fragment content for dedup +CREATE OR REPLACE FUNCTION pgstac_hash_fragment(fragment jsonb) RETURNS text AS $$ +SELECT pgstac_hash(fragment::text); +$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; + +-- get_or_create_fragment: Look up or insert a fragment, returning its id. +-- Uses INSERT … ON CONFLICT … RETURNING to avoid a redundant pre-check SELECT; +-- only falls back to a SELECT when the conflict path suppresses the RETURNING row. +-- This is safe under concurrent inserts: two transactions racing to create the same +-- fragment both see ON CONFLICT DO NOTHING; the loser's RETURNING is empty so it +-- falls through to the SELECT which finds the winner's row. +CREATE OR REPLACE FUNCTION get_or_create_fragment( + content jsonb, + _collection text, + excluded_keys text[] DEFAULT '{id,geometry,collection,type}'::text[] +) RETURNS bigint AS $$ +DECLARE + frag_content jsonb; + frag_hash text; + frag_id bigint; +BEGIN + IF content IS NULL OR _collection IS NULL THEN + RETURN NULL; + END IF; + + frag_content := extract_fragment(content, excluded_keys); + frag_hash := pgstac_hash_fragment(frag_content); + + -- Insert-first: one round trip when the fragment is new. + WITH ins AS ( + INSERT INTO item_fragments (collection, hash, content) + VALUES (_collection, frag_hash, frag_content) + ON CONFLICT (collection, hash) DO NOTHING + RETURNING id + ) + SELECT id INTO frag_id FROM ins; + + -- Fallback SELECT: one extra round trip only on the conflict path. + IF frag_id IS NULL THEN + SELECT id INTO frag_id + FROM item_fragments + WHERE collection = _collection AND hash = frag_hash; + END IF; + + RETURN frag_id; +END; +$$ LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE; + +-- gc_fragments: Garbage collect orphaned fragments using a single set-based DELETE. +-- Replaces the previous per-collection FOR LOOP with a single statement that lets +-- the planner choose the optimal join/anti-join strategy across all collections. +-- The NOT EXISTS sub-select is evaluated per fragment; with an index on items.fragment_id +-- this is an efficient anti-join rather than a full seq-scan. +CREATE OR REPLACE FUNCTION gc_fragments( + _collection text DEFAULT NULL, + retention_interval interval DEFAULT '90 days' +) RETURNS TABLE ( + collection_id text, + fragments_removed int +) AS $$ + WITH deleted AS ( + DELETE FROM item_fragments f + WHERE + (_collection IS NULL OR f.collection = _collection) + AND f.created_at < now() - retention_interval + AND NOT EXISTS (SELECT 1 FROM items i WHERE i.fragment_id = f.id) + RETURNING f.collection + ) + SELECT collection, count(*)::int + FROM deleted + GROUP BY collection; +$$ LANGUAGE SQL VOLATILE PARALLEL UNSAFE; -- END FRAGMENT: 003a_items.sql -- BEGIN FRAGMENT: 003b_partitions.sql diff --git a/src/pgstac/sql/003a_items.sql b/src/pgstac/sql/003a_items.sql index b252e9cf..db79add2 100644 --- a/src/pgstac/sql/003a_items.sql +++ b/src/pgstac/sql/003a_items.sql @@ -1,3 +1,14 @@ +-- Item fragments: deduplicated part of item content (shared across items in a collection) +CREATE TABLE IF NOT EXISTS item_fragments ( + id bigserial PRIMARY KEY, + collection text NOT NULL REFERENCES collections(id) ON DELETE CASCADE, + hash text NOT NULL, + content jsonb NOT NULL, + created_at timestamptz NOT NULL DEFAULT now(), + UNIQUE (collection, hash) +); +CREATE INDEX IF NOT EXISTS item_fragments_collection_idx ON item_fragments (collection); + CREATE TABLE items ( id text NOT NULL, geometry geometry NOT NULL, @@ -7,7 +18,21 @@ CREATE TABLE items ( pgstac_updated_at timestamptz NOT NULL DEFAULT now(), content_hash text NOT NULL DEFAULT '', content JSONB NOT NULL, - private jsonb + private jsonb, + -- Split columns (populated from v0.10+; item_fragments must exist first) + fragment_id bigint REFERENCES item_fragments(id), + bbox jsonb, + links jsonb DEFAULT '[]', + assets jsonb DEFAULT '{}', + properties jsonb DEFAULT '{}', + extra jsonb, + -- Promoted queryable columns (redundant copies for index-only scans) + eo_cloud_cover float8, + eo_snow_cover float8, + gsd float8, + view_off_nadir float8, + view_sun_azimuth float8, + view_sun_elevation float8 ) PARTITION BY LIST (collection) ; @@ -24,6 +49,18 @@ CREATE TABLE IF NOT EXISTS items_deleted_log ( ); CREATE INDEX IF NOT EXISTS items_deleted_log_deleted_at_idx ON items_deleted_log (deleted_at); +-- Field registry: tracks which JSON paths exist in each collection (for queryables) +CREATE TABLE IF NOT EXISTS item_field_registry ( + collection text NOT NULL REFERENCES collections(id) ON DELETE CASCADE, + path text NOT NULL, + is_leaf boolean DEFAULT true, + value_kinds text[] DEFAULT '{}', + first_seen timestamptz NOT NULL DEFAULT now(), + last_seen timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY (collection, path) +); +CREATE INDEX IF NOT EXISTS item_field_registry_path_idx ON item_field_registry (path); + CREATE INDEX "datetime_idx" ON items USING BTREE (datetime DESC, end_datetime ASC); CREATE INDEX "geometry_idx" ON items USING GIST (geometry); @@ -79,9 +116,19 @@ $$ LANGUAGE PLPGSQL SECURITY DEFINER; DROP TRIGGER IF EXISTS items_before_upsert_trigger ON items; DROP TRIGGER IF EXISTS items_before_update_trigger ON items; +-- WHEN guard: skip the expensive content_hydrate hash recomputation when only +-- non-content fields change (e.g. fragment_id assignment, pgstac_updated_at). CREATE TRIGGER items_before_update_trigger BEFORE UPDATE ON items FOR EACH ROW +WHEN ( + OLD.content IS DISTINCT FROM NEW.content + OR OLD.bbox IS DISTINCT FROM NEW.bbox + OR OLD.links IS DISTINCT FROM NEW.links + OR OLD.assets IS DISTINCT FROM NEW.assets + OR OLD.properties IS DISTINCT FROM NEW.properties + OR OLD.extra IS DISTINCT FROM NEW.extra +) EXECUTE FUNCTION items_touch_triggerfunc(); CREATE OR REPLACE FUNCTION items_delete_log_trigger() RETURNS TRIGGER AS $$ @@ -117,6 +164,8 @@ CREATE TRIGGER items_delete_log_after_delete_trigger CREATE OR REPLACE FUNCTION content_dehydrate(content jsonb) RETURNS items AS $$ DECLARE out items; + props jsonb; + base_item jsonb; BEGIN out.id := content->>'id'; out.geometry := stac_geom(content); @@ -125,10 +174,39 @@ BEGIN out.end_datetime := stac_end_datetime(content); out.pgstac_updated_at := now(); out.content_hash := encode(sha256(content::text::bytea), 'hex'); + + base_item := collection_base_item(content->>'collection'); + props := content->'properties'; + + -- Split columns: dedicated storage for standard top-level STAC fields. + -- These enable index-only scans on promoted queryables and avoid JSONB parse + -- on the hot SELECT path once the legacy content column is retired. + out.bbox := content->'bbox'; + out.links := COALESCE(content->'links', '[]'::jsonb); + out.assets := COALESCE(content->'assets', '{}'::jsonb); + out.properties := COALESCE(props, '{}'::jsonb); + -- extra: non-standard top-level fields not in id/geometry/collection/type/bbox/links/assets/properties + out.extra := content - '{id,geometry,collection,type,bbox,links,assets,properties}'::text[]; + + -- Promoted queryable columns: direct float8 storage avoids JSONB parse on range queries. + out.eo_cloud_cover := (props->>'eo:cloud_cover')::float8; + out.eo_snow_cover := (props->>'eo:snow_cover')::float8; + out.gsd := (props->>'gsd')::float8; + out.view_off_nadir := (props->>'view:off_nadir')::float8; + out.view_sun_azimuth := (props->>'view:sun_azimuth')::float8; + out.view_sun_elevation := (props->>'view:sun_elevation')::float8; + + -- Legacy content column: kept for backwards compatibility with clients that + -- read items.content directly. Contains all fields except id/geometry/collection/type, + -- with base_item fields stripped out for dedup storage. + -- NOTE: content_hash above hashes the raw incoming JSONB (pre-strip), which is + -- intentional for change detection; it differs from the hash produced by + -- items_touch_triggerfunc (which hashes the hydrated form on UPDATE). out.content := strip_jsonb( content - '{id,geometry,collection,type}'::text[], - collection_base_item(content->>'collection') + base_item ) - '{id,geometry,collection,type}'::text[]; + out.private := null; RETURN out; END; @@ -188,25 +266,49 @@ $$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; CREATE OR REPLACE FUNCTION content_hydrate(_item items, _collection collections, fields jsonb DEFAULT '{}'::jsonb) RETURNS jsonb AS $$ DECLARE geom jsonb; - bbox jsonb; output jsonb; content jsonb; - base_item jsonb := _collection.base_item; BEGIN IF include_field('geometry', fields) THEN geom := ST_ASGeoJson(_item.geometry, 20)::jsonb; END IF; - output := content_hydrate( - jsonb_build_object( - 'id', _item.id, - 'geometry', geom, + + IF _item.fragment_id IS NOT NULL THEN + -- Preferred path: reconstruct item from split columns. + -- fragment_id IS NOT NULL is the canonical indicator that split columns + -- are populated; checking a nullable bigint is cheaper than a JSONB equality. + content := jsonb_build_object( + 'id', _item.id, + 'geometry', geom, 'collection', _item.collection, - 'type', 'Feature' - ) || _item.content, - _collection.base_item, - fields - ); + 'type', 'Feature' + ); + IF _item.bbox IS NOT NULL THEN + content := content || jsonb_build_object('bbox', _item.bbox); + END IF; + IF _item.links IS NOT NULL THEN + content := content || jsonb_build_object('links', _item.links); + END IF; + IF _item.assets IS NOT NULL THEN + content := content || jsonb_build_object('assets', _item.assets); + END IF; + IF _item.properties IS NOT NULL THEN + content := content || jsonb_build_object('properties', _item.properties); + END IF; + IF _item.extra IS NOT NULL THEN + content := content || _item.extra; + END IF; + ELSE + -- Legacy fallback: reconstruct from the content column (pre-v0.10 rows). + content := jsonb_build_object( + 'id', _item.id, + 'geometry', geom, + 'collection', _item.collection, + 'type', 'Feature' + ) || _item.content; + END IF; + output := content_hydrate(content, _collection.base_item, fields); RETURN output; END; $$ LANGUAGE PLPGSQL STABLE PARALLEL SAFE; @@ -289,6 +391,36 @@ BEGIN GET DIAGNOSTICS nrows = ROW_COUNT; RAISE NOTICE 'Added % rows to tmpdata. %', nrows, clock_timestamp() - ts; + -- Batch fragment dedup: insert all unique fragments in one statement rather than + -- calling get_or_create_fragment() per row (which is O(N) round-trips). + -- pgstac_hash_fragment(content) is computed twice (once for insert, once for the + -- join update) but both calls are IMMUTABLE so the planner can CSE them; the net + -- cost is far lower than N individual PL/pgSQL function round-trips. + -- Concurrent inserts of identical fragments are safe: ON CONFLICT DO NOTHING means + -- both sides succeed with the same row; the join below finds it for either winner. + RAISE NOTICE 'Batch inserting fragments. %', clock_timestamp() - ts; + INSERT INTO item_fragments (collection, hash, content) + SELECT DISTINCT ON (collection, pgstac_hash_fragment(content)) + collection, + pgstac_hash_fragment(content) AS hash, + content + FROM tmpdata + WHERE content IS NOT NULL AND content != '{}'::jsonb + ON CONFLICT (collection, hash) DO NOTHING; + + RAISE NOTICE 'Assigning fragment_id. %', clock_timestamp() - ts; + UPDATE tmpdata t + SET fragment_id = f.id + FROM item_fragments f + WHERE f.collection = t.collection + AND f.hash = pgstac_hash_fragment(t.content) + AND t.content IS NOT NULL AND t.content != '{}'::jsonb; + + -- Queue registry sampling per collection (async via run_or_queue so it does not + -- block the ingest transaction). One queued call per distinct collection in the batch. + PERFORM run_or_queue(format('SELECT update_field_registry_from_items(%L);', c)) + FROM (SELECT DISTINCT collection FROM tmpdata) AS cte(c); + RAISE NOTICE 'Doing the insert. %', clock_timestamp() - ts; IF TG_TABLE_NAME = 'items_staging' THEN INSERT INTO items @@ -318,7 +450,10 @@ BEGIN END IF; RAISE NOTICE 'Deleting data from staging table. %', clock_timestamp() - ts; - DELETE FROM items_staging; + -- Use TG_TABLE_NAME so the correct staging table is cleared. + -- The previous hard-coded 'DELETE FROM items_staging' was a bug that left + -- items_staging_ignore and items_staging_upsert un-cleared after processing. + EXECUTE format('DELETE FROM %I', TG_TABLE_NAME); RAISE NOTICE 'Done. %', clock_timestamp() - ts; RETURN NULL; @@ -413,3 +548,263 @@ UPDATE collections ) ; $$ LANGUAGE SQL; + +-- --------------------------------------------------------------------------- +-- Field Registry: walks JSONB item content to track which paths exist in each +-- collection. Used to auto-populate queryables and support schema inference. +-- --------------------------------------------------------------------------- + +-- jsonb_field_rows: Recursively walk a JSONB document and emit one row per field path. +-- max_depth guards against runaway recursion on pathologically nested documents. +CREATE OR REPLACE FUNCTION jsonb_field_rows( + data jsonb, + parent_path text DEFAULT '', + max_depth int DEFAULT 20 +) RETURNS TABLE (path text, is_leaf boolean, value_kind text) AS $$ +DECLARE + k text; + v jsonb; + current_path text; + jtype text; +BEGIN + IF data IS NULL OR max_depth <= 0 THEN + RETURN; + END IF; + jtype := jsonb_typeof(data); + IF jtype = 'object' THEN + FOR k, v IN SELECT * FROM jsonb_each(data) LOOP + current_path := CASE WHEN parent_path = '' THEN k ELSE parent_path || '.' || k END; + IF jsonb_typeof(v) IN ('object', 'array') THEN + RETURN QUERY SELECT current_path, FALSE, jsonb_typeof(v); + RETURN QUERY SELECT * FROM jsonb_field_rows(v, current_path, max_depth - 1); + ELSE + RETURN QUERY SELECT current_path, TRUE, jsonb_typeof(v); + END IF; + END LOOP; + ELSIF jtype = 'array' THEN + -- Walk array elements (e.g. arrays of nested objects); arrays of scalars + -- are already handled as leaves in the object branch above. + FOR v IN SELECT jsonb_array_elements(data) LOOP + IF jsonb_typeof(v) = 'object' THEN + RETURN QUERY SELECT * FROM jsonb_field_rows(v, parent_path, max_depth - 1); + END IF; + END LOOP; + END IF; +END; +$$ LANGUAGE PLPGSQL IMMUTABLE PARALLEL SAFE; + +-- update_field_registry_from_sample: UPSERT registry rows from a pre-selected array of +-- raw item content JSONBs. Callers supply the sample to decouple sampling strategy +-- from the registry write; merge value_kinds to accumulate observed types over time. +CREATE OR REPLACE FUNCTION update_field_registry_from_sample( + _collection text, + item_contents jsonb[] +) RETURNS void AS $$ + INSERT INTO item_field_registry (collection, path, is_leaf, value_kinds, first_seen, last_seen) + SELECT + _collection, + r.path, + bool_and(r.is_leaf) AS is_leaf, + array_agg(DISTINCT r.value_kind) FILTER (WHERE r.value_kind IS NOT NULL) AS value_kinds, + now(), + now() + FROM unnest(item_contents) AS item(content) + CROSS JOIN LATERAL jsonb_field_rows(item.content) AS r(path, is_leaf, value_kind) + GROUP BY r.path + ON CONFLICT (collection, path) DO UPDATE SET + is_leaf = EXCLUDED.is_leaf, + value_kinds = ( + SELECT array_agg(DISTINCT v) + FROM unnest(item_field_registry.value_kinds || EXCLUDED.value_kinds) t(v) + ), + last_seen = now() + ; +$$ LANGUAGE SQL VOLATILE; + +-- update_field_registry_from_items: Sample a live collection and UPSERT registry rows. +-- Uses TABLESAMPLE BERNOULLI(5) for large collections (>10k rows by pg_class estimate) +-- and LIMIT 1000 for smaller ones to avoid a full seq-scan for tiny collections. +-- pg_class.reltuples is an estimate (may be stale); its only role is threshold selection. +-- Returns (registered_paths, rows_processed) for observability. +CREATE OR REPLACE FUNCTION update_field_registry_from_items( + _collection text +) RETURNS TABLE (registered_paths int, rows_processed int) AS $$ +DECLARE + est_rows bigint; + nrows int; + npaths int; +BEGIN + -- Sum reltuples across all partitions for this collection. + -- reltuples can be -1 (never analyzed); treat negative values as zero. + SELECT COALESCE(sum(GREATEST(c.reltuples::bigint, 0)), 0) INTO est_rows + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'pgstac' + AND c.relkind = 'r' + AND c.relname LIKE '_items_%' + AND c.relname LIKE '%' || regexp_replace(_collection, '[^a-zA-Z0-9_-]', '', 'g') || '%'; + + IF est_rows > 10000 THEN + -- Large collection: use statistical sampling to avoid full seq-scan. + WITH sampled AS ( + SELECT content FROM items TABLESAMPLE BERNOULLI(5) WHERE collection = _collection + ), + upserted AS ( + INSERT INTO item_field_registry (collection, path, is_leaf, value_kinds, first_seen, last_seen) + SELECT + _collection, + r.path, + bool_and(r.is_leaf) AS is_leaf, + array_agg(DISTINCT r.value_kind) FILTER (WHERE r.value_kind IS NOT NULL) AS value_kinds, + now(), now() + FROM sampled + CROSS JOIN LATERAL jsonb_field_rows(content) AS r(path, is_leaf, value_kind) + GROUP BY r.path + ON CONFLICT (collection, path) DO UPDATE SET + is_leaf = EXCLUDED.is_leaf, + value_kinds = ( + SELECT array_agg(DISTINCT v) + FROM unnest(item_field_registry.value_kinds || EXCLUDED.value_kinds) t(v) + ), + last_seen = now() + RETURNING 1 + ) + SELECT + (SELECT count(*)::int FROM upserted), + (SELECT count(*)::int FROM sampled) + INTO npaths, nrows; + ELSE + -- Small collection: process up to 1000 rows to avoid BERNOULLI returning 0 rows. + WITH sampled AS ( + SELECT content FROM items WHERE collection = _collection LIMIT 1000 + ), + upserted AS ( + INSERT INTO item_field_registry (collection, path, is_leaf, value_kinds, first_seen, last_seen) + SELECT + _collection, + r.path, + bool_and(r.is_leaf) AS is_leaf, + array_agg(DISTINCT r.value_kind) FILTER (WHERE r.value_kind IS NOT NULL) AS value_kinds, + now(), now() + FROM sampled + CROSS JOIN LATERAL jsonb_field_rows(content) AS r(path, is_leaf, value_kind) + GROUP BY r.path + ON CONFLICT (collection, path) DO UPDATE SET + is_leaf = EXCLUDED.is_leaf, + value_kinds = ( + SELECT array_agg(DISTINCT v) + FROM unnest(item_field_registry.value_kinds || EXCLUDED.value_kinds) t(v) + ), + last_seen = now() + RETURNING 1 + ) + SELECT + (SELECT count(*)::int FROM upserted), + (SELECT count(*)::int FROM sampled) + INTO npaths, nrows; + END IF; + + RETURN QUERY SELECT npaths, nrows; +END; +$$ LANGUAGE PLPGSQL VOLATILE SECURITY DEFINER; + +-- refresh_field_registry: Expire stale registry entries that haven't been seen recently. +-- Intended for scheduled maintenance (e.g. pg_cron daily job). +-- Returns (collection, expired_paths) for each collection affected. +CREATE OR REPLACE FUNCTION refresh_field_registry( + _collection text DEFAULT NULL, + retention_interval interval DEFAULT '90 days' +) RETURNS TABLE (collection_id text, expired_paths int) AS $$ + WITH deleted AS ( + DELETE FROM item_field_registry + WHERE (_collection IS NULL OR collection = _collection) + AND last_seen < now() - retention_interval + RETURNING collection + ) + SELECT collection, count(*)::int + FROM deleted + GROUP BY collection; +$$ LANGUAGE SQL VOLATILE; + +-- Item Fragment Management functions +-- extract_fragment: Strip the per-item keys from content to get the dedup-eligible portion. +-- Pure SQL so PostgreSQL can inline and constant-fold it; avoid PLPGSQL wrapper overhead. +CREATE OR REPLACE FUNCTION extract_fragment( + content jsonb, + excluded_keys text[] DEFAULT '{id,geometry,collection,type}'::text[] +) RETURNS jsonb AS $$ + SELECT content - COALESCE(excluded_keys, '{id,geometry,collection,type}'::text[]); +$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; + +-- pgstac_hash_fragment: Hash a fragment content for dedup +CREATE OR REPLACE FUNCTION pgstac_hash_fragment(fragment jsonb) RETURNS text AS $$ +SELECT pgstac_hash(fragment::text); +$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE; + +-- get_or_create_fragment: Look up or insert a fragment, returning its id. +-- Uses INSERT … ON CONFLICT … RETURNING to avoid a redundant pre-check SELECT; +-- only falls back to a SELECT when the conflict path suppresses the RETURNING row. +-- This is safe under concurrent inserts: two transactions racing to create the same +-- fragment both see ON CONFLICT DO NOTHING; the loser's RETURNING is empty so it +-- falls through to the SELECT which finds the winner's row. +CREATE OR REPLACE FUNCTION get_or_create_fragment( + content jsonb, + _collection text, + excluded_keys text[] DEFAULT '{id,geometry,collection,type}'::text[] +) RETURNS bigint AS $$ +DECLARE + frag_content jsonb; + frag_hash text; + frag_id bigint; +BEGIN + IF content IS NULL OR _collection IS NULL THEN + RETURN NULL; + END IF; + + frag_content := extract_fragment(content, excluded_keys); + frag_hash := pgstac_hash_fragment(frag_content); + + -- Insert-first: one round trip when the fragment is new. + WITH ins AS ( + INSERT INTO item_fragments (collection, hash, content) + VALUES (_collection, frag_hash, frag_content) + ON CONFLICT (collection, hash) DO NOTHING + RETURNING id + ) + SELECT id INTO frag_id FROM ins; + + -- Fallback SELECT: one extra round trip only on the conflict path. + IF frag_id IS NULL THEN + SELECT id INTO frag_id + FROM item_fragments + WHERE collection = _collection AND hash = frag_hash; + END IF; + + RETURN frag_id; +END; +$$ LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE; + +-- gc_fragments: Garbage collect orphaned fragments using a single set-based DELETE. +-- Replaces the previous per-collection FOR LOOP with a single statement that lets +-- the planner choose the optimal join/anti-join strategy across all collections. +-- The NOT EXISTS sub-select is evaluated per fragment; with an index on items.fragment_id +-- this is an efficient anti-join rather than a full seq-scan. +CREATE OR REPLACE FUNCTION gc_fragments( + _collection text DEFAULT NULL, + retention_interval interval DEFAULT '90 days' +) RETURNS TABLE ( + collection_id text, + fragments_removed int +) AS $$ + WITH deleted AS ( + DELETE FROM item_fragments f + WHERE + (_collection IS NULL OR f.collection = _collection) + AND f.created_at < now() - retention_interval + AND NOT EXISTS (SELECT 1 FROM items i WHERE i.fragment_id = f.id) + RETURNING f.collection + ) + SELECT collection, count(*)::int + FROM deleted + GROUP BY collection; +$$ LANGUAGE SQL VOLATILE PARALLEL UNSAFE; diff --git a/src/pgstac/tests/pgtap/003_items.sql b/src/pgstac/tests/pgtap/003_items.sql index 8412f18b..d1f6e07f 100644 --- a/src/pgstac/tests/pgtap/003_items.sql +++ b/src/pgstac/tests/pgtap/003_items.sql @@ -1,4 +1,6 @@ SELECT has_table('pgstac'::name, 'items'::name); +SELECT has_table('pgstac'::name, 'item_fragments'::name); +SELECT has_table('pgstac'::name, 'item_field_registry'::name); SELECT has_table('pgstac'::name, 'items_deleted_log'::name); @@ -14,6 +16,12 @@ SELECT has_function('pgstac'::name, 'update_item', ARRAY['jsonb']); SELECT has_function('pgstac'::name, 'upsert_item', ARRAY['jsonb']); SELECT has_function('pgstac'::name, 'create_items', ARRAY['jsonb']); SELECT has_function('pgstac'::name, 'upsert_items', ARRAY['jsonb']); +SELECT has_function('pgstac'::name, 'extract_fragment', ARRAY['jsonb', 'text[]']); +SELECT has_function('pgstac'::name, 'get_or_create_fragment', ARRAY['jsonb', 'text', 'text[]']); +SELECT has_function('pgstac'::name, 'gc_fragments', ARRAY['text', 'interval']); +SELECT has_function('pgstac'::name, 'update_field_registry_from_sample', ARRAY['text', 'jsonb[]']); +SELECT has_function('pgstac'::name, 'update_field_registry_from_items', ARRAY['text']); +SELECT has_function('pgstac'::name, 'refresh_field_registry', ARRAY['text', 'interval']); SELECT has_function('pgstac'::name, 'gc_deleted_items_log', ARRAY['interval']); SELECT has_function('pgstac'::name, 'gc_deleted_items_log', ARRAY['interval', 'integer']); SELECT has_function('pgstac'::name, 'gc_deleted_items_log_batch', ARRAY['interval', 'integer']); @@ -138,3 +146,200 @@ SELECT results_eq($$ $$, 'gc_deleted_items_log(interval, integer) removes aged tombstones in batches' ); + +SELECT create_item('{ + "id": "pgstac-test-item-0004", + "bbox": [-85.379245, 30.933949, -85.308201, 31.003555], + "type": "Feature", + "links": [], + "assets": {"image": {"href": "https://example.com/a.tif", "type": "image/tiff"}}, + "geometry": {"type": "Point", "coordinates": [-85.309412, 30.933949]}, + "collection": "pgstac-test-collection", + "properties": {"datetime": "2011-08-25T00:00:00Z", "eo:cloud_cover": 31, "gsd": 5}, + "stac_version": "1.0.0" +}'); + +SELECT create_item('{ + "id": "pgstac-test-item-0005", + "bbox": [-85.379245, 30.933949, -85.308201, 31.003555], + "type": "Feature", + "links": [], + "assets": {"image": {"href": "https://example.com/a.tif", "type": "image/tiff"}}, + "geometry": {"type": "Point", "coordinates": [-85.309500, 30.934000]}, + "collection": "pgstac-test-collection", + "properties": {"datetime": "2011-08-25T00:00:00Z", "eo:cloud_cover": 31, "gsd": 5}, + "stac_version": "1.0.0" +}'); + +SELECT ok( + (SELECT fragment_id IS NOT NULL FROM items WHERE id='pgstac-test-item-0004' AND collection='pgstac-test-collection'), + 'create_item assigns fragment_id for split-storage rows' +); +SELECT ok( + (SELECT bbox = '[-85.379245, 30.933949, -85.308201, 31.003555]'::jsonb FROM items WHERE id='pgstac-test-item-0004' AND collection='pgstac-test-collection'), + 'create_item stores bbox in split column' +); +SELECT ok( + (SELECT links = '[]'::jsonb FROM items WHERE id='pgstac-test-item-0004' AND collection='pgstac-test-collection'), + 'create_item stores links in split column' +); +SELECT ok( + (SELECT assets ? 'image' FROM items WHERE id='pgstac-test-item-0004' AND collection='pgstac-test-collection'), + 'create_item stores assets in split column' +); +SELECT ok( + (SELECT eo_cloud_cover = 31 FROM items WHERE id='pgstac-test-item-0004' AND collection='pgstac-test-collection'), + 'create_item stores promoted columns' +); +SELECT results_eq($$ + SELECT count(DISTINCT fragment_id)::bigint + FROM items + WHERE id IN ('pgstac-test-item-0004', 'pgstac-test-item-0005') + AND collection='pgstac-test-collection'; + $$,$$ + SELECT 1::bigint; + $$, + 'identical split-storage items share one fragment_id' +); +SELECT results_eq($$ + SELECT get_item('pgstac-test-item-0004', 'pgstac-test-collection')->'properties'->>'eo:cloud_cover'; + $$,$$ + SELECT '31'; + $$, + 'get_item hydrates split-storage properties' +); +SELECT results_eq($$ + SELECT get_item('pgstac-test-item-0004', 'pgstac-test-collection')->'bbox'; + $$,$$ + SELECT '[-85.379245, 30.933949, -85.308201, 31.003555]'::jsonb; + $$, + 'get_item hydrates split-storage bbox' +); + +SELECT lives_ok($$ + DELETE FROM item_field_registry + WHERE collection='pgstac-test-collection' + AND path='registry_probe'; +$$, 'clear registry probe row'); + +SELECT lives_ok($$ + SELECT update_field_registry_from_sample( + 'pgstac-test-collection', + ARRAY['{"registry_probe":1}'::jsonb] + ); +$$, 'register numeric field kind from explicit sample'); + +SELECT lives_ok($$ + SELECT update_field_registry_from_sample( + 'pgstac-test-collection', + ARRAY['{"registry_probe":"one"}'::jsonb] + ); +$$, 'merge string field kind from a second explicit sample'); + +SELECT results_eq($$ + SELECT string_agg(v, ',' ORDER BY v) + FROM item_field_registry r, + unnest(r.value_kinds) AS v + WHERE r.collection='pgstac-test-collection' + AND r.path='registry_probe'; + $$,$$ + SELECT 'number,string'; + $$, + 'update_field_registry_from_sample merges fresh value kinds without throttling away updates' +); + +SELECT results_eq($$ + SELECT registered_paths > 1 + FROM update_field_registry_from_items('pgstac-test-collection'); + $$,$$ + SELECT TRUE; + $$, + 'update_field_registry_from_items returns the true registered path count' +); +SELECT ok( + EXISTS ( + SELECT 1 + FROM item_field_registry + WHERE collection='pgstac-test-collection' + AND path='properties.eo:cloud_cover' + ), + 'update_field_registry_from_items records nested property paths' +); + +SELECT lives_ok($$ + UPDATE item_fragments + SET created_at = now() - '100 days'::interval + WHERE id IN ( + SELECT DISTINCT fragment_id + FROM items + WHERE id IN ('pgstac-test-item-0004', 'pgstac-test-item-0005') + AND collection='pgstac-test-collection' + ); +$$, 'age active fragment rows for gc_fragments test'); + +SELECT delete_item('pgstac-test-item-0004', 'pgstac-test-collection'); +SELECT delete_item('pgstac-test-item-0005', 'pgstac-test-collection'); + +SELECT results_eq($$ + SELECT COALESCE(sum(fragments_removed), 0) > 0 + FROM gc_fragments('pgstac-test-collection', '90 days'::interval); + $$,$$ + SELECT TRUE; + $$, + 'gc_fragments removes orphaned dedup rows' +); + +SELECT lives_ok($$ + WITH raw AS ( + SELECT '{ + "id": "pgstac-test-item-legacy", + "bbox": [-85.0, 30.0, -84.0, 31.0], + "type": "Feature", + "links": [], + "assets": {"image": {"href": "https://example.com/legacy.tif", "type": "image/tiff"}}, + "geometry": {"type": "Point", "coordinates": [-85.0, 30.0]}, + "collection": "pgstac-test-collection", + "properties": {"datetime": "2012-01-01T00:00:00Z", "eo:cloud_cover": 44}, + "stac_version": "1.0.0" + }'::jsonb AS content + ), + dehydrated AS ( + SELECT content_dehydrate(content) AS item FROM raw + ) + INSERT INTO items ( + id, + geometry, + collection, + datetime, + end_datetime, + pgstac_updated_at, + content_hash, + content, + private + ) + SELECT + (item).id, + (item).geometry, + (item).collection, + (item).datetime, + (item).end_datetime, + (item).pgstac_updated_at, + (item).content_hash, + (item).content, + (item).private + FROM dehydrated; +$$, 'insert a legacy-style row without split columns'); + +SELECT ok( + (SELECT fragment_id IS NULL FROM items WHERE id='pgstac-test-item-legacy' AND collection='pgstac-test-collection'), + 'legacy rows keep a NULL fragment_id' +); +SELECT results_eq($$ + SELECT get_item('pgstac-test-item-legacy', 'pgstac-test-collection')->'properties'->>'eo:cloud_cover'; + $$,$$ + SELECT '44'; + $$, + 'legacy rows still hydrate through the content fallback path' +); + +SELECT delete_item('pgstac-test-item-legacy', 'pgstac-test-collection');