diff --git a/src/workerd/api/sockets.c++ b/src/workerd/api/sockets.c++ index 5d2e4cc8965..10f50ff6f4e 100644 --- a/src/workerd/api/sockets.c++ +++ b/src/workerd/api/sockets.c++ @@ -278,6 +278,9 @@ jsg::Ref connectImpl(jsg::Lock& js, } jsg::Promise Socket::close(jsg::Lock& js) { + IoContext::requireCurrentOrThrowJs(*ioContext); + auto& ctx = KJ_ASSERT_NONNULL(ioContext->tryGet()); + if (isClosing) { return closedPromiseCopy.whenResolved(js); } @@ -288,16 +291,14 @@ jsg::Promise Socket::close(jsg::Lock& js) { // Wait until the socket connects (successfully or otherwise) return openedPromiseCopy.whenResolved(js) - .then(js, - [this](jsg::Lock& js) { + .then(js, ctx.addFunctor([this](jsg::Lock& js) { if (!writable->getController().isClosedOrClosing()) { return writable->getController().flush(js); } else { return js.resolvedPromise(); } - }) - .then(js, - [this](jsg::Lock& js) { + })) + .then(js, ctx.addFunctor([this](jsg::Lock& js) { // Forcibly abort the readable/writable streams. auto cancelPromise = readable->getController().cancel(js, kj::none); auto abortPromise = writable->getController().abort(js, kj::none); @@ -306,8 +307,8 @@ jsg::Promise Socket::close(jsg::Lock& js) { return cancelPromise.then(js, [abortPromise = kj::mv(abortPromise)](jsg::Lock& js) mutable { return kj::mv(abortPromise); }); - }) - .then(js, [this](jsg::Lock& js) { + })) + .then(js, ctx.addFunctor([this](jsg::Lock& js) { // This task needs to destroyed prior to destroying the AsyncIoStream as it is awaiting // that stream's `whenWriteDisconnected` promise. watchForDisconnectTask = nullptr; @@ -320,7 +321,9 @@ jsg::Promise Socket::close(jsg::Lock& js) { resolveFulfiller(js, kj::none); return js.resolvedPromise(); - }).catch_(js, [this](jsg::Lock& js, jsg::Value err) { errorHandler(js, kj::mv(err)); }); + })).catch_(js, ctx.addFunctor([this](jsg::Lock& js, jsg::Value err) { + errorHandler(js, kj::mv(err)); + })); } jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOptions) { diff --git a/src/workerd/api/sockets.h b/src/workerd/api/sockets.h index 32de414ffa1..8f7a79a7a1f 100644 --- a/src/workerd/api/sockets.h +++ b/src/workerd/api/sockets.h @@ -71,7 +71,8 @@ class Socket: public jsg::Object { kj::String domain, bool isDefaultFetchPort, jsg::PromiseResolverPair openedPrPair) - : connectionStream(context.addObject(kj::mv(connectionStream))), + : ioContext(context.getWeakRef()), + connectionStream(context.addObject(kj::mv(connectionStream))), readable(kj::mv(readableParam)), writable(kj::mv(writable)), closedResolver(kj::mv(closedPrPair.resolver)), @@ -183,6 +184,7 @@ class Socket: public jsg::Object { // TODO(cleanup): Combine all the IoOwns here into one, to improve efficiency and make // shutdown order clearer. + kj::Own ioContext; kj::Maybe>>> connectionStream; jsg::Ref readable; jsg::Ref writable;