Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ ATPROTO_BASE_URL=http://127.0.0.1:3000
# ATSTORE_IDENTIFIER=
# ATSTORE_APP_PASSWORD=
# ATSTORE_SERVICE=https://bsky.social
# Optional: DID of the store publisher (avoids an extra login when checking product-claim eligibility).
# Optional: DID of the store publisher (avoids Bluesky login on listing detail, snapshot refresh, and claim checks).
# ATSTORE_REPO_DID=did:plc:...
# ATSTORE_PROFILE_DISPLAY_NAME=AT Store
# ATSTORE_WEBSITE_URL=https://at.store
Expand Down
31 changes: 31 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,34 @@
services:
tap:
image: ghcr.io/bluesky-social/indigo/tap:latest
ports:
- "2480:2480"
environment:
TAP_DATABASE_URL: sqlite:///data/tap.db
TAP_NO_REPLAY: "true"
TAP_SIGNAL_COLLECTION: fyi.atstore.listing.detail
TAP_COLLECTION_FILTERS: >-
fyi.atstore.listing.detail,
fyi.atstore.listing.review,
fyi.atstore.listing.reviewReply,
fyi.atstore.listing.favorite,
site.standard.publication,
site.standard.document,
com.germnetwork.declaration,
fund.at.actor.declaration,
fund.at.funding.contribute,
fund.at.funding.channel,
fund.at.funding.plan,
fund.at.graph.dependency
volumes:
- tap_data:/data
healthcheck:
test: ["CMD-SHELL", "wget -q -O- http://localhost:2480/health || exit 1"]
interval: 10s
timeout: 5s
retries: 5
start_period: 15s

postgres:
image: pgvector/pgvector:pg17
environment:
Expand All @@ -17,3 +47,4 @@ services:

volumes:
postgres_data:
tap_data:
8 changes: 8 additions & 0 deletions drizzle/0041_store_listing_page_snapshots.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE "store_listing_page_snapshots" (
"store_listing_id" uuid PRIMARY KEY NOT NULL,
"payload" jsonb NOT NULL,
"payload_version" integer DEFAULT 1 NOT NULL,
"refreshed_at" timestamp with time zone NOT NULL
);
--> statement-breakpoint
ALTER TABLE "store_listing_page_snapshots" ADD CONSTRAINT "store_listing_page_snapshots_store_listing_id_store_listings_id_fk" FOREIGN KEY ("store_listing_id") REFERENCES "public"."store_listings"("id") ON DELETE cascade ON UPDATE no action;
12 changes: 12 additions & 0 deletions drizzle/0042_ensure_store_listing_page_snapshots.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS "store_listing_page_snapshots" (
"store_listing_id" uuid PRIMARY KEY NOT NULL,
"payload" jsonb NOT NULL,
"payload_version" integer DEFAULT 1 NOT NULL,
"refreshed_at" timestamp with time zone NOT NULL
);
--> statement-breakpoint
DO $$ BEGIN
ALTER TABLE "store_listing_page_snapshots" ADD CONSTRAINT "store_listing_page_snapshots_store_listing_id_store_listings_id_fk" FOREIGN KEY ("store_listing_id") REFERENCES "public"."store_listings"("id") ON DELETE cascade ON UPDATE no action;
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
14 changes: 14 additions & 0 deletions drizzle/meta/_journal.json
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,20 @@
"when": 1779100000000,
"tag": "0040_store_listing_oauth_discovery",
"breakpoints": true
},
{
"idx": 41,
"version": "7",
"when": 1779200000000,
"tag": "0041_store_listing_page_snapshots",
"breakpoints": true
},
{
"idx": 42,
"version": "7",
"when": 1779300000000,
"tag": "0042_ensure_store_listing_page_snapshots",
"breakpoints": true
}
]
}
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
"listing:add": "tsx scripts/add-manual-directory-listing.ts",
"listing:publish-store": "tsx -r dotenv/config scripts/publish-store-listing-to-atproto.ts",
"listing:rehydrate-from-at-uri": "tsx -r dotenv/config scripts/rehydrate-store-listing-from-at-uri.ts",
"listing:backfill-atstore": "tsx -r dotenv/config scripts/atstore-listings-backfill.ts",
"listing:restore-from-backup-db": "tsx -r dotenv/config scripts/restore-store-listing-from-backup-db.ts",
"oauth:detect-scopes": "tsx scripts/detect-listing-oauth-scopes.ts",
"listing:oauth-probes-sync": "tsx -r dotenv/config scripts/sync-listing-oauth-probes.ts",
"listing:oauth-lexicon-hub-refresh": "tsx -r dotenv/config scripts/refresh-oauth-lexicon-hub.ts",
"listing:page-snapshots-refresh": "tsx -r dotenv/config scripts/refresh-listing-page-snapshots.ts",
"listing:oauth-lexicon-keys-backfill": "tsx -r dotenv/config scripts/backfill-oauth-lexicon-keys.ts",
"listing:oauth-discover-metadata": "tsx -r dotenv/config scripts/discover-listing-oauth-metadata.ts",
"db:generate": "drizzle-kit generate",
Expand Down
109 changes: 109 additions & 0 deletions scripts/atstore-listings-backfill.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env node
/**
* Bulk backfill `store_listings` from every `fyi.atstore.listing.detail` on the @store repo.
*
* Local Tap with `TAP_SIGNAL_COLLECTION` only discovers owner repos over the firehose; it does
* not reliably mirror the full @store publisher catalog. Run this after `pnpm tap:consumer` setup
* when Postgres is missing most directory rows.
*
* pnpm listing:backfill-atstore
*
* Requires `DATABASE_URL`. Uses `ATSTORE_IDENTIFIER` + `ATSTORE_APP_PASSWORD` (or
* `ATSTORE_REPO_DID`) like `listing:rehydrate-from-at-uri`.
*/
import "dotenv/config";
import {
paginateListRecords,
rkeyFromCollectionAtUri,
} from "#/lib/atproto/list-records";
import { COLLECTION } from "#/lib/atproto/nsids";
import { getAtstoreRepoDid } from "#/lib/atproto/publish-directory-listing";
import { resolveAtprotoPdsBaseUrl } from "#/lib/atproto/resolve-atproto-pds";
import {
tryParseListingDetailRecord,
upsertDirectoryListingFromTap,
} from "#/lib/atproto/tap-listing-sync";

