Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 52 additions & 17 deletions workspace/data-proxy/src/proxy-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { randomUUID } from "node:crypto";
import { constants, type DataProxy } from "@seda-protocol/data-proxy-sdk";
import { tryAsync } from "@seda-protocol/utils";
import { Elysia } from "elysia";
import { Maybe } from "true-myth";
import { Maybe, Result } from "true-myth";
import { type Config, getHttpMethods } from "./config-parser";
import { DEFAULT_PROXY_ROUTE_GROUP, JSON_PATH_HEADER_KEY } from "./constants";
import logger from "./logger";
Expand Down Expand Up @@ -92,6 +92,7 @@ export function startProxyServer(

// A route can have multiple methods attach to it
const routeMethods = getHttpMethods(route.method);
const cache = new Map<string, Response>();
for (const routeMethod of routeMethods) {
app.route(
routeMethod,
Expand All @@ -111,6 +112,7 @@ export function startProxyServer(
const requestBody = Maybe.of(body as string | undefined);

// Verification with the SEDA chain that the overlay node is eligible
const upstreamHeaders = new Headers();
if (!serverOptions.disableProof) {
requestLogger.debug("Verifying proof");

Expand All @@ -134,6 +136,10 @@ export function startProxyServer(
requestLogger.error(message);
return createErrorResponse(message, 400);
}
upstreamHeaders.append(
constants.PROOF_HEADER_KEY,
proofHeader.value,
);

const decodedProof = dataProxy.decodeProof(proofHeader.value);

Expand Down Expand Up @@ -188,8 +194,6 @@ export function startProxyServer(
}
upstreamUrl = upstreamUrlResult.value.toString();

const upstreamHeaders = new Headers();

// Redirect all headers given by the requester
for (const [key, value] of Object.entries(headers)) {
if (!value || key === constants.PROOF_HEADER_KEY) {
Expand All @@ -213,26 +217,57 @@ export function startProxyServer(
{ headers: upstreamHeaders, body, upstreamUrl },
);

const upstreamResponse = await tryAsync(async () =>
fetch(upstreamUrl, {
method: routeMethod,
headers: upstreamHeaders,
body: body as string,
}),
);
let upstreamResponse: Response;
const cached = cache.get(upstreamUrl);

if (upstreamResponse.isErr) {
const message = `Proxying URL ${route.path} failed: ${upstreamResponse.error}`;
requestLogger.error(message, { error: upstreamResponse.error });
return createErrorResponse(message, 500);
const getUpstream = async () => {
const methodAllowsBody =
routeMethod !== "GET" && routeMethod !== "HEAD";
if (!methodAllowsBody) {
upstreamHeaders.delete("content-length");
upstreamHeaders.delete("transfer-encoding");
upstreamHeaders.delete("content-type");
}

const response = await tryAsync(async () =>
fetch(upstreamUrl, {
method: routeMethod,
headers: upstreamHeaders,
body: body as string,
}),
);

if (response.isErr) {
const message = `Proxying URL ${route.path} failed: ${response.error}`;
requestLogger.error(message, { error: response.error });
return createErrorResponse(message, 500);
}
const res = response.value;

cache.set(upstreamUrl, res.clone());
return res;
};

if (cached) {
upstreamResponse = cached.clone();

// send the request anyways to the background and update the cache
// background process this
getUpstream().catch((err) => {
requestLogger.error("Background cache update failed", {
error: err,
});
});
} else {
upstreamResponse = await getUpstream();
}

requestLogger.debug("Received upstream response", {
headers: upstreamResponse.value.headers,
headers: upstreamResponse.headers,
});

const upstreamTextResponse = await tryAsync(
async () => await upstreamResponse.value.text(),
async () => await upstreamResponse.text(),
);

if (upstreamTextResponse.isErr) {
Expand Down Expand Up @@ -316,7 +351,7 @@ export function startProxyServer(
// Forward all headers that are configured in the config.json
for (const forwardHeaderKey of route.forwardResponseHeaders) {
const forwardHeaderValue =
upstreamResponse.value.headers.get(forwardHeaderKey);
upstreamResponse.headers.get(forwardHeaderKey);

if (forwardHeaderValue) {
responseHeaders.append(forwardHeaderKey, forwardHeaderValue);
Expand Down