From ecc2f7574c452fa887737053135e8c9c5ab34a18 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Wed, 15 Jan 2020 12:39:06 +0100 Subject: [PATCH 1/2] fully document Observable.create and always return a Subscription Observable.create's one-argument version was not properly documented, and the return type is not described either. But to document that it returns a Subscription we should first make it do so. --- rx.lua | 13 +++++++++---- src/observable.lua | 13 +++++++++---- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/rx.lua b/rx.lua index bef15ac..5eba28a 100644 --- a/rx.lua +++ b/rx.lua @@ -112,16 +112,21 @@ function Observable.create(subscribe) return setmetatable(self, Observable) end ---- Shorthand for creating an Observer and passing it to this Observable's subscription function. --- @arg {function} onNext - Called when the Observable produces a value. +--- Attaches an Observer to this Observable and returns a corresponding Subscription. +-- @arg {Observer|function} onNext - An Observer, or a function which is called when the Observable +-- produces a value. If a function, then the three-argument semantics are used, which are a +-- shorthand for creating an Observer and passing it to this same method. -- @arg {function} onError - Called when the Observable terminates due to an error. -- @arg {function} onCompleted - Called when the Observable completes normally. +-- @returns {Subscription} function Observable:subscribe(onNext, onError, onCompleted) + local s if type(onNext) == 'table' then - return self._subscribe(onNext) + s = self._subscribe(onNext) else - return self._subscribe(Observer.create(onNext, onError, onCompleted)) + s = self._subscribe(Observer.create(onNext, onError, onCompleted)) end + return s or Subscription.create(util.noop) end --- Returns an Observable that immediately completes without producing a value. diff --git a/src/observable.lua b/src/observable.lua index ec792c5..347ed49 100644 --- a/src/observable.lua +++ b/src/observable.lua @@ -17,16 +17,21 @@ function Observable.create(subscribe) return setmetatable(self, Observable) end ---- Shorthand for creating an Observer and passing it to this Observable's subscription function. --- @arg {function} onNext - Called when the Observable produces a value. +--- Attaches an Observer to this Observable and returns a corresponding Subscription. +-- @arg {Observer|function} onNext - An Observer, or a function which is called when the Observable +-- produces a value. If a function, then the three-argument semantics are used, which are a +-- shorthand for creating an Observer and passing it to this same method. -- @arg {function} onError - Called when the Observable terminates due to an error. -- @arg {function} onCompleted - Called when the Observable completes normally. +-- @returns {Subscription} function Observable:subscribe(onNext, onError, onCompleted) + local s if type(onNext) == 'table' then - return self._subscribe(onNext) + s = self._subscribe(onNext) else - return self._subscribe(Observer.create(onNext, onError, onCompleted)) + s = self._subscribe(Observer.create(onNext, onError, onCompleted)) end + return s or Subscription.create(util.noop) end --- Returns an Observable that immediately completes without producing a value. From 59eedeb9a0ca657d8337671bc2a5158473712c85 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Wed, 15 Jan 2020 12:42:48 +0100 Subject: [PATCH 2/2] add AnonymousSubject RxLua only has a zero-version create() function for Subject, corresponding to the constructor of RxJS for example. However, RxJS has a two-argument create() function, returning a subject that simply juxtaposes an observable and an observer into a single object. Quite confusingly, the object returned by create() is not a Subject, but an instance of a different class AnonymousSubject. Introduce AnonymousSubject as a completely separate implementation to avoid overloading Subject.create. --- rx.lua | 62 ++++++++++++++++++++++++++++++++++++++ tests/anonymoussubject.lua | 53 ++++++++++++++++++++++++++++++++ tests/runner.lua | 1 + tools/build.lua | 2 ++ 4 files changed, 118 insertions(+) create mode 100644 tests/anonymoussubject.lua diff --git a/rx.lua b/rx.lua index 5eba28a..ecaf2db 100644 --- a/rx.lua +++ b/rx.lua @@ -2081,6 +2081,67 @@ end Subject.__call = Subject.onNext +--- @class AnonymousSubject +-- @description Like Subject, AnonymousSubjects function both as an Observer and as an Observable. +-- The functionality of both sides of an AnonymousSubject is provided by an external observer +-- and observable. Usually, the entity that creates the AnonymousSubject will subscribe to +-- the observable so that pushing a value to the AnonymousSubject results in an effect on the +-- observer. + +local AnonymousSubject = setmetatable({}, Observable) +AnonymousSubject.__index = AnonymousSubject +AnonymousSubject.__tostring = util.constant('AnonymousSubject') + +--- Creates a new AnonymousSubject +-- @arg{Observer} destination - the observer (input) side of the AnonymousSubject +-- @arg{Observable} source - the observable (output) side of the AnonymousSubject +-- @returns {AnonymousSubject} +function AnonymousSubject.create(_destination, _source) + local self = { + destination = _destination, + source = _source + } + + return setmetatable(self, AnonymousSubject) +end + +--- Attaches an Observer to this AnonymousSubject's observable. +-- @arg {Observer|function} onNext - An Observer, or a function which is called when the Observable +-- produces a value. If a function, then the three-argument semantics are used, which are a +-- shorthand for creating an Observer and passing it to this same method. +-- @arg {function} onError - Called when the Observable terminates due to an error. +-- @arg {function} onCompleted - Called when the Observable completes normally. +function AnonymousSubject:subscribe(onNext, onError, onCompleted) + if self.source then + return self.source:subscribe(onNext, onError, onCompleted) + else + return Subscription.create(util.noop) + end +end + +--- Pushes zero or more values to the AnonymousSubject. +-- @arg {*...} values +function AnonymousSubject:onNext(...) + if self.destination then + self.destination:onNext(...) + end +end + +--- Signal to the AnonymousSubject that an error has occurred. +-- @arg {string=} message - A string describing what went wrong. +function AnonymousSubject:onError(message) + if self.destination then + self.destination:onError(message) + end +end + +--- Signal to the AnonymousSubject that its observer will not be fed any more values. +function AnonymousSubject:onCompleted() + if self.destination then + self.destination:onCompleted() + end +end + --- @class AsyncSubject -- @description AsyncSubjects are subjects that produce either no values or a single value. If -- multiple values are produced via onNext, only the last one is used. If onError is called, then @@ -2312,6 +2373,7 @@ return { CooperativeScheduler = CooperativeScheduler, TimeoutScheduler = TimeoutScheduler, Subject = Subject, + AnonymousSubject = AnonymousSubject, AsyncSubject = AsyncSubject, BehaviorSubject = BehaviorSubject, ReplaySubject = ReplaySubject diff --git a/tests/anonymoussubject.lua b/tests/anonymoussubject.lua new file mode 100644 index 0000000..9012905 --- /dev/null +++ b/tests/anonymoussubject.lua @@ -0,0 +1,53 @@ +describe('AnonymousSubject', function() + describe('create', function() + it('returns an AnonymousSubject', function() + local observable = Rx.Observable.of(42) + local observer = Rx.Observer.create(onNext, nil, onCompleted) + local subject = Rx.AnonymousSubject.create(observer, observable) + expect(subject).to.be.an(Rx.AnonymousSubject) + end) + it('returns an Observer', function() + local onNext, onCompleted = spy(), spy() + local observable = Rx.Observable.of(42) + local observer = Rx.Observer.create(onNext, nil, onCompleted) + local subject = Rx.AnonymousSubject.create(observer, observable) + subject:onNext(1) + subject:onCompleted() + expect(onNext).to.equal({{1}}) + expect(#onCompleted).to.equal(1) + end) + it('returns an Observable', function() + local observable = Rx.Observable.of(42) + local observer = Rx.Observer.create() + local subject = Rx.AnonymousSubject.create(observer, observable) + local onNext, onError, onCompleted = observableSpy(subject) + expect(onNext).to.equal({{42}}) + end) + it('supports operators', function() + local observable = Rx.Observable.of(42) + local observer = Rx.Observer.create() + local subject = Rx.AnonymousSubject.create(observer, observable) + local onNext, onError, onCompleted = observableSpy(subject:map(tostring)) + expect(onNext).to.equal({{'42'}}) + end) + end) + + describe('subscribe', function() + it('returns a Subscription', function() + local observable = Rx.Observable.of(42) + local observer = Rx.Observer.create() + local subject = Rx.AnonymousSubject.create(observer, observable) + expect(subject:subscribe(Rx.Observer.create())).to.be.an(Rx.Subscription) + end) + + it('accepts 3 functions as arguments', function() + local onNext, onCompleted = spy(), spy() + local observable = Rx.Observable.of(42) + local observer = Rx.Observer.create(onNext, nil, onCompleted) + local subject = Rx.AnonymousSubject.create(observer, observable) + subject:subscribe(observer) + expect(onNext).to.equal({{42}}) + expect(#onCompleted).to.equal(1) + end) + end) +end) diff --git a/tests/runner.lua b/tests/runner.lua index 7d05749..721c657 100644 --- a/tests/runner.lua +++ b/tests/runner.lua @@ -90,6 +90,7 @@ else 'observable', 'subscription', 'subject', + 'anonymoussubject', 'asyncsubject', 'behaviorsubject', 'replaysubject' diff --git a/tools/build.lua b/tools/build.lua index 6cac333..c32b63d 100644 --- a/tools/build.lua +++ b/tools/build.lua @@ -64,6 +64,7 @@ local files = { 'src/schedulers/cooperativescheduler.lua', 'src/schedulers/timeoutscheduler.lua', 'src/subjects/subject.lua', + 'src/subjects/anonymoussubject.lua', 'src/subjects/asyncsubject.lua', 'src/subjects/behaviorsubject.lua', 'src/subjects/replaysubject.lua', @@ -96,6 +97,7 @@ local footer = [[return { CooperativeScheduler = CooperativeScheduler, TimeoutScheduler = TimeoutScheduler, Subject = Subject, + AnonymousSubject = AnonymousSubject, AsyncSubject = AsyncSubject, BehaviorSubject = BehaviorSubject, ReplaySubject = ReplaySubject