From 9dc0a02015d1d97770fecb92833a884311c035f8 Mon Sep 17 00:00:00 2001 From: Ashley Peacock Date: Tue, 23 Jun 2026 12:09:00 +0100 Subject: [PATCH] Add unsafe Durable Object eviction for tests Add test-only unsafe Durable Object eviction support for workerd. This introduces server-side eviction plumbing that reuses the existing idle eviction path, then exposes it through unsafe.evict(stub, options) and unsafe.evictAllDurableObjects(options). The eviction path waits for live requests to drain, preserves durable storage, and hibernates hibernatable WebSockets by default. Callers can pass options.webSockets = "close" to close hibernatable WebSockets instead while still preserving storage and allowing the actor to be rebuilt on the next request. The implementation respects preventEviction and keeps actor broken-monitoring intact when eviction is skipped because the actor still has active references. The bulk path walks actor facets recursively so evictAllDurableObjects() covers the currently-running actor tree, waits for all sibling evictions before reporting errors, and keeps each eviction coroutine pinned while it can be suspended. Targeted eviction rejects non-Durable Object stubs and rejects objects that have not been instantiated yet, while bulk eviction skips idle/non-running actors. Tests cover storage preservation, in-memory state reset, in-flight requests, concurrent evictions, preventEviction, non-DO inputs, invalid options, bulk eviction, facet recursion, WebSocket hibernation, and WebSocket close mode. --- src/workerd/api/unsafe.c++ | 41 ++ src/workerd/api/unsafe.h | 33 +- src/workerd/io/io-channels.h | 26 ++ src/workerd/io/io-context.h | 6 + src/workerd/io/request-tracker.h | 4 + src/workerd/server/server.c++ | 236 +++++++++--- .../tests/unsafe-module/unsafe-module-test.js | 351 ++++++++++++++++++ .../unsafe-module/unsafe-module-test.wd-test | 11 +- 8 files changed, 660 insertions(+), 48 deletions(-) diff --git a/src/workerd/api/unsafe.c++ b/src/workerd/api/unsafe.c++ index 95be2e36716..10e295ec311 100644 --- a/src/workerd/api/unsafe.c++ +++ b/src/workerd/api/unsafe.c++ @@ -1,5 +1,6 @@ #include "unsafe.h" +#include #include #include #include @@ -18,6 +19,21 @@ static constexpr auto ASYNC_FN_SUFFIX = "}"_kjc; inline kj::StringPtr getName(jsg::Optional& name, kj::StringPtr def) { return name.map([](kj::String& str) { return str.asPtr(); }).orDefault(def); } + +IoChannelFactory::EvictWebSocketMode parseEvictWebSocketMode( + jsg::Optional options) { + auto opts = kj::mv(options).orDefault({}); + KJ_IF_SOME(webSockets, opts.webSockets) { + if (webSockets == "hibernate") { + return IoChannelFactory::EvictWebSocketMode::HIBERNATE; + } + if (webSockets == "close") { + return IoChannelFactory::EvictWebSocketMode::CLOSE; + } + JSG_FAIL_REQUIRE(TypeError, "options.webSockets must be \"hibernate\" or \"close\"."); + } + return IoChannelFactory::EvictWebSocketMode::HIBERNATE; +} } // namespace #ifdef WORKERD_FUZZILLI @@ -213,6 +229,31 @@ jsg::Promise UnsafeModule::deleteAllDurableObjects(jsg::Lock& js) { return js.resolvedPromise(); } +jsg::Promise UnsafeModule::evict( + jsg::Lock& js, jsg::Ref stub, jsg::Optional options) { + auto& context = IoContext::current(); + + // Resolve the stub to its underlying channel. For a Durable Object stub this is an actor + // channel that supports evictForTest(); for any other Fetcher, evictForTest() throws. + // Use kj::evalNow() so that a synchronous throw (e.g. "not a Durable Object stub" or "not + // currently running") becomes a rejected promise rather than a synchronous exception. + auto channel = stub->getSubrequestChannel(context); + auto promise = kj::evalNow([&channel, options = kj::mv(options)]() mutable { + return channel->evictForTest(parseEvictWebSocketMode(kj::mv(options))); + }).attach(kj::mv(channel)); + return context.awaitIo(js, kj::mv(promise)); +} + +jsg::Promise UnsafeModule::evictAllDurableObjects( + jsg::Lock& js, jsg::Optional options) { + auto& context = IoContext::current(); + // Use kj::evalNow() so that a synchronous throw becomes a rejected promise rather than a + // synchronous exception, matching evict()'s behaviour. + return context.awaitIo(js, kj::evalNow([&context, options = kj::mv(options)]() mutable { + return context.evictAllActorsForTest(parseEvictWebSocketMode(kj::mv(options))); + })); +} + bool UnsafeModule::isTestAutogateEnabled() { return util::Autogate::isEnabled(util::AutogateKey::TEST_WORKERD); } diff --git a/src/workerd/api/unsafe.h b/src/workerd/api/unsafe.h index 9c3e0d9a6ed..828c1b38107 100644 --- a/src/workerd/api/unsafe.h +++ b/src/workerd/api/unsafe.h @@ -19,6 +19,8 @@ namespace workerd::api { +class Fetcher; + // A special binding object that allows for dynamic evaluation. class UnsafeEval: public jsg::Object { public: @@ -104,6 +106,29 @@ class UnsafeModule: public jsg::Object { // restart with clean state. Namespaces with preventEviction are not affected. jsg::Promise deleteAllDurableObjects(jsg::Lock& js); + struct EvictOptions { + jsg::Optional webSockets; + + JSG_STRUCT(webSockets); + JSG_STRUCT_TS_OVERRIDE({ + webSockets?: "hibernate" | "close"; + }); + }; + + // Test-only: gracefully evict the Durable Object referred to by `stub` from its isolate, + // simulating the runtime tearing it down when it goes idle. Durable storage is left intact, so + // the DO rebuilds (rerunning its constructor) on its next request. Hibernatable WebSockets are + // hibernated by default, or closed if options.webSockets is "close". Rejects if `stub` is not a + // Durable Object stub, or if the target DO is not currently running. + jsg::Promise evict( + jsg::Lock& js, jsg::Ref stub, jsg::Optional options); + + // Test-only: gracefully evict every currently-running Durable Object that this worker can + // address (in evictable namespaces). Unlike abortAllDurableObjects(), this preserves durable + // storage and hibernates hibernatable WebSockets by default, or closes them if options.webSockets + // is "close". Idle DOs are skipped (not an error). + jsg::Promise evictAllDurableObjects(jsg::Lock& js, jsg::Optional options); + // Returns true if the TEST_WORKERD autogate is enabled. // This is used to verify that the all-autogates test variant is working correctly. bool isTestAutogateEnabled(); @@ -111,6 +136,8 @@ class UnsafeModule: public jsg::Object { JSG_RESOURCE_TYPE(UnsafeModule) { JSG_METHOD(abortAllDurableObjects); JSG_METHOD(deleteAllDurableObjects); + JSG_METHOD(evict); + JSG_METHOD(evictAllDurableObjects); JSG_METHOD(isTestAutogateEnabled); } }; @@ -142,9 +169,11 @@ void registerUnsafeModule(Registry& registry) { } #ifdef WORKERD_FUZZILLI -#define EW_UNSAFE_ISOLATE_TYPES api::UnsafeEval, api::UnsafeModule, api::Stdin, api::Fuzzilli +#define EW_UNSAFE_ISOLATE_TYPES \ + api::UnsafeEval, api::UnsafeModule, api::UnsafeModule::EvictOptions, api::Stdin, api::Fuzzilli #else -#define EW_UNSAFE_ISOLATE_TYPES api::UnsafeEval, api::UnsafeModule, api::Stdin +#define EW_UNSAFE_ISOLATE_TYPES \ + api::UnsafeEval, api::UnsafeModule, api::UnsafeModule::EvictOptions, api::Stdin #endif template diff --git a/src/workerd/io/io-channels.h b/src/workerd/io/io-channels.h index d0b452cd29f..b9d6e99ffd6 100644 --- a/src/workerd/io/io-channels.h +++ b/src/workerd/io/io-channels.h @@ -99,6 +99,11 @@ struct DynamicWorkerSource; // anything in the world except for the client -- this is a useful property for sandboxing! class IoChannelFactory { public: + enum class EvictWebSocketMode { + HIBERNATE, + CLOSE, + }; + // Opaque, IoContext-independent handle that knows how to construct a channel token referring to // the current entrypoint ("self"). Used to implement `ctx.restore()`: the implementation of // `ctx.restore()` passes this back into `makeRestored*()` so that the resulting restored channel @@ -258,6 +263,18 @@ class IoChannelFactory { // Note that the caller is expected to keep the SubrequestChannel alive until it is done with // the returned WorkerInterface. virtual kj::Own startRequest(SubrequestMetadata metadata) = 0; + + // Test-only: forcibly evict the target of this channel from its isolate, simulating the + // runtime tearing it down when it goes idle. For a Durable Object stub this destroys the actor + // instance while durable storage survives, so the next request rebuilds it. Depending on + // webSocketMode, hibernatable WebSockets are either hibernated first or closed. Only channels + // that point at an actor support this; others throw. + // + // Throws if the target Durable Object is not currently running (never instantiated, or + // already evicted/hibernated). + virtual kj::Promise evictForTest(EvictWebSocketMode webSocketMode) { + JSG_FAIL_REQUIRE(Error, "evict() can only be used on a Durable Object stub."); + } }; // Obtain an object representing a particular subrequest channel. @@ -362,6 +379,15 @@ class IoChannelFactory { KJ_UNIMPLEMENTED("Only implemented by single-tenant workerd runtime"); } + // Test-only: gracefully evict every currently-running actor in the evictable namespaces this + // worker can address, simulating idle teardown. Unlike abortAllActors(), this leaves durable + // storage intact, so DOs rebuild on their next request. Depending on webSocketMode, hibernatable + // WebSockets are either hibernated first or closed. Actors that aren't currently running are + // skipped (no error). + virtual kj::Promise evictAllActorsForTest(EvictWebSocketMode webSocketMode) { + KJ_UNIMPLEMENTED("Only implemented by single-tenant workerd runtime"); + } + // In workerd, the handler aborts the process (unless used on a dynamic // worker). In the edge runtime it will condemn and terminate the current // isolate. diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index 278c52fac55..338c574ac7a 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -971,6 +971,12 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler getIoChannelFactory().deleteAllActors(reason); } + // Test-only: gracefully evict all currently-running actors. See + // IoChannelFactory::evictAllActorsForTest(). + kj::Promise evictAllActorsForTest(IoChannelFactory::EvictWebSocketMode webSocketMode) { + return getIoChannelFactory().evictAllActorsForTest(webSocketMode); + } + // Condemn and terminate JS isolate void abortIsolate(kj::StringPtr reason = nullptr); diff --git a/src/workerd/io/request-tracker.h b/src/workerd/io/request-tracker.h index 781c44d88ba..a922c0d3a44 100644 --- a/src/workerd/io/request-tracker.h +++ b/src/workerd/io/request-tracker.h @@ -54,6 +54,10 @@ class RequestTracker final: public kj::Refcounted { return kj::addRef(*this); } + bool isActive() const { + return activeRequests > 0; + } + private: void requestActive(); void requestInactive(); diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 90b0ad4988f..871f4ebda87 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -374,6 +374,19 @@ class Server::ActorNamespace final { return config; } + bool isEvictable() const { + bool result = true; + KJ_SWITCH_ONEOF(config) { + KJ_CASE_ONEOF(c, Durable) { + result = c.isEvictable; + } + KJ_CASE_ONEOF(c, Ephemeral) { + result = c.isEvictable; + } + } + return result; + } + kj::Own getActorChannel(Worker::Actor::Id id) { KJ_IF_SOME(doId, id.tryGet>()) { KJ_IF_SOME(name, doId->getName()) { @@ -479,17 +492,12 @@ class Server::ActorNamespace final { } void inactive() override { - // Durable objects are evictable by default. - bool isEvictable = true; - KJ_SWITCH_ONEOF(ns.config) { - KJ_CASE_ONEOF(c, Durable) { - isEvictable = c.isEvictable; - } - KJ_CASE_ONEOF(c, Ephemeral) { - isEvictable = c.isEvictable; - } + for (auto& fulfiller: inactiveFulfillers) { + fulfiller->fulfill(); } - if (isEvictable) { + inactiveFulfillers.clear(); + + if (ns.isEvictable()) { KJ_IF_SOME(a, actor) { KJ_IF_SOME(m, a->getHibernationManager()) { // The hibernation manager needs to survive actor eviction and be passed to the actor @@ -778,6 +786,7 @@ class Server::ActorNamespace final { kj::Maybe> shutdownTask; kj::Maybe> onBrokenTask; kj::Maybe brokenReason; + kj::Vector>> inactiveFulfillers; // Reference to the ContainerClient (if container is enabled for this actor) kj::Maybe> containerClient; @@ -977,9 +986,6 @@ class Server::ActorNamespace final { // JS WebSockets. // TODO(someday): We could make this timeout configurable to make testing less burdensome. co_await timer.afterDelay(10 * kj::SECONDS); - // Cancel the onBroken promise, since we're about to destroy the actor anyways and don't - // want to trigger it. - onBrokenTask = kj::none; KJ_IF_SOME(a, actor) { if (a->isShared()) { // Our ActiveRequest refcounting has broken somewhere. This is likely because we're @@ -994,28 +1000,150 @@ class Server::ActorNamespace final { co_return; } - KJ_IF_SOME(m, manager) { + } + co_await tryEvict("broken.dropped; Actor freed due to inactivity"_kj, + IoChannelFactory::EvictWebSocketMode::HIBERNATE); + } + + // Shared eviction body used by both the inactivity timer (handleShutdown) and the test-only + // evict hooks. Destroys the Worker::Actor while keeping durable storage alive, so the DO is + // rebuilt on its next request. Depending on webSocketMode, hibernatable WebSockets are either + // hibernated first or closed. `reason` is recorded as the actor's disconnect reason. + // + // Returns false without evicting if the actor has acquired strong references by the time we + // hold the isolate lock (i.e. a new request raced in). The inactivity-timer path + // (handleShutdown) is cancelled by active() when a request arrives, so it can ignore the + // result; the test-only evict path is not cancellable, so it relies on this re-check to avoid + // tearing down a live actor. For the same reason, we only cancel onBrokenTask once we're + // committed to the shutdown -- otherwise an early `false` return would leave the actor running + // with no broken-detection. + kj::Promise tryEvict( + kj::StringPtr reason, IoChannelFactory::EvictWebSocketMode webSocketMode) { + KJ_IF_SOME(a, actor) { + if (a->isShared()) { + co_return false; + } + + if (manager != kj::none && + webSocketMode == IoChannelFactory::EvictWebSocketMode::HIBERNATE) { auto& worker = a->getWorker(); auto workerStrongRef = kj::atomicAddRef(worker); // Take an async lock, we can't use `takeAsyncLock(RequestObserver&)` since we don't // have an `IncomingRequest` at this point. - // - // Note that we do not have a race here because this is part of the `shutdownTask` - // promise. If a new request comes in while we're waiting to get the lock then we will - // cancel this promise. - Worker::AsyncLock asyncLock = co_await worker.takeAsyncLockWithoutRequest(nullptr); - workerStrongRef->runInLockScope( - asyncLock, [&](Worker::Lock& lock) { m->hibernateWebSockets(lock); }); + auto asyncLock = co_await worker.takeAsyncLockWithoutRequest(nullptr); + + // Re-check liveness now that we've awaited the lock: a new request may have started and + // grabbed a strong reference while we waited. + KJ_IF_SOME(current, actor) { + if (current->isShared()) { + co_return false; + } + } else { + co_return true; + } + + KJ_IF_SOME(m, manager) { + workerStrongRef->runInLockScope( + asyncLock, [&](Worker::Lock& lock) { m->hibernateWebSockets(lock); }); + } + } + + // New requests cannot interleave between the applicable isShared() check and this point + // because there are no awaits in between. In hibernate mode, the applicable check is the + // post-lock re-check above; otherwise, it is the initial check before the hibernation block. + // + // Cancel the onBroken promise, since we're about to destroy the actor anyways and don't + // want to trigger it. + onBrokenTask = kj::none; + KJ_IF_SOME(current, actor) { + // Note: wrap `reason` in kj::str() so KJ_EXCEPTION doesn't prefix the description with + // "reason = " (it only omits the label for string literals and kj::str(...) args). + current->shutdown(0, KJ_EXCEPTION(DISCONNECTED, kj::str(reason))); } - a->shutdown(0, KJ_EXCEPTION(DISCONNECTED, "broken.dropped; Actor freed due to inactivity")); } // Destroy the last strong Worker::Actor reference. actor = kj::none; - // Drop our reference to the ContainerClient - // If setInactivityTimeout() was called, the timer still holds a reference - // so the container stays alive until the timeout expires + if (webSocketMode == IoChannelFactory::EvictWebSocketMode::CLOSE) { + manager = kj::none; + } + + // Drop our reference to the ContainerClient. If setInactivityTimeout() was called, the timer + // still holds a reference so the container stays alive until the timeout expires. containerClient = kj::none; + co_return true; + } + + public: + // Test-only: evict this actor, bypassing the inactivity timer. Throws if the actor isn't + // currently running (never instantiated, or already evicted/hibernated). If the actor has + // in-flight requests, waits for them to drain before evicting. + kj::Promise evictForTest(IoChannelFactory::EvictWebSocketMode webSocketMode) { + JSG_REQUIRE(ns.isEvictable(), Error, + "Cannot evict Durable Object: its namespace has preventEviction set."); + JSG_REQUIRE( + actor != kj::none, Error, "Cannot evict Durable Object: it is not currently running."); + return evictWhenIdle(webSocketMode); + } + + // Test-only: evict this actor and all of its facets if they are running, otherwise do nothing. + // Used by the bulk evictAllDurableObjects() path, which must not error on actors that aren't + // running. + kj::Promise evictTreeForTestIfRunning( + IoChannelFactory::EvictWebSocketMode webSocketMode) { + kj::Vector> promises(facets.size() + 1); + for (auto& facet: facets) { + // Pin the ActorContainer for the duration of its eviction. The map entry is normally + // retained, but the onBroken path can erase it mid-eviction; the addRef keeps the + // coroutine's `this` valid regardless. + promises.add( + facet.value->evictTreeForTestIfRunning(webSocketMode).attach(facet.value->addRef())); + } + + if (actor != kj::none) { + // Pin ourselves just like facet/root map callers do. The join below can be canceled by its + // caller, but the coroutine may still be suspended on request inactivity. + promises.add(evictWhenIdle(webSocketMode).attach(addRef())); + } + + return kj::joinPromises(promises.releaseAsArray()); + } + + private: + // Waits until the actor is idle, then evicts it. We never abort a live request, so while there + // are in-flight requests -- or a just-completed request still holding a transient strong + // reference during teardown (e.g. RPC/fetch teardown) -- we poll until the actor can be torn + // down. To avoid hanging a test forever (e.g. a request that never completes), we give up + // after a fixed deadline. + kj::Promise evictWhenIdle(IoChannelFactory::EvictWebSocketMode webSocketMode) { + constexpr auto EVICT_TIMEOUT = 30 * kj::SECONDS; + auto deadline = timer.now() + EVICT_TIMEOUT; + for (;;) { + if (co_await tryEvict("broken.dropped; Actor evicted by test"_kj, webSocketMode)) { + shutdownTask = kj::none; + co_return; + } + + auto now = timer.now(); + JSG_REQUIRE(now < deadline, Error, + "Timed out waiting to evict Durable Object: it still has active references."); + + if (tracker->isActive()) { + auto paf = kj::newPromiseAndFulfiller(); + auto promise = kj::mv(paf.promise); + inactiveFulfillers.add(kj::mv(paf.fulfiller)); + + co_await kj::mv(promise).exclusiveJoin(timer.afterDelay(deadline - now).then([]() { + JSG_FAIL_REQUIRE(Error, + "Timed out waiting to evict Durable Object: it still has active references."); + })); + } else { + // The actor can briefly have non-request strong refs during teardown, after the tracker + // has already reported inactivity. Yield before re-checking, but don't poll for the full + // request-drain duration. + co_await timer.afterDelay(1 * kj::MILLISECONDS); + } + } } void start(kj::Own& actorClass, Worker::Actor::Id& id) { @@ -1240,6 +1368,22 @@ class Server::ActorNamespace final { actors.clear(); } + // Test-only: gracefully evict every currently-running actor in this namespace. Depending on + // webSocketMode, hibernatable WebSockets are either hibernated first or closed. + // Idle/non-running actors are skipped (not an error). The actor map entries are retained so the + // DO rebuilds on its next request. See IoChannelFactory::evictAllActorsForTest(). + kj::Promise evictAllForTest(IoChannelFactory::EvictWebSocketMode webSocketMode) { + kj::Vector> promises(actors.size()); + for (auto& actor: actors) { + // Pin the ActorContainer for the duration of its eviction. The map entry is normally + // retained, but the onBroken path can erase it mid-eviction; the addRef keeps the coroutine's + // `this` valid regardless. + promises.add( + actor.value->evictTreeForTestIfRunning(webSocketMode).attach(actor.value->addRef())); + } + return kj::joinPromises(promises.releaseAsArray()); + } + // Resets all actor databases, aborts all actors, and cancels all alarms so DOs // can be recreated with clean state. void deleteAll(kj::Maybe reason) { @@ -1366,6 +1510,10 @@ class Server::ActorNamespace final { actorContainer->startRequest(kj::mv(metadata)).attach(actorContainer->addRef())); } + kj::Promise evictForTest(IoChannelFactory::EvictWebSocketMode webSocketMode) override { + return actorContainer->evictForTest(webSocketMode).attach(actorContainer->addRef()); + } + void requireAllowsTransfer() override { actorContainer->requireTransferrableStub(); } @@ -3875,6 +4023,22 @@ class Server::WorkerService final: public Service, deleteActorsCallback(reason); } + kj::Promise evictAllActorsForTest( + IoChannelFactory::EvictWebSocketMode webSocketMode) override { + auto& channels = + KJ_REQUIRE_NONNULL(ioChannels.tryGet(), "link() has not been called"); + + kj::Vector> promises(channels.actor.size()); + for (auto& maybeNs: channels.actor) { + KJ_IF_SOME(ns, maybeNs) { + if (ns.isEvictable()) { + promises.add(ns.evictAllForTest(webSocketMode)); + } + } + } + return kj::joinPromises(promises.releaseAsArray()); + } + // For now, in workerd just abort the process for non-dynamic workers. void abortIsolate(kj::StringPtr reason) noexcept override { KJ_IF_SOME(cb, abortIsolateCallback) { @@ -4436,16 +4600,7 @@ void Server::abortAllActors(kj::Maybe reason) { for (auto& service: services) { KJ_IF_SOME(worker, kj::tryDowncast(*service.value)) { for (auto& [className, ns]: worker.getActorNamespaces()) { - bool isEvictable = true; - KJ_SWITCH_ONEOF(ns->getConfig()) { - KJ_CASE_ONEOF(c, Durable) { - isEvictable = c.isEvictable; - } - KJ_CASE_ONEOF(c, Ephemeral) { - isEvictable = c.isEvictable; - } - } - if (isEvictable) ns->abortAll(reason); + if (ns->isEvictable()) ns->abortAll(reason); } } } @@ -4455,16 +4610,7 @@ void Server::deleteAllActors(kj::Maybe reason) { for (auto& service: services) { KJ_IF_SOME(worker, kj::tryDowncast(*service.value)) { for (auto& [className, ns]: worker.getActorNamespaces()) { - bool isEvictable = true; - KJ_SWITCH_ONEOF(ns->getConfig()) { - KJ_CASE_ONEOF(c, Durable) { - isEvictable = c.isEvictable; - } - KJ_CASE_ONEOF(c, Ephemeral) { - isEvictable = c.isEvictable; - } - } - if (isEvictable) ns->deleteAll(reason); + if (ns->isEvictable()) ns->deleteAll(reason); } } } diff --git a/src/workerd/server/tests/unsafe-module/unsafe-module-test.js b/src/workerd/server/tests/unsafe-module/unsafe-module-test.js index 251bd449f9a..9888f802857 100644 --- a/src/workerd/server/tests/unsafe-module/unsafe-module-test.js +++ b/src/workerd/server/tests/unsafe-module/unsafe-module-test.js @@ -32,6 +32,100 @@ export class StorageObject extends DurableObject { } } +// DO that tracks both in-memory and persisted state, used to verify that evict() tears down the +// instance (losing in-memory state) while preserving durable storage. +export class CounterObject extends DurableObject { + constructor(ctx, env) { + super(ctx, env); + this.inMemory = 0; + } + async bump() { + this.inMemory++; + const stored = ((await this.ctx.storage.get('count')) ?? 0) + 1; + await this.ctx.storage.put('count', stored); + return { inMemory: this.inMemory, stored }; + } + async getState() { + return { + inMemory: this.inMemory, + stored: (await this.ctx.storage.get('count')) ?? 0, + }; + } + async slowBump() { + await new Promise((resolve) => setTimeout(resolve, 10)); + return await this.bump(); + } +} + +// Same as CounterObject, but its namespace sets preventEviction. +export class CounterObjectPreventEviction extends CounterObject {} + +// Root/facet pair used to verify evictAllDurableObjects() walks the whole running actor tree. +export class FacetCounterObject extends CounterObject {} + +export class FacetRootObject extends CounterObject { + getFacet() { + return this.ctx.facets.get('child', () => ({ + class: this.ctx.exports.FacetCounterObject({}), + })); + } + async bumpBoth() { + return { + root: await this.bump(), + facet: await this.getFacet().bump(), + }; + } + async getBothState() { + return { + root: await this.getState(), + facet: await this.getFacet().getState(), + }; + } +} + +// DO that accepts a hibernatable WebSocket. Used to verify that evict() hibernates the socket +// (keeping the connection alive) and rebuilds the instance on the next event. +export class HibernationObject extends DurableObject { + constructor(ctx, env) { + super(ctx, env); + this.wokeCount = 0; // in-memory; resets when the instance is rebuilt. + } + async fetch(request) { + const [client, server] = Object.values(new WebSocketPair()); + this.ctx.acceptWebSocket(server); + return new Response(null, { status: 101, webSocket: client }); + } + async webSocketMessage(ws, message) { + if (message === 'bye') { + ws.close(1000, 'bye'); + return; + } + this.wokeCount++; + const stored = ((await this.ctx.storage.get('count')) ?? 0) + 1; + await this.ctx.storage.put('count', stored); + ws.send(JSON.stringify({ wokeCount: this.wokeCount, stored })); + } + async getStored() { + return (await this.ctx.storage.get('count')) ?? 0; + } +} + +function nextMessage(ws) { + return new Promise((resolve, reject) => { + ws.addEventListener('message', (event) => resolve(event.data), { + once: true, + }); + ws.addEventListener('error', reject, { once: true }); + }); +} + +function nextClose(ws) { + return new Promise((resolve, reject) => { + ws.addEventListener('close', () => resolve(), { once: true }); + ws.addEventListener('error', reject, { once: true }); + }); +} + // DO with an alarm for verifying deleteAllDurableObjects() cancels alarms. let alarmTriggers = 0; export class AlarmObject extends DurableObject { @@ -166,3 +260,260 @@ export const test_delete_all_durable_objects_respects_prevent_eviction = { assert.strictEqual(res1, res2); }, }; + +export const test_evict = { + async test(ctrl, env, ctx) { + const id = env.COUNTER.idFromName('evict'); + const stub = env.COUNTER.get(id); + + assert.deepStrictEqual(await stub.bump(), { inMemory: 1, stored: 1 }); + assert.deepStrictEqual(await stub.bump(), { inMemory: 2, stored: 2 }); + + // Gracefully evict the instance from the isolate. + await unsafe.evict(stub); + + // The stub keeps working (no abort), but the instance was rebuilt: in-memory state is gone + // while durable storage survived. + assert.deepStrictEqual(await stub.getState(), { inMemory: 0, stored: 2 }); + }, +}; + +export const test_evict_waits_for_in_flight_request = { + async test(ctrl, env, ctx) { + const stub = env.COUNTER.get(env.COUNTER.idFromName('evict-in-flight')); + + const pending = stub.slowBump(); + await unsafe.evict(stub); + + assert.deepStrictEqual(await pending, { inMemory: 1, stored: 1 }); + assert.deepStrictEqual(await stub.getState(), { inMemory: 0, stored: 1 }); + }, +}; + +export const test_evict_concurrent_calls = { + async test(ctrl, env, ctx) { + const stub = env.COUNTER.get(env.COUNTER.idFromName('evict-concurrent')); + await stub.bump(); + + await Promise.all([unsafe.evict(stub), unsafe.evict(stub)]); + assert.deepStrictEqual(await stub.getState(), { inMemory: 0, stored: 1 }); + }, +}; + +export const test_evict_respects_prevent_eviction = { + async test(ctrl, env, ctx) { + const stub = env.COUNTER_PREVENT_EVICTION.get( + env.COUNTER_PREVENT_EVICTION.idFromName('evict-prevent') + ); + await stub.bump(); + + await assert.rejects(() => unsafe.evict(stub), { + name: 'Error', + message: + 'Cannot evict Durable Object: its namespace has preventEviction set.', + }); + + assert.deepStrictEqual(await stub.getState(), { inMemory: 1, stored: 1 }); + }, +}; + +export const test_evict_not_running = { + async test(ctrl, env, ctx) { + // Never sent a request, so the DO was never instantiated. + const stub = env.COUNTER.get(env.COUNTER.idFromName('never-running')); + await assert.rejects(() => unsafe.evict(stub), { + name: 'Error', + message: 'Cannot evict Durable Object: it is not currently running.', + }); + }, +}; + +export const test_evict_non_durable_object = { + async test(ctrl, env, ctx) { + // SELF_SERVICE is an ordinary service binding, not a Durable Object stub. + await assert.rejects(() => unsafe.evict(env.SELF_SERVICE), { + name: 'Error', + message: 'evict() can only be used on a Durable Object stub.', + }); + }, +}; + +export const test_evict_rejects_invalid_websocket_mode = { + async test(ctrl, env, ctx) { + const stub = env.COUNTER.get(env.COUNTER.idFromName('evict-invalid-mode')); + await stub.bump(); + + await assert.rejects(() => unsafe.evict(stub, { webSockets: 'explode' }), { + name: 'TypeError', + message: 'options.webSockets must be "hibernate" or "close".', + }); + await assert.rejects( + () => unsafe.evictAllDurableObjects({ webSockets: 'explode' }), + { + name: 'TypeError', + message: 'options.webSockets must be "hibernate" or "close".', + } + ); + + assert.deepStrictEqual(await stub.getState(), { inMemory: 1, stored: 1 }); + }, +}; + +export const test_evict_all_durable_objects = { + async test(ctrl, env, ctx) { + const a = env.COUNTER.get(env.COUNTER.idFromName('all-a')); + const b = env.COUNTER.get(env.COUNTER.idFromName('all-b')); + const prevent = env.COUNTER_PREVENT_EVICTION.get( + env.COUNTER_PREVENT_EVICTION.idFromName('all-prevent') + ); + + await a.bump(); + await a.bump(); + await b.bump(); + await prevent.bump(); + + await unsafe.evictAllDurableObjects(); + + // Evictable counters were rebuilt (in-memory reset), storage preserved. + assert.deepStrictEqual(await a.getState(), { inMemory: 0, stored: 2 }); + assert.deepStrictEqual(await b.getState(), { inMemory: 0, stored: 1 }); + + // preventEviction namespace was skipped — in-memory state survives. + assert.deepStrictEqual(await prevent.getState(), { + inMemory: 1, + stored: 1, + }); + }, +}; + +export const test_evict_all_durable_objects_evicts_facets = { + async test(ctrl, env, ctx) { + const root = env.FACET_ROOT.get( + env.FACET_ROOT.idFromName('evict-all-facets') + ); + + assert.deepStrictEqual(await root.bumpBoth(), { + root: { inMemory: 1, stored: 1 }, + facet: { inMemory: 1, stored: 1 }, + }); + assert.deepStrictEqual(await root.bumpBoth(), { + root: { inMemory: 2, stored: 2 }, + facet: { inMemory: 2, stored: 2 }, + }); + + await unsafe.evictAllDurableObjects(); + + // Both the root and facet instances were rebuilt, but both durable stores survived. + assert.deepStrictEqual(await root.getBothState(), { + root: { inMemory: 0, stored: 2 }, + facet: { inMemory: 0, stored: 2 }, + }); + }, +}; + +export const test_evict_hibernates_websockets = { + async test(ctrl, env, ctx) { + const stub = env.HIBERNATE.get(env.HIBERNATE.idFromName('room')); + const res = await stub.fetch('http://x', { + headers: { Upgrade: 'websocket' }, + }); + const ws = res.webSocket; + assert(ws, 'expected a WebSocket in the response'); + ws.accept(); + + // Fresh instance: wokeCount 0 -> 1, stored 0 -> 1. + let msg = nextMessage(ws); + ws.send('a'); + assert.deepStrictEqual(JSON.parse(await msg), { wokeCount: 1, stored: 1 }); + + // Evict: hibernates the socket (connection stays open) and tears down the instance. + await unsafe.evict(stub); + + // The same socket still delivers (proving it hibernated rather than closing), and the event + // is handled by a rebuilt instance: wokeCount resets to 1, but storage survived (stored -> 2). + msg = nextMessage(ws); + ws.send('b'); + assert.deepStrictEqual(JSON.parse(await msg), { wokeCount: 1, stored: 2 }); + + // Tell the DO to close the socket so the test tears down cleanly. + const closed = new Promise((resolve) => + ws.addEventListener('close', () => resolve(), { once: true }) + ); + ws.send('bye'); + await closed; + }, +}; + +export const test_evict_closes_websockets = { + async test(ctrl, env, ctx) { + const stub = env.HIBERNATE.get(env.HIBERNATE.idFromName('close-room')); + let res = await stub.fetch('http://x', { + headers: { Upgrade: 'websocket' }, + }); + let ws = res.webSocket; + assert(ws, 'expected a WebSocket in the response'); + ws.accept(); + + let msg = nextMessage(ws); + ws.send('a'); + assert.deepStrictEqual(JSON.parse(await msg), { wokeCount: 1, stored: 1 }); + + const closed = nextClose(ws); + await unsafe.evict(stub, { webSockets: 'close' }); + await closed; + + // Storage survived, but the closed socket cannot deliver the wakeup event. Open a new socket to + // verify the instance was rebuilt and storage continues from the previous value. + assert.strictEqual(await stub.getStored(), 1); + res = await stub.fetch('http://x', { + headers: { Upgrade: 'websocket' }, + }); + ws = res.webSocket; + assert(ws, 'expected a WebSocket in the response'); + ws.accept(); + + msg = nextMessage(ws); + ws.send('b'); + assert.deepStrictEqual(JSON.parse(await msg), { wokeCount: 1, stored: 2 }); + + const cleanupClosed = nextClose(ws); + ws.send('bye'); + await cleanupClosed; + }, +}; + +export const test_evict_all_durable_objects_closes_websockets = { + async test(ctrl, env, ctx) { + const stub = env.HIBERNATE.get(env.HIBERNATE.idFromName('close-all-room')); + let res = await stub.fetch('http://x', { + headers: { Upgrade: 'websocket' }, + }); + let ws = res.webSocket; + assert(ws, 'expected a WebSocket in the response'); + ws.accept(); + + let msg = nextMessage(ws); + ws.send('a'); + assert.deepStrictEqual(JSON.parse(await msg), { wokeCount: 1, stored: 1 }); + + const closed = nextClose(ws); + await unsafe.evictAllDurableObjects({ webSockets: 'close' }); + await closed; + + assert.strictEqual(await stub.getStored(), 1); + res = await stub.fetch('http://x', { + headers: { Upgrade: 'websocket' }, + }); + ws = res.webSocket; + assert(ws, 'expected a WebSocket in the response'); + ws.accept(); + + msg = nextMessage(ws); + ws.send('b'); + assert.deepStrictEqual(JSON.parse(await msg), { wokeCount: 1, stored: 2 }); + + const cleanupClosed = nextClose(ws); + ws.send('bye'); + await cleanupClosed; + }, +}; diff --git a/src/workerd/server/tests/unsafe-module/unsafe-module-test.wd-test b/src/workerd/server/tests/unsafe-module/unsafe-module-test.wd-test index daedd2195f5..96f4a7b028f 100644 --- a/src/workerd/server/tests/unsafe-module/unsafe-module-test.wd-test +++ b/src/workerd/server/tests/unsafe-module/unsafe-module-test.wd-test @@ -8,7 +8,7 @@ const unitTests :Workerd.Config = ( modules = [ ( name = "worker", esModule = embed "unsafe-module-test.js" ), ], - compatibilityFlags = ["nodejs_compat", "unsafe_module", "rpc"], + compatibilityFlags = ["nodejs_compat", "unsafe_module", "rpc", "enable_ctx_exports"], durableObjectNamespaces = [ ( className = "TestDurableObject", uniqueKey = "durable" ), ( className = "TestDurableObjectPreventEviction", uniqueKey = "durable-prevent-eviction", preventEviction = true ), @@ -16,6 +16,10 @@ const unitTests :Workerd.Config = ( ( className = "TestEphemeralObjectPreventEviction", ephemeralLocal = void, preventEviction = true ), ( className = "AlarmObject", uniqueKey = "alarm" ), ( className = "StorageObject", uniqueKey = "storage" ), + ( className = "CounterObject", uniqueKey = "counter" ), + ( className = "CounterObjectPreventEviction", uniqueKey = "counter-prevent-eviction", preventEviction = true ), + ( className = "FacetRootObject", uniqueKey = "facet-root" ), + ( className = "HibernationObject", uniqueKey = "hibernation" ), ], durableObjectStorage = ( localDisk = "TEST_TMPDIR" ), bindings = [ @@ -25,6 +29,11 @@ const unitTests :Workerd.Config = ( ( name = "EPHEMERAL_PREVENT_EVICTION", durableObjectNamespace = "TestEphemeralObjectPreventEviction" ), ( name = "ALARM", durableObjectNamespace = "AlarmObject" ), ( name = "STORAGE", durableObjectNamespace = "StorageObject" ), + ( name = "COUNTER", durableObjectNamespace = "CounterObject" ), + ( name = "COUNTER_PREVENT_EVICTION", durableObjectNamespace = "CounterObjectPreventEviction" ), + ( name = "FACET_ROOT", durableObjectNamespace = "FacetRootObject" ), + ( name = "HIBERNATE", durableObjectNamespace = "HibernationObject" ), + ( name = "SELF_SERVICE", service = "unsafe-module-test" ), ], ) ),