From ac2836e86186a2dd828af45970db445399f07000 Mon Sep 17 00:00:00 2001 From: Matthew Miller Date: Fri, 13 Feb 2026 08:18:39 -0500 Subject: [PATCH 1/5] Observable grouping functions -- Rx.buffer, Rx.window, Rx.pairwise --- src/rx/src/Shared/Rx.lua | 89 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/src/rx/src/Shared/Rx.lua b/src/rx/src/Shared/Rx.lua index 80236f04af7..1ca13bf7327 100644 --- a/src/rx/src/Shared/Rx.lua +++ b/src/rx/src/Shared/Rx.lua @@ -2183,4 +2183,93 @@ function Rx.mergeScan(accumulator, seed) }) end +--[=[ + Collects values as an array, emitting the result when another Observable emits. + + https://rxjs.dev/api/index/function/buffer + + @param closingNotifier Observable + @return (source: Observable) -> Observable +]=] + +function Rx.buffer(closingNotifier) + assert(Observable.isObservable(closingNotifier), "Bad observable") + + return function(source) + assert(Observable.isObservable(source), "Bad observable") + + return Observable.new(function(sub) + local maid = Maid.new() + local buffer = {} + + maid:GiveTask(closingNotifier:Subscribe(function() + local latest = table.clone(buffer) + + table.clear(buffer) + + sub:Fire(latest) + end) + + maid:GiveTask(source:Subscribe(function(value) + table.insert(buffer, value) + end, nil, function(value) + sub:Fire(buffer) + sub:Complete() + end)) + + maid:GiveTask(function() + table.clear(buffer) + end) + + return maid + end) + end +end + +--[=[ + Collects values, then, when another Observable fires, emits the collected values as an Observable. + + https://rxjs.dev/api/index/function/window + + @param windowBoundaries Observable + @return (source: Observable) -> Observable +]=] + +function Rx.window(windowBoundaries) + assert(Observable.isObservable(windowBoundaries), "Bad observable") + + return Rx.pipe({ + Rx.buffer(windowBoundaries), + Rx.map(function(data) + return Rx.of(unpack(data)) + end) + }) +end + +--[=[ + Groups pairs of emissions together, emitting them as an array. + + https://rxjs.dev/api/index/function/pairwise + + @return (source: Observable) -> Observable +]=] + +function Rx.pairwise() + return function(source) + assert(Observable.isObservable(source), "Bad observable") + + return Observable.new(function(sub) + local previous = UNSET_VALUE + + return source:Subscribe(function(value) + if previous ~= UNSET_VALUE then + sub:Fire({previous, value}) + end + + previous = value + end, sub:GetFailComplete()) + end) + end +end + return Rx From 357e80fb8add4ec46725584ac3ce545b9c20f6c0 Mon Sep 17 00:00:00 2001 From: Matthew Miller Date: Fri, 13 Feb 2026 10:59:46 -0500 Subject: [PATCH 2/5] Fixed typo --- src/rx/src/Shared/Rx.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rx/src/Shared/Rx.lua b/src/rx/src/Shared/Rx.lua index 1ca13bf7327..d4bfb306dec 100644 --- a/src/rx/src/Shared/Rx.lua +++ b/src/rx/src/Shared/Rx.lua @@ -2208,7 +2208,7 @@ function Rx.buffer(closingNotifier) table.clear(buffer) sub:Fire(latest) - end) + end)) maid:GiveTask(source:Subscribe(function(value) table.insert(buffer, value) @@ -2219,7 +2219,7 @@ function Rx.buffer(closingNotifier) maid:GiveTask(function() table.clear(buffer) - end) + end)) return maid end) From a1fc46becfcf6474a67eaa1c740408fed0d4e374 Mon Sep 17 00:00:00 2001 From: Matthew Miller Date: Fri, 13 Feb 2026 11:48:10 -0500 Subject: [PATCH 3/5] Typo fix --- src/rx/src/Shared/Rx.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rx/src/Shared/Rx.lua b/src/rx/src/Shared/Rx.lua index d4bfb306dec..150b332e843 100644 --- a/src/rx/src/Shared/Rx.lua +++ b/src/rx/src/Shared/Rx.lua @@ -2219,7 +2219,7 @@ function Rx.buffer(closingNotifier) maid:GiveTask(function() table.clear(buffer) - end)) + end) return maid end) From c995f4abe6d11fbb7e9dc2206c926bcfa2f0203b Mon Sep 17 00:00:00 2001 From: InvisibleWater Date: Fri, 13 Feb 2026 12:02:45 -0500 Subject: [PATCH 4/5] In hindsight, naming the variable that probably wasn't the best idea --- src/rx/src/Shared/Rx.lua | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/src/rx/src/Shared/Rx.lua b/src/rx/src/Shared/Rx.lua index 150b332e843..03e44599685 100644 --- a/src/rx/src/Shared/Rx.lua +++ b/src/rx/src/Shared/Rx.lua @@ -2197,28 +2197,32 @@ function Rx.buffer(closingNotifier) return function(source) assert(Observable.isObservable(source), "Bad observable") - + return Observable.new(function(sub) local maid = Maid.new() - local buffer = {} + local latestBuffer = {} maid:GiveTask(closingNotifier:Subscribe(function() - local latest = table.clone(buffer) + local latest = table.clone(latestBuffer) - table.clear(buffer) + table.clear(latestBuffer) sub:Fire(latest) end)) - maid:GiveTask(source:Subscribe(function(value) - table.insert(buffer, value) - end, nil, function(value) - sub:Fire(buffer) - sub:Complete() - end)) + maid:GiveTask(source:Subscribe( + function(value) + table.insert(latestBuffer, value) + end, + nil, + function() + sub:Fire(latestBuffer) + sub:Complete() + end + )) maid:GiveTask(function() - table.clear(buffer) + table.clear(latestBuffer) end) return maid @@ -2242,7 +2246,7 @@ function Rx.window(windowBoundaries) Rx.buffer(windowBoundaries), Rx.map(function(data) return Rx.of(unpack(data)) - end) + end), }) end @@ -2263,7 +2267,7 @@ function Rx.pairwise() return source:Subscribe(function(value) if previous ~= UNSET_VALUE then - sub:Fire({previous, value}) + sub:Fire({ previous, value }) end previous = value From 6697afe13e9ba64a25a976e567ac0bcf5160f1a3 Mon Sep 17 00:00:00 2001 From: InvisibleWater Date: Fri, 13 Feb 2026 12:07:05 -0500 Subject: [PATCH 5/5] Here we go again --- src/rx/src/Shared/Rx.lua | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rx/src/Shared/Rx.lua b/src/rx/src/Shared/Rx.lua index 03e44599685..e98b3deab77 100644 --- a/src/rx/src/Shared/Rx.lua +++ b/src/rx/src/Shared/Rx.lua @@ -2243,10 +2243,10 @@ function Rx.window(windowBoundaries) assert(Observable.isObservable(windowBoundaries), "Bad observable") return Rx.pipe({ - Rx.buffer(windowBoundaries), + Rx.buffer(windowBoundaries) :: any, Rx.map(function(data) - return Rx.of(unpack(data)) - end), + return Rx.of(unpack(data)) :: any + end) :: any, }) end