diff --git a/src/workerd/api/unsafe.c++ b/src/workerd/api/unsafe.c++ index 95be2e36716..10e295ec311 100644 --- a/src/workerd/api/unsafe.c++ +++ b/src/workerd/api/unsafe.c++ @@ -1,5 +1,6 @@ #include "unsafe.h" +#include #include #include #include @@ -18,6 +19,21 @@ static constexpr auto ASYNC_FN_SUFFIX = "}"_kjc; inline kj::StringPtr getName(jsg::Optional& name, kj::StringPtr def) { return name.map([](kj::String& str) { return str.asPtr(); }).orDefault(def); } + +IoChannelFactory::EvictWebSocketMode parseEvictWebSocketMode( + jsg::Optional options) { + auto opts = kj::mv(options).orDefault({}); + KJ_IF_SOME(webSockets, opts.webSockets) { + if (webSockets == "hibernate") { + return IoChannelFactory::EvictWebSocketMode::HIBERNATE; + } + if (webSockets == "close") { + return IoChannelFactory::EvictWebSocketMode::CLOSE; + } + JSG_FAIL_REQUIRE(TypeError, "options.webSockets must be \"hibernate\" or \"close\"."); + } + return IoChannelFactory::EvictWebSocketMode::HIBERNATE; +} } // namespace #ifdef WORKERD_FUZZILLI @@ -213,6 +229,31 @@ jsg::Promise UnsafeModule::deleteAllDurableObjects(jsg::Lock& js) { return js.resolvedPromise(); } +jsg::Promise UnsafeModule::evict( + jsg::Lock& js, jsg::Ref stub, jsg::Optional options) { + auto& context = IoContext::current(); + + // Resolve the stub to its underlying channel. For a Durable Object stub this is an actor + // channel that supports evictForTest(); for any other Fetcher, evictForTest() throws. + // Use kj::evalNow() so that a synchronous throw (e.g. "not a Durable Object stub" or "not + // currently running") becomes a rejected promise rather than a synchronous exception. + auto channel = stub->getSubrequestChannel(context); + auto promise = kj::evalNow([&channel, options = kj::mv(options)]() mutable { + return channel->evictForTest(parseEvictWebSocketMode(kj::mv(options))); + }).attach(kj::mv(channel)); + return context.awaitIo(js, kj::mv(promise)); +} + +jsg::Promise UnsafeModule::evictAllDurableObjects( + jsg::Lock& js, jsg::Optional options) { + auto& context = IoContext::current(); + // Use kj::evalNow() so that a synchronous throw becomes a rejected promise rather than a + // synchronous exception, matching evict()'s behaviour. + return context.awaitIo(js, kj::evalNow([&context, options = kj::mv(options)]() mutable { + return context.evictAllActorsForTest(parseEvictWebSocketMode(kj::mv(options))); + })); +} + bool UnsafeModule::isTestAutogateEnabled() { return util::Autogate::isEnabled(util::AutogateKey::TEST_WORKERD); } diff --git a/src/workerd/api/unsafe.h b/src/workerd/api/unsafe.h index 9c3e0d9a6ed..828c1b38107 100644 --- a/src/workerd/api/unsafe.h +++ b/src/workerd/api/unsafe.h @@ -19,6 +19,8 @@ namespace workerd::api { +class Fetcher; + // A special binding object that allows for dynamic evaluation. class UnsafeEval: public jsg::Object { public: @@ -104,6 +106,29 @@ class UnsafeModule: public jsg::Object { // restart with clean state. Namespaces with preventEviction are not affected. jsg::Promise deleteAllDurableObjects(jsg::Lock& js); + struct EvictOptions { + jsg::Optional webSockets; + + JSG_STRUCT(webSockets); + JSG_STRUCT_TS_OVERRIDE({ + webSockets?: "hibernate" | "close"; + }); + }; + + // Test-only: gracefully evict the Durable Object referred to by `stub` from its isolate, + // simulating the runtime tearing it down when it goes idle. Durable storage is left intact, so + // the DO rebuilds (rerunning its constructor) on its next request. Hibernatable WebSockets are + // hibernated by default, or closed if options.webSockets is "close". Rejects if `stub` is not a + // Durable Object stub, or if the target DO is not currently running. + jsg::Promise evict( + jsg::Lock& js, jsg::Ref stub, jsg::Optional options); + + // Test-only: gracefully evict every currently-running Durable Object that this worker can + // address (in evictable namespaces). Unlike abortAllDurableObjects(), this preserves durable + // storage and hibernates hibernatable WebSockets by default, or closes them if options.webSockets + // is "close". Idle DOs are skipped (not an error). + jsg::Promise evictAllDurableObjects(jsg::Lock& js, jsg::Optional options); + // Returns true if the TEST_WORKERD autogate is enabled. // This is used to verify that the all-autogates test variant is working correctly. bool isTestAutogateEnabled(); @@ -111,6 +136,8 @@ class UnsafeModule: public jsg::Object { JSG_RESOURCE_TYPE(UnsafeModule) { JSG_METHOD(abortAllDurableObjects); JSG_METHOD(deleteAllDurableObjects); + JSG_METHOD(evict); + JSG_METHOD(evictAllDurableObjects); JSG_METHOD(isTestAutogateEnabled); } }; @@ -142,9 +169,11 @@ void registerUnsafeModule(Registry& registry) { } #ifdef WORKERD_FUZZILLI -#define EW_UNSAFE_ISOLATE_TYPES api::UnsafeEval, api::UnsafeModule, api::Stdin, api::Fuzzilli +#define EW_UNSAFE_ISOLATE_TYPES \ + api::UnsafeEval, api::UnsafeModule, api::UnsafeModule::EvictOptions, api::Stdin, api::Fuzzilli #else -#define EW_UNSAFE_ISOLATE_TYPES api::UnsafeEval, api::UnsafeModule, api::Stdin +#define EW_UNSAFE_ISOLATE_TYPES \ + api::UnsafeEval, api::UnsafeModule, api::UnsafeModule::EvictOptions, api::Stdin #endif template diff --git a/src/workerd/io/io-channels.h b/src/workerd/io/io-channels.h index d0b452cd29f..b9d6e99ffd6 100644 --- a/src/workerd/io/io-channels.h +++ b/src/workerd/io/io-channels.h @@ -99,6 +99,11 @@ struct DynamicWorkerSource; // anything in the world except for the client -- this is a useful property for sandboxing! class IoChannelFactory { public: + enum class EvictWebSocketMode { + HIBERNATE, + CLOSE, + }; + // Opaque, IoContext-independent handle that knows how to construct a channel token referring to // the current entrypoint ("self"). Used to implement `ctx.restore()`: the implementation of // `ctx.restore()` passes this back into `makeRestored*()` so that the resulting restored channel @@ -258,6 +263,18 @@ class IoChannelFactory { // Note that the caller is expected to keep the SubrequestChannel alive until it is done with // the returned WorkerInterface. virtual kj::Own startRequest(SubrequestMetadata metadata) = 0; + + // Test-only: forcibly evict the target of this channel from its isolate, simulating the + // runtime tearing it down when it goes idle. For a Durable Object stub this destroys the actor + // instance while durable storage survives, so the next request rebuilds it. Depending on + // webSocketMode, hibernatable WebSockets are either hibernated first or closed. Only channels + // that point at an actor support this; others throw. + // + // Throws if the target Durable Object is not currently running (never instantiated, or + // already evicted/hibernated). + virtual kj::Promise evictForTest(EvictWebSocketMode webSocketMode) { + JSG_FAIL_REQUIRE(Error, "evict() can only be used on a Durable Object stub."); + } }; // Obtain an object representing a particular subrequest channel. @@ -362,6 +379,15 @@ class IoChannelFactory { KJ_UNIMPLEMENTED("Only implemented by single-tenant workerd runtime"); } + // Test-only: gracefully evict every currently-running actor in the evictable namespaces this + // worker can address, simulating idle teardown. Unlike abortAllActors(), this leaves durable + // storage intact, so DOs rebuild on their next request. Depending on webSocketMode, hibernatable + // WebSockets are either hibernated first or closed. Actors that aren't currently running are + // skipped (no error). + virtual kj::Promise evictAllActorsForTest(EvictWebSocketMode webSocketMode) { + KJ_UNIMPLEMENTED("Only implemented by single-tenant workerd runtime"); + } + // In workerd, the handler aborts the process (unless used on a dynamic // worker). In the edge runtime it will condemn and terminate the current // isolate. diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index 278c52fac55..338c574ac7a 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -971,6 +971,12 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler getIoChannelFactory().deleteAllActors(reason); } + // Test-only: gracefully evict all currently-running actors. See + // IoChannelFactory::evictAllActorsForTest(). + kj::Promise evictAllActorsForTest(IoChannelFactory::EvictWebSocketMode webSocketMode) { + return getIoChannelFactory().evictAllActorsForTest(webSocketMode); + } + // Condemn and terminate JS isolate void abortIsolate(kj::StringPtr reason = nullptr); diff --git a/src/workerd/io/request-tracker.h b/src/workerd/io/request-tracker.h index 781c44d88ba..a922c0d3a44 100644 --- a/src/workerd/io/request-tracker.h +++ b/src/workerd/io/request-tracker.h @@ -54,6 +54,10 @@ class RequestTracker final: public kj::Refcounted { return kj::addRef(*this); } + bool isActive() const { + return activeRequests > 0; + } + private: void requestActive(); void requestInactive(); diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 90b0ad4988f..871f4ebda87 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -374,6 +374,19 @@ class Server::ActorNamespace final { return config; } + bool isEvictable() const { + bool result = true; + KJ_SWITCH_ONEOF(config) { + KJ_CASE_ONEOF(c, Durable) { + result = c.isEvictable; + } + KJ_CASE_ONEOF(c, Ephemeral) { + result = c.isEvictable; + } + } + return result; + } + kj::Own getActorChannel(Worker::Actor::Id id) { KJ_IF_SOME(doId, id.tryGet>()) { KJ_IF_SOME(name, doId->getName()) { @@ -479,17 +492,12 @@ class Server::ActorNamespace final { } void inactive() override { - // Durable objects are evictable by default. - bool isEvictable = true; - KJ_SWITCH_ONEOF(ns.config) { - KJ_CASE_ONEOF(c, Durable) { - isEvictable = c.isEvictable; - } - KJ_CASE_ONEOF(c, Ephemeral) { - isEvictable = c.isEvictable; - } + for (auto& fulfiller: inactiveFulfillers) { + fulfiller->fulfill(); } - if (isEvictable) { + inactiveFulfillers.clear(); + + if (ns.isEvictable()) { KJ_IF_SOME(a, actor) { KJ_IF_SOME(m, a->getHibernationManager()) { // The hibernation manager needs to survive actor eviction and be passed to the actor @@ -778,6 +786,7 @@ class Server::ActorNamespace final { kj::Maybe> shutdownTask; kj::Maybe> onBrokenTask; kj::Maybe brokenReason; + kj::Vector>> inactiveFulfillers; // Reference to the ContainerClient (if container is enabled for this actor) kj::Maybe> containerClient; @@ -977,9 +986,6 @@ class Server::ActorNamespace final { // JS WebSockets. // TODO(someday): We could make this timeout configurable to make testing less burdensome. co_await timer.afterDelay(10 * kj::SECONDS); - // Cancel the onBroken promise, since we're about to destroy the actor anyways and don't - // want to trigger it. - onBrokenTask = kj::none; KJ_IF_SOME(a, actor) { if (a->isShared()) { // Our ActiveRequest refcounting has broken somewhere. This is likely because we're @@ -994,28 +1000,150 @@ class Server::ActorNamespace final { co_return; } - KJ_IF_SOME(m, manager) { + } + co_await tryEvict("broken.dropped; Actor freed due to inactivity"_kj, + IoChannelFactory::EvictWebSocketMode::HIBERNATE); + } + + // Shared eviction body used by both the inactivity timer (handleShutdown) and the test-only + // evict hooks. Destroys the Worker::Actor while keeping durable storage alive, so the DO is + // rebuilt on its next request. Depending on webSocketMode, hibernatable WebSockets are either + // hibernated first or closed. `reason` is recorded as the actor's disconnect reason. + // + // Returns false without evicting if the actor has acquired strong references by the time we + // hold the isolate lock (i.e. a new request raced in). The inactivity-timer path + // (handleShutdown) is cancelled by active() when a request arrives, so it can ignore the + // result; the test-only evict path is not cancellable, so it relies on this re-check to avoid + // tearing down a live actor. For the same reason, we only cancel onBrokenTask once we're + // committed to the shutdown -- otherwise an early `false` return would leave the actor running + // with no broken-detection. + kj::Promise tryEvict( + kj::StringPtr reason, IoChannelFactory::EvictWebSocketMode webSocketMode) { + KJ_IF_SOME(a, actor) { + if (a->isShared()) { + co_return false; + } + + if (manager != kj::none && + webSocketMode == IoChannelFactory::EvictWebSocketMode::HIBERNATE) { auto& worker = a->getWorker(); auto workerStrongRef = kj::atomicAddRef(worker); // Take an async lock, we can't use `takeAsyncLock(RequestObserver&)` since we don't // have an `IncomingRequest` at this point. - // - // Note that we do not have a race here because this is part of the `shutdownTask` - // promise. If a new request comes in while we're waiting to get the lock then we will - // cancel this promise. - Worker::AsyncLock asyncLock = co_await worker.takeAsyncLockWithoutRequest(nullptr); - workerStrongRef->runInLockScope( - asyncLock, [&](Worker::Lock& lock) { m->hibernateWebSockets(lock); }); + auto asyncLock = co_await worker.takeAsyncLockWithoutRequest(nullptr); + + // Re-check liveness now that we've awaited the lock: a new request may have started and + // grabbed a strong reference while we waited. + KJ_IF_SOME(current, actor) { + if (current->isShared()) { + co_return false; + } + } else { + co_return true; + } + + KJ_IF_SOME(m, manager) { + workerStrongRef->runInLockScope( + asyncLock, [&](Worker::Lock& lock) { m->hibernateWebSockets(lock); }); + } + } + + // New requests cannot interleave between the applicable isShared() check and this point + // because there are no awaits in between. In hibernate mode, the applicable check is the + // post-lock re-check above; otherwise, it is the initial check before the hibernation block. + // + // Cancel the onBroken promise, since we're about to destroy the actor anyways and don't + // want to trigger it. + onBrokenTask = kj::none; + KJ_IF_SOME(current, actor) { + // Note: wrap `reason` in kj::str() so KJ_EXCEPTION doesn't prefix the description with + // "reason = " (it only omits the label for string literals and kj::str(...) args). + current->shutdown(0, KJ_EXCEPTION(DISCONNECTED, kj::str(reason))); } - a->shutdown(0, KJ_EXCEPTION(DISCONNECTED, "broken.dropped; Actor freed due to inactivity")); } // Destroy the last strong Worker::Actor reference. actor = kj::none; - // Drop our reference to the ContainerClient - // If setInactivityTimeout() was called, the timer still holds a reference - // so the container stays alive until the timeout expires + if (webSocketMode == IoChannelFactory::EvictWebSocketMode::CLOSE) { + manager = kj::none; + } + + // Drop our reference to the ContainerClient. If setInactivityTimeout() was called, the timer + // still holds a reference so the container stays alive until the timeout expires. containerClient = kj::none; + co_return true; + } + + public: + // Test-only: evict this actor, bypassing the inactivity timer. Throws if the actor isn't + // currently running (never instantiated, or already evicted/hibernated). If the actor has + // in-flight requests, waits for them to drain before evicting. + kj::Promise evictForTest(IoChannelFactory::EvictWebSocketMode webSocketMode) { + JSG_REQUIRE(ns.isEvictable(), Error, + "Cannot evict Durable Object: its namespace has preventEviction set."); + JSG_REQUIRE( + actor != kj::none, Error, "Cannot evict Durable Object: it is not currently running."); + return evictWhenIdle(webSocketMode); + } + + // Test-only: evict this actor and all of its facets if they are running, otherwise do nothing. + // Used by the bulk evictAllDurableObjects() path, which must not error on actors that aren't + // running. + kj::Promise evictTreeForTestIfRunning( + IoChannelFactory::EvictWebSocketMode webSocketMode) { + kj::Vector> promises(facets.size() + 1); + for (auto& facet: facets) { + // Pin the ActorContainer for the duration of its eviction. The map entry is normally + // retained, but the onBroken path can erase it mid-eviction; the addRef keeps the + // coroutine's `this` valid regardless. + promises.add( + facet.value->evictTreeForTestIfRunning(webSocketMode).attach(facet.value->addRef())); + } + + if (actor != kj::none) { + // Pin ourselves just like facet/root map callers do. The join below can be canceled by its + // caller, but the coroutine may still be suspended on request inactivity. + promises.add(evictWhenIdle(webSocketMode).attach(addRef())); + } + + return kj::joinPromises(promises.releaseAsArray()); + } + + private: + // Waits until the actor is idle, then evicts it. We never abort a live request, so while there + // are in-flight requests -- or a just-completed request still holding a transient strong + // reference during teardown (e.g. RPC/fetch teardown) -- we poll until the actor can be torn + // down. To avoid hanging a test forever (e.g. a request that never completes), we give up + // after a fixed deadline. + kj::Promise evictWhenIdle(IoChannelFactory::EvictWebSocketMode webSocketMode) { + constexpr auto EVICT_TIMEOUT = 30 * kj::SECONDS; + auto deadline = timer.now() + EVICT_TIMEOUT; + for (;;) { + if (co_await tryEvict("broken.dropped; Actor evicted by test"_kj, webSocketMode)) { + shutdownTask = kj::none; + co_return; + } + + auto now = timer.now(); + JSG_REQUIRE(now < deadline, Error, + "Timed out waiting to evict Durable Object: it still has active references."); + + if (tracker->isActive()) { + auto paf = kj::newPromiseAndFulfiller(); + auto promise = kj::mv(paf.promise); + inactiveFulfillers.add(kj::mv(paf.fulfiller)); + + co_await kj::mv(promise).exclusiveJoin(timer.afterDelay(deadline - now).then([]() { + JSG_FAIL_REQUIRE(Error, + "Timed out waiting to evict Durable Object: it still has active references."); + })); + } else { + // The actor can briefly have non-request strong refs during teardown, after the tracker + // has already reported inactivity. Yield before re-checking, but don't poll for the full + // request-drain duration. + co_await timer.afterDelay(1 * kj::MILLISECONDS); + } + } } void start(kj::Own& actorClass, Worker::Actor::Id& id) { @@ -1240,6 +1368,22 @@ class Server::ActorNamespace final { actors.clear(); } + // Test-only: gracefully evict every currently-running actor in this namespace. Depending on + // webSocketMode, hibernatable WebSockets are either hibernated first or closed. + // Idle/non-running actors are skipped (not an error). The actor map entries are retained so the + // DO rebuilds on its next request. See IoChannelFactory::evictAllActorsForTest(). + kj::Promise evictAllForTest(IoChannelFactory::EvictWebSocketMode webSocketMode) { + kj::Vector> promises(actors.size()); + for (auto& actor: actors) { + // Pin the ActorContainer for the duration of its eviction. The map entry is normally + // retained, but the onBroken path can erase it mid-eviction; the addRef keeps the coroutine's + // `this` valid regardless. + promises.add( + actor.value->evictTreeForTestIfRunning(webSocketMode).attach(actor.value->addRef())); + } + return kj::joinPromises(promises.releaseAsArray()); + } + // Resets all actor databases, aborts all actors, and cancels all alarms so DOs // can be recreated with clean state. void deleteAll(kj::Maybe reason) { @@ -1366,6 +1510,10 @@ class Server::ActorNamespace final { actorContainer->startRequest(kj::mv(metadata)).attach(actorContainer->addRef())); } + kj::Promise evictForTest(IoChannelFactory::EvictWebSocketMode webSocketMode) override { + return actorContainer->evictForTest(webSocketMode).attach(actorContainer->addRef()); + } + void requireAllowsTransfer() override { actorContainer->requireTransferrableStub(); } @@ -3875,6 +4023,22 @@ class Server::WorkerService final: public Service, deleteActorsCallback(reason); } + kj::Promise evictAllActorsForTest( + IoChannelFactory::EvictWebSocketMode webSocketMode) override { + auto& channels = + KJ_REQUIRE_NONNULL(ioChannels.tryGet(), "link() has not been called"); + + kj::Vector> promises(channels.actor.size()); + for (auto& maybeNs: channels.actor) { + KJ_IF_SOME(ns, maybeNs) { + if (ns.isEvictable()) { + promises.add(ns.evictAllForTest(webSocketMode)); + } + } + } + return kj::joinPromises(promises.releaseAsArray()); + } + // For now, in workerd just abort the process for non-dynamic workers. void abortIsolate(kj::StringPtr reason) noexcept override { KJ_IF_SOME(cb, abortIsolateCallback) { @@ -4436,16 +4600,7 @@ void Server::abortAllActors(kj::Maybe reason) { for (auto& service: services) { KJ_IF_SOME(worker, kj::tryDowncast(*service.value)) { for (auto& [className, ns]: worker.getActorNamespaces()) { - bool isEvictable = true; - KJ_SWITCH_ONEOF(ns->getConfig()) { - KJ_CASE_ONEOF(c, Durable) { - isEvictable = c.isEvictable; - } - KJ_CASE_ONEOF(c, Ephemeral) { - isEvictable = c.isEvictable; - } - } - if (isEvictable) ns->abortAll(reason); + if (ns->isEvictable()) ns->abortAll(reason); } } } @@ -4455,16 +4610,7 @@ void Server::deleteAllActors(kj::Maybe reason) { for (auto& service: services) { KJ_IF_SOME(worker, kj::tryDowncast(*service.value)) { for (auto& [className, ns]: worker.getActorNamespaces()) { - bool isEvictable = true; - KJ_SWITCH_ONEOF(ns->getConfig()) { - KJ_CASE_ONEOF(c, Durable) { - isEvictable = c.isEvictable; - } - KJ_CASE_ONEOF(c, Ephemeral) { - isEvictable = c.isEvictable; - } - } - if (isEvictable) ns->deleteAll(reason); + if (ns->isEvictable()) ns->deleteAll(reason); } } } diff --git a/src/workerd/server/tests/unsafe-module/unsafe-module-test.js b/src/workerd/server/tests/unsafe-module/unsafe-module-test.js index 251bd449f9a..9888f802857 100644 --- a/src/workerd/server/tests/unsafe-module/unsafe-module-test.js +++ b/src/workerd/server/tests/unsafe-module/unsafe-module-test.js @@ -32,6 +32,100 @@ export class StorageObject extends DurableObject { } } +// DO that tracks both in-memory and persisted state, used to verify that evict() tears down the +// instance (losing in-memory state) while preserving durable storage. +export class CounterObject extends DurableObject { + constructor(ctx, env) { + super(ctx, env); + this.inMemory = 0; + } + async bump() { + this.inMemory++; + const stored = ((await this.ctx.storage.get('count')) ?? 0) + 1; + await this.ctx.storage.put('count', stored); + return { inMemory: this.inMemory, stored }; + } + async getState() { + return { + inMemory: this.inMemory, + stored: (await this.ctx.storage.get('count')) ?? 0, + }; + } + async slowBump() { + await new Promise((resolve) => setTimeout(resolve, 10)); + return await this.bump(); + } +} + +// Same as CounterObject, but its namespace sets preventEviction. +export class CounterObjectPreventEviction extends CounterObject {} + +// Root/facet pair used to verify evictAllDurableObjects() walks the whole running actor tree. +export class FacetCounterObject extends CounterObject {} + +export class FacetRootObject extends CounterObject { + getFacet() { + return this.ctx.facets.get('child', () => ({ + class: this.ctx.exports.FacetCounterObject({}), + })); + } + async bumpBoth() { + return { + root: await this.bump(), + facet: await this.getFacet().bump(), + }; + } + async getBothState() { + return { + root: await this.getState(), + facet: await this.getFacet().getState(), + }; + } +} + +// DO that accepts a hibernatable WebSocket. Used to verify that evict() hibernates the socket +// (keeping the connection alive) and rebuilds the instance on the next event. +export class HibernationObject extends DurableObject { + constructor(ctx, env) { + super(ctx, env); + this.wokeCount = 0; // in-memory; resets when the instance is rebuilt. + } + async fetch(request) { + const [client, server] = Object.values(new WebSocketPair()); + this.ctx.acceptWebSocket(server); + return new Response(null, { status: 101, webSocket: client }); + } + async webSocketMessage(ws, message) { + if (message === 'bye') { + ws.close(1000, 'bye'); + return; + } + this.wokeCount++; + const stored = ((await this.ctx.storage.get('count')) ?? 0) + 1; + await this.ctx.storage.put('count', stored); + ws.send(JSON.stringify({ wokeCount: this.wokeCount, stored })); + } + async getStored() { + return (await this.ctx.storage.get('count')) ?? 0; + } +} + +function nextMessage(ws) { + return new Promise((resolve, reject) => { + ws.addEventListener('message', (event) => resolve(event.data), { + once: true, + }); + ws.addEventListener('error', reject, { once: true }); + }); +} + +function nextClose(ws) { + return new Promise((resolve, reject) => { + ws.addEventListener('close', () => resolve(), { once: true }); + ws.addEventListener('error', reject, { once: true }); + }); +} + // DO with an alarm for verifying deleteAllDurableObjects() cancels alarms. let alarmTriggers = 0; export class AlarmObject extends DurableObject { @@ -166,3 +260,260 @@ export const test_delete_all_durable_objects_respects_prevent_eviction = { assert.strictEqual(res1, res2); }, }; + +export const test_evict = { + async test(ctrl, env, ctx) { + const id = env.COUNTER.idFromName('evict'); + const stub = env.COUNTER.get(id); + + assert.deepStrictEqual(await stub.bump(), { inMemory: 1, stored: 1 }); + assert.deepStrictEqual(await stub.bump(), { inMemory: 2, stored: 2 }); + + // Gracefully evict the instance from the isolate. + await unsafe.evict(stub); + + // The stub keeps working (no abort), but the instance was rebuilt: in-memory state is gone + // while durable storage survived. + assert.deepStrictEqual(await stub.getState(), { inMemory: 0, stored: 2 }); + }, +}; + +export const test_evict_waits_for_in_flight_request = { + async test(ctrl, env, ctx) { + const stub = env.COUNTER.get(env.COUNTER.idFromName('evict-in-flight')); + + const pending = stub.slowBump(); + await unsafe.evict(stub); + + assert.deepStrictEqual(await pending, { inMemory: 1, stored: 1 }); + assert.deepStrictEqual(await stub.getState(), { inMemory: 0, stored: 1 }); + }, +}; + +export const test_evict_concurrent_calls = { + async test(ctrl, env, ctx) { + const stub = env.COUNTER.get(env.COUNTER.idFromName('evict-concurrent')); + await stub.bump(); + + await Promise.all([unsafe.evict(stub), unsafe.evict(stub)]); + assert.deepStrictEqual(await stub.getState(), { inMemory: 0, stored: 1 }); + }, +}; + +export const test_evict_respects_prevent_eviction = { + async test(ctrl, env, ctx) { + const stub = env.COUNTER_PREVENT_EVICTION.get( + env.COUNTER_PREVENT_EVICTION.idFromName('evict-prevent') + ); + await stub.bump(); + + await assert.rejects(() => unsafe.evict(stub), { + name: 'Error', + message: + 'Cannot evict Durable Object: its namespace has preventEviction set.', + }); + + assert.deepStrictEqual(await stub.getState(), { inMemory: 1, stored: 1 }); + }, +}; + +export const test_evict_not_running = { + async test(ctrl, env, ctx) { + // Never sent a request, so the DO was never instantiated. + const stub = env.COUNTER.get(env.COUNTER.idFromName('never-running')); + await assert.rejects(() => unsafe.evict(stub), { + name: 'Error', + message: 'Cannot evict Durable Object: it is not currently running.', + }); + }, +}; + +export const test_evict_non_durable_object = { + async test(ctrl, env, ctx) { + // SELF_SERVICE is an ordinary service binding, not a Durable Object stub. + await assert.rejects(() => unsafe.evict(env.SELF_SERVICE), { + name: 'Error', + message: 'evict() can only be used on a Durable Object stub.', + }); + }, +}; + +export const test_evict_rejects_invalid_websocket_mode = { + async test(ctrl, env, ctx) { + const stub = env.COUNTER.get(env.COUNTER.idFromName('evict-invalid-mode')); + await stub.bump(); + + await assert.rejects(() => unsafe.evict(stub, { webSockets: 'explode' }), { + name: 'TypeError', + message: 'options.webSockets must be "hibernate" or "close".', + }); + await assert.rejects( + () => unsafe.evictAllDurableObjects({ webSockets: 'explode' }), + { + name: 'TypeError', + message: 'options.webSockets must be "hibernate" or "close".', + } + ); + + assert.deepStrictEqual(await stub.getState(), { inMemory: 1, stored: 1 }); + }, +}; + +export const test_evict_all_durable_objects = { + async test(ctrl, env, ctx) { + const a = env.COUNTER.get(env.COUNTER.idFromName('all-a')); + const b = env.COUNTER.get(env.COUNTER.idFromName('all-b')); + const prevent = env.COUNTER_PREVENT_EVICTION.get( + env.COUNTER_PREVENT_EVICTION.idFromName('all-prevent') + ); + + await a.bump(); + await a.bump(); + await b.bump(); + await prevent.bump(); + + await unsafe.evictAllDurableObjects(); + + // Evictable counters were rebuilt (in-memory reset), storage preserved. + assert.deepStrictEqual(await a.getState(), { inMemory: 0, stored: 2 }); + assert.deepStrictEqual(await b.getState(), { inMemory: 0, stored: 1 }); + + // preventEviction namespace was skipped — in-memory state survives. + assert.deepStrictEqual(await prevent.getState(), { + inMemory: 1, + stored: 1, + }); + }, +}; + +export const test_evict_all_durable_objects_evicts_facets = { + async test(ctrl, env, ctx) { + const root = env.FACET_ROOT.get( + env.FACET_ROOT.idFromName('evict-all-facets') + ); + + assert.deepStrictEqual(await root.bumpBoth(), { + root: { inMemory: 1, stored: 1 }, + facet: { inMemory: 1, stored: 1 }, + }); + assert.deepStrictEqual(await root.bumpBoth(), { + root: { inMemory: 2, stored: 2 }, + facet: { inMemory: 2, stored: 2 }, + }); + + await unsafe.evictAllDurableObjects(); + + // Both the root and facet instances were rebuilt, but both durable stores survived. + assert.deepStrictEqual(await root.getBothState(), { + root: { inMemory: 0, stored: 2 }, + facet: { inMemory: 0, stored: 2 }, + }); + }, +}; + +export const test_evict_hibernates_websockets = { + async test(ctrl, env, ctx) { + const stub = env.HIBERNATE.get(env.HIBERNATE.idFromName('room')); + const res = await stub.fetch('http://x', { + headers: { Upgrade: 'websocket' }, + }); + const ws = res.webSocket; + assert(ws, 'expected a WebSocket in the response'); + ws.accept(); + + // Fresh instance: wokeCount 0 -> 1, stored 0 -> 1. + let msg = nextMessage(ws); + ws.send('a'); + assert.deepStrictEqual(JSON.parse(await msg), { wokeCount: 1, stored: 1 }); + + // Evict: hibernates the socket (connection stays open) and tears down the instance. + await unsafe.evict(stub); + + // The same socket still delivers (proving it hibernated rather than closing), and the event + // is handled by a rebuilt instance: wokeCount resets to 1, but storage survived (stored -> 2). + msg = nextMessage(ws); + ws.send('b'); + assert.deepStrictEqual(JSON.parse(await msg), { wokeCount: 1, stored: 2 }); + + // Tell the DO to close the socket so the test tears down cleanly. + const closed = new Promise((resolve) => + ws.addEventListener('close', () => resolve(), { once: true }) + ); + ws.send('bye'); + await closed; + }, +}; + +export const test_evict_closes_websockets = { + async test(ctrl, env, ctx) { + const stub = env.HIBERNATE.get(env.HIBERNATE.idFromName('close-room')); + let res = await stub.fetch('http://x', { + headers: { Upgrade: 'websocket' }, + }); + let ws = res.webSocket; + assert(ws, 'expected a WebSocket in the response'); + ws.accept(); + + let msg = nextMessage(ws); + ws.send('a'); + assert.deepStrictEqual(JSON.parse(await msg), { wokeCount: 1, stored: 1 }); + + const closed = nextClose(ws); + await unsafe.evict(stub, { webSockets: 'close' }); + await closed; + + // Storage survived, but the closed socket cannot deliver the wakeup event. Open a new socket to + // verify the instance was rebuilt and storage continues from the previous value. + assert.strictEqual(await stub.getStored(), 1); + res = await stub.fetch('http://x', { + headers: { Upgrade: 'websocket' }, + }); + ws = res.webSocket; + assert(ws, 'expected a WebSocket in the response'); + ws.accept(); + + msg = nextMessage(ws); + ws.send('b'); + assert.deepStrictEqual(JSON.parse(await msg), { wokeCount: 1, stored: 2 }); + + const cleanupClosed = nextClose(ws); + ws.send('bye'); + await cleanupClosed; + }, +}; + +export const test_evict_all_durable_objects_closes_websockets = { + async test(ctrl, env, ctx) { + const stub = env.HIBERNATE.get(env.HIBERNATE.idFromName('close-all-room')); + let res = await stub.fetch('http://x', { + headers: { Upgrade: 'websocket' }, + }); + let ws = res.webSocket; + assert(ws, 'expected a WebSocket in the response'); + ws.accept(); + + let msg = nextMessage(ws); + ws.send('a'); + assert.deepStrictEqual(JSON.parse(await msg), { wokeCount: 1, stored: 1 }); + + const closed = nextClose(ws); + await unsafe.evictAllDurableObjects({ webSockets: 'close' }); + await closed; + + assert.strictEqual(await stub.getStored(), 1); + res = await stub.fetch('http://x', { + headers: { Upgrade: 'websocket' }, + }); + ws = res.webSocket; + assert(ws, 'expected a WebSocket in the response'); + ws.accept(); + + msg = nextMessage(ws); + ws.send('b'); + assert.deepStrictEqual(JSON.parse(await msg), { wokeCount: 1, stored: 2 }); + + const cleanupClosed = nextClose(ws); + ws.send('bye'); + await cleanupClosed; + }, +}; diff --git a/src/workerd/server/tests/unsafe-module/unsafe-module-test.wd-test b/src/workerd/server/tests/unsafe-module/unsafe-module-test.wd-test index daedd2195f5..96f4a7b028f 100644 --- a/src/workerd/server/tests/unsafe-module/unsafe-module-test.wd-test +++ b/src/workerd/server/tests/unsafe-module/unsafe-module-test.wd-test @@ -8,7 +8,7 @@ const unitTests :Workerd.Config = ( modules = [ ( name = "worker", esModule = embed "unsafe-module-test.js" ), ], - compatibilityFlags = ["nodejs_compat", "unsafe_module", "rpc"], + compatibilityFlags = ["nodejs_compat", "unsafe_module", "rpc", "enable_ctx_exports"], durableObjectNamespaces = [ ( className = "TestDurableObject", uniqueKey = "durable" ), ( className = "TestDurableObjectPreventEviction", uniqueKey = "durable-prevent-eviction", preventEviction = true ), @@ -16,6 +16,10 @@ const unitTests :Workerd.Config = ( ( className = "TestEphemeralObjectPreventEviction", ephemeralLocal = void, preventEviction = true ), ( className = "AlarmObject", uniqueKey = "alarm" ), ( className = "StorageObject", uniqueKey = "storage" ), + ( className = "CounterObject", uniqueKey = "counter" ), + ( className = "CounterObjectPreventEviction", uniqueKey = "counter-prevent-eviction", preventEviction = true ), + ( className = "FacetRootObject", uniqueKey = "facet-root" ), + ( className = "HibernationObject", uniqueKey = "hibernation" ), ], durableObjectStorage = ( localDisk = "TEST_TMPDIR" ), bindings = [ @@ -25,6 +29,11 @@ const unitTests :Workerd.Config = ( ( name = "EPHEMERAL_PREVENT_EVICTION", durableObjectNamespace = "TestEphemeralObjectPreventEviction" ), ( name = "ALARM", durableObjectNamespace = "AlarmObject" ), ( name = "STORAGE", durableObjectNamespace = "StorageObject" ), + ( name = "COUNTER", durableObjectNamespace = "CounterObject" ), + ( name = "COUNTER_PREVENT_EVICTION", durableObjectNamespace = "CounterObjectPreventEviction" ), + ( name = "FACET_ROOT", durableObjectNamespace = "FacetRootObject" ), + ( name = "HIBERNATE", durableObjectNamespace = "HibernationObject" ), + ( name = "SELF_SERVICE", service = "unsafe-module-test" ), ], ) ),