import { db, dbClient } from "../src/db/index.server";

async function main() {
if (!process.env.DATABASE_URL?.trim()) {
console.error("[atstore-listings-backfill] DATABASE_URL is required");
process.exit(1);
}

const did = await getAtstoreRepoDid();
const pds = await resolveAtprotoPdsBaseUrl(did);
if (!pds) {
console.error(
`[atstore-listings-backfill] no PDS for ${did}; cannot listRecords`,
);
process.exit(1);
}

console.log(
`[atstore-listings-backfill] listing.detail on ${did} via ${pds}…`,
);

let ok = 0;
let failed = 0;
let skipped = 0;

for await (const row of paginateListRecords(
pds,
did,
COLLECTION.listingDetail,
)) {
const rkey = rkeyFromCollectionAtUri(row.uri, COLLECTION.listingDetail);
if (!rkey) {
skipped++;
continue;
}
const body = row.value as Record<string, unknown> | null | undefined;
if (!body || typeof body !== "object") {
skipped++;
continue;
}
const parsed = tryParseListingDetailRecord(body);
if (!parsed.ok) {
console.warn(
`[atstore-listings-backfill] skip rkey=${rkey}: ${parsed.stage} ${parsed.reason}`,
);
skipped++;
continue;
}
try {
await upsertDirectoryListingFromTap({
db,
did,
rkey,
record: parsed.record,
trustedPublisher: true,
});
ok++;
if (ok % 50 === 0) {
console.log(`[atstore-listings-backfill] … ${String(ok)} upserted`);
}
} catch (error) {
failed++;
console.error(
`[atstore-listings-backfill] failed rkey=${rkey} slug=${parsed.record.slug}`,
error,
);
}
}

console.log(
`[atstore-listings-backfill] done ok=${String(ok)} failed=${String(failed)} skipped=${String(skipped)}`,
);
if (failed > 0) {
process.exit(1);
}
}

main()
.catch((error) => {
console.error(error);
process.exit(1);
})
.finally(() => dbClient.end({ timeout: 5 }));
113 changes: 113 additions & 0 deletions scripts/refresh-listing-page-snapshots.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#!/usr/bin/env node
/**
* Rebuild `store_listing_page_snapshots` for every public listing (slow: OAuth, funding, reviews).
*
* pnpm listing:page-snapshots-refresh
* pnpm listing:page-snapshots-refresh -- --slug=murmul
*/
import "dotenv/config";
import { refreshListingPageSnapshot } from "#/lib/listing-page-snapshot";
import { and, asc, eq, isNotNull } from "drizzle-orm";

function ts(): string {
return new Date().toISOString();
}

function slugArg(): string | null {
const raw = process.argv.find((a) => a.startsWith("--slug="));
if (!raw) return null;
const value = raw.slice("--slug=".length).trim();
return value.length > 0 ? value : null;
}

