Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
668 changes: 668 additions & 0 deletions .context/k8s-ha-report.md

Large diffs are not rendered by default.

107 changes: 58 additions & 49 deletions apps/mesh/src/database/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,63 +153,72 @@ async function runPluginMigrations(db: Kysely<Database>): Promise<number> {
return 0;
}

// Note: plugin_migrations table and old record migration are handled
// in runKyselyMigrations() before Kysely's migrator runs

// Get already executed migrations
const executed = await sql<{ plugin_id: string; name: string }>`
SELECT plugin_id, name FROM plugin_migrations
`.execute(db);
const executedSet = new Set(
executed.rows.map((r) => `${r.plugin_id}/${r.name}`),
);

// Group migrations by plugin
const migrationsByPlugin = new Map<
string,
Array<{
name: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
up: (db: any) => Promise<void>;
}>
>();

for (const { pluginId, migration } of pluginMigrations) {
if (!migrationsByPlugin.has(pluginId)) {
migrationsByPlugin.set(pluginId, []);
// Use a transaction-scoped advisory lock to prevent concurrent execution.
// db.transaction() both pins the connection AND wraps in a transaction,
// so pg_advisory_xact_lock holds until the transaction commits.
// Lock ID 73649281 is a fixed constant for plugin migrations.
return await db.transaction().execute(async (trx) => {
await sql`SELECT pg_advisory_xact_lock(73649281)`.execute(trx);

// Note: plugin_migrations table and old record migration are handled
// in runKyselyMigrations() before Kysely's migrator runs

// Get already executed migrations
const executed = await sql<{ plugin_id: string; name: string }>`
SELECT plugin_id, name FROM plugin_migrations
`.execute(trx);
const executedSet = new Set(
executed.rows.map((r) => `${r.plugin_id}/${r.name}`),
);

// Group migrations by plugin
const migrationsByPlugin = new Map<
string,
Array<{
name: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
up: (db: any) => Promise<void>;
}>
>();

for (const { pluginId, migration } of pluginMigrations) {
if (!migrationsByPlugin.has(pluginId)) {
migrationsByPlugin.set(pluginId, []);
}
migrationsByPlugin.get(pluginId)!.push({
name: migration.name,
up: migration.up,
});
}
migrationsByPlugin.get(pluginId)!.push({
name: migration.name,
up: migration.up,
});
}

// Run pending migrations for each plugin
let totalPending = 0;
// Run pending migrations for each plugin
let totalPending = 0;

for (const [pluginId, pluginMigrationList] of migrationsByPlugin) {
// Sort by name to ensure consistent order
pluginMigrationList.sort((a, b) => a.name.localeCompare(b.name));
for (const [pluginId, pluginMigrationList] of migrationsByPlugin) {
// Sort by name to ensure consistent order
pluginMigrationList.sort((a, b) => a.name.localeCompare(b.name));

for (const migration of pluginMigrationList) {
const key = `${pluginId}/${migration.name}`;
if (executedSet.has(key)) {
continue; // Already executed
}
for (const migration of pluginMigrationList) {
const key = `${pluginId}/${migration.name}`;
if (executedSet.has(key)) {
continue; // Already executed
}

totalPending++;
await migration.up(db);
totalPending++;
await migration.up(trx);

// Record as executed
const timestamp = new Date().toISOString();
await sql`
INSERT INTO plugin_migrations (plugin_id, name, timestamp)
VALUES (${pluginId}, ${migration.name}, ${timestamp})
`.execute(db);
// Record as executed
const timestamp = new Date().toISOString();
await sql`
INSERT INTO plugin_migrations (plugin_id, name, timestamp)
VALUES (${pluginId}, ${migration.name}, ${timestamp})
`.execute(trx);
}
}
}

return totalPending;
return totalPending;
});
// Advisory lock is automatically released when the connection returns to pool.
}

// ============================================================================
Expand Down
91 changes: 91 additions & 0 deletions apps/mesh/src/event-bus/nats-notify.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { describe, expect, mock, test } from "bun:test";
import type { NatsConnection, Subscription } from "nats";
import { NatsNotifyStrategy } from "./nats-notify";

function createMockConnection(): {
nc: NatsConnection;
subs: Subscription[];
} {
const subs: Subscription[] = [];
const nc = {
subscribe: mock((_subject: string) => {
let resolveIterator: (() => void) | null = null;
const sub: Subscription = {
unsubscribe: mock(() => {
resolveIterator?.();
}),
drain: mock(() => Promise.resolve()),
isClosed: false,
[Symbol.asyncIterator]: () => ({
next: () =>
new Promise<IteratorResult<unknown>>((resolve) => {
resolveIterator = () => resolve({ done: true, value: undefined });
}),
return: () => Promise.resolve({ done: true, value: undefined }),
throw: () => Promise.resolve({ done: true, value: undefined }),
}),
} as unknown as Subscription;
subs.push(sub);
return sub;
}),
publish: mock(() => {}),
isClosed: () => false,
isDraining: () => false,
} as unknown as NatsConnection;

return { nc, subs };
}

describe("NatsNotifyStrategy", () => {
test("start() creates subscription", async () => {
const { nc } = createMockConnection();
const strategy = new NatsNotifyStrategy({ getConnection: () => nc });

await strategy.start(() => {});

expect(nc.subscribe).toHaveBeenCalledTimes(1);
});

test("start() re-subscribes when called again (reconnect scenario)", async () => {
const { nc, subs } = createMockConnection();
const strategy = new NatsNotifyStrategy({ getConnection: () => nc });

await strategy.start(() => {});
expect(nc.subscribe).toHaveBeenCalledTimes(1);

// Simulate reconnect: start() is called again
await strategy.start();
expect(nc.subscribe).toHaveBeenCalledTimes(2);
// Old subscription should have been unsubscribed
expect(subs[0]!.unsubscribe).toHaveBeenCalledTimes(1);
});

test("stop() cleans up subscription", async () => {
const { nc, subs } = createMockConnection();
const strategy = new NatsNotifyStrategy({ getConnection: () => nc });

await strategy.start(() => {});
await strategy.stop();

expect(subs[0]!.unsubscribe).toHaveBeenCalledTimes(1);
});

test("notify() publishes to correct subject", async () => {
const { nc } = createMockConnection();
const strategy = new NatsNotifyStrategy({ getConnection: () => nc });

await strategy.notify("event-123");

expect(nc.publish).toHaveBeenCalledWith(
"mesh.events.notify",
expect.any(Uint8Array),
);
});

test("notify() silently succeeds when NATS is disconnected", async () => {
const strategy = new NatsNotifyStrategy({ getConnection: () => null });

// Should not throw
await strategy.notify("event-123");
});
});
12 changes: 11 additions & 1 deletion apps/mesh/src/event-bus/nats-notify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,20 @@ export class NatsNotifyStrategy implements NotifyStrategy {
constructor(private readonly options: NatsNotifyStrategyOptions) {}

async start(onNotify?: () => void): Promise<void> {
if (this.sub) return;
if (onNotify) this.onNotify = onNotify;
if (!this.onNotify) return;

// Clean up stale subscription from previous connection (reconnect scenario).
// After NATS reconnects, the old Subscription object is dead but non-null.
if (this.sub) {
try {
this.sub.unsubscribe();
} catch {
// ignore — connection may already be closed
}
this.sub = null;
}

const nc = this.options.getConnection();
if (!nc) return; // NATS not ready — polling strategy is safety net

Expand Down
4 changes: 2 additions & 2 deletions apps/mesh/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ async function gracefulShutdown(signal: string) {
console.log(`\n[shutdown] Received ${signal}, shutting down gracefully...`);

const forceExitTimer = setTimeout(() => {
console.error("[shutdown] Timed out after 55s, forcing exit.");
console.error("[shutdown] Timed out after 58s, forcing exit.");
process.exit(1);
}, 55_000);
}, 58_000);
forceExitTimer.unref?.();

let exitCode = 0;
Expand Down
10 changes: 9 additions & 1 deletion apps/mesh/src/nats/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,15 @@ function defaultConnect(opts: {
reconnect: boolean;
maxReconnectAttempts: number;
}): Promise<NatsConnection> {
return connect(opts);
return connect({
...opts,
pingInterval: 20_000,
maxPingOut: 3,
reconnectTimeWait: 1_000,
reconnectJitter: 500,
reconnectJitterTLS: 1_000,
name: "mesh-app",
});
}

function sleep(ms: number): Promise<void> {
Expand Down
4 changes: 4 additions & 0 deletions deploy/helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ spec:
readinessProbe:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.startupProbe }}
startupProbe:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.lifecycle }}
lifecycle:
{{- toYaml . | nindent 12 }}
Expand Down
4 changes: 4 additions & 0 deletions deploy/helm/templates/hpa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,8 @@ spec:
type: Utilization
averageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }}
{{- end }}
{{- with .Values.autoscaling.behavior }}
behavior:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- end }}
41 changes: 41 additions & 0 deletions deploy/helm/templates/ingress.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{{- if .Values.ingress.enabled -}}
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: {{ include "chart-deco-studio.fullname" . }}
labels:
{{- include "chart-deco-studio.labels" . | nindent 4 }}
{{- with .Values.ingress.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
{{- if .Values.ingress.className }}
ingressClassName: {{ .Values.ingress.className }}
{{- end }}
{{- if .Values.ingress.tls }}
tls:
{{- range .Values.ingress.tls }}
- hosts:
{{- range .hosts }}
- {{ . | quote }}
{{- end }}
secretName: {{ .secretName }}
{{- end }}
{{- end }}
rules:
{{- range .Values.ingress.hosts }}
- host: {{ .host | quote }}
http:
paths:
{{- range .paths }}
- path: {{ .path }}
pathType: {{ .pathType }}
backend:
service:
name: {{ include "chart-deco-studio.fullname" $ }}
port:
number: {{ $.Values.service.port }}
{{- end }}
{{- end }}
{{- end }}
13 changes: 13 additions & 0 deletions deploy/helm/templates/pdb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{{- if or .Values.autoscaling.enabled (gt (int (default 1 .Values.replicaCount)) 1) }}
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: {{ include "chart-deco-studio.fullname" . }}
labels:
{{- include "chart-deco-studio.labels" . | nindent 4 }}
spec:
maxUnavailable: 1
selector:
matchLabels:
{{- include "chart-deco-studio.selectorLabels" . | nindent 6 }}
{{- end }}
Loading
Loading