Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/workerd/api/container.c++
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ jsg::Promise<void> Container::setInactivityTimeout(jsg::Lock& js, int64_t durati
return IoContext::current().awaitIo(js, req.sendIgnoringResult());
}

void Container::setEgressTcp(jsg::Lock& js, kj::String addr, jsg::Ref<Fetcher> binding) {
auto& ioctx = IoContext::current();
auto channel = binding->getSubrequestChannel(ioctx);

// Get a channel token for RPC usage, the container runtime can use this
// token later to redeem a Fetcher.
auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);

auto req = rpcClient->setEgressTcpRequest();
req.setAddr(addr);
req.setChannelToken(token);
ioctx.addTask(req.sendIgnoringResult());
}

jsg::Promise<void> Container::monitor(jsg::Lock& js) {
JSG_REQUIRE(running, Error, "monitor() cannot be called on a container that is not running.");

Expand Down
5 changes: 5 additions & 0 deletions src/workerd/api/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Container: public jsg::Object {
void signal(jsg::Lock& js, int signo);
jsg::Ref<Fetcher> getTcpPort(jsg::Lock& js, int port);
jsg::Promise<void> setInactivityTimeout(jsg::Lock& js, int64_t durationMs);
void setEgressTcp(jsg::Lock& js, kj::String addr, jsg::Ref<Fetcher> binding);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, should we call this API setEgressHttp() considering that we're going to interpret it as HTTP?

In the future when we support connect handlers then we could actually have an option to support raw TCP, but it would have to be a separate method since we need the app to tell us whether to try parsing the input as HTTP or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another method name idea: interceptOutboundHttp

Copy link
Contributor Author

@gabivlj gabivlj Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea. We could also instead have it be at a 'domain'/'hostname' level instead of IP port.


// TODO(containers): listenTcp()

Expand All @@ -73,6 +74,10 @@ class Container: public jsg::Object {
JSG_METHOD(signal);
JSG_METHOD(getTcpPort);
JSG_METHOD(setInactivityTimeout);

if (flags.getWorkerdExperimental()) {
JSG_METHOD(setEgressTcp);
}
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/io/container.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,8 @@ interface Container @0x9aaceefc06523bca {
# Note that if there is an open connection to the container, the runtime must not shutdown the container.
# If there is no activity timeout duration configured and no container connection, it's up to the runtime
# to decide when to signal the container to exit.

setEgressTcp @8 (addr :Text, channelToken :Data);
# Configures egress TCP routing for the container. When the container attempts to connect to the
# specified address, the connection should be routed back to the Workers runtime using the channel token.
}
16 changes: 14 additions & 2 deletions src/workerd/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -193,19 +193,29 @@ wd_cc_library(
],
)

wd_cc_library(
name = "channel-token",
srcs = ["channel-token.c++"],
hdrs = ["channel-token.h"],
deps = [
":channel-token_capnp",
"//src/workerd/io",
"//src/workerd/util:entropy",
],
)

wd_cc_library(
name = "server",
srcs = [
"channel-token.c++",
"server.c++",
],
hdrs = [
"channel-token.h",
"server.h",
],
deps = [
":actor-id-impl",
":alarm-scheduler",
":channel-token",
":channel-token_capnp",
":container-client",
":facet-tree-index",
Expand Down Expand Up @@ -257,7 +267,9 @@ wd_cc_library(
hdrs = ["container-client.h"],
visibility = ["//visibility:public"],
deps = [
":channel-token",
":docker-api_capnp",
"//src/workerd/io",
"//src/workerd/io:container_capnp",
"//src/workerd/jsg",
"@capnp-cpp//src/capnp/compat:http-over-capnp",
Expand Down
27 changes: 25 additions & 2 deletions src/workerd/server/container-client.c++
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,17 @@ ContainerClient::ContainerClient(capnp::ByteStreamFactory& byteStreamFactory,
kj::String containerName,
kj::String imageName,
kj::TaskSet& waitUntilTasks,
kj::Function<void()> cleanupCallback)
kj::Function<void()> cleanupCallback,
ChannelTokenHandler& channelTokenHandler)
: byteStreamFactory(byteStreamFactory),
timer(timer),
network(network),
dockerPath(kj::mv(dockerPath)),
containerName(kj::encodeUriComponent(kj::mv(containerName))),
imageName(kj::mv(imageName)),
waitUntilTasks(waitUntilTasks),
cleanupCallback(kj::mv(cleanupCallback)) {}
cleanupCallback(kj::mv(cleanupCallback)),
channelTokenHandler(channelTokenHandler) {}

ContainerClient::~ContainerClient() noexcept(false) {
// Call the cleanup callback to remove this client from the ActorNamespace map
Expand Down Expand Up @@ -466,6 +468,27 @@ kj::Promise<void> ContainerClient::listenTcp(ListenTcpContext context) {
KJ_UNIMPLEMENTED("listenTcp not implemented for Docker containers - use port mapping instead");
}

kj::Promise<void> ContainerClient::setEgressTcp(SetEgressTcpContext context) {
auto params = context.getParams();
auto addr = kj::str(params.getAddr());
auto tokenBytes = params.getChannelToken();

// Redeem the channel token to get a SubrequestChannel
auto subrequestChannel = channelTokenHandler.decodeSubrequestChannelToken(
workerd::IoChannelFactory::ChannelTokenUsage::RPC, tokenBytes);

// Store the mapping
egressMappings.upsert(kj::mv(addr), kj::mv(subrequestChannel),
[](auto& existing, auto&& newValue) { existing = kj::mv(newValue); });

// TODO: At some point we need to figure out how to make it so
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to fully implement this for local testing now. If it's not available in local testing then nobody will be able to code against this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Responding to both comments about local dev here (@danlapid):

  1. I agree for the feature local dev shouldn't feel like an after-thought.
  2. That's the intent of the experimental flag, I think we can experiment the feature on our end and see if it's working as intended, and iterate on both production and local dev in the meantime.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you had local dev support you could iterate super fast 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Independently of local-dev support being a must for this feature to be publicly available, I think it's not really equivalent the path of connectivity to workers in local dev and in prod.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see the benefit of rushing to prod before implementing local dev.

It's not really in doubt whether or not it will work in prod. But it'll be hard to write code to test it in prod if we don't also have local dev.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be left as "at some point".
Local dev can't be an afterthought.

// in local development we are able to actually map to an egress mapping.
// For now, just fake it for testing purposes the decoding of the
// subrequest channel token.

co_return;
}

kj::Own<ContainerClient> ContainerClient::addRef() {
return kj::addRef(*this);
}
Expand Down
12 changes: 11 additions & 1 deletion src/workerd/server/container-client.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#pragma once

#include <workerd/io/container.capnp.h>
#include <workerd/io/io-channels.h>
#include <workerd/server/channel-token.h>

#include <capnp/compat/byte-stream.h>
#include <capnp/list.h>
Expand Down Expand Up @@ -33,7 +35,8 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte
kj::String containerName,
kj::String imageName,
kj::TaskSet& waitUntilTasks,
kj::Function<void()> cleanupCallback);
kj::Function<void()> cleanupCallback,
ChannelTokenHandler& channelTokenHandler);

~ContainerClient() noexcept(false);

Expand All @@ -46,6 +49,7 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte
kj::Promise<void> getTcpPort(GetTcpPortContext context) override;
kj::Promise<void> listenTcp(ListenTcpContext context) override;
kj::Promise<void> setInactivityTimeout(SetInactivityTimeoutContext context) override;
kj::Promise<void> setEgressTcp(SetEgressTcpContext context) override;

kj::Own<ContainerClient> addRef();

Expand Down Expand Up @@ -93,6 +97,12 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte

// Cleanup callback to remove from ActorNamespace map when destroyed
kj::Function<void()> cleanupCallback;

// For redeeming channel tokens received via setEgressTcp
ChannelTokenHandler& channelTokenHandler;

// Egress TCP mappings: address -> SubrequestChannel
kj::HashMap<kj::String, kj::Own<workerd::IoChannelFactory::SubrequestChannel>> egressMappings;
};

} // namespace workerd::server
11 changes: 7 additions & 4 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1929,9 +1929,9 @@ class Server::WorkerService final: public Service,
}

auto actorClass = kj::refcounted<ActorClassImpl>(*this, entry.key, Frankenvalue());
auto ns =
kj::heap<ActorNamespace>(kj::mv(actorClass), entry.value, threadContext.getUnsafeTimer(),
threadContext.getByteStreamFactory(), network, dockerPath, waitUntilTasks);
auto ns = kj::heap<ActorNamespace>(kj::mv(actorClass), entry.value,
threadContext.getUnsafeTimer(), threadContext.getByteStreamFactory(), channelTokenHandler,
network, dockerPath, waitUntilTasks);
actorNamespaces.insert(entry.key, kj::mv(ns));
}
}
Expand Down Expand Up @@ -2195,13 +2195,15 @@ class Server::WorkerService final: public Service,
const ActorConfig& config,
kj::Timer& timer,
capnp::ByteStreamFactory& byteStreamFactory,
ChannelTokenHandler& channelTokenHandler,
kj::Network& dockerNetwork,
kj::Maybe<kj::StringPtr> dockerPath,
kj::TaskSet& waitUntilTasks)
: actorClass(kj::mv(actorClass)),
config(config),
timer(timer),
byteStreamFactory(byteStreamFactory),
channelTokenHandler(channelTokenHandler),
dockerNetwork(dockerNetwork),
dockerPath(dockerPath),
waitUntilTasks(waitUntilTasks) {}
Expand Down Expand Up @@ -2856,7 +2858,7 @@ class Server::WorkerService final: public Service,

