Skip to content
This repository was archived by the owner on Nov 21, 2025. It is now read-only.

Commit 9deb80e

Browse files
committed
refactor: better error handling, abort controler, hooks & store
1 parent 977981a commit 9deb80e

10 files changed

Lines changed: 370 additions & 169 deletions

File tree

packages/core/src/env/index.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ export function parseEnv<T extends "web" | "server" = "server">(
1818
// @ts-expect-error Property env doesn't exist in import.meta
1919
const envSource = type === "web" ? import.meta.env : process.env;
2020
return envSchema.parse(envSource) as any;
21-
} catch (error) {
22-
if (error instanceof ZodError) {
21+
} catch (err) {
22+
if (err instanceof ZodError) {
2323
// eslint-disable-next-line @typescript-eslint/no-unused-vars
24-
const { _errors, ...invalidEnvVars } = error.format();
24+
const { _errors, ...invalidEnvVars } = err.format();
2525
console.error(`\nMissing or invalid environment variables:\n\n ${Object.keys(invalidEnvVars).join("\n ")}\n`);
2626
process.exit(1);
2727
}
28-
throw error;
28+
throw err;
2929
}
3030
}
3131

packages/server/src/app/router.ts

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { initTRPC } from "@trpc/server";
1+
import { initTRPC, TRPCError } from "@trpc/server";
22
import { CreateFastifyContextOptions } from "@trpc/server/adapters/fastify";
33
import { observable } from "@trpc/server/observable";
44
import { z } from "zod";
@@ -14,6 +14,8 @@ export type AppContext = {
1414
res: CreateFastifyContextOptions["res"];
1515
};
1616

