Skip to content
Draft
17 changes: 17 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,26 @@ POSTHOG_PROJECT_KEY=
# DEPOT_TOKEN=<Depot org token>
# DEV_OTEL_EXPORTER_OTLP_ENDPOINT="http://0.0.0.0:4318"
# These are needed for the object store (for handling large payloads/outputs)
# Default provider (backward compatible - no protocol prefix)
# OBJECT_STORE_BASE_URL="https://{bucket}.{accountId}.r2.cloudflarestorage.com"
# OBJECT_STORE_ACCESS_KEY_ID=
# OBJECT_STORE_SECRET_ACCESS_KEY=
# OBJECT_STORE_REGION=auto
# OBJECT_STORE_SERVICE=s3
# OBJECT_STORE_DEFAULT_PROTOCOL=s3 # Optional: protocol to use for new uploads (e.g., "s3", "r2")
#
# Named providers (protocol-prefixed data) - optional for multi-provider support
# OBJECT_STORE_S3_BASE_URL=https://s3.amazonaws.com
# OBJECT_STORE_S3_ACCESS_KEY_ID=
# OBJECT_STORE_S3_SECRET_ACCESS_KEY=
# OBJECT_STORE_S3_REGION=us-east-1
# OBJECT_STORE_S3_SERVICE=s3
#
# OBJECT_STORE_R2_BASE_URL=https://{bucket}.{accountId}.r2.cloudflarestorage.com
# OBJECT_STORE_R2_ACCESS_KEY_ID=
# OBJECT_STORE_R2_SECRET_ACCESS_KEY=
# OBJECT_STORE_R2_REGION=auto
# OBJECT_STORE_R2_SERVICE=s3
# CHECKPOINT_THRESHOLD_IN_MS=10000

# These control the server-side internal telemetry
Expand Down
6 changes: 6 additions & 0 deletions .server-changes/multi-provider-object-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Multi-provider object storage with protocol-based routing for zero-downtime migration
6 changes: 6 additions & 0 deletions .server-changes/object-store-iam-auth.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add IAM role-based auth support for object stores (no access keys required).
7 changes: 7 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,18 @@ const EnvironmentSchema = z
.default(60 * 1000 * 15), // 15 minutes

OBJECT_STORE_BASE_URL: z.string().optional(),
OBJECT_STORE_BUCKET: z.string().optional(),
OBJECT_STORE_ACCESS_KEY_ID: z.string().optional(),
OBJECT_STORE_SECRET_ACCESS_KEY: z.string().optional(),
OBJECT_STORE_REGION: z.string().optional(),
OBJECT_STORE_SERVICE: z.string().default("s3"),

// Protocol to use for new uploads (e.g., "s3", "r2"). Data without protocol uses default provider above.
// If specified, you must configure the corresponding provider using OBJECT_STORE_{PROTOCOL}_* env vars.
// Example: OBJECT_STORE_DEFAULT_PROTOCOL=s3 requires OBJECT_STORE_S3_BASE_URL, OBJECT_STORE_S3_ACCESS_KEY_ID, etc.
// Enables zero-downtime migration between providers (old data keeps working, new data uses new provider).
OBJECT_STORE_DEFAULT_PROTOCOL: z.string().regex(/^[a-z0-9]+$/).optional(),

