Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion reactivex/operators/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,7 @@ require("reactivex.operators.distinctUntilChanged")
require("reactivex.operators.all")
require("reactivex.operators.scan")
require("reactivex.operators.elementAt")
require("reactivex.operators.pluck")
require("reactivex.operators.pluck")
require("reactivex.operators.publish")
require("reactivex.operators.refCount")
require("reactivex.operators.share")
42 changes: 42 additions & 0 deletions reactivex/operators/publish.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
local Observable = require 'reactivex.observable'
local Subject = require 'reactivex.subjects.subject'

-- Turns a cold Observable into a ConnectableObservable that shares a single
-- subscription through an internal Subject. Call :connect() to subscribe the
-- source; downstream subscribers attach to the subject.
function Observable:publish()
local source = self
local subject = Subject.create()
local connection

local connectable = Observable.create(function(observer)
return subject:subscribe(observer)
end)

function connectable:connect()
if not connection then
connection = source:subscribe(
function(...) subject:onNext(...) end,
function(err)
subject:onError(err)
connection = nil
end,
function()
subject:onCompleted()
connection = nil
end
)
end

return connection
end

function connectable:unsubscribe()
if connection then
connection:unsubscribe()
connection = nil
end
end

return connectable
end
35 changes: 35 additions & 0 deletions reactivex/operators/refCount.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
local Observable = require 'reactivex.observable'
local util = require 'reactivex.util'

-- Auto-connects a connectable observable on first subscriber and disconnects
-- when the last subscriber unsubscribes.
function Observable:refCount()
local connectable = self
local refCounter = 0
local connection

local function tryConnect()
if not connection then
connection = connectable:connect()
end
end

local function tryDisconnect()
if connection and refCounter == 0 then
connection:unsubscribe()
connection = nil
end
end

return Observable.create(function(observer)
refCounter = refCounter + 1
local subscription = connectable:subscribe(observer)
tryConnect()

return function()
subscription:unsubscribe()
refCounter = refCounter - 1
tryDisconnect()
end
end)
end
9 changes: 9 additions & 0 deletions reactivex/operators/share.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
local Observable = require 'reactivex.observable'
require 'reactivex.operators.publish'
require 'reactivex.operators.refCount'

-- Convenience alias for publish():refCount(); makes a cold observable hot and
-- shared among subscribers without replaying past values.
function Observable:share()
return self:publish():refCount()
end
31 changes: 31 additions & 0 deletions tests/publish.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package.path = "./?.lua;./?/init.lua;" .. package.path

local Observable = require("reactivex.observable")
local Subject = require("reactivex.subjects.subject")

require('reactivex.operators.publish')

describe('publish', function()
it('multicasts a single source subscription to many observers', function()
local subscribeCount = 0
local source = Observable.create(function(observer)
subscribeCount = subscribeCount + 1
observer:onNext('a')
observer:onCompleted()
end)

local connectable = source:publish()

local first, second = {}, {}
connectable:subscribe(function(value) table.insert(first, value) end)
connectable:subscribe(function(value) table.insert(second, value) end)

local connection = connectable:connect()

expect(subscribeCount).to.equal(1)
expect(first).to.equal({ 'a' })
expect(second).to.equal({ 'a' })

connection:unsubscribe()
end)
end)
43 changes: 43 additions & 0 deletions tests/refCount.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package.path = "./?.lua;./?/init.lua;" .. package.path

local Observable = require("reactivex.observable")
local Subject = require("reactivex.subjects.subject")
local Subscription = require("reactivex.subscription")

require('reactivex.operators.publish')
require('reactivex.operators.refCount')

describe('refCount', function()
it('auto-connects on first subscriber and disconnects on last', function()
local source = Subject.create()
local unsubscribes = 0

local observable = Observable.create(function(observer)
local innerSub = source:subscribe(observer)
return function()
unsubscribes = unsubscribes + 1
innerSub:unsubscribe()
end
end)

local shared = observable:publish():refCount()

local first, second = {}, {}
local subA = shared:subscribe(function(value) table.insert(first, value) end)
expect(unsubscribes).to.equal(0)

local subB = shared:subscribe(function(value) table.insert(second, value) end)

source:onNext(1)
source:onNext(2)
expect(first).to.equal({1, 2})
expect(second).to.equal({1, 2})
expect(unsubscribes).to.equal(0)

subA:unsubscribe()
expect(unsubscribes).to.equal(0)

subB:unsubscribe()
expect(unsubscribes).to.equal(1)
end)
end)
22 changes: 22 additions & 0 deletions tests/share.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package.path = "./?.lua;./?/init.lua;" .. package.path

local Observable = require("reactivex.observable")

require('reactivex.operators.share')

describe('share', function()
it('is equivalent to publish():refCount()', function()
local source = require("reactivex.subjects.subject").create()
local shared = source:share()
local a, b = {}, {}

shared:subscribe(function(v) table.insert(a, v) end)
shared:subscribe(function(v) table.insert(b, v) end)

source:onNext(1)
source:onCompleted()

expect(a).to.equal({1})
expect(b).to.equal({1})
end)
end)
5 changes: 4 additions & 1 deletion tests/testOperators.lua
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ dofile('tests/unpack.lua')
dofile('tests/unwrap.lua')
dofile('tests/window.lua')
dofile('tests/with.lua')
dofile('tests/zip.lua')
dofile('tests/zip.lua')
dofile('tests/publish.lua')
dofile('tests/refCount.lua')
dofile('tests/share.lua')