Skip to content

Observables continue being evaluated after stream completes #22

@w0utert

Description

@w0utert

When a stream (chain of observables) completes, subscribers will stop receiving events (as expected), but any observables higher up the chain that would have been evaluated if the stream was still active, are still evaluated after the stream completes.

See the following Lua snippet:

rx = require 'rx'

subject_1 = rx.Subject.create()

positive_cubes = subject_1:
  filter(function(v)
    print "--> PC 1"
    return v > 0
  end):
  map(function(v)
    print "--> PC 2"
    return v*v*v
  end):
  take(2)

positive_even_cubes_times_two = positive_cubes:
  filter(function(v)
    print "--> PEC 1"
    return (v % 2) == 0
  end):
  map(function(v)
    print "--> PEC 2"
    return v*2
  end):
  first()

negative_cubes = subject_1:
  filter(function(v)
    print "--> NC 1"
    return v < 0
  end):
  map(function(v)
    print "--> NC 2"
    return v*v*v
  end)

pc_sub = positive_cubes:dump("positiveCubes")
pec_sub = positive_even_cubes_times_two:dump("positiveEvenCubesTimesTwo")
nc_sub = negative_cubes:dump("negativeCubes")

subject_1:onNext(-1)
subject_1:onNext(4)
subject_1:onNext(3)
subject_1:onNext(5)
subject_1:onNext(2)
subject_1:onNext(4)
subject_1:onNext(-4)
subject_1:onNext(4)
subject_1:onNext(-4)

This creates three streams sharing the same subject. The positiveCubes stream completes after two values are pushed for which x^3 is positive. The positiveEvenCubesTimesTwo stream completes after one value is pushed for which x^3 is even. The negativeCubes stream never completes.

Each observable produces a side effect (the print statement). Execting the script produces the following output:

lua rx_test.lua 
--> PC 1
--> PC 1
--> NC 1
--> NC 2
negativeCubes onNext: -1
--> PC 1
--> PC 2
positiveCubes onNext: 64
--> PC 1
--> PC 2
--> PEC 1
--> PEC 2
positiveEvenCubesTimesTwo onNext: 128
positiveEvenCubesTimesTwo onCompleted
--> NC 1
--> PC 1
--> PC 2
positiveCubes onNext: 27
positiveCubes onCompleted
--> PC 1
--> PC 2
--> PEC 1
--> NC 1
--> PC 1
--> PC 2
--> PC 1
--> PC 2
--> NC 1
--> PC 1
--> PC 2
--> PC 1
--> PC 2
--> NC 1
--> PC 1
--> PC 2
--> PC 1
--> PC 2
--> NC 1
--> PC 1
--> PC 1
--> NC 1
--> NC 2
negativeCubes onNext: -64
--> PC 1
--> PC 2
--> PC 1
--> PC 2
--> NC 1
--> PC 1
--> PC 1
--> NC 1
--> NC 2
negativeCubes onNext: -64

As can be seen from the output, the observables in the positiveCubes stream are still being called after the stream completes. Interestingly, the observables in the positiveEvenCubesTimesTwo streams are also called after the stream completes, but only once, after that they are not evaluated anymore (?).

Recreating the same example using rxJS:

rx = require('rxjs/Rx');

subject_1 = new rx.Subject()

positive_cubes = subject_1.
  filter(function(v) {
    console.log('--> PC 1');
    return (v > 0);
  }).
  map(function(v) {
    console.log('--> PC 2');
    return v*v*v
  }).
  take(2);

positive_even_cubes_times_two = positive_cubes.
  filter(function(v) {
    console.log('--> PEC 1');
    return ((v % 2) == 0);
  }).
  map(function(v) {
    console.log('--> PEC 2');
    return (v*2)
  }).
  first();

negative_cubes = subject_1.
  filter(function(v) {
    console.log('--> NC 1');
    return (v < 0);
  }).
  map(function(v) {
    console.log('--> NC 2');
    return (v*v*v)
  });

positive_cubes.
  subscribe(function(x) {
    console.log('positiveCubes: onNext: ' + x)
  },
  null,
  function() {
    console.log('postitiveCubes: onCompleted');
  });

positive_even_cubes_times_two.
  subscribe(function(x) {
    console.log('positiveEvenCubesTimesTwo: onNext: ' + x)
  },
  null,
  function() {
    console.log('postitiveEvenCubesTimesTwo: onCompleted');
  });

negative_cubes.
  subscribe(function(x) {
    console.log('negativeCubes: onNext: ' + x)
  },
  null,
  function() {
    console.log('negativeCubes: onCompleted');
  });

subject_1.next(-1)
subject_1.next(4)
subject_1.next(3)
subject_1.next(5)
subject_1.next(2)
subject_1.next(4)
subject_1.next(-4)
subject_1.next(4)
subject_1.next(-4)

The output looks as expected:

node rx_test.js 
--> PC 1
--> PC 1
--> NC 1
--> NC 2
negativeCubes: onNext: -1
--> PC 1
--> PC 2
positiveCubes: onNext: 64
--> PC 1
--> PC 2
--> PEC 1
--> PEC 2
positiveEvenCubesTimesTwo: onNext: 128
postitiveEvenCubesTimesTwo: onCompleted
--> NC 1
--> PC 1
--> PC 2
positiveCubes: onNext: 27
postitiveCubes: onCompleted
--> NC 1
--> NC 1
--> NC 1
--> NC 1
--> NC 1
--> NC 2
negativeCubes: onNext: -64
--> NC 1
--> NC 1
--> NC 2
negativeCubes: onNext: -64

Observables not being disposed after the stream completes becomes a problem if the number of streams is large, or if temporary/transient streams are created. Over time, the number of observables that are being evaluated unnecessarily because they are part of a completed stream will start to add up.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions