From 600719fd4794fae392640eff450e6a6f0c2195f1 Mon Sep 17 00:00:00 2001 From: Qichao Chu Date: Mon, 30 Mar 2026 21:54:25 -0700 Subject: [PATCH 1/5] feat(bdd): add shared stream operations scenario for Go SDK (#1986) Add a shared Gherkin feature file for stream CRUD operations and corresponding godog step definitions, beginning the migration of Go BDD tests from embedded Ginkgo scenarios to shared .feature files. Co-Authored-By: Claude Opus 4.6 --- bdd/docker-compose.yml | 1 + bdd/go/tests/stream_operations_test.go | 172 ++++++++++++++++++++++++ bdd/scenarios/stream_operations.feature | 31 +++++ 3 files changed, 204 insertions(+) create mode 100644 bdd/go/tests/stream_operations_test.go create mode 100644 bdd/scenarios/stream_operations.feature diff --git a/bdd/docker-compose.yml b/bdd/docker-compose.yml index ddb28b9d94..a19dc6dba4 100644 --- a/bdd/docker-compose.yml +++ b/bdd/docker-compose.yml @@ -239,6 +239,7 @@ services: - GO_TEST_EXTRA_FLAGS=${GO_TEST_EXTRA_FLAGS:-} volumes: - ./scenarios/basic_messaging.feature:/app/features/basic_messaging.feature + - ./scenarios/stream_operations.feature:/app/features/stream_operations.feature command: [ "sh", "-c", "go test -v ${GO_TEST_EXTRA_FLAGS} ./..." ] networks: - iggy-bdd-network diff --git a/bdd/go/tests/stream_operations_test.go b/bdd/go/tests/stream_operations_test.go new file mode 100644 index 0000000000..ce4ab8e4c4 --- /dev/null +++ b/bdd/go/tests/stream_operations_test.go @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tests + +import ( + "context" + "fmt" + "math/rand" + "os" + "testing" + + iggcon "github.com/apache/iggy/foreign/go/contracts" + "github.com/apache/iggy/foreign/go/iggycli" + "github.com/apache/iggy/foreign/go/tcp" + "github.com/cucumber/godog" +) + +type streamOpsCtxKey struct{} + +type streamOpsCtx struct { + serverAddr *string + client iggycli.Client + streamID *uint32 + streamName *string +} + +func getStreamOpsCtx(ctx context.Context) *streamOpsCtx { + return ctx.Value(streamOpsCtxKey{}).(*streamOpsCtx) +} + +func streamGivenRunningServer(ctx context.Context) error { + c := getStreamOpsCtx(ctx) + addr := os.Getenv("IGGY_TCP_ADDRESS") + if addr == "" { + addr = "127.0.0.1:8090" + } + c.serverAddr = &addr + return nil +} + +func streamGivenAuthenticationAsRoot(ctx context.Context) error { + c := getStreamOpsCtx(ctx) + serverAddr := *c.serverAddr + + client, err := iggycli.NewIggyClient( + iggycli.WithTcp( + tcp.WithServerAddress(serverAddr), + ), + ) + if err != nil { + return fmt.Errorf("error creating client: %w", err) + } + + if err = client.Ping(); err != nil { + return fmt.Errorf("error pinging client: %w", err) + } + + if _, err = client.LoginUser("iggy", "iggy"); err != nil { + return fmt.Errorf("error logging in: %v", err) + } + + c.client = client + return nil +} + +func whenCreateStreamWithUniqueName(ctx context.Context) error { + c := getStreamOpsCtx(ctx) + + const charset = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, 32) + for i := range b { + b[i] = charset[rand.Intn(len(charset))] + } + name := string(b) + + stream, err := c.client.CreateStream(name) + if err != nil { + return fmt.Errorf("failed to create stream: %w", err) + } + + c.streamID = &stream.Id + c.streamName = &stream.Name + return nil +} + +func thenStreamCreatedSuccessfullyOps(ctx context.Context) error { + c := getStreamOpsCtx(ctx) + if c.streamID == nil { + return fmt.Errorf("stream should have been created") + } + return nil +} + +func thenStreamRetrievableByID(ctx context.Context) error { + c := getStreamOpsCtx(ctx) + streamIdentifier, _ := iggcon.NewIdentifier(*c.streamID) + stream, err := c.client.GetStream(streamIdentifier) + if err != nil { + return fmt.Errorf("failed to get stream by ID: %w", err) + } + if stream == nil { + return fmt.Errorf("stream should not be nil") + } + return nil +} + +func thenStreamNameMatchesCreated(ctx context.Context) error { + c := getStreamOpsCtx(ctx) + streamIdentifier, _ := iggcon.NewIdentifier(*c.streamID) + stream, err := c.client.GetStream(streamIdentifier) + if err != nil { + return fmt.Errorf("failed to get stream: %w", err) + } + if stream.Name != *c.streamName { + return fmt.Errorf("expected stream name %s, got %s", *c.streamName, stream.Name) + } + return nil +} + +func initStreamScenarios(sc *godog.ScenarioContext) { + sc.Before(func(ctx context.Context, sc *godog.Scenario) (context.Context, error) { + return context.WithValue(context.Background(), streamOpsCtxKey{}, &streamOpsCtx{}), nil + }) + + sc.After(func(ctx context.Context, sc *godog.Scenario, err error) (context.Context, error) { + c := getStreamOpsCtx(ctx) + if c.client != nil && c.streamID != nil { + streamIdentifier, _ := iggcon.NewIdentifier(*c.streamID) + _ = c.client.DeleteStream(streamIdentifier) + } + return ctx, nil + }) + + // Background steps + sc.Step(`I have a running Iggy server`, streamGivenRunningServer) + sc.Step(`I am authenticated as the root user`, streamGivenAuthenticationAsRoot) + + // Stream operation steps + sc.Step(`I create a stream with a unique name`, whenCreateStreamWithUniqueName) + sc.Step(`the stream should be created successfully`, thenStreamCreatedSuccessfullyOps) + sc.Step(`the stream should be retrievable by its ID`, thenStreamRetrievableByID) + sc.Step(`the stream name should match the created name`, thenStreamNameMatchesCreated) +} + +func TestStreamFeatures(t *testing.T) { + suite := godog.TestSuite{ + ScenarioInitializer: initStreamScenarios, + Options: &godog.Options{ + Format: "pretty", + Paths: []string{"../../scenarios/stream_operations.feature"}, + TestingT: t, + }, + } + if suite.Run() != 0 { + t.Fatal("failing stream feature tests") + } +} diff --git a/bdd/scenarios/stream_operations.feature b/bdd/scenarios/stream_operations.feature new file mode 100644 index 0000000000..3b62c79133 --- /dev/null +++ b/bdd/scenarios/stream_operations.feature @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +Feature: Stream Operations + As a developer using Apache Iggy + I want to manage streams + So that I can organize my messaging data + + Background: + Given I have a running Iggy server + And I am authenticated as the root user + + Scenario: Create stream with valid name + When I create a stream with a unique name + Then the stream should be created successfully + And the stream should be retrievable by its ID + And the stream name should match the created name From a1065dc1c6a05aa3bb185544e3a0495571bab164 Mon Sep 17 00:00:00 2001 From: Qichao Chu Date: Thu, 2 Apr 2026 13:10:00 -0700 Subject: [PATCH 2/5] fix(bdd): address PR review feedback for stream operations tests Move step definitions from stream_operations_test.go to stream_operations.go (non-test file) and register the suite in suite_test.go, matching the pattern used by basic_messaging and leader_redirection. Update imports to use refactored Go SDK paths (client, client/tcp, contracts). Co-Authored-By: Claude Opus 4.6 --- ...perations_test.go => stream_operations.go} | 75 ++++++++----------- bdd/go/tests/suite_test.go | 7 ++ 2 files changed, 40 insertions(+), 42 deletions(-) rename bdd/go/tests/{stream_operations_test.go => stream_operations.go} (64%) diff --git a/bdd/go/tests/stream_operations_test.go b/bdd/go/tests/stream_operations.go similarity index 64% rename from bdd/go/tests/stream_operations_test.go rename to bdd/go/tests/stream_operations.go index ce4ab8e4c4..3c39c56dda 100644 --- a/bdd/go/tests/stream_operations_test.go +++ b/bdd/go/tests/stream_operations.go @@ -19,14 +19,14 @@ package tests import ( "context" + "errors" "fmt" "math/rand" "os" - "testing" + "github.com/apache/iggy/foreign/go/client" + "github.com/apache/iggy/foreign/go/client/tcp" iggcon "github.com/apache/iggy/foreign/go/contracts" - "github.com/apache/iggy/foreign/go/iggycli" - "github.com/apache/iggy/foreign/go/tcp" "github.com/cucumber/godog" ) @@ -34,7 +34,7 @@ type streamOpsCtxKey struct{} type streamOpsCtx struct { serverAddr *string - client iggycli.Client + client iggcon.Client streamID *uint32 streamName *string } @@ -43,7 +43,9 @@ func getStreamOpsCtx(ctx context.Context) *streamOpsCtx { return ctx.Value(streamOpsCtxKey{}).(*streamOpsCtx) } -func streamGivenRunningServer(ctx context.Context) error { +type streamOpsSteps struct{} + +func (s streamOpsSteps) givenRunningServer(ctx context.Context) error { c := getStreamOpsCtx(ctx) addr := os.Getenv("IGGY_TCP_ADDRESS") if addr == "" { @@ -53,12 +55,12 @@ func streamGivenRunningServer(ctx context.Context) error { return nil } -func streamGivenAuthenticationAsRoot(ctx context.Context) error { +func (s streamOpsSteps) givenAuthenticationAsRoot(ctx context.Context) error { c := getStreamOpsCtx(ctx) serverAddr := *c.serverAddr - client, err := iggycli.NewIggyClient( - iggycli.WithTcp( + cli, err := client.NewIggyClient( + client.WithTcp( tcp.WithServerAddress(serverAddr), ), ) @@ -66,19 +68,19 @@ func streamGivenAuthenticationAsRoot(ctx context.Context) error { return fmt.Errorf("error creating client: %w", err) } - if err = client.Ping(); err != nil { + if err = cli.Ping(); err != nil { return fmt.Errorf("error pinging client: %w", err) } - if _, err = client.LoginUser("iggy", "iggy"); err != nil { + if _, err = cli.LoginUser("iggy", "iggy"); err != nil { return fmt.Errorf("error logging in: %v", err) } - c.client = client + c.client = cli return nil } -func whenCreateStreamWithUniqueName(ctx context.Context) error { +func (s streamOpsSteps) whenCreateStreamWithUniqueName(ctx context.Context) error { c := getStreamOpsCtx(ctx) const charset = "abcdefghijklmnopqrstuvwxyz0123456789" @@ -98,7 +100,7 @@ func whenCreateStreamWithUniqueName(ctx context.Context) error { return nil } -func thenStreamCreatedSuccessfullyOps(ctx context.Context) error { +func (s streamOpsSteps) thenStreamCreatedSuccessfully(ctx context.Context) error { c := getStreamOpsCtx(ctx) if c.streamID == nil { return fmt.Errorf("stream should have been created") @@ -106,7 +108,7 @@ func thenStreamCreatedSuccessfullyOps(ctx context.Context) error { return nil } -func thenStreamRetrievableByID(ctx context.Context) error { +func (s streamOpsSteps) thenStreamRetrievableByID(ctx context.Context) error { c := getStreamOpsCtx(ctx) streamIdentifier, _ := iggcon.NewIdentifier(*c.streamID) stream, err := c.client.GetStream(streamIdentifier) @@ -119,7 +121,7 @@ func thenStreamRetrievableByID(ctx context.Context) error { return nil } -func thenStreamNameMatchesCreated(ctx context.Context) error { +func (s streamOpsSteps) thenStreamNameMatchesCreated(ctx context.Context) error { c := getStreamOpsCtx(ctx) streamIdentifier, _ := iggcon.NewIdentifier(*c.streamID) stream, err := c.client.GetStream(streamIdentifier) @@ -132,41 +134,30 @@ func thenStreamNameMatchesCreated(ctx context.Context) error { return nil } -func initStreamScenarios(sc *godog.ScenarioContext) { +func initStreamOpsScenario(sc *godog.ScenarioContext) { sc.Before(func(ctx context.Context, sc *godog.Scenario) (context.Context, error) { return context.WithValue(context.Background(), streamOpsCtxKey{}, &streamOpsCtx{}), nil }) - sc.After(func(ctx context.Context, sc *godog.Scenario, err error) (context.Context, error) { + s := &streamOpsSteps{} + sc.Step(`I have a running Iggy server`, s.givenRunningServer) + sc.Step(`I am authenticated as the root user`, s.givenAuthenticationAsRoot) + sc.Step(`I create a stream with a unique name`, s.whenCreateStreamWithUniqueName) + sc.Step(`the stream should be created successfully`, s.thenStreamCreatedSuccessfully) + sc.Step(`the stream should be retrievable by its ID`, s.thenStreamRetrievableByID) + sc.Step(`the stream name should match the created name`, s.thenStreamNameMatchesCreated) + + sc.After(func(ctx context.Context, sc *godog.Scenario, scErr error) (context.Context, error) { c := getStreamOpsCtx(ctx) if c.client != nil && c.streamID != nil { streamIdentifier, _ := iggcon.NewIdentifier(*c.streamID) _ = c.client.DeleteStream(streamIdentifier) } - return ctx, nil + if c.client != nil { + if err := c.client.Close(); err != nil { + scErr = errors.Join(scErr, fmt.Errorf("error closing client: %w", err)) + } + } + return ctx, scErr }) - - // Background steps - sc.Step(`I have a running Iggy server`, streamGivenRunningServer) - sc.Step(`I am authenticated as the root user`, streamGivenAuthenticationAsRoot) - - // Stream operation steps - sc.Step(`I create a stream with a unique name`, whenCreateStreamWithUniqueName) - sc.Step(`the stream should be created successfully`, thenStreamCreatedSuccessfullyOps) - sc.Step(`the stream should be retrievable by its ID`, thenStreamRetrievableByID) - sc.Step(`the stream name should match the created name`, thenStreamNameMatchesCreated) -} - -func TestStreamFeatures(t *testing.T) { - suite := godog.TestSuite{ - ScenarioInitializer: initStreamScenarios, - Options: &godog.Options{ - Format: "pretty", - Paths: []string{"../../scenarios/stream_operations.feature"}, - TestingT: t, - }, - } - if suite.Run() != 0 { - t.Fatal("failing stream feature tests") - } } diff --git a/bdd/go/tests/suite_test.go b/bdd/go/tests/suite_test.go index cf1f478bbd..a228ddd664 100644 --- a/bdd/go/tests/suite_test.go +++ b/bdd/go/tests/suite_test.go @@ -38,6 +38,13 @@ func TestFeatures(t *testing.T) { Paths: []string{"../../scenarios/leader_redirection.feature"}, TestingT: t, }, + }, { + ScenarioInitializer: initStreamOpsScenario, + Options: &godog.Options{ + Format: "pretty", + Paths: []string{"../../scenarios/stream_operations.feature"}, + TestingT: t, + }, }} for _, s := range suites { if s.Run() != 0 { From f19b4360c6a43104baa18916416c19631ce3f429 Mon Sep 17 00:00:00 2001 From: Qichao Chu Date: Thu, 2 Apr 2026 15:35:43 -0700 Subject: [PATCH 3/5] feat(bdd): add stream update/delete to basic_messaging across all SDKs (#1986) Extend the shared basic_messaging.feature with stream update and delete steps, covering full CRUD lifecycle. Add matching step definitions in all 6 SDKs (Go, Python, Rust, Java, Node, C#). Add update_stream and delete_stream methods to the Python SDK (PyO3). Remove the separate stream_operations.feature and its Go-only implementation to avoid duplication. Co-Authored-By: Claude Opus 4.6 --- bdd/docker-compose.yml | 1 - bdd/go/tests/basic_messaging.go | 55 +++++- bdd/go/tests/stream_operations.go | 163 ------------------ bdd/go/tests/suite_test.go | 7 - .../apache/iggy/bdd/BasicMessagingSteps.java | 25 +++ bdd/python/tests/test_basic_messaging.py | 40 +++++ bdd/rust/tests/steps/streams.rs | 44 +++++ bdd/scenarios/basic_messaging.feature | 6 + bdd/scenarios/stream_operations.feature | 31 ---- .../BasicMessagingOperationsSteps.cs | 32 ++++ foreign/node/src/bdd/stream.ts | 34 ++++ foreign/python/apache_iggy.pyi | 16 ++ foreign/python/src/client.rs | 42 +++++ 13 files changed, 292 insertions(+), 204 deletions(-) delete mode 100644 bdd/go/tests/stream_operations.go delete mode 100644 bdd/scenarios/stream_operations.feature diff --git a/bdd/docker-compose.yml b/bdd/docker-compose.yml index a19dc6dba4..ddb28b9d94 100644 --- a/bdd/docker-compose.yml +++ b/bdd/docker-compose.yml @@ -239,7 +239,6 @@ services: - GO_TEST_EXTRA_FLAGS=${GO_TEST_EXTRA_FLAGS:-} volumes: - ./scenarios/basic_messaging.feature:/app/features/basic_messaging.feature - - ./scenarios/stream_operations.feature:/app/features/stream_operations.feature command: [ "sh", "-c", "go test -v ${GO_TEST_EXTRA_FLAGS} ./..." ] networks: - iggy-bdd-network diff --git a/bdd/go/tests/basic_messaging.go b/bdd/go/tests/basic_messaging.go index 971f5abf62..f1fec9fc51 100644 --- a/bdd/go/tests/basic_messaging.go +++ b/bdd/go/tests/basic_messaging.go @@ -209,6 +209,47 @@ func (s basicMessagingSteps) thenLastPolledMessageMatchesSent(ctx context.Contex return nil } +func (s basicMessagingSteps) whenUpdateStreamName(ctx context.Context, newName string) error { + c := getBasicMessagingCtx(ctx) + streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID) + if err := c.client.UpdateStream(streamIdentifier, newName); err != nil { + return fmt.Errorf("failed to update stream: %w", err) + } + c.lastStreamName = &newName + return nil +} + +func (s basicMessagingSteps) thenStreamNameUpdated(ctx context.Context, expectedName string) error { + c := getBasicMessagingCtx(ctx) + streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID) + stream, err := c.client.GetStream(streamIdentifier) + if err != nil { + return fmt.Errorf("failed to get stream: %w", err) + } + if stream.Name != expectedName { + return fmt.Errorf("expected stream name %s, got %s", expectedName, stream.Name) + } + return nil +} + +func (s basicMessagingSteps) whenDeleteStream(ctx context.Context) error { + c := getBasicMessagingCtx(ctx) + streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID) + if err := c.client.DeleteStream(streamIdentifier); err != nil { + return fmt.Errorf("failed to delete stream: %w", err) + } + c.lastStreamID = nil + return nil +} + +func (s basicMessagingSteps) thenStreamDeletedSuccessfully(ctx context.Context) error { + c := getBasicMessagingCtx(ctx) + if c.lastStreamID != nil { + return errors.New("stream ID should be nil after deletion") + } + return nil +} + func (s basicMessagingSteps) givenNoStreams(ctx context.Context) error { client := getBasicMessagingCtx(ctx).client streams, err := client.GetStreams() @@ -322,10 +363,20 @@ func initBasicMessagingScenario(sc *godog.ScenarioContext) { sc.Step(`the topic should be created successfully`, s.thenTopicCreatedSuccessfully) sc.Step(`^the topic should have name "([^"]*)"$`, s.thenTopicHasName) sc.Step(`^the topic should have (\d+) partitions$`, s.thenTopicsHasPartitions) + sc.Step(`^I update the stream name to "([^"]*)"$`, s.whenUpdateStreamName) + sc.Step(`^the stream name should be updated to "([^"]*)"$`, s.thenStreamNameUpdated) + sc.Step(`I delete the stream`, s.whenDeleteStream) + sc.Step(`the stream should be deleted successfully`, s.thenStreamDeletedSuccessfully) sc.After(func(ctx context.Context, sc *godog.Scenario, scErr error) (context.Context, error) { c := getBasicMessagingCtx(ctx) - if err := c.client.Close(); err != nil { - scErr = errors.Join(scErr, fmt.Errorf("error closing client: %w", err)) + if c.client != nil && c.lastStreamID != nil { + streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID) + _ = c.client.DeleteStream(streamIdentifier) + } + if c.client != nil { + if err := c.client.Close(); err != nil { + scErr = errors.Join(scErr, fmt.Errorf("error closing client: %w", err)) + } } return ctx, scErr }) diff --git a/bdd/go/tests/stream_operations.go b/bdd/go/tests/stream_operations.go deleted file mode 100644 index 3c39c56dda..0000000000 --- a/bdd/go/tests/stream_operations.go +++ /dev/null @@ -1,163 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package tests - -import ( - "context" - "errors" - "fmt" - "math/rand" - "os" - - "github.com/apache/iggy/foreign/go/client" - "github.com/apache/iggy/foreign/go/client/tcp" - iggcon "github.com/apache/iggy/foreign/go/contracts" - "github.com/cucumber/godog" -) - -type streamOpsCtxKey struct{} - -type streamOpsCtx struct { - serverAddr *string - client iggcon.Client - streamID *uint32 - streamName *string -} - -func getStreamOpsCtx(ctx context.Context) *streamOpsCtx { - return ctx.Value(streamOpsCtxKey{}).(*streamOpsCtx) -} - -type streamOpsSteps struct{} - -func (s streamOpsSteps) givenRunningServer(ctx context.Context) error { - c := getStreamOpsCtx(ctx) - addr := os.Getenv("IGGY_TCP_ADDRESS") - if addr == "" { - addr = "127.0.0.1:8090" - } - c.serverAddr = &addr - return nil -} - -func (s streamOpsSteps) givenAuthenticationAsRoot(ctx context.Context) error { - c := getStreamOpsCtx(ctx) - serverAddr := *c.serverAddr - - cli, err := client.NewIggyClient( - client.WithTcp( - tcp.WithServerAddress(serverAddr), - ), - ) - if err != nil { - return fmt.Errorf("error creating client: %w", err) - } - - if err = cli.Ping(); err != nil { - return fmt.Errorf("error pinging client: %w", err) - } - - if _, err = cli.LoginUser("iggy", "iggy"); err != nil { - return fmt.Errorf("error logging in: %v", err) - } - - c.client = cli - return nil -} - -func (s streamOpsSteps) whenCreateStreamWithUniqueName(ctx context.Context) error { - c := getStreamOpsCtx(ctx) - - const charset = "abcdefghijklmnopqrstuvwxyz0123456789" - b := make([]byte, 32) - for i := range b { - b[i] = charset[rand.Intn(len(charset))] - } - name := string(b) - - stream, err := c.client.CreateStream(name) - if err != nil { - return fmt.Errorf("failed to create stream: %w", err) - } - - c.streamID = &stream.Id - c.streamName = &stream.Name - return nil -} - -func (s streamOpsSteps) thenStreamCreatedSuccessfully(ctx context.Context) error { - c := getStreamOpsCtx(ctx) - if c.streamID == nil { - return fmt.Errorf("stream should have been created") - } - return nil -} - -func (s streamOpsSteps) thenStreamRetrievableByID(ctx context.Context) error { - c := getStreamOpsCtx(ctx) - streamIdentifier, _ := iggcon.NewIdentifier(*c.streamID) - stream, err := c.client.GetStream(streamIdentifier) - if err != nil { - return fmt.Errorf("failed to get stream by ID: %w", err) - } - if stream == nil { - return fmt.Errorf("stream should not be nil") - } - return nil -} - -func (s streamOpsSteps) thenStreamNameMatchesCreated(ctx context.Context) error { - c := getStreamOpsCtx(ctx) - streamIdentifier, _ := iggcon.NewIdentifier(*c.streamID) - stream, err := c.client.GetStream(streamIdentifier) - if err != nil { - return fmt.Errorf("failed to get stream: %w", err) - } - if stream.Name != *c.streamName { - return fmt.Errorf("expected stream name %s, got %s", *c.streamName, stream.Name) - } - return nil -} - -func initStreamOpsScenario(sc *godog.ScenarioContext) { - sc.Before(func(ctx context.Context, sc *godog.Scenario) (context.Context, error) { - return context.WithValue(context.Background(), streamOpsCtxKey{}, &streamOpsCtx{}), nil - }) - - s := &streamOpsSteps{} - sc.Step(`I have a running Iggy server`, s.givenRunningServer) - sc.Step(`I am authenticated as the root user`, s.givenAuthenticationAsRoot) - sc.Step(`I create a stream with a unique name`, s.whenCreateStreamWithUniqueName) - sc.Step(`the stream should be created successfully`, s.thenStreamCreatedSuccessfully) - sc.Step(`the stream should be retrievable by its ID`, s.thenStreamRetrievableByID) - sc.Step(`the stream name should match the created name`, s.thenStreamNameMatchesCreated) - - sc.After(func(ctx context.Context, sc *godog.Scenario, scErr error) (context.Context, error) { - c := getStreamOpsCtx(ctx) - if c.client != nil && c.streamID != nil { - streamIdentifier, _ := iggcon.NewIdentifier(*c.streamID) - _ = c.client.DeleteStream(streamIdentifier) - } - if c.client != nil { - if err := c.client.Close(); err != nil { - scErr = errors.Join(scErr, fmt.Errorf("error closing client: %w", err)) - } - } - return ctx, scErr - }) -} diff --git a/bdd/go/tests/suite_test.go b/bdd/go/tests/suite_test.go index a228ddd664..cf1f478bbd 100644 --- a/bdd/go/tests/suite_test.go +++ b/bdd/go/tests/suite_test.go @@ -38,13 +38,6 @@ func TestFeatures(t *testing.T) { Paths: []string{"../../scenarios/leader_redirection.feature"}, TestingT: t, }, - }, { - ScenarioInitializer: initStreamOpsScenario, - Options: &godog.Options{ - Format: "pretty", - Paths: []string{"../../scenarios/stream_operations.feature"}, - TestingT: t, - }, }} for _, s := range suites { if s.Run() != 0 { diff --git a/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java b/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java index 0ad53dad0f..26df74df84 100644 --- a/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java +++ b/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java @@ -43,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class BasicMessagingSteps { @@ -209,6 +210,30 @@ public void lastPolledMessageMatchesSent() { assertEquals(context.lastSentMessage, lastPayload, "Last message should match sent message"); } + @When("I update the stream name to {string}") + public void updateStreamName(String newName) { + getClient().streams().updateStream(context.lastStreamId, newName); + context.lastStreamName = newName; + } + + @Then("the stream name should be updated to {string}") + public void streamNameUpdated(String expectedName) { + Optional stream = getClient().streams().getStream(context.lastStreamId); + assertTrue(stream.isPresent(), "Stream should exist"); + assertEquals(expectedName, stream.get().name(), "Stream name should be updated"); + } + + @When("I delete the stream") + public void deleteStream() { + getClient().streams().deleteStream(context.lastStreamId); + context.lastStreamId = null; + } + + @Then("the stream should be deleted successfully") + public void streamDeletedSuccessfully() { + assertNull(context.lastStreamId, "Stream should have been deleted"); + } + private IggyBaseClient getClient() { if (context.client == null) { throw new IllegalStateException("Iggy client not initialized"); diff --git a/bdd/python/tests/test_basic_messaging.py b/bdd/python/tests/test_basic_messaging.py index 445c02a97b..d5d4ae002e 100644 --- a/bdd/python/tests/test_basic_messaging.py +++ b/bdd/python/tests/test_basic_messaging.py @@ -272,3 +272,43 @@ def verify_last_message_match(context): last_polled_payload = last_polled.payload().decode("utf-8") assert last_polled_payload == context.last_sent_message + + +@when(parsers.parse('I update the stream name to "{new_name}"')) +def update_stream_name(context, new_name): + """Update the stream name""" + + async def _update(): + await context.client.update_stream(context.last_stream_id, new_name) + context.last_stream_name = new_name + + asyncio.run(_update()) + + +@then(parsers.parse('the stream name should be updated to "{expected_name}"')) +def verify_stream_name_updated(context, expected_name): + """Verify stream name was updated""" + + async def _verify(): + stream = await context.client.get_stream(context.last_stream_id) + assert stream is not None + assert stream.name == expected_name + + asyncio.run(_verify()) + + +@when("I delete the stream") +def delete_stream(context): + """Delete the stream""" + + async def _delete(): + await context.client.delete_stream(context.last_stream_id) + context.last_stream_id = None + + asyncio.run(_delete()) + + +@then("the stream should be deleted successfully") +def verify_stream_deleted(context): + """Verify stream was deleted""" + assert context.last_stream_id is None diff --git a/bdd/rust/tests/steps/streams.rs b/bdd/rust/tests/steps/streams.rs index f24a679199..8a4a3635de 100644 --- a/bdd/rust/tests/steps/streams.rs +++ b/bdd/rust/tests/steps/streams.rs @@ -64,3 +64,47 @@ pub async fn then_stream_has_name(world: &mut GlobalContext, expected_name: Stri "Stream should have expected name" ); } + +#[when(regex = r#"^I update the stream name to "([^"]*)"$"#)] +pub async fn when_update_stream_name(world: &mut GlobalContext, new_name: String) { + let client = world.client.as_ref().expect("Client should be available"); + let stream_id = world.last_stream_id.expect("Stream should exist"); + let identifier = iggy::identifier::Identifier::numeric(stream_id).unwrap(); + client + .update_stream(&identifier, &new_name) + .await + .expect("Should be able to update stream"); + world.last_stream_name = Some(new_name); +} + +#[then(regex = r#"^the stream name should be updated to "([^"]*)"$"#)] +pub async fn then_stream_name_updated(world: &mut GlobalContext, expected_name: String) { + let client = world.client.as_ref().expect("Client should be available"); + let stream_id = world.last_stream_id.expect("Stream should exist"); + let identifier = iggy::identifier::Identifier::numeric(stream_id).unwrap(); + let stream = client + .get_stream(&identifier) + .await + .expect("Should be able to get stream"); + assert_eq!(stream.name, expected_name, "Stream name should be updated"); +} + +#[when("I delete the stream")] +pub async fn when_delete_stream(world: &mut GlobalContext) { + let client = world.client.as_ref().expect("Client should be available"); + let stream_id = world.last_stream_id.expect("Stream should exist"); + let identifier = iggy::identifier::Identifier::numeric(stream_id).unwrap(); + client + .delete_stream(&identifier) + .await + .expect("Should be able to delete stream"); + world.last_stream_id = None; +} + +#[then("the stream should be deleted successfully")] +pub async fn then_stream_deleted_successfully(world: &mut GlobalContext) { + assert!( + world.last_stream_id.is_none(), + "Stream should have been deleted" + ); +} diff --git a/bdd/scenarios/basic_messaging.feature b/bdd/scenarios/basic_messaging.feature index e31eea5995..2a86ef0b94 100644 --- a/bdd/scenarios/basic_messaging.feature +++ b/bdd/scenarios/basic_messaging.feature @@ -43,3 +43,9 @@ Feature: Basic Messaging Operations And the messages should have sequential offsets from 0 to 9 And each message should have the expected payload content And the last polled message should match the last sent message + + When I update the stream name to "test-stream-updated" + Then the stream name should be updated to "test-stream-updated" + + When I delete the stream + Then the stream should be deleted successfully diff --git a/bdd/scenarios/stream_operations.feature b/bdd/scenarios/stream_operations.feature deleted file mode 100644 index 3b62c79133..0000000000 --- a/bdd/scenarios/stream_operations.feature +++ /dev/null @@ -1,31 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -Feature: Stream Operations - As a developer using Apache Iggy - I want to manage streams - So that I can organize my messaging data - - Background: - Given I have a running Iggy server - And I am authenticated as the root user - - Scenario: Create stream with valid name - When I create a stream with a unique name - Then the stream should be created successfully - And the stream should be retrievable by its ID - And the stream name should match the created name diff --git a/foreign/csharp/Iggy_SDK.Tests.BDD/StepDefinitions/BasicMessagingOperationsSteps.cs b/foreign/csharp/Iggy_SDK.Tests.BDD/StepDefinitions/BasicMessagingOperationsSteps.cs index 17742645a0..106e9270e2 100644 --- a/foreign/csharp/Iggy_SDK.Tests.BDD/StepDefinitions/BasicMessagingOperationsSteps.cs +++ b/foreign/csharp/Iggy_SDK.Tests.BDD/StepDefinitions/BasicMessagingOperationsSteps.cs @@ -196,6 +196,38 @@ public void ThenTheLastPolledMessageShouldMatchTheLastSentMessage() lastPolled.Header.Id.ShouldBe(_context.LastSendMessage.Header.Id); lastPolled.Payload.ShouldBe(_context.LastSendMessage.Payload); } + + [When("I update the stream name to {string}")] + public async Task WhenIUpdateTheStreamNameTo(string newName) + { + _context.CreatedStream.ShouldNotBeNull(); + await _context.IggyClient.UpdateStreamAsync( + Identifier.Numeric(_context.CreatedStream!.Id), newName); + _context.CreatedStream = await _context.IggyClient.GetStreamByIdAsync( + Identifier.Numeric(_context.CreatedStream.Id)); + } + + [Then("the stream name should be updated to {string}")] + public void ThenTheStreamNameShouldBeUpdatedTo(string expectedName) + { + _context.CreatedStream.ShouldNotBeNull(); + _context.CreatedStream!.Name.ShouldBe(expectedName); + } + + [When(@"I delete the stream")] + public async Task WhenIDeleteTheStream() + { + _context.CreatedStream.ShouldNotBeNull(); + await _context.IggyClient.DeleteStreamAsync( + Identifier.Numeric(_context.CreatedStream!.Id)); + _context.CreatedStream = null; + } + + [Then(@"the stream should be deleted successfully")] + public void ThenTheStreamShouldBeDeletedSuccessfully() + { + _context.CreatedStream.ShouldBeNull(); + } } // Test context for sharing data between steps diff --git a/foreign/node/src/bdd/stream.ts b/foreign/node/src/bdd/stream.ts index cdfeddf9c9..7497e1a7b0 100644 --- a/foreign/node/src/bdd/stream.ts +++ b/foreign/node/src/bdd/stream.ts @@ -45,6 +45,40 @@ Then( } ); +When( + 'I update the stream name to {string}', + async function (this: TestWorld, newName: string) { + assert.ok(await this.client.stream.update({ + streamId: this.stream.id, + name: newName + })); + this.stream = { ...this.stream, name: newName }; + } +); + +Then( + 'the stream name should be updated to {string}', + async function (this: TestWorld, expectedName: string) { + const stream = await this.client.stream.get({ streamId: this.stream.id }); + assert.equal(stream.name, expectedName); + } +); + +When( + 'I delete the stream', + async function (this: TestWorld) { + assert.ok(await this.client.stream.delete({ streamId: this.stream.id })); + } +); + +Then( + 'the stream should be deleted successfully', + async function (this: TestWorld) { + // If we reached here without error, the stream was deleted successfully + assert.ok(true); + } +); + // Cleanup: delete stream after test Then( 'I can delete stream with ID {int}', diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi index 0c0e9a1b3d..9e8b8da37b 100644 --- a/foreign/python/apache_iggy.pyi +++ b/foreign/python/apache_iggy.pyi @@ -259,6 +259,22 @@ class IggyClient: Returns Option of stream details or a PyRuntimeError on failure. """ + def update_stream( + self, stream_id: builtins.str | builtins.int, name: builtins.str + ) -> collections.abc.Awaitable[None]: + r""" + Updates a stream's name. + + Returns Ok(()) on successful stream update or a PyRuntimeError on failure. + """ + def delete_stream( + self, stream_id: builtins.str | builtins.int + ) -> collections.abc.Awaitable[None]: + r""" + Deletes a stream by id. + + Returns Ok(()) on successful stream deletion or a PyRuntimeError on failure. + """ def create_topic( self, stream: builtins.str | builtins.int, diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs index c849b89109..5b9a872edc 100644 --- a/foreign/python/src/client.rs +++ b/foreign/python/src/client.rs @@ -171,6 +171,48 @@ impl IggyClient { }) } + /// Updates a stream's name. + /// + /// Returns Ok(()) on successful stream update or a PyRuntimeError on failure. + #[pyo3(signature = (stream_id, name))] + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] + fn update_stream<'a>( + &self, + py: Python<'a>, + stream_id: PyIdentifier, + name: String, + ) -> PyResult> { + let stream_id = Identifier::from(stream_id); + let inner = self.inner.clone(); + future_into_py(py, async move { + inner + .update_stream(&stream_id, &name) + .await + .map_err(|e| PyErr::new::(format!("{e:?}")))?; + Ok(()) + }) + } + + /// Deletes a stream by id. + /// + /// Returns Ok(()) on successful stream deletion or a PyRuntimeError on failure. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] + fn delete_stream<'a>( + &self, + py: Python<'a>, + stream_id: PyIdentifier, + ) -> PyResult> { + let stream_id = Identifier::from(stream_id); + let inner = self.inner.clone(); + future_into_py(py, async move { + inner + .delete_stream(&stream_id) + .await + .map_err(|e| PyErr::new::(format!("{e:?}")))?; + Ok(()) + }) + } + /// Creates a new topic with the given parameters. /// /// Returns Ok(()) on successful topic creation or a PyRuntimeError on failure. From c6636ebf03440e67f47c7e3999d3cf6272ec09d5 Mon Sep 17 00:00:00 2001 From: Qichao Chu Date: Thu, 2 Apr 2026 18:56:57 -0700 Subject: [PATCH 4/5] fix(bdd): fix Rust identifier import and Node null check in stream steps Use iggy::prelude::Identifier instead of iggy::identifier::Identifier and unwrap Option from get_stream. Add null assertion for stream.get() return value in Node step definition. Co-Authored-By: Claude Opus 4.6 --- bdd/rust/tests/steps/streams.rs | 11 ++++++----- foreign/node/src/bdd/stream.ts | 3 ++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/bdd/rust/tests/steps/streams.rs b/bdd/rust/tests/steps/streams.rs index 8a4a3635de..5603344306 100644 --- a/bdd/rust/tests/steps/streams.rs +++ b/bdd/rust/tests/steps/streams.rs @@ -18,7 +18,7 @@ use crate::common::global_context::GlobalContext; use cucumber::{given, then, when}; -use iggy::prelude::StreamClient; +use iggy::prelude::{Identifier, StreamClient}; #[given("I have no streams in the system")] pub async fn given_no_streams(world: &mut GlobalContext) { @@ -69,7 +69,7 @@ pub async fn then_stream_has_name(world: &mut GlobalContext, expected_name: Stri pub async fn when_update_stream_name(world: &mut GlobalContext, new_name: String) { let client = world.client.as_ref().expect("Client should be available"); let stream_id = world.last_stream_id.expect("Stream should exist"); - let identifier = iggy::identifier::Identifier::numeric(stream_id).unwrap(); + let identifier = Identifier::numeric(stream_id).unwrap(); client .update_stream(&identifier, &new_name) .await @@ -81,11 +81,12 @@ pub async fn when_update_stream_name(world: &mut GlobalContext, new_name: String pub async fn then_stream_name_updated(world: &mut GlobalContext, expected_name: String) { let client = world.client.as_ref().expect("Client should be available"); let stream_id = world.last_stream_id.expect("Stream should exist"); - let identifier = iggy::identifier::Identifier::numeric(stream_id).unwrap(); + let identifier = Identifier::numeric(stream_id).unwrap(); let stream = client .get_stream(&identifier) .await - .expect("Should be able to get stream"); + .expect("Should be able to get stream") + .expect("Stream should exist"); assert_eq!(stream.name, expected_name, "Stream name should be updated"); } @@ -93,7 +94,7 @@ pub async fn then_stream_name_updated(world: &mut GlobalContext, expected_name: pub async fn when_delete_stream(world: &mut GlobalContext) { let client = world.client.as_ref().expect("Client should be available"); let stream_id = world.last_stream_id.expect("Stream should exist"); - let identifier = iggy::identifier::Identifier::numeric(stream_id).unwrap(); + let identifier = Identifier::numeric(stream_id).unwrap(); client .delete_stream(&identifier) .await diff --git a/foreign/node/src/bdd/stream.ts b/foreign/node/src/bdd/stream.ts index 7497e1a7b0..47c96bc576 100644 --- a/foreign/node/src/bdd/stream.ts +++ b/foreign/node/src/bdd/stream.ts @@ -60,7 +60,8 @@ Then( 'the stream name should be updated to {string}', async function (this: TestWorld, expectedName: string) { const stream = await this.client.stream.get({ streamId: this.stream.id }); - assert.equal(stream.name, expectedName); + assert.ok(stream, 'Stream should exist after update'); + assert.equal(stream!.name, expectedName); } ); From 2fe260f838c3b694a4648c4fcc8ae1a08a21975a Mon Sep 17 00:00:00 2001 From: Qichao Chu Date: Tue, 12 May 2026 16:38:31 -0700 Subject: [PATCH 5/5] fix(bdd): address remaining PR #3063 review feedback - Verify stream deletion via GetStream rather than checking the local pointer, which whenDeleteStream sets to nil itself (chengxilo). - Anchor all Go sc.Step regexes with ^...$ for consistency (chengxilo). - Document the per-scenario After cleanup as best-effort and note the longer-term direction of a global cleanup script (chengxilo). - Make the stream-delete step explicit by passing the stream name in the feature file instead of relying on implicit prior-step state (hubcio). Update Go, Rust, Python, Java, C#, and Node step definitions to match the new "I delete the stream with name ..." step. --- bdd/go/tests/basic_messaging.go | 43 +++++++++++++------ .../apache/iggy/bdd/BasicMessagingSteps.java | 7 +-- bdd/python/tests/test_basic_messaging.py | 8 ++-- bdd/rust/tests/steps/streams.rs | 7 ++- bdd/scenarios/basic_messaging.feature | 2 +- .../BasicMessagingOperationsSteps.cs | 8 ++-- foreign/node/src/bdd/stream.ts | 6 +-- 7 files changed, 47 insertions(+), 34 deletions(-) diff --git a/bdd/go/tests/basic_messaging.go b/bdd/go/tests/basic_messaging.go index f1fec9fc51..b873d0c4b0 100644 --- a/bdd/go/tests/basic_messaging.go +++ b/bdd/go/tests/basic_messaging.go @@ -39,6 +39,7 @@ type basicMessagingCtx struct { lastPollMessages *iggcon.PolledMessage lastStreamID *uint32 lastStreamName *string + lastDeletedStreamID *uint32 lastTopicID *uint32 lastTopicName *string lastTopicPartitions *uint32 @@ -232,20 +233,29 @@ func (s basicMessagingSteps) thenStreamNameUpdated(ctx context.Context, expected return nil } -func (s basicMessagingSteps) whenDeleteStream(ctx context.Context) error { +func (s basicMessagingSteps) whenDeleteStreamByName(ctx context.Context, name string) error { c := getBasicMessagingCtx(ctx) - streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID) + streamIdentifier, err := iggcon.NewIdentifier(name) + if err != nil { + return fmt.Errorf("invalid stream name %q: %w", name, err) + } if err := c.client.DeleteStream(streamIdentifier); err != nil { return fmt.Errorf("failed to delete stream: %w", err) } + c.lastDeletedStreamID = c.lastStreamID c.lastStreamID = nil return nil } func (s basicMessagingSteps) thenStreamDeletedSuccessfully(ctx context.Context) error { c := getBasicMessagingCtx(ctx) - if c.lastStreamID != nil { - return errors.New("stream ID should be nil after deletion") + if c.lastDeletedStreamID == nil { + return errors.New("no stream was deleted in this scenario") + } + streamIdentifier, _ := iggcon.NewIdentifier(*c.lastDeletedStreamID) + stream, err := c.client.GetStream(streamIdentifier) + if err == nil && stream != nil { + return fmt.Errorf("stream %d still exists after deletion", *c.lastDeletedStreamID) } return nil } @@ -346,29 +356,34 @@ func initBasicMessagingScenario(sc *godog.ScenarioContext) { return context.WithValue(context.Background(), basicMessagingCtxKey{}, &basicMessagingCtx{}), nil }) s := &basicMessagingSteps{} - sc.Step(`I have a running Iggy server`, s.givenRunningServer) - sc.Step(`I am authenticated as the root user`, s.givenAuthenticationAsRoot) + sc.Step(`^I have a running Iggy server$`, s.givenRunningServer) + sc.Step(`^I am authenticated as the root user$`, s.givenAuthenticationAsRoot) sc.Step(`^I send (\d+) messages to stream (\d+), topic (\d+), partition (\d+)$`, s.whenSendMessages) sc.Step(`^I poll messages from stream (\d+), topic (\d+), partition (\d+) starting from offset (\d+)$`, s.whenPollMessages) - sc.Step(`all messages should be sent successfully`, s.thenMessageSentSuccessfully) + sc.Step(`^all messages should be sent successfully$`, s.thenMessageSentSuccessfully) sc.Step(`^I should receive (\d+) messages$`, s.thenShouldReceiveMessages) sc.Step(`^the messages should have sequential offsets from (\d+) to (\d+)$`, s.thenMessagesHaveSequentialOffsets) - sc.Step(`each message should have the expected payload content`, s.thenMessagesHaveExpectedPayload) - sc.Step(`the last polled message should match the last sent message`, s.thenLastPolledMessageMatchesSent) + sc.Step(`^each message should have the expected payload content$`, s.thenMessagesHaveExpectedPayload) + sc.Step(`^the last polled message should match the last sent message$`, s.thenLastPolledMessageMatchesSent) sc.Step(`^the stream should have name "([^"]*)"$`, s.thenStreamHasName) - sc.Step(`the stream should be created successfully`, s.thenStreamCreatedSuccessfully) + sc.Step(`^the stream should be created successfully$`, s.thenStreamCreatedSuccessfully) sc.Step(`^I create a stream with name "([^"]*)"$`, s.whenCreateStream) - sc.Step(`I have no streams in the system`, s.givenNoStreams) + sc.Step(`^I have no streams in the system$`, s.givenNoStreams) sc.Step(`^I create a topic with name "([^"]*)" in stream (\d+) with (\d+) partitions$`, s.whenCreateTopic) - sc.Step(`the topic should be created successfully`, s.thenTopicCreatedSuccessfully) + sc.Step(`^the topic should be created successfully$`, s.thenTopicCreatedSuccessfully) sc.Step(`^the topic should have name "([^"]*)"$`, s.thenTopicHasName) sc.Step(`^the topic should have (\d+) partitions$`, s.thenTopicsHasPartitions) sc.Step(`^I update the stream name to "([^"]*)"$`, s.whenUpdateStreamName) sc.Step(`^the stream name should be updated to "([^"]*)"$`, s.thenStreamNameUpdated) - sc.Step(`I delete the stream`, s.whenDeleteStream) - sc.Step(`the stream should be deleted successfully`, s.thenStreamDeletedSuccessfully) + sc.Step(`^I delete the stream with name "([^"]*)"$`, s.whenDeleteStreamByName) + sc.Step(`^the stream should be deleted successfully$`, s.thenStreamDeletedSuccessfully) sc.After(func(ctx context.Context, sc *godog.Scenario, scErr error) (context.Context, error) { c := getBasicMessagingCtx(ctx) + // Best-effort cleanup: if the scenario left a stream behind (e.g. + // failed before the explicit delete step), try to remove it so the + // next scenario starts clean. A failure here is intentionally + // ignored; for guaranteed teardown across all resource kinds, a + // global cleanup script remains the better long-term solution. if c.client != nil && c.lastStreamID != nil { streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID) _ = c.client.DeleteStream(streamIdentifier) diff --git a/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java b/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java index 26df74df84..c68a9fd3b6 100644 --- a/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java +++ b/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java @@ -24,6 +24,7 @@ import io.cucumber.java.en.When; import org.apache.iggy.client.blocking.IggyBaseClient; import org.apache.iggy.client.blocking.tcp.IggyTcpClient; +import org.apache.iggy.identifier.StreamId; import org.apache.iggy.message.Message; import org.apache.iggy.message.Partitioning; import org.apache.iggy.message.PollingStrategy; @@ -223,9 +224,9 @@ public void streamNameUpdated(String expectedName) { assertEquals(expectedName, stream.get().name(), "Stream name should be updated"); } - @When("I delete the stream") - public void deleteStream() { - getClient().streams().deleteStream(context.lastStreamId); + @When("I delete the stream with name {string}") + public void deleteStream(String name) { + getClient().streams().deleteStream(StreamId.of(name)); context.lastStreamId = null; } diff --git a/bdd/python/tests/test_basic_messaging.py b/bdd/python/tests/test_basic_messaging.py index d5d4ae002e..5b2105402d 100644 --- a/bdd/python/tests/test_basic_messaging.py +++ b/bdd/python/tests/test_basic_messaging.py @@ -297,12 +297,12 @@ async def _verify(): asyncio.run(_verify()) -@when("I delete the stream") -def delete_stream(context): - """Delete the stream""" +@when(parsers.parse('I delete the stream with name "{name}"')) +def delete_stream(context, name): + """Delete the stream by name""" async def _delete(): - await context.client.delete_stream(context.last_stream_id) + await context.client.delete_stream(name) context.last_stream_id = None asyncio.run(_delete()) diff --git a/bdd/rust/tests/steps/streams.rs b/bdd/rust/tests/steps/streams.rs index 5603344306..b115d23819 100644 --- a/bdd/rust/tests/steps/streams.rs +++ b/bdd/rust/tests/steps/streams.rs @@ -90,11 +90,10 @@ pub async fn then_stream_name_updated(world: &mut GlobalContext, expected_name: assert_eq!(stream.name, expected_name, "Stream name should be updated"); } -#[when("I delete the stream")] -pub async fn when_delete_stream(world: &mut GlobalContext) { +#[when(regex = r#"^I delete the stream with name "([^"]*)"$"#)] +pub async fn when_delete_stream(world: &mut GlobalContext, name: String) { let client = world.client.as_ref().expect("Client should be available"); - let stream_id = world.last_stream_id.expect("Stream should exist"); - let identifier = Identifier::numeric(stream_id).unwrap(); + let identifier = Identifier::named(&name).expect("Stream name should be valid"); client .delete_stream(&identifier) .await diff --git a/bdd/scenarios/basic_messaging.feature b/bdd/scenarios/basic_messaging.feature index 2a86ef0b94..69078848ee 100644 --- a/bdd/scenarios/basic_messaging.feature +++ b/bdd/scenarios/basic_messaging.feature @@ -47,5 +47,5 @@ Feature: Basic Messaging Operations When I update the stream name to "test-stream-updated" Then the stream name should be updated to "test-stream-updated" - When I delete the stream + When I delete the stream with name "test-stream-updated" Then the stream should be deleted successfully diff --git a/foreign/csharp/Iggy_SDK.Tests.BDD/StepDefinitions/BasicMessagingOperationsSteps.cs b/foreign/csharp/Iggy_SDK.Tests.BDD/StepDefinitions/BasicMessagingOperationsSteps.cs index 106e9270e2..76341cb4ea 100644 --- a/foreign/csharp/Iggy_SDK.Tests.BDD/StepDefinitions/BasicMessagingOperationsSteps.cs +++ b/foreign/csharp/Iggy_SDK.Tests.BDD/StepDefinitions/BasicMessagingOperationsSteps.cs @@ -214,12 +214,10 @@ public void ThenTheStreamNameShouldBeUpdatedTo(string expectedName) _context.CreatedStream!.Name.ShouldBe(expectedName); } - [When(@"I delete the stream")] - public async Task WhenIDeleteTheStream() + [When(@"I delete the stream with name ""(.*)""")] + public async Task WhenIDeleteTheStream(string name) { - _context.CreatedStream.ShouldNotBeNull(); - await _context.IggyClient.DeleteStreamAsync( - Identifier.Numeric(_context.CreatedStream!.Id)); + await _context.IggyClient.DeleteStreamAsync(Identifier.String(name)); _context.CreatedStream = null; } diff --git a/foreign/node/src/bdd/stream.ts b/foreign/node/src/bdd/stream.ts index 47c96bc576..1d48279d0b 100644 --- a/foreign/node/src/bdd/stream.ts +++ b/foreign/node/src/bdd/stream.ts @@ -66,9 +66,9 @@ Then( ); When( - 'I delete the stream', - async function (this: TestWorld) { - assert.ok(await this.client.stream.delete({ streamId: this.stream.id })); + 'I delete the stream with name {string}', + async function (this: TestWorld, name: string) { + assert.ok(await this.client.stream.delete({ streamId: name })); } );