Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 71 additions & 51 deletions apps/server/src/provider/Layers/ProviderRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ import * as Path from "effect/Path";
import * as PubSub from "effect/PubSub";
import * as Ref from "effect/Ref";
import * as Stream from "effect/Stream";
import * as Duration from "effect/Duration";
import * as Option from "effect/Option";
import * as Semaphore from "effect/Semaphore";

import { ServerConfig } from "../../config.ts";
import { ProviderInstanceRegistry } from "../Services/ProviderInstanceRegistry.ts";
import { ProviderRegistry, type ProviderRegistryShape } from "../Services/ProviderRegistry.ts";
import {
hydrateCachedProvider,
isCachedProviderCorrelated,
orderProviderSnapshots,
readProviderStatusCache,
resolveProviderStatusCachePath,
Expand All @@ -69,6 +70,8 @@ const loadProviders = (
},
);

const BOOT_SNAPSHOT_FALLBACK_BUDGET = Duration.millis(100);

const makeManualProviderMaintenanceCapabilities = (provider: ProviderDriverKind) =>
makeManualOnlyProviderMaintenanceCapabilities({
provider,
Expand Down Expand Up @@ -184,6 +187,21 @@ const buildSnapshotSource = (instance: ProviderInstance): ProviderSnapshotSource
streamChanges: instance.snapshot.streamChanges,
});

const buildPendingSnapshot = (instance: ProviderInstance): ServerProvider => ({
instanceId: instance.instanceId,
driver: instance.driverKind,
displayName: instance.displayName,
enabled: instance.enabled,
installed: false,
version: null,
status: "pending",
auth: { status: "unknown" },
checkedAt: "1970-01-01T00:00:00.000Z",
models: [],
slashCommands: [],
skills: [],
});

export const ProviderRegistryLive = Layer.effect(
ProviderRegistry,
Effect.gen(function* () {
Expand All @@ -204,57 +222,69 @@ export const ProviderRegistryLive = Layer.effect(
// Instances added post-boot skip this path; their first entry in
// `providersRef` comes from the reactive `syncLiveSources` pass
// below.
//
// The boot fallback is bounded so slow provider probes can't block
// HTTP readiness. Fast `getSnapshot` implementations (tests, cheap
// providers) still seed state within the budget; cache fills the
// gaps for everything else.
const bootInstances = yield* instanceRegistry.listInstances;
const bootSources = bootInstances.map(buildSnapshotSource);
const fallbackProviders = yield* loadProviders(bootSources);
const fallbackByInstance = new Map<ProviderInstanceId, ServerProvider>();
for (let index = 0; index < fallbackProviders.length; index++) {
const provider = fallbackProviders[index];
const source = bootSources[index];
if (provider === undefined || source === undefined) {
continue;
}
fallbackByInstance.set(source.instanceId, provider);
}
const bootInstanceByInstanceId = new Map(
bootInstances.map((instance) => [instance.instanceId, instance] as const),
);
const fallbackProviders = yield* loadProviders(bootSources).pipe(
Effect.timeoutOption(BOOT_SNAPSHOT_FALLBACK_BUDGET),
Effect.map(Option.getOrElse((): ReadonlyArray<ServerProvider> => [])),
);
const fallbackByInstance = new Map(
fallbackProviders.map((provider) => [provider.instanceId, provider] as const),
);

const cachedProviders = yield* Effect.forEach(
const hydratedBootProviders = yield* Effect.forEach(
bootSources,
(source) =>
Effect.gen(function* () {
// One cache file per configured instance. For the default
// instance of a built-in kind the path equals `<kind>.json` —
// identical to the legacy filename. We still require the cache
// payload to carry matching instance id + driver kind; old
// identity-less payloads are discarded and the awaited refresh
// below repopulates the cache.
// identity-less payloads are discarded and the background
// refresh repopulates the cache.
const filePath = yield* resolveProviderStatusCachePath({
cacheDir: config.providerStatusCacheDir,
instanceId: source.instanceId,
}).pipe(Effect.provideService(Path.Path, path));
const fallbackProvider = fallbackByInstance.get(source.instanceId);
if (fallbackProvider === undefined) {
return undefined;
}
const orPendingSnapshot = (
provider: ServerProvider | undefined,
): ServerProvider | undefined => {
if (provider !== undefined) return provider;
const instance = bootInstanceByInstanceId.get(source.instanceId);
return instance !== undefined ? buildPendingSnapshot(instance) : undefined;
};

return yield* readProviderStatusCache(filePath).pipe(
Effect.provideService(FileSystem.FileSystem, fileSystem),
Effect.flatMap((cachedProvider) => {
Effect.flatMap((cachedProvider): Effect.Effect<ServerProvider | undefined> => {
if (cachedProvider === undefined) {
return Effect.void.pipe(Effect.as(undefined as ServerProvider | undefined));
return Effect.succeed(orPendingSnapshot(fallbackProvider));
}
const correlation = {
cachedProvider,
fallbackProvider,
} as const;
if (!isCachedProviderCorrelated(correlation)) {
if (
cachedProvider.instanceId !== source.instanceId ||
cachedProvider.driver !== source.driverKind
) {
return Effect.logWarning("provider status cache identity mismatch, ignoring", {
path: filePath,
instanceId: source.instanceId,
cachedInstanceId: cachedProvider.instanceId ?? null,
driver: source.driverKind,
cachedDriver: cachedProvider.driver ?? null,
}).pipe(Effect.as(undefined as ServerProvider | undefined));
}).pipe(Effect.map(() => orPendingSnapshot(fallbackProvider)));
}
if (fallbackProvider !== undefined) {
return Effect.succeed(hydrateCachedProvider({ cachedProvider, fallbackProvider }));
}
return Effect.succeed(hydrateCachedProvider(correlation));
return Effect.succeed(cachedProvider);
}),
);
}),
Expand All @@ -266,7 +296,7 @@ export const ProviderRegistryLive = Layer.effect(
),
),
);
const providersRef = yield* Ref.make<ReadonlyArray<ServerProvider>>(cachedProviders);
const providersRef = yield* Ref.make<ReadonlyArray<ServerProvider>>(hydratedBootProviders);
const maintenanceActionStatesRef = yield* Ref.make<
ReadonlyMap<ProviderInstanceId, { readonly update?: ServerProviderUpdateState | undefined }>
>(new Map());
Expand Down Expand Up @@ -618,26 +648,15 @@ export const ProviderRegistryLive = Layer.effect(
}),
);

// Seed `providersRef` with the boot-time fallback snapshots so
// consumers calling `getProviders` immediately after layer build see
// a populated list — even before the first `syncLiveSources` refresh
// resolves. Cached snapshots (already in `providersRef`) merge with
// these via `upsertProviders` so on-disk state wins where present
// and pending fallbacks fill the gaps.
yield* upsertProviders(fallbackProviders, { publish: false });
// Subscribe to registry mutations BEFORE running the initial sync.
// `subscribeChanges` acquires the dequeue synchronously in this
// fibre; the subscription is active the instant this `yield*`
// returns. Forking the consumer loop later cannot lose a publish
// because no publish can reach a not-yet-subscribed dequeue.
//
// (Contrast with the pre-fix code that did
// `Stream.runForEach(instanceRegistry.streamChanges, …).pipe(Effect.forkScoped)`.
// `Stream.fromPubSub` defers `PubSub.subscribe` to stream start,
// and `forkScoped` only schedules the fibre — so a reconcile that
// published between "fibre scheduled" and "fibre starts running"
// was dropped, which made any settings change that replaced an
// instance never propagate to the aggregator's `providersRef`.)
// `providersRef` already contains boot-hydrated snapshots (cache +
// pending fallbacks). Seed unavailable instances eagerly so the UI
// sees ghost/fork-only drivers immediately, without waiting for the
// background refresh.
yield* upsertProviders(yield* instanceRegistry.listUnavailable, {
persist: false,
replace: true,
publish: false,
});
// Subscribe to registry mutations BEFORE running the initial sync.
// `subscribeChanges` acquires the `PubSub.Subscription` synchronously
// in this fibre; the subscription is registered with the PubSub the
Expand All @@ -654,10 +673,11 @@ export const ProviderRegistryLive = Layer.effect(
// was dropped, which made any settings change that replaced an
// instance never propagate to the aggregator's `providersRef`.)
const instanceChanges = yield* instanceRegistry.subscribeChanges;
// Initial sync: subscribe + kick off refreshes for every instance
// present at boot. Run synchronously so consumers pulling immediately
// after the layer build see the correct aggregator state.
yield* syncLiveSources;
// Initial sync: fork scoped so provider refresh runs purely in the
// background — no startup budget is consumed. The layer returns
// immediately after forking; provider state updates when refresh
// completes.
yield* syncLiveSourcesAndContinue.pipe(Effect.forkScoped);
// React to registry mutations — instance added / removed / rebuilt.
// `Stream.fromSubscription` builds a stream over the pre-acquired
// subscription rather than subscribing on stream start, which is
Expand Down
8 changes: 7 additions & 1 deletion packages/contracts/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ export type ServerConfigIssue = typeof ServerConfigIssue.Type;

const ServerConfigIssues = Schema.Array(ServerConfigIssue);

export const ServerProviderState = Schema.Literals(["ready", "warning", "error", "disabled"]);
export const ServerProviderState = Schema.Literals([
"ready",
"warning",
"error",
"disabled",
"pending",
]);
export type ServerProviderState = typeof ServerProviderState.Type;

export const ServerProviderAuthStatus = Schema.Literals([
Expand Down
Loading