auto client = kj::refcounted<ContainerClient>(byteStreamFactory, timer, dockerNetwork,
kj::str(dockerPathRef), kj::str(containerId), kj::str(imageName), waitUntilTasks,
kj::mv(cleanupCallback));
kj::mv(cleanupCallback), channelTokenHandler);

// Store raw pointer in map (does not own)
containerClients.insert(kj::str(containerId), client.get());
Expand Down Expand Up @@ -2901,6 +2903,7 @@ class Server::WorkerService final: public Service,
kj::Maybe<kj::Promise<void>> cleanupTask;
kj::Timer& timer;
capnp::ByteStreamFactory& byteStreamFactory;
ChannelTokenHandler& channelTokenHandler;
kj::Network& dockerNetwork;
kj::Maybe<kj::StringPtr> dockerPath;
kj::TaskSet& waitUntilTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const unitTests :Workerd.Config = (
modules = [
(name = "worker", esModule = embed "test.js")
],
compatibilityFlags = ["nodejs_compat", "experimental"],
compatibilityFlags = ["enable_ctx_exports", "nodejs_compat", "experimental"],
containerEngine = (localDocker = (socketPath = "unix:/var/run/docker.sock")),
durableObjectNamespaces = [
( className = "DurableObjectExample",
Expand Down
40 changes: 38 additions & 2 deletions src/workerd/server/tests/container-client/test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { DurableObject } from 'cloudflare:workers';
import { DurableObject, WorkerEntrypoint } from 'cloudflare:workers';
import assert from 'node:assert';
import { scheduler } from 'node:timers/promises';

Expand Down Expand Up @@ -66,7 +66,7 @@ export class DurableObjectExample extends DurableObject {
{
let resp;
// The retry count here is arbitrary. Can increase it if necessary.
const maxRetries = 6;
const maxRetries = 15;
for (let i = 1; i <= maxRetries; i++) {
try {
resp = await container
Expand Down Expand Up @@ -260,6 +260,33 @@ export class DurableObjectExample extends DurableObject {
getStatus() {
return this.ctx.container.running;
}

async testSetEgressTcp() {
const container = this.ctx.container;
if (container.running) {
let monitor = container.monitor().catch((_err) => {});
await container.destroy();
await monitor;
}
assert.strictEqual(container.running, false);

// Start container
container.start();
assert.strictEqual(container.running, true);

// Set up egress TCP mapping to route requests to the binding
// This registers the binding's channel token with the container runtime
container.setEgressTcp(
'10.0.0.1:9999',
this.ctx.exports.TestService({ props: {} })
);
}
}

export class TestService extends WorkerEntrypoint {
fetch() {
return new Response('you have hit TestService');
}
}

export class DurableObjectExample2 extends DurableObjectExample {}
Expand Down Expand Up @@ -394,3 +421,12 @@ export const testSetInactivityTimeout = {
}
},
};

// Test setEgressTcp functionality - registers a binding's channel token with the container
export const testSetEgressTcp = {
async test(_ctrl, env) {
const id = env.MY_CONTAINER.idFromName('testSetEgressTcp');
const stub = env.MY_CONTAINER.get(id);
await stub.testSetEgressTcp();
},
};
1 change: 1 addition & 0 deletions types/generated-snapshot/experimental/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3829,6 +3829,7 @@ interface Container {
signal(signo: number): void;
getTcpPort(port: number): Fetcher;
setInactivityTimeout(durationMs: number | bigint): Promise<void>;
setEgressTcp(addr: string, binding: Fetcher): void;
}
interface ContainerStartupOptions {
entrypoint?: string[];
Expand Down
1 change: 1 addition & 0 deletions types/generated-snapshot/experimental/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3840,6 +3840,7 @@ export interface Container {
signal(signo: number): void;
getTcpPort(port: number): Fetcher;
setInactivityTimeout(durationMs: number | bigint): Promise<void>;
setEgressTcp(addr: string, binding: Fetcher): void;
}
export interface ContainerStartupOptions {
entrypoint?: string[];
Expand Down