Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -621,22 +621,23 @@ class WritableStreamController {
public:
// When a Writer is locked to a controller, the controller will attach itself to the writer,
// passing along the closed and ready promises that will be used to communicate state to the
// user code.
// user code. LazyPromise is used to defer the creation of the V8 Promise until the promise
// is actually accessed by JavaScript.
//
// The controller is guaranteed to either outlive the Writer or will detach the Writer so the
// WritableStreamController& reference should always remain valid.
virtual void attach(jsg::Lock& js,
WritableStreamController& controller,
jsg::Promise<void> closedPromise,
jsg::Promise<void> readyPromise) = 0;
jsg::LazyPromise<void> closedPromise,
jsg::LazyPromise<void> readyPromise) = 0;

// When a Writer lock is released, the controller will signal to the writer that is has been
// detached.
virtual void detach() = 0;

// The ready promise can be replaced whenever backpressure is signaled by the underlying
// controller.
virtual void replaceReadyPromise(jsg::Lock& js, jsg::Promise<void> readyPromise) = 0;
virtual void replaceReadyPromise(jsg::Lock& js, jsg::LazyPromise<void> readyPromise) = 0;
};

struct PendingAbort {
Expand Down Expand Up @@ -831,8 +832,8 @@ class WriterLocked {
public:
static constexpr kj::StringPtr NAME KJ_UNUSED = "writer-locked"_kj;
WriterLocked(WritableStreamController::Writer& writer,
jsg::Promise<void>::Resolver closedFulfiller,
kj::Maybe<jsg::Promise<void>::Resolver> readyFulfiller = kj::none)
jsg::LazyPromise<void>::Resolver closedFulfiller,
kj::Maybe<jsg::LazyPromise<void>::Resolver> readyFulfiller = kj::none)
: writer(writer),
closedFulfiller(kj::mv(closedFulfiller)),
readyFulfiller(kj::mv(readyFulfiller)) {}
Expand All @@ -852,15 +853,15 @@ class WriterLocked {
return KJ_ASSERT_NONNULL(writer);
}

kj::Maybe<jsg::Promise<void>::Resolver>& getClosedFulfiller() {
kj::Maybe<jsg::LazyPromise<void>::Resolver>& getClosedFulfiller() {
return closedFulfiller;
}

kj::Maybe<jsg::Promise<void>::Resolver>& getReadyFulfiller() {
kj::Maybe<jsg::LazyPromise<void>::Resolver>& getReadyFulfiller() {
return readyFulfiller;
}

void setReadyFulfiller(jsg::Lock& js, jsg::PromiseResolverPair<void>& pair) {
void setReadyFulfiller(jsg::Lock& js, jsg::LazyPromiseResolverPair<void>& pair) {
KJ_IF_SOME(w, writer) {
readyFulfiller = kj::mv(pair.resolver);
w.replaceReadyPromise(js, kj::mv(pair.promise));
Expand All @@ -880,8 +881,8 @@ class WriterLocked {

private:
kj::Maybe<WritableStreamController::Writer&> writer;
kj::Maybe<jsg::Promise<void>::Resolver> closedFulfiller;
kj::Maybe<jsg::Promise<void>::Resolver> readyFulfiller;
kj::Maybe<jsg::LazyPromise<void>::Resolver> closedFulfiller;
kj::Maybe<jsg::LazyPromise<void>::Resolver> readyFulfiller;
};

template <typename T>
Expand Down Expand Up @@ -911,6 +912,34 @@ void maybeRejectPromise(jsg::Lock& js,
}
}

// LazyPromise resolver overloads
template <typename T>
void maybeResolvePromise(
jsg::Lock& js, kj::Maybe<typename jsg::LazyPromise<T>::Resolver>& maybeResolver, T&& t) {
KJ_IF_SOME(resolver, maybeResolver) {
resolver.resolve(js, kj::fwd<T>(t));
maybeResolver = kj::none;
}
}

inline void maybeResolvePromise(
jsg::Lock& js, kj::Maybe<typename jsg::LazyPromise<void>::Resolver>& maybeResolver) {
KJ_IF_SOME(resolver, maybeResolver) {
resolver.resolve(js);
maybeResolver = kj::none;
}
}

template <typename T>
void maybeRejectPromise(jsg::Lock& js,
kj::Maybe<typename jsg::LazyPromise<T>::Resolver>& maybeResolver,
v8::Local<v8::Value> reason) {
KJ_IF_SOME(resolver, maybeResolver) {
resolver.reject(js, jsg::Value(js.v8Isolate, reason));
maybeResolver = kj::none;
}
}

template <typename T>
jsg::Promise<T> rejectedMaybeHandledPromise(
jsg::Lock& js, v8::Local<v8::Value> reason, bool handled) {
Expand Down
6 changes: 3 additions & 3 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ void WritableStreamInternalController::updateBackpressure(jsg::Lock& js, bool ba
// Per the spec, when backpressure is updated and is true, we replace the existing
// ready promise on the writer with a new pending promise, regardless of whether
// the existing one is resolved or not.
auto prp = js.newPromiseAndResolver<void>();
jsg::LazyPromiseResolverPair<void> prp;
prp.promise.markAsHandled(js);
writerLock.setReadyFulfiller(js, prp);
return;
Expand Down Expand Up @@ -1362,10 +1362,10 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer)
return false;
}

auto closedPrp = js.newPromiseAndResolver<void>();
jsg::LazyPromiseResolverPair<void> closedPrp;
closedPrp.promise.markAsHandled(js);

auto readyPrp = js.newPromiseAndResolver<void>();
jsg::LazyPromiseResolverPair<void> readyPrp;
readyPrp.promise.markAsHandled(js);

auto lock = WriterLocked(writer, kj::mv(closedPrp.resolver), kj::mv(readyPrp.resolver));
Expand Down
10 changes: 5 additions & 5 deletions src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,9 @@ bool WritableLockImpl<Controller>::lockWriter(jsg::Lock& js, Controller& self, W
return false;
}

auto closedPrp = js.newPromiseAndResolver<void>();
jsg::LazyPromiseResolverPair<void> closedPrp;
closedPrp.promise.markAsHandled(js);
auto readyPrp = js.newPromiseAndResolver<void>();
jsg::LazyPromiseResolverPair<void> readyPrp;
readyPrp.promise.markAsHandled(js);

auto lock = WriterLocked(writer, kj::mv(closedPrp.resolver), kj::mv(readyPrp.resolver));
Expand Down Expand Up @@ -3532,9 +3532,9 @@ void WritableStreamJsController::maybeRejectReadyPromise(
if (writerLock.getReadyFulfiller() != kj::none) {
maybeRejectPromise<void>(js, writerLock.getReadyFulfiller(), reason);
} else {
auto prp = js.newPromiseAndResolver<void>();
jsg::LazyPromiseResolverPair<void> prp;
prp.promise.markAsHandled(js);
prp.resolver.reject(js, reason);
prp.resolver.reject(js, jsg::Value(js.v8Isolate, reason));
writerLock.setReadyFulfiller(js, prp);
}
}
Expand Down Expand Up @@ -3748,7 +3748,7 @@ void WritableStreamJsController::updateBackpressure(jsg::Lock& js, bool backpres
// Per the spec, when backpressure is updated and is true, we replace the existing
// ready promise on the writer with a new pending promise, regardless of whether
// the existing one is resolved or not.
auto prp = js.newPromiseAndResolver<void>();
jsg::LazyPromiseResolverPair<void> prp;
prp.promise.markAsHandled(js);
return writerLock.setReadyFulfiller(js, prp);
}
Expand Down
24 changes: 13 additions & 11 deletions src/workerd/api/streams/writable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ jsg::Promise<void> WritableStreamDefaultWriter::abort(

void WritableStreamDefaultWriter::attach(jsg::Lock& js,
WritableStreamController& controller,
jsg::Promise<void> closedPromise,
jsg::Promise<void> readyPromise) {
jsg::LazyPromise<void> closedPromise,
jsg::LazyPromise<void> readyPromise) {
KJ_ASSERT(state.is<Initial>());
state = controller.addRef();
this->closedPromise = kj::mv(closedPromise);
replaceReadyPromise(js, kj::mv(readyPromise));
this->readyPromise = kj::mv(readyPromise);
}

jsg::Promise<void> WritableStreamDefaultWriter::close(jsg::Lock& js) {
Expand Down Expand Up @@ -108,8 +108,9 @@ void WritableStreamDefaultWriter::detach() {
KJ_UNREACHABLE;
}

jsg::MemoizedIdentity<jsg::Promise<void>>& WritableStreamDefaultWriter::getClosed() {
return KJ_ASSERT_NONNULL(closedPromise, "the writer was never attached to a stream");
jsg::MemoizedIdentity<jsg::Promise<void>>& WritableStreamDefaultWriter::getClosed(jsg::Lock& js) {
return KJ_ASSERT_NONNULL(closedPromise, "the writer was never attached to a stream")
.getPromise(js);
}

kj::Maybe<int> WritableStreamDefaultWriter::getDesiredSize() {
Expand All @@ -130,12 +131,14 @@ kj::Maybe<int> WritableStreamDefaultWriter::getDesiredSize() {
KJ_UNREACHABLE;
}

jsg::MemoizedIdentity<jsg::Promise<void>>& WritableStreamDefaultWriter::getReady() {
return KJ_ASSERT_NONNULL(readyPromise, "the writer was never attached to a stream");
jsg::MemoizedIdentity<jsg::Promise<void>>& WritableStreamDefaultWriter::getReady(jsg::Lock& js) {
return KJ_ASSERT_NONNULL(readyPromise, "the writer was never attached to a stream")
.getPromise(js);
}

kj::Maybe<jsg::Promise<void>> WritableStreamDefaultWriter::isReady(jsg::Lock& js) {
return readyPromisePending.map([&](jsg::Promise<void>& p) { return p.whenResolved(js); });
return readyPromise.map(
[&](jsg::LazyPromise<void>& p) { return p.getPromise(js).inner().whenResolved(js); });
}

void WritableStreamDefaultWriter::lockToStream(jsg::Lock& js, WritableStream& stream) {
Expand Down Expand Up @@ -172,9 +175,8 @@ void WritableStreamDefaultWriter::releaseLock(jsg::Lock& js) {
}

void WritableStreamDefaultWriter::replaceReadyPromise(
jsg::Lock& js, jsg::Promise<void> readyPromise) {
this->readyPromisePending = kj::mv(readyPromise);
this->readyPromise = KJ_ASSERT_NONNULL(this->readyPromisePending).whenResolved(js);
jsg::Lock& js, jsg::LazyPromise<void> readyPromise) {
this->readyPromise = kj::mv(readyPromise);
}

jsg::Promise<void> WritableStreamDefaultWriter::write(
Expand Down
15 changes: 7 additions & 8 deletions src/workerd/api/streams/writable.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class WritableStreamDefaultWriter: public jsg::Object, public WritableStreamCont
static jsg::Ref<WritableStreamDefaultWriter> constructor(
jsg::Lock& js, jsg::Ref<WritableStream> stream);

jsg::MemoizedIdentity<jsg::Promise<void>>& getClosed();
jsg::MemoizedIdentity<jsg::Promise<void>>& getReady();
jsg::MemoizedIdentity<jsg::Promise<void>>& getClosed(jsg::Lock& js);
jsg::MemoizedIdentity<jsg::Promise<void>>& getReady(jsg::Lock& js);
kj::Maybe<int> getDesiredSize();

jsg::Promise<void> abort(jsg::Lock& js, jsg::Optional<v8::Local<v8::Value>> reason);
Expand Down Expand Up @@ -66,14 +66,14 @@ class WritableStreamDefaultWriter: public jsg::Object, public WritableStreamCont

void attach(jsg::Lock& js,
WritableStreamController& controller,
jsg::Promise<void> closedPromise,
jsg::Promise<void> readyPromise) override;
jsg::LazyPromise<void> closedPromise,
jsg::LazyPromise<void> readyPromise) override;

void detach() override;

void lockToStream(jsg::Lock& js, WritableStream& stream);

void replaceReadyPromise(jsg::Lock& js, jsg::Promise<void> readyPromise) override;
void replaceReadyPromise(jsg::Lock& js, jsg::LazyPromise<void> readyPromise) override;

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const;

Expand All @@ -94,9 +94,8 @@ class WritableStreamDefaultWriter: public jsg::Object, public WritableStreamCont
kj::Maybe<IoContext&> ioContext;
kj::OneOf<Initial, Attached, Released, StreamStates::Closed> state = Initial();

kj::Maybe<jsg::MemoizedIdentity<jsg::Promise<void>>> closedPromise;
kj::Maybe<jsg::MemoizedIdentity<jsg::Promise<void>>> readyPromise;
kj::Maybe<jsg::Promise<void>> readyPromisePending;
kj::Maybe<jsg::LazyPromise<void>> closedPromise;
kj::Maybe<jsg::LazyPromise<void>> readyPromise;

void visitForGc(jsg::GcVisitor& visitor);
};
Expand Down
1 change: 1 addition & 0 deletions src/workerd/jsg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ wd_cc_library(
"//src/workerd/util", # Required for util/batch-queue.h
"//src/workerd/util:autogate",
"//src/workerd/util:sentry",
"//src/workerd/util:state-machine", # Required for LazyPromise
"//src/workerd/util:thread-scopes",
"@capnp-cpp//src/kj",
"@workerd-v8//:v8",
Expand Down
9 changes: 9 additions & 0 deletions src/workerd/jsg/jsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,15 @@ class MemoizedIdentity {
return *this;
}

// Returns a reference to the inner value. This only works when the value
// hasn't been converted to JavaScript yet (i.e., while still in C++ code).
T& inner() {
return value.template get<T>();
}
const T& inner() const {
return value.template get<T>();
}

void visitForGc(GcVisitor& visitor);

JSG_MEMORY_INFO(MemoizedIdentity) {
Expand Down
Loading
Loading