diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index b517353..404b94d 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -1,28 +1,62 @@ -# This workflow will build a golang project -# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go - -name: Go - -on: - push: - branches: [ "root" ] - pull_request: - branches: [ "root" ] - -jobs: - - build: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v4 - with: - go-version: '1.20' - - - name: Build - run: go build -v ./... - - - name: Test - run: go test -v ./... +# This workflow will build a golang project +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go + +name: Go + +on: + push: + branches: [ "root" ] + pull_request: + branches: [ "root" ] + workflow_dispatch: + +jobs: + + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: '1.24' + cache: true + + - name: Get dependencies + run: | + go mod download + go mod verify + + - name: Verify formatting + run: | + gofmt -l . > gofmt_diff.txt + if [ -s gofmt_diff.txt ]; then + echo "Run 'go fmt ./...' to fix formatting" + cat gofmt_diff.txt + exit 1 + fi + + - name: Vet + run: go vet ./... + + - name: Install golangci-lint + run: | + curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.64.6 + + - name: Run golangci-lint + run: golangci-lint run ./... + + - name: Build + run: go build -v ./... + + - name: Test with Race Detector and Coverage + run: go test -race -coverprofile=coverage.out -covermode=atomic -v ./... + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_TOKEN }} # optional, but required for private repos + file: ./coverage.out + flags: unittests + name: codecov-umbrella diff --git a/go.mod b/go.mod index f78e48e..9a26d8c 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,10 @@ go 1.24.0 toolchain go1.24.3 require ( + cloud.google.com/go/pubsub v1.50.1 github.com/goccy/go-json v0.10.2 github.com/google/uuid v1.6.0 + google.golang.org/api v0.247.0 ) require ( @@ -15,8 +17,7 @@ require ( cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect cloud.google.com/go/compute/metadata v0.8.0 // indirect cloud.google.com/go/iam v1.5.2 // indirect - cloud.google.com/go/pubsub v1.50.1 // indirect - cloud.google.com/go/pubsub/v2 v2.0.0 // indirect + cloud.google.com/go/pubsub/v2 v2.3.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -37,7 +38,6 @@ require ( golang.org/x/sys v0.35.0 // indirect golang.org/x/text v0.28.0 // indirect golang.org/x/time v0.12.0 // indirect - google.golang.org/api v0.247.0 // indirect google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a // indirect diff --git a/go.sum b/go.sum index 6e49101..9c0fb7a 100644 --- a/go.sum +++ b/go.sum @@ -9,15 +9,20 @@ cloud.google.com/go/compute/metadata v0.8.0 h1:HxMRIbao8w17ZX6wBnjhcDkW6lTFpgcao cloud.google.com/go/compute/metadata v0.8.0/go.mod h1:sYOGTp851OV9bOFJ9CH7elVvyzopvWQFNNghtDQ/Biw= cloud.google.com/go/iam v1.5.2 h1:qgFRAGEmd8z6dJ/qyEchAuL9jpswyODjA2lS+w234g8= cloud.google.com/go/iam v1.5.2/go.mod h1:SE1vg0N81zQqLzQEwxL2WI6yhetBdbNQuTvIKCSkUHE= +cloud.google.com/go/kms v1.22.0 h1:dBRIj7+GDeeEvatJeTB19oYZNV0aj6wEqSIT/7gLqtk= +cloud.google.com/go/kms v1.22.0/go.mod h1:U7mf8Sva5jpOb4bxYZdtw/9zsbIjrklYwPcvMk34AL8= +cloud.google.com/go/longrunning v0.6.7 h1:IGtfDWHhQCgCjwQjV9iiLnUta9LBCo8R9QmAFsS/PrE= +cloud.google.com/go/longrunning v0.6.7/go.mod h1:EAFV3IZAKmM56TyiE6VAP3VoTzhZzySwI/YI1s/nRsY= cloud.google.com/go/pubsub v1.50.1 h1:fzbXpPyJnSGvWXF1jabhQeXyxdbCIkXTpjXHy7xviBM= cloud.google.com/go/pubsub v1.50.1/go.mod h1:6YVJv3MzWJUVdvQXG081sFvS0dWQOdnV+oTo++q/xFk= -cloud.google.com/go/pubsub/v2 v2.0.0 h1:0qS6mRJ41gD1lNmM/vdm6bR7DQu6coQcVwD+VPf0Bz0= -cloud.google.com/go/pubsub/v2 v2.0.0/go.mod h1:0aztFxNzVQIRSZ8vUr79uH2bS3jwLebwK6q1sgEub+E= +cloud.google.com/go/pubsub/v2 v2.3.0 h1:DgAN907x+sP0nScYfBzneRiIhWoXcpCD8ZAut8WX9vs= +cloud.google.com/go/pubsub/v2 v2.3.0/go.mod h1:O5f0KHG9zDheZAd3z5rlCRhxt2JQtB+t/IYLKK3Bpvw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -34,6 +39,8 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -44,23 +51,26 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= -github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.3.6 h1:GW/XbdyBFQ8Qe+YAmFU9uHLo7OnF5tL52HFAgMmyrf4= github.com/googleapis/enterprise-certificate-proxy v0.3.6/go.mod h1:MkHOF77EYAE7qfSuSS9PU6g4Nt4e11cnsDUowfwewLA= github.com/googleapis/gax-go/v2 v2.15.0 h1:SyjDc1mGgZU5LncH8gimWo9lW1DtIfPibOG81vgd/bo= github.com/googleapis/gax-go/v2 v2.15.0/go.mod h1:zVVkkxAQHa1RQpg9z2AUCMnKhi0Qld9rcmyfL1OZhoc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -69,6 +79,10 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.einride.tech/aip v0.73.0 h1:bPo4oqBo2ZQeBKo4ZzLb1kxYXTY1ysJhpvQyfuGzvps= +go.einride.tech/aip v0.73.0/go.mod h1:Mj7rFbmXEgw0dq1dqJ7JGMvYCZZVxmGOR3S4ZcV5LvQ= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -81,6 +95,10 @@ go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE= go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= +go.opentelemetry.io/otel/sdk v1.36.0 h1:b6SYIuLRs88ztox4EyrvRti80uXIFy+Sqzoh9kFULbs= +go.opentelemetry.io/otel/sdk v1.36.0/go.mod h1:+lC+mTgD+MUWfjJubi2vvXWcVxyr9rmlshZni72pXeY= +go.opentelemetry.io/otel/sdk/metric v1.36.0 h1:r0ntwwGosWGaa0CrSt8cuNuTcccMXERFwHX4dThiPis= +go.opentelemetry.io/otel/sdk/metric v1.36.0/go.mod h1:qTNOhFDfKRwX0yXOqJYegL5WRaW376QbB7P4Pb0qva4= go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -158,6 +176,7 @@ google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2 google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/main.go b/main.go index 1c15d96..71e2b76 100644 --- a/main.go +++ b/main.go @@ -90,14 +90,20 @@ func demonstratePubSub(ms *memorystore.MemoryStore) { // Publish time.Sleep(500 * time.Millisecond) // Wait for subscriptions - ms.Publish("updates", []byte("System update available")) - ms.Publish("alerts", []byte("High CPU usage")) + if err := ms.Publish("updates", []byte("System update available")); err != nil { + log.Printf("Error publishing to updates: %v", err) + } + if err := ms.Publish("alerts", []byte("High CPU usage")); err != nil { + log.Printf("Error publishing to alerts: %v", err) + } wg.Wait() // Unsubscribe for _, topic := range topics { - ms.Unsubscribe(topic) + if err := ms.Unsubscribe(topic); err != nil { + log.Printf("Error unsubscribing from %s: %v", topic, err) + } } } @@ -108,7 +114,11 @@ func main() { } ms := memorystore.NewMemoryStore() - defer ms.Stop() + defer func() { + if err := ms.Stop(); err != nil { + log.Printf("Error stopping store: %v", err) + } + }() demonstrateBasicOperations(ms) demonstratePubSub(ms) diff --git a/memorystore/batch_test.go b/memorystore/batch_test.go index 60060c1..d9ae8cf 100644 --- a/memorystore/batch_test.go +++ b/memorystore/batch_test.go @@ -7,7 +7,9 @@ import ( func TestBatchOperations(t *testing.T) { ms := NewMemoryStore() - defer ms.Stop() + defer func() { + _ = ms.Stop() + }() // Test SetMulti items := map[string][]byte{ diff --git a/memorystore/memorystore_extra_test.go b/memorystore/memorystore_extra_test.go new file mode 100644 index 0000000..aa04978 --- /dev/null +++ b/memorystore/memorystore_extra_test.go @@ -0,0 +1,73 @@ +package memorystore + +import ( + "os" + "testing" + "time" +) + +func TestMemoryStore_PubSub_Stopped(t *testing.T) { + ms := NewMemoryStore() + _ = ms.Stop() + + if _, err := ms.Subscribe("test"); err != ErrStoreStopped { + t.Errorf("Subscribe() should return ErrStoreStopped, got %v", err) + } + + if err := ms.Publish("test", []byte("msg")); err != ErrStoreStopped { + t.Errorf("Publish() should return ErrStoreStopped, got %v", err) + } + + if err := ms.Unsubscribe("test"); err != ErrStoreStopped { + t.Errorf("Unsubscribe() should return ErrStoreStopped, got %v", err) + } +} + +func TestMemoryStore_InitPubSub_GCP(t *testing.T) { + // Save original env + orig := os.Getenv("GOOGLE_CLOUD_PROJECT") + defer os.Setenv("GOOGLE_CLOUD_PROJECT", orig) + + // Set env to trigger GCP path + os.Setenv("GOOGLE_CLOUD_PROJECT", "test-project") + + // This should fail to init GCP (no creds) and fall back to memory + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + // Check if it's running (fallback worked) + if ms.ps == nil { + t.Fatal("PubSub client should be initialized (fallback to memory)") + } + + // Verify it is indeed InMemoryPubSub by checking type or behavior + // internal field ms.ps is private, but we can check behavior + // We can check if SubscriberCount works, as it only works for InMemory + + // Create a subscription + _, err := ms.Subscribe("test") + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + + // Check count + if count := ms.SubscriberCount("test"); count != 1 { + t.Errorf("Expected subscriber count 1, got %d. This implies fallback to InMemoryPubSub failed or behavior changed.", count) + } +} + +func TestMemoryStore_SetJSON_Error(t *testing.T) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + // Channel is not JSON marshalsable + ch := make(chan int) + err := ms.SetJSON("key", ch, time.Minute) + if err == nil { + t.Error("SetJSON should fail for unmarshalable type") + } +} diff --git a/memorystore/memorystore_test.go b/memorystore/memorystore_test.go index bb78b9a..103a2db 100644 --- a/memorystore/memorystore_test.go +++ b/memorystore/memorystore_test.go @@ -1,530 +1,546 @@ -package memorystore - -import ( - "testing" - "time" - - "github.com/goccy/go-json" -) - -// TestMemoryStore_SetGet tests the basic Set and Get operations -func TestMemoryStore_SetGet(t *testing.T) { - tests := []struct { - name string - key string - value []byte - expiration time.Duration - wantExists bool - wantErr bool - }{ - { - name: "basic set and get", - key: "test1", - value: []byte("hello"), - expiration: time.Minute, - wantExists: true, - wantErr: false, - }, - { - name: "empty key", - key: "", - value: []byte("value"), - expiration: time.Minute, - wantExists: true, - wantErr: false, - }, - { - name: "nil value", - key: "nil-value", - value: nil, - expiration: time.Minute, - wantExists: true, - wantErr: false, - }, - { - name: "immediate expiration", - key: "expire", - value: []byte("expired"), - expiration: 0, - wantExists: false, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ms := NewMemoryStore() - defer ms.Stop() - - err := ms.Set(tt.key, tt.value, tt.expiration) - if (err != nil) != tt.wantErr { - t.Errorf("Set() error = %v, wantErr %v", err, tt.wantErr) - return - } - - got, exists := ms.Get(tt.key) - if exists != tt.wantExists { - t.Errorf("Get() exists = %v, want %v", exists, tt.wantExists) - } - if exists && string(got) != string(tt.value) { - t.Errorf("Get() got = %v, want %v", string(got), string(tt.value)) - } - }) - } -} - -// TestMemoryStore_SetGetJSON tests JSON serialization and deserialization -func TestMemoryStore_SetGetJSON(t *testing.T) { - type testStruct struct { - Name string `json:"name"` - Value int `json:"value"` - } - - tests := []struct { - name string - key string - value testStruct - expiration time.Duration - wantErr bool - }{ - { - name: "valid struct", - key: "test1", - value: testStruct{ - Name: "test", - Value: 123, - }, - expiration: time.Minute, - wantErr: false, - }, - { - name: "zero value struct", - key: "test2", - value: testStruct{ - Name: "", - Value: 0, - }, - expiration: time.Minute, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ms := NewMemoryStore() - defer ms.Stop() - - err := ms.SetJSON(tt.key, tt.value, tt.expiration) - if (err != nil) != tt.wantErr { - t.Errorf("SetJSON() error = %v, wantErr %v", err, tt.wantErr) - return - } - - var got testStruct - exists, err := ms.GetJSON(tt.key, &got) - if (err != nil) != tt.wantErr { - t.Errorf("GetJSON() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !exists { - t.Error("GetJSON() value should exist") - return - } - if got != tt.value { - t.Errorf("GetJSON() got = %v, want %v", got, tt.value) - } - }) - } -} - -// TestMemoryStore_Delete tests the Delete operation -func TestMemoryStore_Delete(t *testing.T) { - ms := NewMemoryStore() - defer ms.Stop() - - // Set up test data - testData := map[string][]byte{ - "key1": []byte("value1"), - "key2": []byte("value2"), - } - - for k, v := range testData { - if err := ms.Set(k, v, time.Minute); err != nil { - t.Fatalf("Setup failed: %v", err) - } - } - - // Test deletion - for k := range testData { - ms.Delete(k) - if _, exists := ms.Get(k); exists { - t.Errorf("Delete() key %s should not exist after deletion", k) - } - } -} - -// TestMemoryStore_Expiration tests expiration functionality -func TestMemoryStore_Expiration(t *testing.T) { - ms := NewMemoryStore() - defer ms.Stop() - - key := "expiring" - value := []byte("value") - shortDuration := 100 * time.Millisecond - - if err := ms.Set(key, value, shortDuration); err != nil { - t.Fatalf("Set() error = %v", err) - } - - // Verify value exists immediately - if _, exists := ms.Get(key); !exists { - t.Error("Value should exist before expiration") - } - - // Wait for expiration - time.Sleep(shortDuration + 50*time.Millisecond) - - // Verify value has expired - if _, exists := ms.Get(key); exists { - t.Error("Value should not exist after expiration") - } -} - -// TestMemoryStore_Stop tests the store shutdown functionality -func TestMemoryStore_Stop(t *testing.T) { - ms := NewMemoryStore() - - // Store a value - if err := ms.Set("key", []byte("value"), time.Minute); err != nil { - t.Fatalf("Set() error = %v", err) - } - - // Stop the store - if err := ms.Stop(); err != nil { - t.Fatalf("Stop() error = %v", err) - } - - // Verify store is stopped - if !ms.IsStopped() { - t.Error("IsStopped() should return true after Stop()") - } - - // Verify second stop is safe - if err := ms.Stop(); err != nil { - t.Errorf("Second Stop() should not return error, got %v", err) - } -} - -// TestMemoryStore_Concurrent tests concurrent access -func TestMemoryStore_Concurrent(t *testing.T) { - ms := NewMemoryStore() - defer ms.Stop() - - const goroutines = 10 - const operationsPerGoroutine = 100 - done := make(chan bool, goroutines) - - for i := 0; i < goroutines; i++ { - go func(id int) { - for j := 0; j < operationsPerGoroutine; j++ { - key := time.Now().String() // Use timestamp to ensure unique keys - value := []byte("test") - - // Test Set - if err := ms.Set(key, value, time.Minute); err != nil { - t.Errorf("Concurrent Set() error = %v", err) - } - - // Test Get - if _, exists := ms.Get(key); !exists { - t.Errorf("Concurrent Get() value should exist") - } - - // Test Delete - ms.Delete(key) - } - done <- true - }(i) - } - - // Wait for all goroutines to complete - for i := 0; i < goroutines; i++ { - <-done - } -} - -// BenchmarkMemoryStore_SetGet benchmarks Set and Get operations -func BenchmarkMemoryStore_SetGet(b *testing.B) { - ms := NewMemoryStore() - defer ms.Stop() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - key := time.Now().String() - value := []byte("benchmark value") - - if err := ms.Set(key, value, time.Minute); err != nil { - b.Fatalf("Set() error = %v", err) - } - - if _, exists := ms.Get(key); !exists { - b.Fatal("Get() value should exist") - } - } -} - -// TestMemoryStore_SetJSON tests the SetJSON method -func TestMemoryStore_SetJSON(t *testing.T) { - tests := []struct { - name string - key string - value interface{} - expiration time.Duration - wantErr bool - }{ - { - name: "simple struct", - key: "struct1", - value: struct { - Name string - Value int - }{ - Name: "test", - Value: 123, - }, - expiration: time.Minute, - wantErr: false, - }, - { - name: "simple string", - key: "string1", - value: "hello world", - expiration: time.Minute, - wantErr: false, - }, - { - name: "complex struct", - key: "complex1", - value: struct { - ID int64 - Name string - Tags []string - Metadata map[string]interface{} - Active bool - CreateAt time.Time - UpdatedAt *time.Time - }{ - ID: 1, - Name: "test", - Tags: []string{"tag1", "tag2"}, - Metadata: map[string]interface{}{"key": "value"}, - Active: true, - CreateAt: time.Now(), - }, - expiration: time.Minute, - wantErr: false, - }, - { - name: "nested structs", - key: "nested1", - value: struct { - Parent struct { - Child struct { - Value string - } - } - }{ - Parent: struct { - Child struct { - Value string - } - }{ - Child: struct { - Value string - }{ - Value: "nested value", - }, - }, - }, - expiration: time.Minute, - wantErr: false, - }, - { - name: "nil value", - key: "nil1", - value: nil, - expiration: time.Minute, - wantErr: false, - }, - { - name: "zero values", - key: "zero1", - value: struct { - Int int - String string - Bool bool - Slice []int - Map map[string]string - Pointer *string - }{}, - expiration: time.Minute, - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ms := NewMemoryStore() - defer ms.Stop() - - err := ms.SetJSON(tt.key, tt.value, tt.expiration) - if (err != nil) != tt.wantErr { - t.Errorf("SetJSON() error = %v, wantErr %v", err, tt.wantErr) - return - } - - // Verify the value was stored correctly by retrieving it as bytes - data, exists := ms.Get(tt.key) - if !exists { - t.Error("Value was not stored") - return - } - - // Compare JSON representations to verify correct serialization - expectedJSON, err := json.Marshal(tt.value) - if err != nil { - t.Fatalf("Failed to marshal test value: %v", err) - } - - if string(data) != string(expectedJSON) { - t.Errorf("Stored JSON doesn't match expected. Got %s, want %s", string(data), string(expectedJSON)) - } - }) - } -} - -// TestMemoryStore_GetJSON tests the GetJSON method -func TestMemoryStore_GetJSON(t *testing.T) { - type testStruct struct { - Name string - Value int - Active bool - } - - tests := []struct { - name string - key string - storeValue interface{} - destType interface{} - wantErr bool - wantExists bool - validate func(interface{}) bool - }{ - { - name: "basic struct", - key: "struct1", - storeValue: testStruct{ - Name: "test", - Value: 123, - Active: true, - }, - destType: &testStruct{}, - wantErr: false, - wantExists: true, - validate: func(got interface{}) bool { - g, ok := got.(*testStruct) - return ok && g.Name == "test" && g.Value == 123 && g.Active == true - }, - }, - { - name: "non-existent key", - key: "nonexistent", - storeValue: nil, - destType: &testStruct{}, - wantErr: false, - wantExists: false, - validate: func(interface{}) bool { return true }, - }, - { - name: "string value", - key: "string1", - storeValue: "hello world", - destType: new(string), - wantErr: false, - wantExists: true, - validate: func(got interface{}) bool { - g, ok := got.(*string) - return ok && *g == "hello world" - }, - }, - { - name: "map value", - key: "map1", - storeValue: map[string]interface{}{ - "key1": "value1", - "key2": 42, - }, - destType: &map[string]interface{}{}, - wantErr: false, - wantExists: true, - validate: func(got interface{}) bool { - g, ok := got.(*map[string]interface{}) - if !ok { - return false - } - m := *g - return m["key1"] == "value1" && m["key2"] == float64(42) - }, - }, - { - name: "nil destination", - key: "nil1", - storeValue: "some value", - destType: nil, - wantErr: true, - wantExists: true, - validate: func(interface{}) bool { return true }, - }, - { - name: "type mismatch", - key: "mismatch1", - storeValue: struct { - Name string - }{ - Name: "test", - }, - destType: new(int), - wantErr: true, - wantExists: true, - validate: func(interface{}) bool { return true }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ms := NewMemoryStore() - defer ms.Stop() - - // Store the test value if it's not a non-existent key test - if tt.storeValue != nil { - if err := ms.SetJSON(tt.key, tt.storeValue, time.Minute); err != nil { - t.Fatalf("Failed to store test value: %v", err) - } - } - - exists, err := ms.GetJSON(tt.key, tt.destType) - if (err != nil) != tt.wantErr { - t.Errorf("GetJSON() error = %v, wantErr %v", err, tt.wantErr) - } - if exists != tt.wantExists { - t.Errorf("GetJSON() exists = %v, wantExists %v", exists, tt.wantExists) - } - - if exists && !tt.wantErr { - if !tt.validate(tt.destType) { - t.Errorf("GetJSON() validation failed for test case %s", tt.name) - } - } - }) - } -} +package memorystore + +import ( + "testing" + "time" + + "github.com/goccy/go-json" +) + +// TestMemoryStore_SetGet tests the basic Set and Get operations +func TestMemoryStore_SetGet(t *testing.T) { + tests := []struct { + name string + key string + value []byte + expiration time.Duration + wantExists bool + wantErr bool + }{ + { + name: "basic set and get", + key: "test1", + value: []byte("hello"), + expiration: time.Minute, + wantExists: true, + wantErr: false, + }, + { + name: "empty key", + key: "", + value: []byte("value"), + expiration: time.Minute, + wantExists: true, + wantErr: false, + }, + { + name: "nil value", + key: "nil-value", + value: nil, + expiration: time.Minute, + wantExists: true, + wantErr: false, + }, + { + name: "immediate expiration", + key: "expire", + value: []byte("expired"), + expiration: 0, + wantExists: false, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + err := ms.Set(tt.key, tt.value, tt.expiration) + if (err != nil) != tt.wantErr { + t.Errorf("Set() error = %v, wantErr %v", err, tt.wantErr) + return + } + + got, exists := ms.Get(tt.key) + if exists != tt.wantExists { + t.Errorf("Get() exists = %v, want %v", exists, tt.wantExists) + } + if exists && string(got) != string(tt.value) { + t.Errorf("Get() got = %v, want %v", string(got), string(tt.value)) + } + }) + } +} + +// TestMemoryStore_SetGetJSON tests JSON serialization and deserialization +func TestMemoryStore_SetGetJSON(t *testing.T) { + type testStruct struct { + Name string `json:"name"` + Value int `json:"value"` + } + + tests := []struct { + name string + key string + value testStruct + expiration time.Duration + wantErr bool + }{ + { + name: "valid struct", + key: "test1", + value: testStruct{ + Name: "test", + Value: 123, + }, + expiration: time.Minute, + wantErr: false, + }, + { + name: "zero value struct", + key: "test2", + value: testStruct{ + Name: "", + Value: 0, + }, + expiration: time.Minute, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + err := ms.SetJSON(tt.key, tt.value, tt.expiration) + if (err != nil) != tt.wantErr { + t.Errorf("SetJSON() error = %v, wantErr %v", err, tt.wantErr) + return + } + + var got testStruct + exists, err := ms.GetJSON(tt.key, &got) + if (err != nil) != tt.wantErr { + t.Errorf("GetJSON() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !exists { + t.Error("GetJSON() value should exist") + return + } + if got != tt.value { + t.Errorf("GetJSON() got = %v, want %v", got, tt.value) + } + }) + } +} + +// TestMemoryStore_Delete tests the Delete operation +func TestMemoryStore_Delete(t *testing.T) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + // Set up test data + testData := map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + } + + for k, v := range testData { + if err := ms.Set(k, v, time.Minute); err != nil { + t.Fatalf("Setup failed: %v", err) + } + } + + // Test deletion + for k := range testData { + ms.Delete(k) + if _, exists := ms.Get(k); exists { + t.Errorf("Delete() key %s should not exist after deletion", k) + } + } +} + +// TestMemoryStore_Expiration tests expiration functionality +func TestMemoryStore_Expiration(t *testing.T) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + key := "expiring" + value := []byte("value") + shortDuration := 100 * time.Millisecond + + if err := ms.Set(key, value, shortDuration); err != nil { + t.Fatalf("Set() error = %v", err) + } + + // Verify value exists immediately + if _, exists := ms.Get(key); !exists { + t.Error("Value should exist before expiration") + } + + // Wait for expiration + time.Sleep(shortDuration + 50*time.Millisecond) + + // Verify value has expired + if _, exists := ms.Get(key); exists { + t.Error("Value should not exist after expiration") + } +} + +// TestMemoryStore_Stop tests the store shutdown functionality +func TestMemoryStore_Stop(t *testing.T) { + ms := NewMemoryStore() + + // Store a value + if err := ms.Set("key", []byte("value"), time.Minute); err != nil { + t.Fatalf("Set() error = %v", err) + } + + // Stop the store + if err := ms.Stop(); err != nil { + t.Fatalf("Stop() error = %v", err) + } + + // Verify store is stopped + if !ms.IsStopped() { + t.Error("IsStopped() should return true after Stop()") + } + + // Verify second stop is safe + if err := ms.Stop(); err != nil { + t.Errorf("Second Stop() should not return error, got %v", err) + } +} + +// TestMemoryStore_Concurrent tests concurrent access +func TestMemoryStore_Concurrent(t *testing.T) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + const goroutines = 10 + const operationsPerGoroutine = 100 + done := make(chan bool, goroutines) + + for i := 0; i < goroutines; i++ { + go func(id int) { + for j := 0; j < operationsPerGoroutine; j++ { + key := time.Now().String() // Use timestamp to ensure unique keys + value := []byte("test") + + // Test Set + if err := ms.Set(key, value, time.Minute); err != nil { + t.Errorf("Concurrent Set() error = %v", err) + } + + // Test Get + if _, exists := ms.Get(key); !exists { + t.Errorf("Concurrent Get() value should exist") + } + + // Test Delete + ms.Delete(key) + } + done <- true + }(i) + } + + // Wait for all goroutines to complete + for i := 0; i < goroutines; i++ { + <-done + } +} + +// BenchmarkMemoryStore_SetGet benchmarks Set and Get operations +func BenchmarkMemoryStore_SetGet(b *testing.B) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + key := time.Now().String() + value := []byte("benchmark value") + + if err := ms.Set(key, value, time.Minute); err != nil { + b.Fatalf("Set() error = %v", err) + } + + if _, exists := ms.Get(key); !exists { + b.Fatal("Get() value should exist") + } + } +} + +// TestMemoryStore_SetJSON tests the SetJSON method +func TestMemoryStore_SetJSON(t *testing.T) { + tests := []struct { + name string + key string + value interface{} + expiration time.Duration + wantErr bool + }{ + { + name: "simple struct", + key: "struct1", + value: struct { + Name string + Value int + }{ + Name: "test", + Value: 123, + }, + expiration: time.Minute, + wantErr: false, + }, + { + name: "simple string", + key: "string1", + value: "hello world", + expiration: time.Minute, + wantErr: false, + }, + { + name: "complex struct", + key: "complex1", + value: struct { + ID int64 + Name string + Tags []string + Metadata map[string]interface{} + Active bool + CreateAt time.Time + UpdatedAt *time.Time + }{ + ID: 1, + Name: "test", + Tags: []string{"tag1", "tag2"}, + Metadata: map[string]interface{}{"key": "value"}, + Active: true, + CreateAt: time.Now(), + }, + expiration: time.Minute, + wantErr: false, + }, + { + name: "nested structs", + key: "nested1", + value: struct { + Parent struct { + Child struct { + Value string + } + } + }{ + Parent: struct { + Child struct { + Value string + } + }{ + Child: struct { + Value string + }{ + Value: "nested value", + }, + }, + }, + expiration: time.Minute, + wantErr: false, + }, + { + name: "nil value", + key: "nil1", + value: nil, + expiration: time.Minute, + wantErr: false, + }, + { + name: "zero values", + key: "zero1", + value: struct { + Int int + String string + Bool bool + Slice []int + Map map[string]string + Pointer *string + }{}, + expiration: time.Minute, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + err := ms.SetJSON(tt.key, tt.value, tt.expiration) + if (err != nil) != tt.wantErr { + t.Errorf("SetJSON() error = %v, wantErr %v", err, tt.wantErr) + return + } + + // Verify the value was stored correctly by retrieving it as bytes + data, exists := ms.Get(tt.key) + if !exists { + t.Error("Value was not stored") + return + } + + // Compare JSON representations to verify correct serialization + expectedJSON, err := json.Marshal(tt.value) + if err != nil { + t.Fatalf("Failed to marshal test value: %v", err) + } + + if string(data) != string(expectedJSON) { + t.Errorf("Stored JSON doesn't match expected. Got %s, want %s", string(data), string(expectedJSON)) + } + }) + } +} + +// TestMemoryStore_GetJSON tests the GetJSON method +func TestMemoryStore_GetJSON(t *testing.T) { + type testStruct struct { + Name string + Value int + Active bool + } + + tests := []struct { + name string + key string + storeValue interface{} + destType interface{} + wantErr bool + wantExists bool + validate func(interface{}) bool + }{ + { + name: "basic struct", + key: "struct1", + storeValue: testStruct{ + Name: "test", + Value: 123, + Active: true, + }, + destType: &testStruct{}, + wantErr: false, + wantExists: true, + validate: func(got interface{}) bool { + g, ok := got.(*testStruct) + return ok && g.Name == "test" && g.Value == 123 && g.Active == true + }, + }, + { + name: "non-existent key", + key: "nonexistent", + storeValue: nil, + destType: &testStruct{}, + wantErr: false, + wantExists: false, + validate: func(interface{}) bool { return true }, + }, + { + name: "string value", + key: "string1", + storeValue: "hello world", + destType: new(string), + wantErr: false, + wantExists: true, + validate: func(got interface{}) bool { + g, ok := got.(*string) + return ok && *g == "hello world" + }, + }, + { + name: "map value", + key: "map1", + storeValue: map[string]interface{}{ + "key1": "value1", + "key2": 42, + }, + destType: &map[string]interface{}{}, + wantErr: false, + wantExists: true, + validate: func(got interface{}) bool { + g, ok := got.(*map[string]interface{}) + if !ok { + return false + } + m := *g + return m["key1"] == "value1" && m["key2"] == float64(42) + }, + }, + { + name: "nil destination", + key: "nil1", + storeValue: "some value", + destType: nil, + wantErr: true, + wantExists: true, + validate: func(interface{}) bool { return true }, + }, + { + name: "type mismatch", + key: "mismatch1", + storeValue: struct { + Name string + }{ + Name: "test", + }, + destType: new(int), + wantErr: true, + wantExists: true, + validate: func(interface{}) bool { return true }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + // Store the test value if it's not a non-existent key test + if tt.storeValue != nil { + if err := ms.SetJSON(tt.key, tt.storeValue, time.Minute); err != nil { + t.Fatalf("Failed to store test value: %v", err) + } + } + + exists, err := ms.GetJSON(tt.key, tt.destType) + if (err != nil) != tt.wantErr { + t.Errorf("GetJSON() error = %v, wantErr %v", err, tt.wantErr) + } + if exists != tt.wantExists { + t.Errorf("GetJSON() exists = %v, wantExists %v", exists, tt.wantExists) + } + + if exists && !tt.wantErr { + if !tt.validate(tt.destType) { + t.Errorf("GetJSON() validation failed for test case %s", tt.name) + } + } + }) + } +} diff --git a/memorystore/metrics_test.go b/memorystore/metrics_test.go index 7f84366..212314c 100644 --- a/memorystore/metrics_test.go +++ b/memorystore/metrics_test.go @@ -7,7 +7,9 @@ import ( func TestMetrics(t *testing.T) { ms := NewMemoryStore() - defer ms.Stop() + defer func() { + _ = ms.Stop() + }() // Initial metrics should be zero metrics := ms.GetMetrics() @@ -16,7 +18,7 @@ func TestMetrics(t *testing.T) { } // Test Hits - ms.Set("key1", []byte("value1"), time.Minute) + _ = ms.Set("key1", []byte("value1"), time.Minute) ms.Get("key1") metrics = ms.GetMetrics() if metrics.Hits != 1 { @@ -31,14 +33,14 @@ func TestMetrics(t *testing.T) { } // Test Items - ms.Set("key2", []byte("value2"), time.Minute) + _ = ms.Set("key2", []byte("value2"), time.Minute) metrics = ms.GetMetrics() if metrics.Items != 2 { t.Errorf("Expected 2 items, got %d", metrics.Items) } // Test Evictions - ms.Set("expired", []byte("expired"), 1*time.Millisecond) + _ = ms.Set("expired", []byte("expired"), 1*time.Millisecond) time.Sleep(100 * time.Millisecond) // Wait for expiration // Trigger cleanup diff --git a/memorystore/pubsub_gcp.go b/memorystore/pubsub_gcp.go index 0007f8a..ed59318 100644 --- a/memorystore/pubsub_gcp.go +++ b/memorystore/pubsub_gcp.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub" //nolint:staticcheck "github.com/google/uuid" "google.golang.org/api/option" ) @@ -99,8 +99,9 @@ func (g *GCPPubSub) Subscribe(topicName string) (<-chan []byte, error) { } }) if err != nil && err != context.Canceled { - // Log error? - // fmt.Printf("Receive error: %v\n", err) + // In a real application, we might want to log this error or handle it more gracefully. + // For now, we'll just ignore it as per the existing design. + _ = err } }() @@ -151,6 +152,7 @@ func (g *GCPPubSub) Unsubscribe(topicName string) error { // Delete subscription from GCP to clean up if err := sub.sub.Delete(context.Background()); err != nil { // Log error but continue + _ = err } delete(g.subscriptions, topicName) @@ -166,7 +168,7 @@ func (g *GCPPubSub) Close() error { sub.cancel() sub.wg.Wait() // Best effort delete - sub.sub.Delete(context.Background()) + _ = sub.sub.Delete(context.Background()) } g.subscriptions = nil diff --git a/memorystore/pubsub_memory.go b/memorystore/pubsub_memory.go index b925385..bc6c83f 100644 --- a/memorystore/pubsub_memory.go +++ b/memorystore/pubsub_memory.go @@ -14,10 +14,10 @@ const ( // subscription represents an individual subscriber type subscription struct { - topic string // The topic this subscription matches - ch chan []byte // Channel for sending messages to the subscriber - ctx context.Context // Context for managing subscription lifetime - cancel func() // Function to cancel the subscription context + topic string // The topic this subscription matches + ch chan []byte // Channel for sending messages to the subscriber + ctx context.Context // Context for managing subscription lifetime + cancel func() // Function to cancel the subscription context } // InMemoryPubSub handles all publish/subscribe operations in memory @@ -53,10 +53,10 @@ func (ps *InMemoryPubSub) Subscribe(topic string) (<-chan []byte, error) { ch := make(chan []byte, defaultChannelBuffer) sub := &subscription{ - topic: topic, - ch: ch, - ctx: ctx, - cancel: cancel, + topic: topic, + ch: ch, + ctx: ctx, + cancel: cancel, } // Add subscription to manager diff --git a/memorystore/pubsub_memory_test.go b/memorystore/pubsub_memory_test.go new file mode 100644 index 0000000..9c3bf73 --- /dev/null +++ b/memorystore/pubsub_memory_test.go @@ -0,0 +1,177 @@ +package memorystore + +import ( + "sync" + "testing" + "time" +) + +func TestMatchesTopic(t *testing.T) { + tests := []struct { + topic string + pattern string + match bool + }{ + {"user:123", "user:123", true}, + {"user:123", "user:*", true}, + {"user:123:profile", "user:*:profile", true}, + {"user:123:profile", "user:123:*", true}, + {"user:123", "post:123", false}, + {"user:123", "user:123:profile", false}, + {"user:123:profile", "user:123", false}, + {"user:123", "*", false}, + {"a", "*", true}, + {"a:b", "*:*", true}, + {"a:b", "a:*", true}, + {"a:b", "*:b", true}, + {"a:b:c", "a:*:c", true}, + } + + for _, tt := range tests { + if got := matchesTopic(tt.topic, tt.pattern); got != tt.match { + t.Errorf("matchesTopic(%q, %q) = %v, want %v", tt.topic, tt.pattern, got, tt.match) + } + } +} + +func TestInMemoryPubSub_Closed(t *testing.T) { + ps := newInMemoryPubSub() + + // Test Close + if err := ps.Close(); err != nil { + t.Fatalf("Close failed: %v", err) + } + + // Test operations after Close + if _, err := ps.Subscribe("topic"); err != ErrStoreStopped { + t.Errorf("Subscribe after Close should return ErrStoreStopped, got %v", err) + } + + if err := ps.Publish("topic", []byte("msg")); err != ErrStoreStopped { + t.Errorf("Publish after Close should return ErrStoreStopped, got %v", err) + } + + if err := ps.Unsubscribe("topic"); err != ErrStoreStopped { + t.Errorf("Unsubscribe after Close should return ErrStoreStopped, got %v", err) + } +} + +func TestInMemoryPubSub_RemoveSubscription(t *testing.T) { + ps := newInMemoryPubSub() + defer ps.Close() + + topic := "test" + sub1, err := ps.Subscribe(topic) + if err != nil { + t.Fatalf("Subscribe 1 failed: %v", err) + } + + sub2, err := ps.Subscribe(topic) + if err != nil { + t.Fatalf("Subscribe 2 failed: %v", err) + } + + // Verify we have 2 subs + ps.mu.RLock() + if len(ps.subscriptions[topic]) != 2 { + t.Errorf("Expected 2 subscriptions, got %d", len(ps.subscriptions[topic])) + } + ps.mu.RUnlock() + + // Unsubscribe all for topic + if err := ps.Unsubscribe(topic); err != nil { + t.Fatalf("Unsubscribe failed: %v", err) + } + + // Wait a bit for cleanup goroutines + time.Sleep(50 * time.Millisecond) + + ps.mu.RLock() + if len(ps.subscriptions[topic]) != 0 { + t.Errorf("Expected 0 subscriptions after Unsubscribe, got %d", len(ps.subscriptions[topic])) + } + ps.mu.RUnlock() + + // Verify channels are closed + select { + case _, ok := <-sub1: + if ok { + t.Error("sub1 channel should be closed") + } + default: + // might not be closed yet if we didn't wait enough, but Sleep should cover it + } + select { + case _, ok := <-sub2: + if ok { + t.Error("sub2 channel should be closed") + } + default: + } +} + +func TestInMemoryPubSub_TopicCleaning(t *testing.T) { + ps := newInMemoryPubSub() + defer ps.Close() + + topic := "temp-topic" + ch, _ := ps.Subscribe(topic) + + // Unsubscribe + _ = ps.Unsubscribe(topic) + + // Wait for cleanup + time.Sleep(10 * time.Millisecond) + + // Check if topic entry is removed from map + ps.mu.RLock() + _, exists := ps.subscriptions[topic] + ps.mu.RUnlock() + + if exists { + t.Error("Topic entry should be removed from map after last subscriber is removed") + } + + // Drain channel to be safe + for range ch { + } +} + +func TestInMemoryPubSub_ConcurrentPublishSubscribe(t *testing.T) { + ps := newInMemoryPubSub() + defer ps.Close() + + var wg sync.WaitGroup + const routines = 20 + + // Concurrent Subscribe + for i := 0; i < routines; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + _, _ = ps.Subscribe("topic") + }(i) + } + + // Concurrent Publish + for i := 0; i < routines; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + _ = ps.Publish("topic", []byte("msg")) + }(i) + } + + // Concurrent Unsubscribe + for i := 0; i < routines; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + // Sleep a bit to let some subscribes happen + time.Sleep(time.Millisecond) + _ = ps.Unsubscribe("topic") + }(i) + } + + wg.Wait() +} diff --git a/memorystore/pubsub_test.go b/memorystore/pubsub_test.go index 1c9e0dd..8bca7f5 100644 --- a/memorystore/pubsub_test.go +++ b/memorystore/pubsub_test.go @@ -1,344 +1,354 @@ -// memorystore/pubsub_test.go -package memorystore - -import ( - "fmt" - "sync" - "testing" - "time" -) - -// TestMemoryStore_Subscribe tests basic subscription functionality -func TestMemoryStore_Subscribe(t *testing.T) { - tests := []struct { - name string - pattern string - wantErr bool - publishKey string - publishMsg []byte - shouldMatch bool - }{ - { - name: "exact match subscription", - pattern: "user:123", - wantErr: false, - publishKey: "user:123", - publishMsg: []byte("test message"), - shouldMatch: true, - }, - { - name: "wildcard subscription", - pattern: "user:*", - wantErr: false, - publishKey: "user:123", - publishMsg: []byte("test message"), - shouldMatch: true, - }, - { - name: "non-matching subscription", - pattern: "user:123", - wantErr: false, - publishKey: "user:456", - publishMsg: []byte("test message"), - shouldMatch: false, - }, - { - name: "empty pattern", - pattern: "", - wantErr: true, - publishKey: "", - publishMsg: nil, - shouldMatch: false, - }, - { - name: "multiple wildcards", - pattern: "user:*:status", - wantErr: false, - publishKey: "user:123:status", - publishMsg: []byte("active"), - shouldMatch: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ms := NewMemoryStore() - defer ms.Stop() - - // Create subscription - ch, err := ms.Subscribe(tt.pattern) - if (err != nil) != tt.wantErr { - t.Errorf("Subscribe() error = %v, wantErr %v", err, tt.wantErr) - return - } - - if err != nil { - return - } - - // Test publishing - var receivedMsg []byte - var wg sync.WaitGroup - wg.Add(1) - - go func() { - defer wg.Done() - select { - case msg := <-ch: - receivedMsg = msg - case <-time.After(100 * time.Millisecond): - // Timeout if no message received - } - }() - - err = ms.Publish(tt.publishKey, tt.publishMsg) - if err != nil { - t.Errorf("Publish() error = %v", err) - } - - wg.Wait() - - if tt.shouldMatch { - if receivedMsg == nil { - t.Error("Expected to receive message but got none") - } else if string(receivedMsg) != string(tt.publishMsg) { - t.Errorf("Got message %s, want %s", string(receivedMsg), string(tt.publishMsg)) - } - } else { - if receivedMsg != nil { - t.Errorf("Got unexpected message %s", string(receivedMsg)) - } - } - }) - } -} - -// TestMemoryStore_Unsubscribe tests unsubscription functionality -func TestMemoryStore_Unsubscribe(t *testing.T) { - ms := NewMemoryStore() - defer ms.Stop() - - pattern := "test:*" - ch, err := ms.Subscribe(pattern) - if err != nil { - t.Fatalf("Subscribe() error = %v", err) - } - - // Unsubscribe - err = ms.Unsubscribe(pattern) - if err != nil { - t.Errorf("Unsubscribe() error = %v", err) - } - - // Verify channel is closed - select { - case _, ok := <-ch: - if ok { - t.Error("Channel should be closed after unsubscribe") - } - case <-time.After(100 * time.Millisecond): - t.Error("Channel should be closed immediately") - } - - // Verify subscriber count is 0 - if count := ms.SubscriberCount(pattern); count != 0 { - t.Errorf("SubscriberCount() = %v, want 0", count) - } -} - -// TestMemoryStore_MultipleSubscribers tests multiple subscribers to the same pattern -func TestMemoryStore_MultipleSubscribers(t *testing.T) { - ms := NewMemoryStore() - defer ms.Stop() - - pattern := "test:*" - subscribers := 5 - message := []byte("test message") - timeout := time.After(2 * time.Second) // Add timeout - - var channels []<-chan []byte - var wg sync.WaitGroup - - // Create subscribers - for i := 0; i < subscribers; i++ { - ch, err := ms.Subscribe(pattern) - if err != nil { - t.Fatalf("Subscribe() error = %v", err) - } - channels = append(channels, ch) - } - - // Verify subscriber count - if count := ms.SubscriberCount(pattern); count != subscribers { - t.Errorf("SubscriberCount() = %v, want %v", count, subscribers) - } - - // Test message delivery to all subscribers - wg.Add(subscribers) - receivedCount := 0 - var mu sync.Mutex - - for i := 0; i < subscribers; i++ { - go func(ch <-chan []byte) { - defer wg.Done() - select { - case msg := <-ch: - if string(msg) == string(message) { - mu.Lock() - receivedCount++ - mu.Unlock() - } - case <-timeout: - // Timeout - don't block forever - t.Error("Timeout waiting for message") - } - }(channels[i]) - } - - // Publish message - err := ms.Publish("test:123", message) - if err != nil { - t.Fatalf("Publish() error = %v", err) - } - - // Wait with timeout - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - select { - case <-done: - // Success - case <-time.After(3 * time.Second): - t.Fatal("Test timed out") - } - - if receivedCount != subscribers { - t.Errorf("Message received by %v subscribers, want %v", receivedCount, subscribers) - } -} - -func TestMemoryStore_PubSub_Concurrent(t *testing.T) { - ms := NewMemoryStore() - defer ms.Stop() - - const publishers = 5 - const subscribers = 5 - const messagesPerPublisher = 20 - - var wg sync.WaitGroup - received := make(map[string]int) - var mu sync.Mutex - - // Channel to signal when all messages have been published - allPublished := make(chan struct{}) - - // Create subscribers - var subWg sync.WaitGroup - for i := 0; i < subscribers; i++ { - ch, err := ms.Subscribe("test:*") - if err != nil { - t.Fatalf("Subscribe() error = %v", err) - } - - subWg.Add(1) - go func() { - defer subWg.Done() - for { - select { - case msg, ok := <-ch: - if !ok { - return - } - mu.Lock() - received[string(msg)]++ - mu.Unlock() - case <-allPublished: - // Give a short time to process any remaining messages - time.Sleep(100 * time.Millisecond) - return - } - } - }() - } - - // Create publishers - wg.Add(publishers) - for i := 0; i < publishers; i++ { - go func(id int) { - defer wg.Done() - for j := 0; j < messagesPerPublisher; j++ { - msg := []byte(fmt.Sprintf("msg-%d-%d", id, j)) - if err := ms.Publish("test:123", msg); err != nil { - t.Errorf("Publish() error = %v", err) - } - time.Sleep(time.Millisecond) // Small delay to prevent message flood - } - }(i) - } - - // Wait for all publishers to finish - wg.Wait() - close(allPublished) - - // Wait for subscribers to finish processing - done := make(chan struct{}) - go func() { - subWg.Wait() - close(done) - }() - - // Wait with timeout - select { - case <-done: - // Success - case <-time.After(2 * time.Second): - t.Fatal("Timeout waiting for subscribers to finish") - } - - // Verify results - mu.Lock() - totalMessages := len(received) - messageCount := 0 - for _, count := range received { - messageCount += count - } - mu.Unlock() - - expectedTotal := publishers * messagesPerPublisher * subscribers - if messageCount != expectedTotal { - t.Errorf("Expected %d total message receipts, got %d", expectedTotal, messageCount) - } - if totalMessages == 0 { - t.Error("No messages were received") - } -} - -// BenchmarkMemoryStore_PubSub benchmarks publish/subscribe operations -func BenchmarkMemoryStore_PubSub(b *testing.B) { - ms := NewMemoryStore() - defer ms.Stop() - - ch, err := ms.Subscribe("bench:*") - if err != nil { - b.Fatalf("Subscribe() error = %v", err) - } - - // Start consumer - go func() { - for range ch { - // Consume messages - } - }() - - message := []byte("benchmark message") - b.ResetTimer() - - for i := 0; i < b.N; i++ { - if err := ms.Publish("bench:test", message); err != nil { - b.Fatalf("Publish() error = %v", err) - } - } -} +// memorystore/pubsub_test.go +package memorystore + +import ( + "fmt" + "sync" + "testing" + "time" +) + +// TestMemoryStore_Subscribe tests basic subscription functionality +func TestMemoryStore_Subscribe(t *testing.T) { + tests := []struct { + name string + pattern string + wantErr bool + publishKey string + publishMsg []byte + shouldMatch bool + }{ + { + name: "exact match subscription", + pattern: "user:123", + wantErr: false, + publishKey: "user:123", + publishMsg: []byte("test message"), + shouldMatch: true, + }, + { + name: "wildcard subscription", + pattern: "user:*", + wantErr: false, + publishKey: "user:123", + publishMsg: []byte("test message"), + shouldMatch: true, + }, + { + name: "non-matching subscription", + pattern: "user:123", + wantErr: false, + publishKey: "user:456", + publishMsg: []byte("test message"), + shouldMatch: false, + }, + { + name: "empty pattern", + pattern: "", + wantErr: true, + publishKey: "", + publishMsg: nil, + shouldMatch: false, + }, + { + name: "multiple wildcards", + pattern: "user:*:status", + wantErr: false, + publishKey: "user:123:status", + publishMsg: []byte("active"), + shouldMatch: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + // Create subscription + ch, err := ms.Subscribe(tt.pattern) + if (err != nil) != tt.wantErr { + t.Errorf("Subscribe() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if err != nil { + return + } + + // Test publishing + var receivedMsg []byte + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + select { + case msg := <-ch: + receivedMsg = msg + case <-time.After(100 * time.Millisecond): + // Timeout if no message received + } + }() + + err = ms.Publish(tt.publishKey, tt.publishMsg) + if err != nil { + t.Errorf("Publish() error = %v", err) + } + + wg.Wait() + + if tt.shouldMatch { + if receivedMsg == nil { + t.Error("Expected to receive message but got none") + } else if string(receivedMsg) != string(tt.publishMsg) { + t.Errorf("Got message %s, want %s", string(receivedMsg), string(tt.publishMsg)) + } + } else { + if receivedMsg != nil { + t.Errorf("Got unexpected message %s", string(receivedMsg)) + } + } + }) + } +} + +// TestMemoryStore_Unsubscribe tests unsubscription functionality +func TestMemoryStore_Unsubscribe(t *testing.T) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + pattern := "test:*" + ch, err := ms.Subscribe(pattern) + if err != nil { + t.Fatalf("Subscribe() error = %v", err) + } + + // Unsubscribe + err = ms.Unsubscribe(pattern) + if err != nil { + t.Errorf("Unsubscribe() error = %v", err) + } + + // Verify channel is closed + select { + case _, ok := <-ch: + if ok { + t.Error("Channel should be closed after unsubscribe") + } + case <-time.After(100 * time.Millisecond): + t.Error("Channel should be closed immediately") + } + + // Verify subscriber count is 0 + if count := ms.SubscriberCount(pattern); count != 0 { + t.Errorf("SubscriberCount() = %v, want 0", count) + } +} + +// TestMemoryStore_MultipleSubscribers tests multiple subscribers to the same pattern +func TestMemoryStore_MultipleSubscribers(t *testing.T) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + pattern := "test:*" + subscribers := 5 + message := []byte("test message") + timeout := time.After(2 * time.Second) // Add timeout + + var channels []<-chan []byte + var wg sync.WaitGroup + + // Create subscribers + for i := 0; i < subscribers; i++ { + ch, err := ms.Subscribe(pattern) + if err != nil { + t.Fatalf("Subscribe() error = %v", err) + } + channels = append(channels, ch) + } + + // Verify subscriber count + if count := ms.SubscriberCount(pattern); count != subscribers { + t.Errorf("SubscriberCount() = %v, want %v", count, subscribers) + } + + // Test message delivery to all subscribers + wg.Add(subscribers) + receivedCount := 0 + var mu sync.Mutex + + for i := 0; i < subscribers; i++ { + go func(ch <-chan []byte) { + defer wg.Done() + select { + case msg := <-ch: + if string(msg) == string(message) { + mu.Lock() + receivedCount++ + mu.Unlock() + } + case <-timeout: + // Timeout - don't block forever + t.Error("Timeout waiting for message") + } + }(channels[i]) + } + + // Publish message + err := ms.Publish("test:123", message) + if err != nil { + t.Fatalf("Publish() error = %v", err) + } + + // Wait with timeout + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(3 * time.Second): + t.Fatal("Test timed out") + } + + if receivedCount != subscribers { + t.Errorf("Message received by %v subscribers, want %v", receivedCount, subscribers) + } +} + +func TestMemoryStore_PubSub_Concurrent(t *testing.T) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + const publishers = 5 + const subscribers = 5 + const messagesPerPublisher = 20 + + var wg sync.WaitGroup + received := make(map[string]int) + var mu sync.Mutex + + // Channel to signal when all messages have been published + allPublished := make(chan struct{}) + + // Create subscribers + var subWg sync.WaitGroup + for i := 0; i < subscribers; i++ { + ch, err := ms.Subscribe("test:*") + if err != nil { + t.Fatalf("Subscribe() error = %v", err) + } + + subWg.Add(1) + go func() { + defer subWg.Done() + for { + select { + case msg, ok := <-ch: + if !ok { + return + } + mu.Lock() + received[string(msg)]++ + mu.Unlock() + case <-allPublished: + // Give a short time to process any remaining messages + time.Sleep(100 * time.Millisecond) + return + } + } + }() + } + + // Create publishers + wg.Add(publishers) + for i := 0; i < publishers; i++ { + go func(id int) { + defer wg.Done() + for j := 0; j < messagesPerPublisher; j++ { + msg := []byte(fmt.Sprintf("msg-%d-%d", id, j)) + if err := ms.Publish("test:123", msg); err != nil { + t.Errorf("Publish() error = %v", err) + } + time.Sleep(time.Millisecond) // Small delay to prevent message flood + } + }(i) + } + + // Wait for all publishers to finish + wg.Wait() + close(allPublished) + + // Wait for subscribers to finish processing + done := make(chan struct{}) + go func() { + subWg.Wait() + close(done) + }() + + // Wait with timeout + select { + case <-done: + // Success + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for subscribers to finish") + } + + // Verify results + mu.Lock() + totalMessages := len(received) + messageCount := 0 + for _, count := range received { + messageCount += count + } + mu.Unlock() + + expectedTotal := publishers * messagesPerPublisher * subscribers + if messageCount != expectedTotal { + t.Errorf("Expected %d total message receipts, got %d", expectedTotal, messageCount) + } + if totalMessages == 0 { + t.Error("No messages were received") + } +} + +// BenchmarkMemoryStore_PubSub benchmarks publish/subscribe operations +func BenchmarkMemoryStore_PubSub(b *testing.B) { + ms := NewMemoryStore() + defer func() { + _ = ms.Stop() + }() + + ch, err := ms.Subscribe("bench:*") + if err != nil { + b.Fatalf("Subscribe() error = %v", err) + } + + // Start consumer + go func() { + for range ch { + // Consume messages + } + }() + + message := []byte("benchmark message") + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := ms.Publish("bench:test", message); err != nil { + b.Fatalf("Publish() error = %v", err) + } + } +}