17+
let activeSubscriptions = 0;
18+
1719
/**
1820
* Creates and configures the main tRPC router with all API endpoints.
1921
*
@@ -74,8 +76,8 @@ export function createAppRouter() {
7476
contractAddress: z.string(),
7577
}) satisfies z.ZodType<ExplainContractInput>,
7678
)
77-
.mutation(({ ctx, input }) => {
78-
return ctx.service.explainContract(input);
79+
.mutation(async ({ ctx, input }) => {
80+
return await ctx.service.explainContract(input);
7981
}),
8082

8183
// TODO: Use protectedProcedure when we can handle cookies with websockets
@@ -87,12 +89,68 @@ export function createAppRouter() {
8789
}) satisfies z.ZodType<ExplainContractInput>,
8890
)
8991
.subscription(({ ctx, input }) => {
92+
console.log("Starting subscription", { chainId: input.chainId, contractAddress: input.contractAddress });
93+
activeSubscriptions++;
94+
console.log("Active subscriptions:", activeSubscriptions);
95+
9096
return observable<Partial<ExplainContractOutput>>((emit) => {
91-
const onCompletion = (obj: Partial<ExplainContractOutput>) => {
92-
emit.next(obj);
97+
// Track if the subscription is still active
98+
let isActive = true;
99+
let cleanupFn: (() => void) | null = null;
100+
101+
// Create a cleanup function that will be called when the client disconnects
102+
const cleanup = () => {
103+
console.log("Cleaning up subscription");
104+
isActive = false;
93105
};
94106

95-
ctx.service.explainContractStream(input, onCompletion);
107+
// Start the streaming process
108+
ctx.service
109+
.explainContractStream(input, {
110+
onCompletion: (obj: Partial<ExplainContractOutput>) => {
111+
if (isActive) emit.next(obj);
112+
},
113+
onFinish: () => {
114+
if (isActive) emit.complete();
115+
},
116+
onError: (err: Error) => {
117+
if (isActive) {
118+
console.error("Subscription error:", err);
119+
emit.error(
120+
err instanceof TRPCError
121+
? err
122+
: new TRPCError({
123+
code: "INTERNAL_SERVER_ERROR",
124+
message: err.message,
125+
}),
126+
);
127+
}
128+
},
129+
})
130+
.then((cleanup) => {
131+
// Store the cleanup function for later use
132+
cleanupFn = cleanup;
133+
})
134+
.catch((err) => {
135+
console.error("Failed to start stream:", err);
136+
if (isActive) {
137+
emit.error(
138+
err instanceof TRPCError
139+
? err
140+
: new TRPCError({
141+
code: "INTERNAL_SERVER_ERROR",
142+
message: err.message,
143+
}),
144+
);
145+
}
146+
});
147+
148+
// Return a cleanup function that will be called when the client disconnects
149+
return () => {
150+
console.log("Client disconnected");
151+
cleanup();
152+
if (cleanupFn) cleanupFn();
153+
};
96154
});
97155
}),
98156
});

packages/server/src/service/index.ts

Lines changed: 74 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
import { ExplainContractInput, ExplainContractOutput, ExplainEventInput, ExplainEventOutput } from "@core/llm/types";
1+
import {
2+
ExplainContractInput,
3+
ExplainContractOutput,
4+
ExplainEventInput,
5+
ExplainEventOutput,
6+
GetContractOutput,
7+
} from "@core/llm/types";
28
import { debug } from "@server/app/debug";
39
import { AuthService, AuthServiceOptions } from "@server/service/auth";
410
import { CacheService, CacheServiceOptions } from "@server/service/cache";
@@ -60,58 +66,102 @@ export class Service {
6066
// Create a cache key
6167
const cacheKey = `contract:${input.chainId}:${input.contractAddress}`;
6268

69+
// Try to get from cache first
70+
const cached = await this.cache.get<ExplainContractOutput>(cacheKey);
71+
if (cached) {
72+
debug("Cache hit for key:", cacheKey);
73+
return cached;
74+
}
75+
76+
// If not in cache, get from LLM
77+
debug("Cache miss for key:", cacheKey);
78+
let contractDetails: GetContractOutput;
6379
try {
64-
// Try to get from cache first
65-
const cached = await this.cache.get<ExplainContractOutput>(cacheKey);
66-
if (cached) {
67-
debug("Cache hit for key:", cacheKey);
68-
return cached;
69-
}
80+
contractDetails = await this.whatsabi.getContract(input);
81+
} catch (err) {
82+
debug("Error in getContract:", err);
83+
throw err;
84+
}
7085

71-
// If not in cache, get from LLM
72-
debug("Cache miss for key:", cacheKey);
73-
const contractDetails = await this.whatsabi.getContract(input);
86+
try {
7487
const result = await this.llm.explainContract(contractDetails);
7588

7689
// Store in cache
7790
await this.cache.set(cacheKey, result);
7891

7992
return result;
80-
} catch (error) {
81-
debug("Error in explainContract:", error);
82-
throw error;
93+
} catch (err) {
94+
debug("Error in explainContract:", err);
95+
throw err;
8396
}
8497
}
8598

86-
// TODO: experimenting
8799
async explainContractStream(
88100
input: ExplainContractInput,
89-
onCompletion: (obj: Partial<ExplainContractOutput>) => void,
90-
): Promise<void> {
101+
cb: {
102+
onCompletion: (obj: Partial<ExplainContractOutput>) => void;
103+
onFinish: () => void;
104+
onError: (err: Error) => void;
105+
},
106+
): Promise<() => void> {
107+
const { onCompletion, onFinish, onError } = cb;
91108
const cacheKey = `contract:${input.chainId}:${input.contractAddress}`;
109+
let llmCleanup: (() => void) | null = null;
92110

93111
try {
94112
const cached = await this.cache.get<ExplainContractOutput>(cacheKey);
95113
if (cached) {
96114
debug("Cache hit for key:", cacheKey);
97115
onCompletion(cached);
98-
return;
116+
onFinish();
117+
return () => {}; // No-op cleanup for cache hits
99118
}
100-
} catch (error) {
101-
debug("Error in explainContractStream:", error);
119+
} catch (err) {
120+
debug("Error in explainContractStream:", err);
121+
onError(err instanceof Error ? err : new Error(String(err)));
122+
onFinish();
123+
return () => {}; // No-op cleanup for errors
102124
}
103125

104-
const contractDetails = await this.whatsabi.getContract(input);
105-
await this.llm.explainContractStream(contractDetails, onCompletion, (obj) => {
106-
this.cache.set(cacheKey, obj);
107-
});
126+
let contractDetails: GetContractOutput;
127+
try {
128+
contractDetails = await this.whatsabi.getContract(input);
129+
} catch (err) {
130+
debug("Error in getContract:", err);
131+
onError(err instanceof Error ? err : new Error(String(err)));
132+
onFinish();
133+
return () => {}; // No-op cleanup for errors
134+
}
135+
136+
try {
137+
// Get the cleanup function from the LLM service
138+
llmCleanup = await this.llm.explainContractStream(contractDetails, onCompletion, (obj) => {
139+
this.cache.set(cacheKey, obj);
140+
onFinish();
141+
});
142+
143+
// Return a cleanup function that will call the LLM cleanup
144+
return () => {
145+
if (llmCleanup) llmCleanup();
146+
};
147+
} catch (err) {
148+
debug("Error in explainContractStream:", err);
149+
onError(err instanceof Error ? err : new Error(String(err)));
150+
onFinish();
151+
return () => {}; // No-op cleanup for errors
152+
}
108153
}
109154

110155
async explainEventStream(
111156
input: ExplainContractOutput & ExplainEventInput,
112157
onCompletion: (obj: Partial<ExplainEventOutput>) => void,
113158
): Promise<void> {
114-
await this.llm.explainEventStream(input, onCompletion);
159+
try {
160+
await this.llm.explainEventStream(input, onCompletion);
161+
} catch (err) {
162+
debug("Error in explainEventStream:", err);
163+
throw err;
164+
}
115165
}
116166

117167
// Add auth methods

packages/server/src/service/llm.ts

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ export class LLMService {
5353
input: GetContractOutput,
5454
onCompletion: (obj: Partial<ExplainContractOutput>) => void,
5555
onFinish: (obj: ExplainContractOutput) => void,
56-
): Promise<void> {
57-
await this.stream(
56+
): Promise<() => void> {
57+
return await this.stream(
5858
EXPLAIN_CONTRACT.systemPrompt,
5959
EXPLAIN_CONTRACT.outputSchema,
6060
JSON.stringify(input),
@@ -85,22 +85,42 @@ export class LLMService {
8585
input: string,
8686
onCompletion: (obj: any) => void,
8787
onFinish?: (obj: z.infer<S>) => void,
88-
): Promise<void> {
88+
): Promise<() => void> {
89+
const abortController = new AbortController();
90+
8991
try {
9092
const { partialObjectStream, object: objectPromise } = streamObject({
9193
model: this.openrouter(this.options.model),
9294
system: systemPrompt,
9395
prompt: input,
9496
schema,
97+
abortSignal: abortController.signal,
9598
});
9699

97-
for await (const obj of partialObjectStream) onCompletion(obj);
100+
// Process the stream in the background
101+
(async () => {
102+
try {
103+
for await (const obj of partialObjectStream) onCompletion(obj);
98104

99-
const object = await objectPromise;
100-
onFinish?.(object);
101-
} catch (e) {
102-
console.error(e);
103-
throw e;
105+
if (onFinish) {
106+
const object = await objectPromise;
107+
onFinish(object);
108+
}
109+
} catch (err) {
110+
if (abortController.signal.aborted) {
111+
console.log("Stream aborted");
112+
} else {
113+
console.error(err);
114+
}
115+
}
116+
})();
117+
118+
return () => {
119+
abortController.abort();
120+
};
121+
} catch (err) {
122+
console.error(err);
123+
throw err;
104124
}
105125
}
106126
}

packages/server/src/service/whatsabi.ts

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,6 @@ export class WhatsAbiService {
4949
abiLoader,
5050
followProxies: true,
5151
loadContractResult: true,
52-
onError: (error) => {
53-
console.error(error);
54-
throw error;
55-
},
5652
});
5753

5854
const sources = await result.contractResult?.getSources?.();
@@ -81,9 +77,9 @@ export class WhatsAbiService {
8177
name: result.contractResult?.name ?? undefined,
8278
sources: refinedSources,
8379
};
84-
} catch (error) {
85-
debug("Error in getContract", chainId, contractAddress, error);
86-
throw error;
80+
} catch (err) {
81+
debug("Error in getContract", chainId, contractAddress, err);
82+
throw err;
8783
}
8884
}
8985
}

0 commit comments

Comments
 (0)