Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion apps/server/src/cloud/ManagedEndpointRuntime.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { describe, expect, it } from "@effect/vitest";
import { assert, describe, expect, it } from "@effect/vitest";
import { vi } from "vite-plus/test";
import * as Deferred from "effect/Deferred";
import * as Effect from "effect/Effect";
Expand Down Expand Up @@ -193,6 +193,47 @@ describe("CloudManagedEndpointRuntime", () => {
}),
);

it.effect("deduplicates Cloudflare connector configs with absent optional tunnel fields", () =>
Effect.gen(function* () {
const spawned: Array<number> = [];
const killed: Array<number> = [];
const spawner = ChildProcessSpawner.make(() =>
Effect.gen(function* () {
const pid = 250 + spawned.length;
spawned.push(pid);
const handle = makeHandle({
pid,
onKill: () => {
killed.push(pid);
},
});
yield* Effect.addFinalizer(() => handle.kill().pipe(Effect.ignore));
return handle;
}),
);
const runtime = yield* buildCloudManagedEndpointRuntime(spawner);

const first = yield* runtime.applyConfig({
providerKind: "cloudflare_tunnel",
connectorToken: "token",
});
const second = yield* runtime.applyConfig({
providerKind: "cloudflare_tunnel",
connectorToken: "token",
});
yield* runtime.applyConfig(null);

assert.deepStrictEqual(spawned, [250]);
assert.deepStrictEqual(killed, [250]);
assert.deepStrictEqual(first, {
status: "running",
providerKind: "cloudflare_tunnel",
pid: 250,
});
assert.deepStrictEqual(second, first);
}),
);

