Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion bdd/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,12 @@ services:
context: ..
dockerfile: bdd/go/Dockerfile
depends_on:
- iggy-server
iggy-server:
condition: service_healthy
iggy-leader:
condition: service_healthy
iggy-follower:
condition: service_healthy
environment:
- IGGY_ROOT_USERNAME=iggy
- IGGY_ROOT_PASSWORD=iggy
Expand Down
1 change: 1 addition & 0 deletions bdd/go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
)

require (
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/cucumber/gherkin/go/v26 v26.2.0 // indirect
github.com/cucumber/messages/go/v21 v21.0.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
Expand Down
6 changes: 4 additions & 2 deletions bdd/go/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cucumber/gherkin/go/v26 v26.2.0 h1:EgIjePLWiPeslwIWmNQ3XHcypPsWAHoMCz/YEBKP4GI=
github.com/cucumber/gherkin/go/v26 v26.2.0/go.mod h1:t2GAPnB8maCT4lkHL99BDCVNzCh1d7dBhCLt150Nr/0=
Expand Down Expand Up @@ -65,8 +67,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ import (
"context"
"errors"
"fmt"
"os"

"github.com/apache/iggy/foreign/go/client"
"github.com/apache/iggy/foreign/go/client/tcp"
iggcon "github.com/apache/iggy/foreign/go/contracts"
"github.com/apache/iggy/foreign/go/iggycli"
"github.com/apache/iggy/foreign/go/tcp"
"github.com/cucumber/godog"
"github.com/google/uuid"
"os"
"testing"
)

type basicMessagingCtxKey struct{}

type basicMessagingCtx struct {
serverAddr *string
client iggycli.Client
client iggcon.Client
lastSentMessage *iggcon.IggyMessage
lastPollMessages *iggcon.PolledMessage
lastStreamID *uint32
Expand All @@ -48,7 +48,9 @@ func getBasicMessagingCtx(ctx context.Context) *basicMessagingCtx {
return ctx.Value(basicMessagingCtxKey{}).(*basicMessagingCtx)
}

func givenRunningServer(ctx context.Context) error {
type basicMessagingSteps struct{}

func (s basicMessagingSteps) givenRunningServer(ctx context.Context) error {
c := getBasicMessagingCtx(ctx)
addr := os.Getenv("IGGY_TCP_ADDRESS")
if addr == "" {
Expand All @@ -57,39 +59,39 @@ func givenRunningServer(ctx context.Context) error {
c.serverAddr = &addr
return nil
}
func givenAuthenticationAsRoot(ctx context.Context) error {

func (s basicMessagingSteps) givenAuthenticationAsRoot(ctx context.Context) error {
c := getBasicMessagingCtx(ctx)
serverAddr := *c.serverAddr

client, err := iggycli.NewIggyClient(
iggycli.WithTcp(
cli, err := client.NewIggyClient(
client.WithTcp(
tcp.WithServerAddress(serverAddr),
),
)
if err != nil {
return fmt.Errorf("error creating client: %w", err)
}

if err = client.Ping(); err != nil {
if err = cli.Ping(); err != nil {
return fmt.Errorf("error pinging client: %w", err)
}

if _, err = client.LoginUser("iggy", "iggy"); err != nil {
if _, err = cli.LoginUser("iggy", "iggy"); err != nil {
return fmt.Errorf("error logging in: %v", err)
}

c.client = client
c.client = cli
return nil
}

func whenSendMessages(
func (s basicMessagingSteps) whenSendMessages(
ctx context.Context,
messagesCount uint32,
streamID uint32,
topicID uint32,
partitionID uint32) error {
c := getBasicMessagingCtx(ctx)
messages, err := createTestMessages(messagesCount)
messages, err := s.createTestMessages(messagesCount)
if err != nil {
return fmt.Errorf("error creating test messages: %w", err)
}
Expand All @@ -105,7 +107,7 @@ func whenSendMessages(
return nil
}

func createTestMessages(count uint32) ([]iggcon.IggyMessage, error) {
func (s basicMessagingSteps) createTestMessages(count uint32) ([]iggcon.IggyMessage, error) {
messages := make([]iggcon.IggyMessage, 0, count)
for i := 0; uint32(i) < count; i++ {
id := uuid.New()
Expand All @@ -119,7 +121,7 @@ func createTestMessages(count uint32) ([]iggcon.IggyMessage, error) {
return messages, nil
}

func whenPollMessages(
func (s basicMessagingSteps) whenPollMessages(
ctx context.Context,
streamID uint32,
topicID uint32,
Expand All @@ -146,19 +148,19 @@ func whenPollMessages(
return nil
}

func thenMessageSentSuccessfully(_ context.Context) error {
func (s basicMessagingSteps) thenMessageSentSuccessfully(_ context.Context) error {
return nil
}

func thenShouldReceiveMessages(ctx context.Context, expectedCount uint32) error {
func (s basicMessagingSteps) thenShouldReceiveMessages(ctx context.Context, expectedCount uint32) error {
polledMessages := getBasicMessagingCtx(ctx).lastPollMessages
if uint32(len(polledMessages.Messages)) != expectedCount {
return fmt.Errorf("expected %d messages, but there is %d", expectedCount, len(polledMessages.Messages))
}
return nil
}

func thenMessagesHaveSequentialOffsets(
func (s basicMessagingSteps) thenMessagesHaveSequentialOffsets(
ctx context.Context,
startOffset uint64,
endOffset uint64) error {
Expand All @@ -176,7 +178,7 @@ func thenMessagesHaveSequentialOffsets(
return nil
}

func thenMessagesHaveExpectedPayload(ctx context.Context) error {
func (s basicMessagingSteps) thenMessagesHaveExpectedPayload(ctx context.Context) error {
polledMessages := getBasicMessagingCtx(ctx).lastPollMessages
for i, m := range polledMessages.Messages {
expectedPayload := fmt.Sprintf("test message %d", i)
Expand All @@ -187,7 +189,7 @@ func thenMessagesHaveExpectedPayload(ctx context.Context) error {
return nil
}

func thenLastPolledMessageMatchesSent(ctx context.Context) error {
func (s basicMessagingSteps) thenLastPolledMessageMatchesSent(ctx context.Context) error {
c := getBasicMessagingCtx(ctx)
polledMessages := c.lastPollMessages
sentMessage := c.lastSentMessage
Expand All @@ -207,7 +209,7 @@ func thenLastPolledMessageMatchesSent(ctx context.Context) error {
return nil
}

func givenNoStreams(ctx context.Context) error {
func (s basicMessagingSteps) givenNoStreams(ctx context.Context) error {
client := getBasicMessagingCtx(ctx).client
streams, err := client.GetStreams()
if err != nil {
Expand All @@ -221,7 +223,7 @@ func givenNoStreams(ctx context.Context) error {
return err
}

func whenCreateStream(ctx context.Context, streamName string) error {
func (s basicMessagingSteps) whenCreateStream(ctx context.Context, streamName string) error {
c := getBasicMessagingCtx(ctx)
stream, err := c.client.CreateStream(streamName)
if err != nil {
Expand All @@ -232,16 +234,14 @@ func whenCreateStream(ctx context.Context, streamName string) error {
return nil
}

func thenStreamCreatedSuccessfully(ctx context.Context) error {
func (s basicMessagingSteps) thenStreamCreatedSuccessfully(ctx context.Context) error {
if getBasicMessagingCtx(ctx).lastStreamID == nil {
return errors.New("stream should have been created")
}
return nil
}

func thenStreamHasName(
ctx context.Context,
expectedName string) error {
func (s basicMessagingSteps) thenStreamHasName(ctx context.Context, expectedName string) error {
c := getBasicMessagingCtx(ctx)
streamName := *c.lastStreamName
if streamName != expectedName {
Expand All @@ -250,8 +250,7 @@ func thenStreamHasName(
return nil
}

func whenCreateTopic(
ctx context.Context,
func (s basicMessagingSteps) whenCreateTopic(ctx context.Context,
topicName string,
streamID uint32,
partitionsCount uint32) error {
Expand All @@ -277,15 +276,14 @@ func whenCreateTopic(
return nil
}

func thenTopicCreatedSuccessfully(ctx context.Context) error {
func (s basicMessagingSteps) thenTopicCreatedSuccessfully(ctx context.Context) error {
if getBasicMessagingCtx(ctx).lastTopicID == nil {
return errors.New("topic should have been created")
}
return nil
}
func thenTopicHasName(
ctx context.Context,
expectedName string) error {

func (s basicMessagingSteps) thenTopicHasName(ctx context.Context, expectedName string) error {
c := getBasicMessagingCtx(ctx)
topicName := *c.lastTopicName
if topicName != expectedName {
Expand All @@ -294,47 +292,41 @@ func thenTopicHasName(
return nil
}

func thenTopicsHasPartitions(ctx context.Context, expectedTopicPartitions uint32) error {
func (s basicMessagingSteps) thenTopicsHasPartitions(ctx context.Context, expectedTopicPartitions uint32) error {
topicPartitions := *getBasicMessagingCtx(ctx).lastTopicPartitions
if topicPartitions != expectedTopicPartitions {
return errors.New("topic should have expected number of partitions")
}
return nil
}

func initScenarios(sc *godog.ScenarioContext) {
func initBasicMessagingScenario(sc *godog.ScenarioContext) {
sc.Before(func(ctx context.Context, sc *godog.Scenario) (context.Context, error) {
return context.WithValue(context.Background(), basicMessagingCtxKey{}, &basicMessagingCtx{}), nil
})
sc.Step(`I have a running Iggy server`, givenRunningServer)
sc.Step(`I am authenticated as the root user`, givenAuthenticationAsRoot)
sc.Step(`^I send (\d+) messages to stream (\d+), topic (\d+), partition (\d+)$`, whenSendMessages)
sc.Step(`^I poll messages from stream (\d+), topic (\d+), partition (\d+) starting from offset (\d+)$`, whenPollMessages)
sc.Step(`all messages should be sent successfully`, thenMessageSentSuccessfully)
sc.Step(`^I should receive (\d+) messages$`, thenShouldReceiveMessages)
sc.Step(`^the messages should have sequential offsets from (\d+) to (\d+)$`, thenMessagesHaveSequentialOffsets)
sc.Step(`each message should have the expected payload content`, thenMessagesHaveExpectedPayload)
sc.Step(`the last polled message should match the last sent message`, thenLastPolledMessageMatchesSent)
sc.Step(`^the stream should have name "([^"]*)"$`, thenStreamHasName)
sc.Step(`the stream should be created successfully`, thenStreamCreatedSuccessfully)
sc.Step(`^I create a stream with name "([^"]*)"$`, whenCreateStream)
sc.Step(`I have no streams in the system`, givenNoStreams)
sc.Step(`^I create a topic with name "([^"]*)" in stream (\d+) with (\d+) partitions$`, whenCreateTopic)
sc.Step(`the topic should be created successfully`, thenTopicCreatedSuccessfully)
sc.Step(`^the topic should have name "([^"]*)"$`, thenTopicHasName)
sc.Step(`^the topic should have (\d+) partitions$`, thenTopicsHasPartitions)
}

func TestFeatures(t *testing.T) {
suite := godog.TestSuite{
ScenarioInitializer: initScenarios,
Options: &godog.Options{
Format: "pretty",
Paths: []string{"../../scenarios/basic_messaging.feature"},
TestingT: t,
},
}
if suite.Run() != 0 {
t.Fatal("failing feature tests")
}
s := &basicMessagingSteps{}
sc.Step(`I have a running Iggy server`, s.givenRunningServer)
sc.Step(`I am authenticated as the root user`, s.givenAuthenticationAsRoot)
sc.Step(`^I send (\d+) messages to stream (\d+), topic (\d+), partition (\d+)$`, s.whenSendMessages)
sc.Step(`^I poll messages from stream (\d+), topic (\d+), partition (\d+) starting from offset (\d+)$`, s.whenPollMessages)
sc.Step(`all messages should be sent successfully`, s.thenMessageSentSuccessfully)
sc.Step(`^I should receive (\d+) messages$`, s.thenShouldReceiveMessages)
sc.Step(`^the messages should have sequential offsets from (\d+) to (\d+)$`, s.thenMessagesHaveSequentialOffsets)
sc.Step(`each message should have the expected payload content`, s.thenMessagesHaveExpectedPayload)
sc.Step(`the last polled message should match the last sent message`, s.thenLastPolledMessageMatchesSent)
sc.Step(`^the stream should have name "([^"]*)"$`, s.thenStreamHasName)
sc.Step(`the stream should be created successfully`, s.thenStreamCreatedSuccessfully)
sc.Step(`^I create a stream with name "([^"]*)"$`, s.whenCreateStream)
sc.Step(`I have no streams in the system`, s.givenNoStreams)
sc.Step(`^I create a topic with name "([^"]*)" in stream (\d+) with (\d+) partitions$`, s.whenCreateTopic)
sc.Step(`the topic should be created successfully`, s.thenTopicCreatedSuccessfully)
sc.Step(`^the topic should have name "([^"]*)"$`, s.thenTopicHasName)
sc.Step(`^the topic should have (\d+) partitions$`, s.thenTopicsHasPartitions)
sc.After(func(ctx context.Context, sc *godog.Scenario, scErr error) (context.Context, error) {
c := getBasicMessagingCtx(ctx)
if err := c.client.Close(); err != nil {
scErr = errors.Join(scErr, fmt.Errorf("error closing client: %w", err))
}
return ctx, scErr
})
}
Loading