diff --git a/apps/server/src/cloud/ManagedEndpointRuntime.test.ts b/apps/server/src/cloud/ManagedEndpointRuntime.test.ts index e0d5924fcc2..9d0b49f8693 100644 --- a/apps/server/src/cloud/ManagedEndpointRuntime.test.ts +++ b/apps/server/src/cloud/ManagedEndpointRuntime.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from "@effect/vitest"; +import { assert, describe, expect, it } from "@effect/vitest"; import { vi } from "vite-plus/test"; import * as Deferred from "effect/Deferred"; import * as Effect from "effect/Effect"; @@ -193,6 +193,47 @@ describe("CloudManagedEndpointRuntime", () => { }), ); + it.effect("deduplicates Cloudflare connector configs with absent optional tunnel fields", () => + Effect.gen(function* () { + const spawned: Array = []; + const killed: Array = []; + const spawner = ChildProcessSpawner.make(() => + Effect.gen(function* () { + const pid = 250 + spawned.length; + spawned.push(pid); + const handle = makeHandle({ + pid, + onKill: () => { + killed.push(pid); + }, + }); + yield* Effect.addFinalizer(() => handle.kill().pipe(Effect.ignore)); + return handle; + }), + ); + const runtime = yield* buildCloudManagedEndpointRuntime(spawner); + + const first = yield* runtime.applyConfig({ + providerKind: "cloudflare_tunnel", + connectorToken: "token", + }); + const second = yield* runtime.applyConfig({ + providerKind: "cloudflare_tunnel", + connectorToken: "token", + }); + yield* runtime.applyConfig(null); + + assert.deepStrictEqual(spawned, [250]); + assert.deepStrictEqual(killed, [250]); + assert.deepStrictEqual(first, { + status: "running", + providerKind: "cloudflare_tunnel", + pid: 250, + }); + assert.deepStrictEqual(second, first); + }), + ); + it.effect("restarts the connector when the active process has exited", () => Effect.gen(function* () { const spawned: Array = []; diff --git a/apps/server/src/cloud/ManagedEndpointRuntime.ts b/apps/server/src/cloud/ManagedEndpointRuntime.ts index a1d7112a929..b27df8d3b2e 100644 --- a/apps/server/src/cloud/ManagedEndpointRuntime.ts +++ b/apps/server/src/cloud/ManagedEndpointRuntime.ts @@ -1,4 +1,7 @@ -import type { RelayManagedEndpointRuntimeConfig } from "@t3tools/contracts/relay"; +import { + RelayManagedEndpointProviderKind, + type RelayManagedEndpointRuntimeConfig, +} from "@t3tools/contracts/relay"; import * as RelayClient from "@t3tools/shared/relayClient"; import * as Context from "effect/Context"; import * as Effect from "effect/Effect"; @@ -7,6 +10,7 @@ import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as Ref from "effect/Ref"; import * as Result from "effect/Result"; +import * as Schema from "effect/Schema"; import * as Semaphore from "effect/Semaphore"; import * as Scope from "effect/Scope"; import * as Stream from "effect/Stream"; @@ -20,13 +24,22 @@ function bytesToString(bytes: Uint8Array): string { return new TextDecoder().decode(bytes); } +const RuntimeConfigKeyPayload = Schema.Struct({ + providerKind: RelayManagedEndpointProviderKind, + connectorToken: Schema.String, + tunnelId: Schema.NullOr(Schema.String), + tunnelName: Schema.NullOr(Schema.String), +}); +const RuntimeConfigKeyPayloadJson = Schema.fromJsonString(RuntimeConfigKeyPayload); +const encodeRuntimeConfigKey = Schema.encodeSync(RuntimeConfigKeyPayloadJson); + const readRuntimeConfig = Effect.gen(function* () { const secrets = yield* ServerSecretStore.ServerSecretStore; const bytes = yield* secrets.get(CLOUD_ENDPOINT_RUNTIME_CONFIG); if (Option.isNone(bytes)) { - return null; + return Option.none(); } - return Option.getOrNull(decodeRuntimeConfig(bytesToString(bytes.value))); + return decodeRuntimeConfig(bytesToString(bytes.value)); }); export type CloudManagedEndpointRuntimeStatus = @@ -76,7 +89,7 @@ export function classifyRelayClientOutput(line: string): "connected" | "warning" } function runtimeConfigKey(config: RelayManagedEndpointRuntimeConfig): string { - return JSON.stringify({ + return encodeRuntimeConfigKey({ providerKind: config.providerKind, connectorToken: config.connectorToken, tunnelId: config.tunnelId ?? null, @@ -307,11 +320,11 @@ export const make = Effect.gen(function* () { const initialConfig = yield* readRuntimeConfig.pipe( Effect.catch((cause) => Effect.logWarning("Failed to read managed endpoint runtime config", { cause }).pipe( - Effect.as(null), + Effect.as(Option.none()), ), ), ); - yield* runtime.applyConfig(initialConfig); + yield* runtime.applyConfig(Option.getOrNull(initialConfig)); yield* Effect.addFinalizer(() => runtime.applyConfig(null)); return runtime; }); diff --git a/packages/client-runtime/src/relay/managedRelayState.test.ts b/packages/client-runtime/src/relay/managedRelayState.test.ts index 49400d32aef..04eb83f1fcb 100644 --- a/packages/client-runtime/src/relay/managedRelayState.test.ts +++ b/packages/client-runtime/src/relay/managedRelayState.test.ts @@ -4,7 +4,7 @@ import type { RelayClientEnvironmentRecord, RelayEnvironmentStatusResponse, } from "@t3tools/contracts/relay"; -import { describe, expect, it } from "@effect/vitest"; +import { assert, describe, expect, it } from "@effect/vitest"; import * as Effect from "effect/Effect"; import * as Fiber from "effect/Fiber"; import * as Layer from "effect/Layer"; @@ -334,6 +334,27 @@ describe("createManagedRelayQueryManager", () => { ); }); + it("refreshes environment status atoms keyed by schema-encoded relay environments", async () => { + const getEnvironmentStatus = vi.fn(() => + Effect.succeed({ + environmentId: environment.environmentId, + endpoint: environment.endpoint, + status: "online" as const, + checkedAt: "2026-06-01T00:00:00.000Z", + }), + ); + const manager = createManager({ getEnvironmentStatus }); + setSession(); + const atom = manager.environmentStatusAtom({ accountId: "account-1", environment }); + + registry.get(atom); + await vi.waitFor(() => assert.equal(getEnvironmentStatus.mock.calls.length, 1)); + + manager.refreshEnvironmentStatus(registry, { accountId: "account-1", environment }); + await vi.waitFor(() => assert.equal(getEnvironmentStatus.mock.calls.length, 2)); + assert.equal(readManagedRelaySnapshotState(registry.get(atom)).data?.status, "online"); + }); + it("rejects status responses for a different environment", async () => { const mismatchedStatus = { environmentId: EnvironmentId.make("environment-2"), diff --git a/packages/client-runtime/src/relay/managedRelayState.ts b/packages/client-runtime/src/relay/managedRelayState.ts index ec6a0710dd1..1f27aa12bb5 100644 --- a/packages/client-runtime/src/relay/managedRelayState.ts +++ b/packages/client-runtime/src/relay/managedRelayState.ts @@ -1,8 +1,9 @@ import type { - RelayClientEnvironmentRecord, + RelayClientEnvironmentRecord as RelayClientEnvironmentRecordType, RelayEnvironmentStatusResponse, } from "@t3tools/contracts/relay"; import { + RelayClientEnvironmentRecord, RelayEnvironmentConnectScope, RelayEnvironmentStatusScope, } from "@t3tools/contracts/relay"; @@ -12,6 +13,7 @@ import * as Clock from "effect/Clock"; import * as Data from "effect/Data"; import * as Effect from "effect/Effect"; import * as Option from "effect/Option"; +import * as Schema from "effect/Schema"; import * as Stream from "effect/Stream"; import { AsyncResult, Atom, AtomRegistry } from "effect/unstable/reactivity"; @@ -21,6 +23,14 @@ import * as ManagedRelay from "./managedRelay.ts"; const DEFAULT_STALE_TIME_MS = 15_000; const DEFAULT_IDLE_TTL_MS = 5 * 60_000; const CLERK_TOKEN_EXPIRY_SKEW_MS = 5_000; +const ManagedRelayStatusKeyPayload = Schema.Struct({ + accountId: Schema.String, + environment: RelayClientEnvironmentRecord, +}); +type ManagedRelayStatusKeyPayload = typeof ManagedRelayStatusKeyPayload.Type; +const ManagedRelayStatusKeyPayloadJson = Schema.fromJsonString(ManagedRelayStatusKeyPayload); +const encodeManagedRelayStatusKey = Schema.encodeSync(ManagedRelayStatusKeyPayloadJson); +const decodeManagedRelayStatusKey = Schema.decodeUnknownSync(ManagedRelayStatusKeyPayloadJson); export interface ManagedRelaySession { readonly accountId: string; @@ -235,24 +245,18 @@ function requireClerkToken( function statusKey(input: { readonly accountId: string; - readonly environment: RelayClientEnvironmentRecord; + readonly environment: RelayClientEnvironmentRecordType; }): string { - return JSON.stringify(input); + return encodeManagedRelayStatusKey(input); } -function parseStatusKey(key: string): { - readonly accountId: string; - readonly environment: RelayClientEnvironmentRecord; -} { - return JSON.parse(key) as { - readonly accountId: string; - readonly environment: RelayClientEnvironmentRecord; - }; +function parseStatusKey(key: string): ManagedRelayStatusKeyPayload { + return decodeManagedRelayStatusKey(key); } function endpointMatches( - left: RelayClientEnvironmentRecord["endpoint"], - right: RelayClientEnvironmentRecord["endpoint"], + left: RelayClientEnvironmentRecordType["endpoint"], + right: RelayClientEnvironmentRecordType["endpoint"], ): boolean { return ( left.httpBaseUrl === right.httpBaseUrl && @@ -262,7 +266,7 @@ function endpointMatches( } function validateEnvironmentStatus( - environment: RelayClientEnvironmentRecord, + environment: RelayClientEnvironmentRecordType, status: RelayEnvironmentStatusResponse, ): Effect.Effect { if (status.environmentId !== environment.environmentId) { @@ -429,7 +433,7 @@ export function createManagedRelayQueryManager( devicesAtom, environmentStatusAtom: (input: { readonly accountId: string; - readonly environment: RelayClientEnvironmentRecord; + readonly environment: RelayClientEnvironmentRecordType; }) => environmentStatusAtom(statusKey(input)), refreshEnvironments(registry: AtomRegistry.AtomRegistry, accountId: string): void { registry.refresh(environmentsAtom(accountId)); @@ -441,7 +445,7 @@ export function createManagedRelayQueryManager( registry: AtomRegistry.AtomRegistry, input: { readonly accountId: string; - readonly environment: RelayClientEnvironmentRecord; + readonly environment: RelayClientEnvironmentRecordType; }, ): void { registry.refresh(environmentStatusAtom(statusKey(input)));