ARTIFACTS_OBJECT_STORE_BUCKET: z.string().optional(),
ARTIFACTS_OBJECT_STORE_BASE_URL: z.string().optional(),
ARTIFACTS_OBJECT_STORE_ACCESS_KEY_ID: z.string().optional(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { $replica, prisma } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { generatePresignedUrl } from "~/v3/r2.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
import { tracer } from "~/v3/tracer.server";
import { startSpanWithEnv } from "~/v3/tracing.server";

Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.packets.$.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { generatePresignedUrl } from "~/v3/r2.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";

const ParamsSchema = z.object({
"*": z.string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { basename } from "node:path";
import { z } from "zod";
import { prisma } from "~/db.server";
import { requireUserId } from "~/services/session.server";
import { generatePresignedRequest } from "~/v3/r2.server";
import { generatePresignedRequest } from "~/v3/objectStore.server";

const ParamSchema = z.object({
environmentId: z.string(),
Expand Down
28 changes: 17 additions & 11 deletions apps/webapp/app/runEngine/concerns/batchPayloads.server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
import { type IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
import { env } from "~/env.server";
import { startActiveSpan } from "~/v3/tracer.server";
import { uploadPacketToObjectStore, r2 } from "~/v3/r2.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { hasObjectStoreClient, uploadPacketToObjectStore } from "~/v3/objectStore.server";
import { startActiveSpan } from "~/v3/tracer.server";

export type BatchPayloadProcessResult = {
/** The processed payload - either the original or an R2 path */
Expand Down Expand Up @@ -31,7 +31,7 @@ export class BatchPayloadProcessor {
* If not available, large payloads will be stored inline (which may fail for very large payloads).
*/
isObjectStoreAvailable(): boolean {
return r2 !== undefined && env.OBJECT_STORE_BASE_URL !== undefined;
return hasObjectStoreClient();
}

/**
Expand Down Expand Up @@ -103,11 +103,17 @@ export class BatchPayloadProcessor {
};
}

// Upload to R2
// Upload to object store
const filename = `batch_${batchId}/item_${itemIndex}/payload.json`;

const [uploadError] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment)
const [uploadError, uploadedFilename] = await tryCatch(
uploadPacketToObjectStore(
filename,
packet.data,
packet.dataType,
environment,
env.OBJECT_STORE_DEFAULT_PROTOCOL
)
);

if (uploadError) {
Expand All @@ -125,18 +131,18 @@ export class BatchPayloadProcessor {
);
}

logger.debug("Batch item payload offloaded to R2", {
logger.debug("Batch item payload offloaded to object store", {
batchId,
itemIndex,
filename,
filename: uploadedFilename,
size,
});

span.setAttribute("wasOffloaded", true);
span.setAttribute("offloadPath", filename);
span.setAttribute("offloadPath", uploadedFilename);

return {
payload: filename,
payload: uploadedFilename!,
payloadType: "application/store",
wasOffloaded: true,
size,
Expand Down
8 changes: 4 additions & 4 deletions apps/webapp/app/runEngine/concerns/payloads.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/
import { PayloadProcessor, TriggerTaskRequest } from "../types";
import { env } from "~/env.server";
import { startActiveSpan } from "~/v3/tracer.server";
import { uploadPacketToObjectStore } from "~/v3/r2.server";
import { uploadPacketToObjectStore } from "~/v3/objectStore.server";
import { ServiceValidationError } from "~/v3/services/common.server";

export class DefaultPayloadProcessor implements PayloadProcessor {
Expand Down Expand Up @@ -31,16 +31,16 @@ export class DefaultPayloadProcessor implements PayloadProcessor {

const filename = `${request.friendlyId}/payload.json`;

const [uploadError] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment)
const [uploadError, uploadedFilename] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment, env.OBJECT_STORE_DEFAULT_PROTOCOL)
);

if (uploadError) {
throw new ServiceValidationError("Failed to upload large payload to object store", 500); // This is retryable
}

return {
data: filename,
data: uploadedFilename!,
dataType: "application/store",
};
});
Expand Down
6 changes: 3 additions & 3 deletions apps/webapp/app/runEngine/services/batchTrigger.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { env } from "~/env.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { batchTriggerWorker } from "~/v3/batchTriggerWorker.server";
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server";
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/objectStore.server";
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
import { TriggerTaskService } from "../../v3/services/triggerTask.server";
import { startActiveSpan } from "../../v3/tracer.server";
Expand Down Expand Up @@ -716,10 +716,10 @@ export class RunEngineBatchTriggerService extends WithRunEngine {

const filename = `${pathPrefix}/payload.json`;

await uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment);
const uploadedFilename = await uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment);

return {
data: filename,
data: uploadedFilename,
dataType: "application/store",
};
});
Expand Down
Loading
Loading