Skip to content

Commit e72bebd

Browse files
committed
fix: Replicas can now be set for streams
1 parent 211e5ab commit e72bebd

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

internal/events/streams.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,11 @@ func (m *Manager) EnsureStream(ctx context.Context, config *StreamConfig) (*Stre
166166
streamConfig.Sources = sources
167167
}
168168

169-
if config.Replicas != nil && *config.Replicas == 0 {
169+
if config.Replicas != nil {
170+
if *config.Replicas <= 0 {
171+
return nil, errors.New("replicas must be greater than 0")
172+
}
173+
170174
streamConfig.Replicas = int(*config.Replicas)
171175
} else {
172176
// TODO: We may want to have a config value in the Windshift server that sets the default value

internal/events/streams_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1799,5 +1799,47 @@ var _ = Describe("Streams", func() {
17991799
})
18001800
Expect(err).To(HaveOccurred())
18011801
})
1802+
1803+
It("replicas defaults to 1", func(ctx context.Context) {
1804+
_, err := js.Stream(ctx, "test")
1805+
Expect(err).To(MatchError(jetstream.ErrStreamNotFound))
1806+
1807+
_, err = manager.EnsureStream(ctx, &events.StreamConfig{
1808+
Name: "test",
1809+
})
1810+
Expect(err).ToNot(HaveOccurred())
1811+
1812+
stream, err := js.Stream(ctx, "test")
1813+
Expect(err).ToNot(HaveOccurred())
1814+
Expect(stream.CachedInfo().Config.Replicas).To(Equal(1))
1815+
})
1816+
1817+
It("specifying zero replicas errors", func(ctx context.Context) {
1818+
_, err := js.Stream(ctx, "test")
1819+
Expect(err).To(MatchError(jetstream.ErrStreamNotFound))
1820+
1821+
r := uint(0)
1822+
_, err = manager.EnsureStream(ctx, &events.StreamConfig{
1823+
Name: "test",
1824+
Replicas: &r,
1825+
})
1826+
Expect(err).To(HaveOccurred())
1827+
})
1828+
1829+
It("can create stream with 1 replica", func(ctx context.Context) {
1830+
_, err := js.Stream(ctx, "test")
1831+
Expect(err).To(MatchError(jetstream.ErrStreamNotFound))
1832+
1833+
r := uint(1)
1834+
_, err = manager.EnsureStream(ctx, &events.StreamConfig{
1835+
Name: "test",
1836+
Replicas: &r,
1837+
})
1838+
Expect(err).ToNot(HaveOccurred())
1839+
1840+
stream, err := js.Stream(ctx, "test")
1841+
Expect(err).ToNot(HaveOccurred())
1842+
Expect(stream.CachedInfo().Config.Replicas).To(Equal(1))
1843+
})
18021844
})
18031845
})

0 commit comments

Comments
 (0)