it.effect("restarts the connector when the active process has exited", () =>
Effect.gen(function* () {
const spawned: Array<number> = [];
Expand Down
25 changes: 19 additions & 6 deletions apps/server/src/cloud/ManagedEndpointRuntime.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import type { RelayManagedEndpointRuntimeConfig } from "@t3tools/contracts/relay";
import {
RelayManagedEndpointProviderKind,
type RelayManagedEndpointRuntimeConfig,
} from "@t3tools/contracts/relay";
import * as RelayClient from "@t3tools/shared/relayClient";
import * as Context from "effect/Context";
import * as Effect from "effect/Effect";
Expand All @@ -7,6 +10,7 @@ import * as Layer from "effect/Layer";
import * as Option from "effect/Option";
import * as Ref from "effect/Ref";
import * as Result from "effect/Result";
import * as Schema from "effect/Schema";
import * as Semaphore from "effect/Semaphore";
import * as Scope from "effect/Scope";
import * as Stream from "effect/Stream";
Expand All @@ -20,13 +24,22 @@ function bytesToString(bytes: Uint8Array): string {
return new TextDecoder().decode(bytes);
}

const RuntimeConfigKeyPayload = Schema.Struct({
providerKind: RelayManagedEndpointProviderKind,
connectorToken: Schema.String,
tunnelId: Schema.NullOr(Schema.String),
tunnelName: Schema.NullOr(Schema.String),
});
const RuntimeConfigKeyPayloadJson = Schema.fromJsonString(RuntimeConfigKeyPayload);
const encodeRuntimeConfigKey = Schema.encodeSync(RuntimeConfigKeyPayloadJson);

const readRuntimeConfig = Effect.gen(function* () {
const secrets = yield* ServerSecretStore.ServerSecretStore;
const bytes = yield* secrets.get(CLOUD_ENDPOINT_RUNTIME_CONFIG);
if (Option.isNone(bytes)) {
return null;
return Option.none();
}
return Option.getOrNull(decodeRuntimeConfig(bytesToString(bytes.value)));
return decodeRuntimeConfig(bytesToString(bytes.value));
});

export type CloudManagedEndpointRuntimeStatus =
Expand Down Expand Up @@ -76,7 +89,7 @@ export function classifyRelayClientOutput(line: string): "connected" | "warning"
}

function runtimeConfigKey(config: RelayManagedEndpointRuntimeConfig): string {
return JSON.stringify({
return encodeRuntimeConfigKey({
providerKind: config.providerKind,
connectorToken: config.connectorToken,
tunnelId: config.tunnelId ?? null,
Expand Down Expand Up @@ -307,11 +320,11 @@ export const make = Effect.gen(function* () {
const initialConfig = yield* readRuntimeConfig.pipe(
Effect.catch((cause) =>
Effect.logWarning("Failed to read managed endpoint runtime config", { cause }).pipe(
Effect.as(null),
Effect.as(Option.none()),
),
),
);
yield* runtime.applyConfig(initialConfig);
yield* runtime.applyConfig(Option.getOrNull(initialConfig));
yield* Effect.addFinalizer(() => runtime.applyConfig(null));
return runtime;
});
Expand Down
23 changes: 22 additions & 1 deletion packages/client-runtime/src/relay/managedRelayState.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type {
RelayClientEnvironmentRecord,
RelayEnvironmentStatusResponse,
} from "@t3tools/contracts/relay";
import { describe, expect, it } from "@effect/vitest";
import { assert, describe, expect, it } from "@effect/vitest";
import * as Effect from "effect/Effect";
import * as Fiber from "effect/Fiber";
import * as Layer from "effect/Layer";
Expand Down Expand Up @@ -334,6 +334,27 @@ describe("createManagedRelayQueryManager", () => {
);
});

it("refreshes environment status atoms keyed by schema-encoded relay environments", async () => {
const getEnvironmentStatus = vi.fn(() =>
Effect.succeed({
environmentId: environment.environmentId,
endpoint: environment.endpoint,
status: "online" as const,
checkedAt: "2026-06-01T00:00:00.000Z",
}),
);
const manager = createManager({ getEnvironmentStatus });
setSession();
const atom = manager.environmentStatusAtom({ accountId: "account-1", environment });

registry.get(atom);
await vi.waitFor(() => assert.equal(getEnvironmentStatus.mock.calls.length, 1));

manager.refreshEnvironmentStatus(registry, { accountId: "account-1", environment });
await vi.waitFor(() => assert.equal(getEnvironmentStatus.mock.calls.length, 2));
assert.equal(readManagedRelaySnapshotState(registry.get(atom)).data?.status, "online");
});

it("rejects status responses for a different environment", async () => {
const mismatchedStatus = {
environmentId: EnvironmentId.make("environment-2"),
Expand Down
36 changes: 20 additions & 16 deletions packages/client-runtime/src/relay/managedRelayState.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type {
RelayClientEnvironmentRecord,
RelayClientEnvironmentRecord as RelayClientEnvironmentRecordType,
RelayEnvironmentStatusResponse,
} from "@t3tools/contracts/relay";
import {
RelayClientEnvironmentRecord,
RelayEnvironmentConnectScope,
RelayEnvironmentStatusScope,
} from "@t3tools/contracts/relay";
Expand All @@ -12,6 +13,7 @@ import * as Clock from "effect/Clock";
import * as Data from "effect/Data";
import * as Effect from "effect/Effect";
import * as Option from "effect/Option";
import * as Schema from "effect/Schema";
import * as Stream from "effect/Stream";
import { AsyncResult, Atom, AtomRegistry } from "effect/unstable/reactivity";

Expand All @@ -21,6 +23,14 @@ import * as ManagedRelay from "./managedRelay.ts";
const DEFAULT_STALE_TIME_MS = 15_000;
const DEFAULT_IDLE_TTL_MS = 5 * 60_000;
const CLERK_TOKEN_EXPIRY_SKEW_MS = 5_000;
const ManagedRelayStatusKeyPayload = Schema.Struct({
accountId: Schema.String,
environment: RelayClientEnvironmentRecord,
});
type ManagedRelayStatusKeyPayload = typeof ManagedRelayStatusKeyPayload.Type;
const ManagedRelayStatusKeyPayloadJson = Schema.fromJsonString(ManagedRelayStatusKeyPayload);
const encodeManagedRelayStatusKey = Schema.encodeSync(ManagedRelayStatusKeyPayloadJson);
const decodeManagedRelayStatusKey = Schema.decodeUnknownSync(ManagedRelayStatusKeyPayloadJson);

export interface ManagedRelaySession {
readonly accountId: string;
Expand Down Expand Up @@ -235,24 +245,18 @@ function requireClerkToken(

function statusKey(input: {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium relay/managedRelayState.ts:246

statusKey now calls encodeManagedRelayStatusKey, which runs Schema.encodeSync on the entire RelayClientEnvironmentRecord. An environment object with a runtime-invalid value in any field (e.g. an untrimmed label or linkedAt) throws an exception, preventing environmentStatusAtom and refreshEnvironmentStatus from even creating the atom key. Previously JSON.stringify serialized without validation, so only the fields actually used later (environmentId and endpoint) mattered. Consider narrowing the schema to only the fields consumed by parseStatusKey and the downstream query, so non-essential field validation doesn't block status lookups.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file @packages/client-runtime/src/relay/managedRelayState.ts around line 246:

`statusKey` now calls `encodeManagedRelayStatusKey`, which runs `Schema.encodeSync` on the entire `RelayClientEnvironmentRecord`. An environment object with a runtime-invalid value in any field (e.g. an untrimmed `label` or `linkedAt`) throws an exception, preventing `environmentStatusAtom` and `refreshEnvironmentStatus` from even creating the atom key. Previously `JSON.stringify` serialized without validation, so only the fields actually used later (`environmentId` and `endpoint`) mattered. Consider narrowing the schema to only the fields consumed by `parseStatusKey` and the downstream query, so non-essential field validation doesn't block status lookups.

readonly accountId: string;
readonly environment: RelayClientEnvironmentRecord;
readonly environment: RelayClientEnvironmentRecordType;
}): string {
return JSON.stringify(input);
return encodeManagedRelayStatusKey(input);
}

function parseStatusKey(key: string): {
readonly accountId: string;
readonly environment: RelayClientEnvironmentRecord;
} {
return JSON.parse(key) as {
readonly accountId: string;
readonly environment: RelayClientEnvironmentRecord;
};
function parseStatusKey(key: string): ManagedRelayStatusKeyPayload {
return decodeManagedRelayStatusKey(key);
}

function endpointMatches(
left: RelayClientEnvironmentRecord["endpoint"],
right: RelayClientEnvironmentRecord["endpoint"],
left: RelayClientEnvironmentRecordType["endpoint"],
right: RelayClientEnvironmentRecordType["endpoint"],
): boolean {
return (
left.httpBaseUrl === right.httpBaseUrl &&
Expand All @@ -262,7 +266,7 @@ function endpointMatches(
}

function validateEnvironmentStatus(
environment: RelayClientEnvironmentRecord,
environment: RelayClientEnvironmentRecordType,
status: RelayEnvironmentStatusResponse,
): Effect.Effect<RelayEnvironmentStatusResponse, ManagedRelaySnapshotError> {
if (status.environmentId !== environment.environmentId) {
Expand Down Expand Up @@ -429,7 +433,7 @@ export function createManagedRelayQueryManager(
devicesAtom,
environmentStatusAtom: (input: {
readonly accountId: string;
readonly environment: RelayClientEnvironmentRecord;
readonly environment: RelayClientEnvironmentRecordType;
}) => environmentStatusAtom(statusKey(input)),
refreshEnvironments(registry: AtomRegistry.AtomRegistry, accountId: string): void {
registry.refresh(environmentsAtom(accountId));
Expand All @@ -441,7 +445,7 @@ export function createManagedRelayQueryManager(
registry: AtomRegistry.AtomRegistry,
input: {
readonly accountId: string;
readonly environment: RelayClientEnvironmentRecord;
readonly environment: RelayClientEnvironmentRecordType;
},
): void {
registry.refresh(environmentStatusAtom(statusKey(input)));
Expand Down
Loading