Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 7 additions & 35 deletions e2e/cross_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ func TestPublishWithoutSubscriptionOnDifferentNode(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Log(tt.description)

var cancel context.CancelFunc
var subCmd *exec.Cmd

Expand All @@ -65,8 +63,6 @@ func TestPublishWithoutSubscriptionOnDifferentNode(t *testing.T) {
defer cancel()

subProxy := proxies[tt.subscribeOn]
t.Logf("Starting subscriber on proxy %d: %s", tt.subscribeOn, subProxy)

subCmd = exec.CommandContext(ctx, cliBinaryPath,
"subscribe",
"--topic="+testTopic,
Expand All @@ -77,14 +73,10 @@ func TestPublishWithoutSubscriptionOnDifferentNode(t *testing.T) {

// Wait for subscriber to be active
time.Sleep(3 * time.Second)
t.Log("Subscriber active")
} else {
t.Log("No subscriber started (testing publish without subscription)")
}

// Attempt to publish
pubProxy := proxies[tt.publishOn]
t.Logf("Publishing on proxy %d: %s", tt.publishOn, pubProxy)

out, err := RunCommand(cliBinaryPath, "publish",
"--topic="+testTopic,
Expand All @@ -94,17 +86,12 @@ func TestPublishWithoutSubscriptionOnDifferentNode(t *testing.T) {
// Validate expectations
if tt.shouldFail {
// Should fail with "topic not assigned" or similar
if err == nil {
t.Errorf("CRITICAL: Publishing without subscriber should have failed but succeeded! Output: %s", out)
} else {
lowerOut := strings.ToLower(out)
if strings.Contains(lowerOut, "topic not assigned") ||
strings.Contains(lowerOut, "not found") ||
strings.Contains(lowerOut, "failed") {
t.Logf("✅ Correctly rejected: %s", out)
} else {
t.Logf("⚠️ Failed but with unexpected error: %s", out)
}
require.Error(t, err, "Publishing without subscriber should have failed. Output: %s", out)
lowerOut := strings.ToLower(out)
if !strings.Contains(lowerOut, "topic not assigned") &&
!strings.Contains(lowerOut, "not found") &&
!strings.Contains(lowerOut, "failed") {
t.Logf("Unexpected error message: %s", out)
}
} else {
// Should succeed
Expand All @@ -113,8 +100,6 @@ func TestPublishWithoutSubscriptionOnDifferentNode(t *testing.T) {
validator := NewValidator(out)
err := validator.ValidatePublishSuccess()
require.NoError(t, err, "Publish validation failed")

t.Logf("✅ Cross-node publish succeeded")
}

// Cleanup
Expand All @@ -140,8 +125,6 @@ func TestCrossProxyFailover(t *testing.T) {
// Test with a definitely invalid proxy
invalidProxy := "http://192.0.2.1:8080" // TEST-NET-1 (non-routable)

t.Log("Testing publish to unreachable proxy (should fail quickly)")

// Start subscriber on valid proxy first
validProxy := GetDefaultProxy()

Expand Down Expand Up @@ -173,8 +156,6 @@ func TestCrossProxyFailover(t *testing.T) {
require.Less(t, duration.Seconds(), 35.0,
"Publish to unreachable proxy should timeout/fail within 35 seconds, took %v", duration)

t.Logf("✅ Correctly failed to publish to unreachable proxy in %v", duration)

cancel()
subCmd.Wait()
}
Expand All @@ -188,8 +169,6 @@ func TestMultipleSubscribersOnDifferentProxies(t *testing.T) {
testTopic := fmt.Sprintf("multi-sub-%d", time.Now().Unix())
testMessage := fmt.Sprintf("MultiSubTest-%d", time.Now().Unix())

t.Log("Starting subscribers on multiple proxies...")

// Start subscribers on both proxies
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -204,13 +183,10 @@ func TestMultipleSubscribersOnDifferentProxies(t *testing.T) {
err := subCmd.Start()
require.NoError(t, err, "Failed to start subscriber %d on %s", i, proxy)
subCmds = append(subCmds, subCmd)

t.Logf("Subscriber %d started on %s", i, proxy)
}

// Wait for all subscribers to be ready
time.Sleep(4 * time.Second)
t.Log("All subscribers ready")

// Publish message to first proxy
out, err := RunCommand(cliBinaryPath, "publish",
Expand All @@ -224,8 +200,6 @@ func TestMultipleSubscribersOnDifferentProxies(t *testing.T) {
err = validator.ValidatePublishSuccess()
require.NoError(t, err)

t.Log("✅ Successfully published to topic with multiple cross-proxy subscribers")

// Cleanup
cancel()
for _, cmd := range subCmds {
Expand All @@ -252,8 +226,7 @@ func TestProxyHealthBeforePublish(t *testing.T) {
validator := NewValidator(healthOut)
healthInfo, err := validator.ValidateHealthCheck()
require.NoError(t, err, "Health check validation failed")

t.Logf("Proxy %s health: %s", proxy, healthInfo.Status)
require.Equal(t, "ok", healthInfo.Status, "Proxy %s should be healthy", proxy)

// If healthy, test should be able to publish (with subscriber)
testTopic := fmt.Sprintf("health-pub-%d-%d", i, time.Now().Unix())
Expand All @@ -280,7 +253,6 @@ func TestProxyHealthBeforePublish(t *testing.T) {
subCmd.Wait()

require.NoError(t, pubErr, "Proxy reported healthy but publish failed: %s", pubOut)
t.Logf("✅ Healthy proxy successfully processed publish")
})
}
}
6 changes: 0 additions & 6 deletions e2e/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func TestFullWorkflow(t *testing.T) {
})

// Start subscriber in background before publishing
t.Log("Starting background subscriber for workflow tests...")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -51,7 +50,6 @@ func TestFullWorkflow(t *testing.T) {

// Wait for subscription to be active
time.Sleep(2 * time.Second)
t.Log("Subscriber active, proceeding with publish tests...")

t.Run("4_publish_http_message", func(t *testing.T) {
out, err := RunCommand(cliBinaryPath, "publish",
Expand Down Expand Up @@ -99,7 +97,6 @@ func TestFullWorkflow(t *testing.T) {
// Cleanup: stop subscriber
cancel()
subCmd.Wait()
t.Log("Background subscriber stopped")
}

func TestCrossProxyWorkflow(t *testing.T) {
Expand All @@ -111,7 +108,6 @@ func TestCrossProxyWorkflow(t *testing.T) {
testTopic := fmt.Sprintf("cross-proxy-%d", time.Now().Unix())

// Start subscriber on first proxy
t.Log("Starting background subscriber for cross-proxy tests...")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -122,7 +118,6 @@ func TestCrossProxyWorkflow(t *testing.T) {

// Wait for subscription to be active
time.Sleep(2 * time.Second)
t.Log("Subscriber active on first proxy...")

for i, proxy := range proxies {
proxyName := fmt.Sprintf("proxy_%d", i+1)
Expand All @@ -145,5 +140,4 @@ func TestCrossProxyWorkflow(t *testing.T) {
// Cleanup: stop subscriber
cancel()
subCmd.Wait()
t.Log("Background subscriber stopped")
}
70 changes: 67 additions & 3 deletions e2e/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"

Expand All @@ -19,7 +21,6 @@ func TestPublishCommand(t *testing.T) {
testTopic := fmt.Sprintf("test-publish-%d", time.Now().Unix())

// Start a subscriber in the background to enable publishing
t.Log("Starting background subscriber to enable publishing...")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -29,7 +30,6 @@ func TestPublishCommand(t *testing.T) {

// Wait for subscription to be active
time.Sleep(2 * time.Second)
t.Log("Subscriber active, proceeding with publish tests...")

tests := []struct {
name string
Expand Down Expand Up @@ -81,6 +81,7 @@ func TestPublishCommand(t *testing.T) {
},
}

// Run the basic tests first
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out, err := RunCommand(cliBinaryPath, tt.args...)
Expand All @@ -98,8 +99,71 @@ func TestPublishCommand(t *testing.T) {
})
}

// Test --file flag scenarios
t.Run("publish from file HTTP", func(t *testing.T) {
dir := t.TempDir()
testFile := filepath.Join(dir, "test-publish.txt")
testContent := "Test file content for HTTP publish"
err := os.WriteFile(testFile, []byte(testContent), 0644)
require.NoError(t, err, "Failed to create test file")

out, err := RunCommand(cliBinaryPath, "publish",
"--topic="+testTopic,
"--file="+testFile,
"--service-url="+serviceURL)
require.NoError(t, err, "File publish failed: %v\nOutput: %s", err, out)

validator := NewValidator(out)
err = validator.ValidatePublishSuccess()
require.NoError(t, err, "File publish validation failed")
})

t.Run("publish from file gRPC", func(t *testing.T) {
dir := t.TempDir()
testFile := filepath.Join(dir, "test-publish-grpc.txt")
testContent := "Test file content for gRPC publish"
err := os.WriteFile(testFile, []byte(testContent), 0644)
require.NoError(t, err, "Failed to create test file")

out, err := RunCommand(cliBinaryPath, "publish",
"--topic="+testTopic,
"--file="+testFile,
"--grpc",
"--service-url="+serviceURL)
require.NoError(t, err, "File gRPC publish failed: %v\nOutput: %s", err, out)

validator := NewValidator(out)
err = validator.ValidatePublishSuccess()
require.NoError(t, err, "File gRPC publish validation failed")
})

t.Run("publish file not found", func(t *testing.T) {
dir := t.TempDir()
nonExistentFile := filepath.Join(dir, "nonexistent-file.txt")
out, err := RunCommand(cliBinaryPath, "publish",
"--topic="+testTopic,
"--file="+nonExistentFile,
"--service-url="+serviceURL)
require.Error(t, err, "Expected file not found error. Output: %s", out)
require.Contains(t, strings.ToLower(out), "failed to read file", "Expected file read error")
})

t.Run("publish file and message both (should fail)", func(t *testing.T) {
dir := t.TempDir()
testFile := filepath.Join(dir, "test-publish-both.txt")
err := os.WriteFile(testFile, []byte("test"), 0644)
require.NoError(t, err, "Failed to create test file")

out, err := RunCommand(cliBinaryPath, "publish",
"--topic="+testTopic,
"--file="+testFile,
"--message=test",
"--service-url="+serviceURL)
require.Error(t, err, "Expected error when both --file and --message are provided. Output: %s", out)
require.Contains(t, strings.ToLower(out), "only one", "Expected error about using only one option")
})

// Cleanup: stop subscriber
cancel()
subCmd.Wait()
t.Log("Background subscriber stopped")
}
Loading
Loading