diff --git a/reactivex/operators/init.lua b/reactivex/operators/init.lua index ae7cbd3..5ab03f2 100644 --- a/reactivex/operators/init.lua +++ b/reactivex/operators/init.lua @@ -50,4 +50,7 @@ require("reactivex.operators.distinctUntilChanged") require("reactivex.operators.all") require("reactivex.operators.scan") require("reactivex.operators.elementAt") -require("reactivex.operators.pluck") \ No newline at end of file +require("reactivex.operators.pluck") +require("reactivex.operators.publish") +require("reactivex.operators.refCount") +require("reactivex.operators.share") diff --git a/reactivex/operators/publish.lua b/reactivex/operators/publish.lua new file mode 100644 index 0000000..c13fa62 --- /dev/null +++ b/reactivex/operators/publish.lua @@ -0,0 +1,42 @@ +local Observable = require 'reactivex.observable' +local Subject = require 'reactivex.subjects.subject' + +-- Turns a cold Observable into a ConnectableObservable that shares a single +-- subscription through an internal Subject. Call :connect() to subscribe the +-- source; downstream subscribers attach to the subject. +function Observable:publish() + local source = self + local subject = Subject.create() + local connection + + local connectable = Observable.create(function(observer) + return subject:subscribe(observer) + end) + + function connectable:connect() + if not connection then + connection = source:subscribe( + function(...) subject:onNext(...) end, + function(err) + subject:onError(err) + connection = nil + end, + function() + subject:onCompleted() + connection = nil + end + ) + end + + return connection + end + + function connectable:unsubscribe() + if connection then + connection:unsubscribe() + connection = nil + end + end + + return connectable +end diff --git a/reactivex/operators/refCount.lua b/reactivex/operators/refCount.lua new file mode 100644 index 0000000..e33522d --- /dev/null +++ b/reactivex/operators/refCount.lua @@ -0,0 +1,35 @@ +local Observable = require 'reactivex.observable' +local util = require 'reactivex.util' + +-- Auto-connects a connectable observable on first subscriber and disconnects +-- when the last subscriber unsubscribes. +function Observable:refCount() + local connectable = self + local refCounter = 0 + local connection + + local function tryConnect() + if not connection then + connection = connectable:connect() + end + end + + local function tryDisconnect() + if connection and refCounter == 0 then + connection:unsubscribe() + connection = nil + end + end + + return Observable.create(function(observer) + refCounter = refCounter + 1 + local subscription = connectable:subscribe(observer) + tryConnect() + + return function() + subscription:unsubscribe() + refCounter = refCounter - 1 + tryDisconnect() + end + end) +end diff --git a/reactivex/operators/share.lua b/reactivex/operators/share.lua new file mode 100644 index 0000000..56c769b --- /dev/null +++ b/reactivex/operators/share.lua @@ -0,0 +1,9 @@ +local Observable = require 'reactivex.observable' +require 'reactivex.operators.publish' +require 'reactivex.operators.refCount' + +-- Convenience alias for publish():refCount(); makes a cold observable hot and +-- shared among subscribers without replaying past values. +function Observable:share() + return self:publish():refCount() +end diff --git a/tests/publish.lua b/tests/publish.lua new file mode 100644 index 0000000..4c803cf --- /dev/null +++ b/tests/publish.lua @@ -0,0 +1,31 @@ +package.path = "./?.lua;./?/init.lua;" .. package.path + +local Observable = require("reactivex.observable") +local Subject = require("reactivex.subjects.subject") + +require('reactivex.operators.publish') + +describe('publish', function() + it('multicasts a single source subscription to many observers', function() + local subscribeCount = 0 + local source = Observable.create(function(observer) + subscribeCount = subscribeCount + 1 + observer:onNext('a') + observer:onCompleted() + end) + + local connectable = source:publish() + + local first, second = {}, {} + connectable:subscribe(function(value) table.insert(first, value) end) + connectable:subscribe(function(value) table.insert(second, value) end) + + local connection = connectable:connect() + + expect(subscribeCount).to.equal(1) + expect(first).to.equal({ 'a' }) + expect(second).to.equal({ 'a' }) + + connection:unsubscribe() + end) +end) diff --git a/tests/refCount.lua b/tests/refCount.lua new file mode 100644 index 0000000..3496356 --- /dev/null +++ b/tests/refCount.lua @@ -0,0 +1,43 @@ +package.path = "./?.lua;./?/init.lua;" .. package.path + +local Observable = require("reactivex.observable") +local Subject = require("reactivex.subjects.subject") +local Subscription = require("reactivex.subscription") + +require('reactivex.operators.publish') +require('reactivex.operators.refCount') + +describe('refCount', function() + it('auto-connects on first subscriber and disconnects on last', function() + local source = Subject.create() + local unsubscribes = 0 + + local observable = Observable.create(function(observer) + local innerSub = source:subscribe(observer) + return function() + unsubscribes = unsubscribes + 1 + innerSub:unsubscribe() + end + end) + + local shared = observable:publish():refCount() + + local first, second = {}, {} + local subA = shared:subscribe(function(value) table.insert(first, value) end) + expect(unsubscribes).to.equal(0) + + local subB = shared:subscribe(function(value) table.insert(second, value) end) + + source:onNext(1) + source:onNext(2) + expect(first).to.equal({1, 2}) + expect(second).to.equal({1, 2}) + expect(unsubscribes).to.equal(0) + + subA:unsubscribe() + expect(unsubscribes).to.equal(0) + + subB:unsubscribe() + expect(unsubscribes).to.equal(1) + end) +end) diff --git a/tests/share.lua b/tests/share.lua new file mode 100644 index 0000000..0bf355f --- /dev/null +++ b/tests/share.lua @@ -0,0 +1,22 @@ +package.path = "./?.lua;./?/init.lua;" .. package.path + +local Observable = require("reactivex.observable") + +require('reactivex.operators.share') + +describe('share', function() + it('is equivalent to publish():refCount()', function() + local source = require("reactivex.subjects.subject").create() + local shared = source:share() + local a, b = {}, {} + + shared:subscribe(function(v) table.insert(a, v) end) + shared:subscribe(function(v) table.insert(b, v) end) + + source:onNext(1) + source:onCompleted() + + expect(a).to.equal({1}) + expect(b).to.equal({1}) + end) +end) diff --git a/tests/testOperators.lua b/tests/testOperators.lua index aa64bf9..c01de05 100644 --- a/tests/testOperators.lua +++ b/tests/testOperators.lua @@ -48,4 +48,7 @@ dofile('tests/unpack.lua') dofile('tests/unwrap.lua') dofile('tests/window.lua') dofile('tests/with.lua') -dofile('tests/zip.lua') \ No newline at end of file +dofile('tests/zip.lua') +dofile('tests/publish.lua') +dofile('tests/refCount.lua') +dofile('tests/share.lua')