Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/sync-engine/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
],
"dependencies": {
"pg": "^8.16.3",
"pg-node-migrations": "0.0.8",
"yesql": "^7.0.0"
},
"peerDependencies": {
Expand Down
129 changes: 100 additions & 29 deletions packages/sync-engine/src/database/migrate.ts
Original file line number Diff line number Diff line change
@@ -1,66 +1,137 @@
import { Client } from 'pg'
import { migrate } from 'pg-node-migrations'
import fs from 'node:fs'
import pino from 'pino'
import path from 'node:path'
import type { Logger } from 'pino'
import type { ConnectionOptions } from 'node:tls'

type MigrationConfig = {
schema: string
databaseUrl: string
ssl?: ConnectionOptions
logger?: pino.Logger
logger?: Logger
}

async function connectAndMigrate(
client: Client,
migrationsDirectory: string,
config: MigrationConfig,
logOnError = false
) {
if (!fs.existsSync(migrationsDirectory)) {
config.logger?.info(`Migrations directory ${migrationsDirectory} not found, skipping`)
return
}
const MIGRATION_LOCK_ID = 72987329
const MIGRATIONS_DIR = path.join(__dirname, 'migrations')

const optionalConfig = {
schemaName: config.schema,
tableName: 'migrations',
}
function parseFileName(fileName: string): { id: number; name: string } | null {
const match = /^(\d+)[-_](.*)\.sql$/i.exec(fileName)
if (!match) return null
return { id: parseInt(match[1], 10), name: match[2] }
}

export type Migration = { id: number; name: string; sql: string }

/**
* Returns all migrations with schema placeholders replaced.
* Useful for inspecting migrations or running them manually with psql.
*/
export function getMigrations(schema: string = 'stripe'): Migration[] {
if (!fs.existsSync(MIGRATIONS_DIR)) return []

return fs
.readdirSync(MIGRATIONS_DIR)
.filter((f) => f.endsWith('.sql'))
.sort()
.map((fileName) => {
const parsed = parseFileName(fileName)
if (!parsed) return null

const raw = fs.readFileSync(path.join(MIGRATIONS_DIR, fileName), 'utf8')
const sql = raw.replace(/\{\{schema\}\}/g, schema)

return { id: parsed.id, name: parsed.name, sql }
})
.filter((m): m is Migration => m !== null)
}

/**
* Applies a single migration file within a transaction.
* Supports disabling transactions via `-- postgres-migrations disable-transaction` comment.
*/
async function applyMigration(
client: Client,
tableName: string,
migration: { id: number; name: string; sql: string }
): Promise<void> {
const useTransaction = !migration.sql.includes('-- postgres-migrations disable-transaction')

try {
await migrate({ client }, migrationsDirectory, optionalConfig)
} catch (error) {
if (logOnError && error instanceof Error) {
config.logger?.error(error, 'Migration error:')
} else {
throw error
if (useTransaction) await client.query('START TRANSACTION')
await client.query(migration.sql)
await client.query(`INSERT INTO ${tableName} (id, name) VALUES ($1, $2)`, [
migration.id,
migration.name,
])
if (useTransaction) await client.query('COMMIT')
} catch (err) {
if (useTransaction) {
try {
await client.query('ROLLBACK')
} catch {
// Connection may already be broken
}
}
throw new Error(
`Migration ${migration.id} (${migration.name}) failed: ${err instanceof Error ? err.message : String(err)}`
)
}
}

export async function runMigrations(config: MigrationConfig): Promise<void> {
// Init DB
const client = new Client({
connectionString: config.databaseUrl,
ssl: config.ssl,
connectionTimeoutMillis: 10_000,
})

const tableName = `"${config.schema}"."migrations"`
const migrations = getMigrations(config.schema)

try {
// Run migrations
await client.connect()

// Ensure schema exists, not doing it via migration to not break current migration checksums
await client.query(`CREATE SCHEMA IF NOT EXISTS ${config.schema};`)
if (migrations.length === 0) {
config.logger?.info(`No migrations found, skipping`)
return
}

config.logger?.info('Running migrations')

await connectAndMigrate(client, path.resolve(__dirname, './migrations'), config)
await client.query(`SELECT pg_advisory_lock(${MIGRATION_LOCK_ID})`)

try {
await client.query(`CREATE SCHEMA IF NOT EXISTS "${config.schema}"`)

await client.query(`
CREATE TABLE IF NOT EXISTS ${tableName} (
id integer PRIMARY KEY,
name varchar(100) UNIQUE NOT NULL,
executed_at timestamp DEFAULT current_timestamp
)
`)

// Remove legacy hash column from pg-node-migrations (checksums no longer validated)
await client.query(`ALTER TABLE ${tableName} DROP COLUMN IF EXISTS hash`)

const { rows: applied } = await client.query<{ id: number }>(`SELECT id FROM ${tableName}`)
const appliedIds = new Set(applied.map((r) => r.id))

for (const migration of migrations) {
if (appliedIds.has(migration.id)) continue

config.logger?.info(`Applying migration ${migration.id}: ${migration.name}`)
await applyMigration(client, tableName, migration)
}
} finally {
await client.query(`SELECT pg_advisory_unlock(${MIGRATION_LOCK_ID})`)
}

config.logger?.info('Finished migrations')
} catch (err) {
config.logger?.error(err, 'Error running migrations')
throw err
} finally {
await client.end()
config.logger?.info('Finished migrations')
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
create table if not exists "stripe"."products" (
create table if not exists "{{schema}}"."products" (
"id" text primary key,
"object" text,
"active" boolean,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
create table if not exists "stripe"."customers" (
create table if not exists "{{schema}}"."customers" (
"id" text primary key,
"object" text,
"address" jsonb,
Expand Down
25 changes: 16 additions & 9 deletions packages/sync-engine/src/database/migrations/0003_prices.sql
Original file line number Diff line number Diff line change
@@ -1,34 +1,41 @@
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'pricing_type') THEN
create type "stripe"."pricing_type" as enum ('one_time', 'recurring');
IF NOT EXISTS (
SELECT 1 FROM pg_type t
JOIN pg_namespace n ON t.typnamespace = n.oid
WHERE t.typname = 'pricing_type' AND n.nspname = '{{schema}}'
) THEN
create type "{{schema}}"."pricing_type" as enum ('one_time', 'recurring');
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'pricing_tiers') THEN
create type "stripe"."pricing_tiers" as enum ('graduated', 'volume');
IF NOT EXISTS (
SELECT 1 FROM pg_type t
JOIN pg_namespace n ON t.typnamespace = n.oid
WHERE t.typname = 'pricing_tiers' AND n.nspname = '{{schema}}'
) THEN
create type "{{schema}}"."pricing_tiers" as enum ('graduated', 'volume');
END IF;
--more types here...
END
$$;


create table if not exists "stripe"."prices" (
create table if not exists "{{schema}}"."prices" (
"id" text primary key,
"object" text,
"active" boolean,
"currency" text,
"metadata" jsonb,
"nickname" text,
"recurring" jsonb,
"type" stripe.pricing_type,
"type" "{{schema}}"."pricing_type",
"unit_amount" integer,
"billing_scheme" text,
"created" integer,
"livemode" boolean,
"lookup_key" text,
"tiers_mode" stripe.pricing_tiers,
"tiers_mode" "{{schema}}"."pricing_tiers",
"transform_quantity" jsonb,
"unit_amount_decimal" text,

"product" text references stripe.products
"product" text references "{{schema}}"."products"
);

Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@

DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'subscription_status') THEN
create type "stripe"."subscription_status" as enum (
IF NOT EXISTS (
SELECT 1 FROM pg_type t
JOIN pg_namespace n ON t.typnamespace = n.oid
WHERE t.typname = 'subscription_status' AND n.nspname = '{{schema}}'
) THEN
create type "{{schema}}"."subscription_status" as enum (
'trialing',
'active',
'canceled',
Expand All @@ -15,7 +19,7 @@ BEGIN
END
$$;

create table if not exists "stripe"."subscriptions" (
create table if not exists "{{schema}}"."subscriptions" (
"id" text primary key,
"object" text,
"cancel_at_period_end" boolean,
Expand All @@ -26,7 +30,7 @@ create table if not exists "stripe"."subscriptions" (
"metadata" jsonb,
"pending_setup_intent" text,
"pending_update" jsonb,
"status" "stripe"."subscription_status",
"status" "{{schema}}"."subscription_status",
"application_fee_percent" double precision,
"billing_cycle_anchor" integer,
"billing_thresholds" jsonb,
Expand All @@ -49,7 +53,7 @@ create table if not exists "stripe"."subscriptions" (
"trial_start" jsonb,

"schedule" text,
"customer" text references "stripe"."customers",
"customer" text references "{{schema}}"."customers",
"latest_invoice" text, -- not yet joined
"plan" text -- not yet joined
);
Expand Down
16 changes: 10 additions & 6 deletions packages/sync-engine/src/database/migrations/0005_invoices.sql
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@

DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'invoice_status') THEN
create type "stripe"."invoice_status" as enum ('draft', 'open', 'paid', 'uncollectible', 'void');
IF NOT EXISTS (
SELECT 1 FROM pg_type t
JOIN pg_namespace n ON t.typnamespace = n.oid
WHERE t.typname = 'invoice_status' AND n.nspname = '{{schema}}'
) THEN
create type "{{schema}}"."invoice_status" as enum ('draft', 'open', 'paid', 'uncollectible', 'void');
END IF;
END
$$;


create table if not exists "stripe"."invoices" (
create table if not exists "{{schema}}"."invoices" (
"id" text primary key,
"object" text,
"auto_advance" boolean,
Expand All @@ -20,7 +24,7 @@ create table if not exists "stripe"."invoices" (
"metadata" jsonb,
"period_end" integer,
"period_start" integer,
"status" "stripe"."invoice_status",
"status" "{{schema}}"."invoice_status",
"total" bigint,
"account_country" text,
"account_name" text,
Expand Down Expand Up @@ -67,8 +71,8 @@ create table if not exists "stripe"."invoices" (
"transfer_data" jsonb,
"webhooks_delivered_at" integer,

"customer" text references "stripe"."customers",
"subscription" text references "stripe"."subscriptions",
"customer" text references "{{schema}}"."customers",
"subscription" text references "{{schema}}"."subscriptions",
"payment_intent" text, -- not yet implemented
"default_payment_method" text, -- not yet implemented
"default_source" text, -- not yet implemented
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

create table if not exists "stripe".charges (
create table if not exists "{{schema}}"."charges" (
id text primary key,
object text,
card jsonb,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
create table if not exists "stripe".coupons (
create table if not exists "{{schema}}"."coupons" (
id text primary key,
object text,
name text,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
create table if not exists "stripe".disputes (
create table if not exists "{{schema}}"."disputes" (
id text primary key,
object text,
amount bigint,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
create table if not exists "stripe".events (
create table if not exists "{{schema}}"."events" (
id text primary key,
object text,
data jsonb,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
create table if not exists "stripe".payouts (
create table if not exists "{{schema}}"."payouts" (
id text primary key,
object text,
date text,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
create table if not exists "stripe"."plans" (
create table if not exists "{{schema}}"."plans" (
id text primary key,
object text,
name text,
Expand Down
Loading