async function main() {
if (!process.env.DATABASE_URL?.trim()) {
console.error(
`[refresh-listing-page-snapshots] ${ts()} DATABASE_URL is required`,
);
process.exit(1);
}

const onlySlug = slugArg();
const { db, dbClient } = await import("#/db/index.server");
const schema = await import("#/db/schema");

const rows = await db
.select({ id: schema.storeListings.id, slug: schema.storeListings.slug })
.from(schema.storeListings)
.where(
onlySlug
? and(
eq(schema.storeListings.slug, onlySlug),
isNotNull(schema.storeListings.slug),
)
: isNotNull(schema.storeListings.slug),
)
.orderBy(asc(schema.storeListings.slug));

if (onlySlug && rows.length === 0) {
console.error(
`[refresh-listing-page-snapshots] ${ts()} no listing for slug=${onlySlug}`,
);
process.exit(1);
}

console.log(
`[refresh-listing-page-snapshots] ${ts()} refreshing ${String(rows.length)} listing(s)…`,
);

let ok = 0;
let failed = 0;
const startedAt = Date.now();

for (const row of rows) {
try {
await refreshListingPageSnapshot(db, row.id);
ok++;
if (ok % 25 === 0 || ok === rows.length) {
console.log(
`[refresh-listing-page-snapshots] ${ts()} progress ok=${String(ok)} failed=${String(failed)} slug=${row.slug ?? row.id}`,
);
}
} catch (error) {
failed++;
const cause =
error instanceof Error && "cause" in error && error.cause != null
? error.cause
: null;
console.warn(
`[refresh-listing-page-snapshots] ${ts()} failed id=${row.id} slug=${row.slug ?? "?"}`,
error instanceof Error ? (error.stack ?? error.message) : error,
cause == null ? undefined : { cause },
);
if (
failed === 1 &&
cause instanceof Error &&
/store_listing_page_snapshots/i.test(cause.message) &&
/does not exist|relation/i.test(cause.message)
) {
console.error(
`[refresh-listing-page-snapshots] ${ts()} table missing — run: pnpm db:migrate`,
);
}
}
}

const elapsedMs = Date.now() - startedAt;
console.log(
`[refresh-listing-page-snapshots] ${ts()} done ok=${String(ok)} failed=${String(failed)} elapsedMs=${String(elapsedMs)}`,
);

await dbClient.end({ timeout: 5 }).catch(() => {});
if (failed > 0) {
process.exit(1);
}
}

main().catch((error) => {
console.error(
`[refresh-listing-page-snapshots] fatal`,
error instanceof Error ? (error.stack ?? error.message) : error,
);
process.exit(1);
});
14 changes: 14 additions & 0 deletions scripts/sync-listing-oauth-probes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/
import "dotenv/config";
import * as schema from "#/db/schema";
import { refreshListingPageSnapshot } from "#/lib/listing-page-snapshot";
import { refreshOAuthLexiconHubSnapshot } from "#/lib/oauth-lexicon-hub-snapshot.server";
import { probeOAuthListingAuth } from "#/lib/oauth-listing-auth-probe";
import { extractOAuthLexiconKeysForStorefrontProbe } from "#/lib/oauth-scope-lexicon-keys";
Expand Down Expand Up @@ -240,6 +241,19 @@ async function main() {
target: schema.storeListingOAuthProbes.storeListingId,
set: omitPk(payload),
});

try {
await refreshListingPageSnapshot(db, listing.id);
} catch (refreshError) {
log("warn", "page_snapshot_refresh_failed", {
listingId: listing.id,
slug: listing.slug,
error:
refreshError instanceof Error
? refreshError.message
: String(refreshError),
});
}
}

async function persistError(
Expand Down
19 changes: 19 additions & 0 deletions src/db/schema.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { ListingLink } from "#/lib/atproto/listing-record";
import type { StoreListingPageSnapshotPayload } from "#/lib/listing-page-snapshot.types";
import type { DirectoryOAuthLexiconHubData } from "#/lib/oauth-lexicon-hub.types";
import type { OAuthAuthProbeReport } from "#/lib/oauth-listing-auth-probe";
import type { StoreListingOauthDiscoveryDetail } from "#/lib/oauth-listing-oauth-discovery.types";
Expand Down Expand Up @@ -766,6 +767,24 @@ export const oauthLexiconHubSnapshot = pgTable("oauth_lexicon_hub_snapshot", {
computedAt: timestamp("computed_at", { withTimezone: true }).notNull(),
});

/**
* Precomputed public product-page bundle (reviews/mentions/related/oauth/funding).
* Rebuilt on listing/review/mention/probe/fund changes and by backfill cron.
*/
export const storeListingPageSnapshots = pgTable(
"store_listing_page_snapshots",
{
storeListingId: uuid("store_listing_id")
.primaryKey()
.references(() => storeListings.id, { onDelete: "cascade" }),
payload: jsonb("payload")
.$type<StoreListingPageSnapshotPayload>()
.notNull(),
payloadVersion: integer("payload_version").notNull().default(1),
refreshedAt: timestamp("refreshed_at", { withTimezone: true }).notNull(),
},
);

/** Ordered homepage hero slots managed from admin. */
export const homePageHeroListings = pgTable(
"home_page_hero_listings",
Expand Down
Loading
Loading