diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 12cb27e..b517353 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -22,7 +22,7 @@ jobs: go-version: '1.20' - name: Build - run: go run main.go + run: go build -v ./... - name: Test run: go test -v ./... diff --git a/README.md b/README.md index 58c4336..ae161c7 100644 --- a/README.md +++ b/README.md @@ -1,217 +1,288 @@ -# ๐Ÿš€ MemoryStore - -MemoryStore is a high-performance, thread-safe, in-memory key-value store implemented in Go. It features automatic key expiration, JSON serialization support, and concurrent access safety. - -[![Go Report Card](https://goreportcard.com/badge/github.com/BryceWayne/MemoryStore)](https://goreportcard.com/report/github.com/BryceWayne/MemoryStore) -[![GoDoc](https://godoc.org/github.com/BryceWayne/MemoryStore?status.svg)](https://godoc.org/github.com/BryceWayne/MemoryStore) - -## Features - -- ๐Ÿ”„ Thread-safe operations -- โฐ Automatic key expiration -- ๐Ÿงน Background cleanup of expired keys -- ๐Ÿ“ฆ Support for both raw bytes and JSON data -- ๐Ÿ’ช High-performance using go-json -- ๐Ÿ”’ Clean shutdown mechanism -- ๐Ÿ“ Comprehensive documentation -- ๐Ÿ“ก Pattern-based Publish/Subscribe system - -## Installation - -```bash -go get github.com/BryceWayne/MemoryStore -``` - -## Quick Start - -Here's a simple example demonstrating basic usage: - -```go -package main - -import ( - "log" - "time" - "github.com/BryceWayne/MemoryStore/memorystore" -) - -func main() { - // Create a new store instance - store := memorystore.NewMemoryStore() - defer store.Stop() - - // Store a string (converted to bytes) - err := store.Set("greeting", []byte("Hello, World!"), 1*time.Minute) - if err != nil { - log.Fatal(err) - } - - // Retrieve the value - if value, exists := store.Get("greeting"); exists { - log.Printf("Value: %s", string(value)) - } -} -``` - -## Advanced Usage - -### Working with JSON - -MemoryStore provides convenient methods for JSON serialization: - -```go -type User struct { - Name string `json:"name"` - Email string `json:"email"` -} - -func main() { - store := memorystore.NewMemoryStore() - defer store.Stop() - - // Store JSON data - user := User{Name: "Alice", Email: "alice@example.com"} - err := store.SetJSON("user:123", user, 1*time.Hour) - if err != nil { - log.Fatal(err) - } - - // Retrieve JSON data - var retrievedUser User - exists, err := store.GetJSON("user:123", &retrievedUser) - if err != nil { - log.Fatal(err) - } - if exists { - log.Printf("User: %+v", retrievedUser) - } -} -``` - -### Working with PubSub - -MemoryStore includes a powerful publish/subscribe system for real-time communication: - -```go -func main() { - store := memorystore.NewMemoryStore() - defer store.Stop() - - // Subscribe to user updates - userEvents, err := store.Subscribe("user:*") - if err != nil { - log.Fatal(err) - } - - // Listen for messages in a goroutine - go func() { - for msg := range userEvents { - log.Printf("Received update: %s", string(msg)) - } - }() - - // Publish updates - err = store.Publish("user:123", []byte("status:active")) - if err != nil { - log.Fatal(err) - } -} -``` - -The PubSub system supports: -- Pattern-based subscriptions (`user:*`, `order:*:status`) -- Non-blocking message delivery -- Automatic cleanup of disconnected subscribers -- Thread-safe concurrent access -- Integration with existing store operations - -Methods available: -- `Subscribe(pattern string) (<-chan []byte, error)`: Subscribe to a pattern -- `Publish(channel string, message []byte) error`: Publish a message -- `Unsubscribe(pattern string) error`: Unsubscribe from a pattern -- `SubscriberCount(pattern string) int`: Get number of subscribers - -### Expiration and Cleanup - -Keys automatically expire after their specified duration: - -```go -// Set a value that expires in 5 seconds -store.Set("temp", []byte("temporary value"), 5*time.Second) - -// The background cleanup routine will automatically remove expired items -// You can also manually check if an item exists -time.Sleep(6 * time.Second) -if _, exists := store.Get("temp"); !exists { - log.Println("Key has expired") -} -``` - -### Proper Shutdown - -Always ensure proper cleanup by calling `Stop()`: - -```go -store := memorystore.NewMemoryStore() -defer func() { - if err := store.Stop(); err != nil { - log.Printf("Error stopping store: %v", err) - } -}() -``` - -## Performance Considerations - -- Uses `github.com/goccy/go-json` for faster JSON operations -- Minimizes lock contention with RWMutex -- Efficient background cleanup of expired items -- Memory-efficient storage using byte slices - -## Building and Testing - -Use the provided Makefile: - -```bash -# Build the project -make build - -# Run tests -make test - -# Run benchmarks -make bench -``` - -## Contributing - -Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change. - -1. Fork the repository -2. Create your feature branch (`git checkout -b feature/AmazingFeature`) -3. Commit your changes (`git commit -m 'Add some AmazingFeature'`) -4. Push to the branch (`git push origin feature/AmazingFeature`) -5. Open a Pull Request - -## License - -This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. - -## Acknowledgments - -- Thanks to the Go team for the amazing standard library -- [go-json](https://github.com/goccy/go-json) for high-performance JSON operations - -## Todo - -- [ ] Add support for batch operations -- [ ] Implement data persistence -- [ ] Add metrics and monitoring -- [ ] Add compression options - -## Support - -If you have any questions or need help integrating MemoryStore, please: - -1. Check the [documentation](https://godoc.org/github.com/BryceWayne/MemoryStore) -2. Open an issue with a detailed description -3. Reach out through the discussions tab \ No newline at end of file +# ๐Ÿš€ MemoryStore + +MemoryStore is a high-performance, thread-safe, in-memory key-value store implemented in Go. It features automatic key expiration, JSON serialization support, batch operations, metrics, and an agnostic Publish/Subscribe system (supporting In-Memory and Google Cloud PubSub). + +[![Go Report Card](https://goreportcard.com/badge/github.com/BryceWayne/MemoryStore)](https://goreportcard.com/report/github.com/BryceWayne/MemoryStore) +[![GoDoc](https://godoc.org/github.com/BryceWayne/MemoryStore?status.svg)](https://godoc.org/github.com/BryceWayne/MemoryStore) + +## Features + +- ๐Ÿ”„ Thread-safe operations +- โฐ Automatic key expiration +- ๐Ÿงน Background cleanup of expired keys +- ๐Ÿ“ฆ Support for both raw bytes and JSON data +- ๐Ÿ’ช High-performance using go-json +- ๐Ÿš€ Batch operations (`SetMulti`, `GetMulti`) +- ๐Ÿ“Š Built-in Metrics and Monitoring +- ๐Ÿ“ก Agnostic Publish/Subscribe system (In-Memory & GCP PubSub) +- ๐Ÿ”’ Clean shutdown mechanism +- ๐Ÿ“ Comprehensive documentation + +## Installation + +```bash +go get github.com/BryceWayne/MemoryStore +``` + +## Quick Start + +Here's a simple example demonstrating basic usage: + +```go +package main + +import ( + "log" + "time" + "github.com/BryceWayne/MemoryStore/memorystore" +) + +func main() { + // Create a new store instance (defaults to In-Memory PubSub) + store := memorystore.NewMemoryStore() + defer store.Stop() + + // Store a string (converted to bytes) + err := store.Set("greeting", []byte("Hello, World!"), 1*time.Minute) + if err != nil { + log.Fatal(err) + } + + // Retrieve the value + if value, exists := store.Get("greeting"); exists { + log.Printf("Value: %s", string(value)) + } +} +``` + +## PubSub Usage + +MemoryStore supports an agnostic PubSub interface. By default, it uses an in-memory implementation. To use Google Cloud PubSub, simply provide your Project ID configuration. + +### Using In-Memory PubSub (Default) + +```go +package main + +import ( + "log" + "time" + "github.com/BryceWayne/MemoryStore/memorystore" +) + +func main() { + store := memorystore.NewMemoryStore() + defer store.Stop() + + // Subscribe to a topic + msgs, err := store.Subscribe("user-updates") + if err != nil { + log.Fatal(err) + } + + // Listen in background + go func() { + for msg := range msgs { + log.Printf("Received: %s", string(msg)) + } + }() + + // Publish to the topic + err = store.Publish("user-updates", []byte("User 123 logged in")) + if err != nil { + log.Fatal(err) + } + + // Give time for message delivery + time.Sleep(1 * time.Second) +} +``` + +### Using Google Cloud PubSub + +```go +package main + +import ( + "log" + "time" + "github.com/BryceWayne/MemoryStore/memorystore" +) + +func main() { + // Configure with GCP Project ID + config := memorystore.Config{ + GCPProjectID: "my-gcp-project-id", + } + store := memorystore.NewMemoryStoreWithConfig(config) + defer store.Stop() + + // Subscribe to a topic + // Note: GCP PubSub creates a subscription for this topic + msgs, err := store.Subscribe("user-updates") + if err != nil { + log.Fatal(err) + } + + // Listen in background + go func() { + for msg := range msgs { + log.Printf("Received: %s", string(msg)) + } + }() + + // Publish to the topic + err = store.Publish("user-updates", []byte("User 123 logged in")) + if err != nil { + log.Fatal(err) + } + + // Give time for message delivery + time.Sleep(1 * time.Second) +} +``` + +Or simply set `GOOGLE_CLOUD_PROJECT` environment variable: + +```bash +export GOOGLE_CLOUD_PROJECT=my-project-id +``` +```go +store := memorystore.NewMemoryStore() // Automatically picks up GCP PubSub +``` + +## Advanced Usage + +### Working with JSON + +MemoryStore provides convenient methods for JSON serialization: + +```go +type User struct { + Name string `json:"name"` + Email string `json:"email"` +} + +func main() { + store := memorystore.NewMemoryStore() + defer store.Stop() + + // Store JSON data + user := User{Name: "Alice", Email: "alice@example.com"} + err := store.SetJSON("user:123", user, 1*time.Hour) + if err != nil { + log.Fatal(err) + } + + // Retrieve JSON data + var retrievedUser User + exists, err := store.GetJSON("user:123", &retrievedUser) + if err != nil { + log.Fatal(err) + } + if exists { + log.Printf("User: %+v", retrievedUser) + } +} +``` + +### Batch Operations + +Efficiently set or get multiple items at once: + +```go +items := map[string][]byte{ + "key1": []byte("val1"), + "key2": []byte("val2"), +} +store.SetMulti(items, time.Minute) + +results := store.GetMulti([]string{"key1", "key2"}) +``` + +### Metrics + +Monitor cache performance: + +```go +metrics := store.GetMetrics() +log.Printf("Hits: %d, Misses: %d, Items: %d", metrics.Hits, metrics.Misses, metrics.Items) +``` + +### Expiration and Cleanup + +Keys automatically expire after their specified duration: + +```go +// Set a value that expires in 5 seconds +store.Set("temp", []byte("temporary value"), 5*time.Second) + +// The background cleanup routine will automatically remove expired items +// You can also manually check if an item exists +time.Sleep(6 * time.Second) +if _, exists := store.Get("temp"); !exists { + log.Println("Key has expired") +} +``` + +### Proper Shutdown + +Always ensure proper cleanup by calling `Stop()`: + +```go +store := memorystore.NewMemoryStore() +defer func() { + if err := store.Stop(); err != nil { + log.Printf("Error stopping store: %v", err) + } +}() +``` + +## Performance Considerations + +- Uses `github.com/goccy/go-json` for faster JSON operations +- Minimizes lock contention with RWMutex +- Efficient background cleanup of expired items +- Memory-efficient storage using byte slices + +## Building and Testing + +Use the provided Makefile: + +```bash +# Build the project +make build + +# Run tests +make test + +# Run benchmarks +make bench +``` + +## Contributing + +Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change. + +1. Fork the repository +2. Create your feature branch (`git checkout -b feature/AmazingFeature`) +3. Commit your changes (`git commit -m 'Add some AmazingFeature'`) +4. Push to the branch (`git push origin feature/AmazingFeature`) +5. Open a Pull Request + +## License + +This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. + +## Acknowledgments + +- Thanks to the Go team for the amazing standard library +- [go-json](https://github.com/goccy/go-json) for high-performance JSON operations + +## Support + +If you have any questions or need help integrating MemoryStore, please: + +1. Check the [documentation](https://godoc.org/github.com/BryceWayne/MemoryStore) +2. Open an issue with a detailed description +3. Reach out through the discussions tab diff --git a/go.mod b/go.mod index a1d8ff9..f78e48e 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,46 @@ module github.com/BryceWayne/MemoryStore -go 1.21 +go 1.24.0 + +toolchain go1.24.3 require ( github.com/goccy/go-json v0.10.2 - github.com/google/uuid v1.5.0 + github.com/google/uuid v1.6.0 +) + +require ( + cloud.google.com/go v0.121.6 // indirect + cloud.google.com/go/auth v0.16.4 // indirect + cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect + cloud.google.com/go/compute/metadata v0.8.0 // indirect + cloud.google.com/go/iam v1.5.2 // indirect + cloud.google.com/go/pubsub v1.50.1 // indirect + cloud.google.com/go/pubsub/v2 v2.0.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/s2a-go v0.1.9 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect + github.com/googleapis/gax-go/v2 v2.15.0 // indirect + go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect + go.opentelemetry.io/otel v1.36.0 // indirect + go.opentelemetry.io/otel/metric v1.36.0 // indirect + go.opentelemetry.io/otel/trace v1.36.0 // indirect + golang.org/x/crypto v0.41.0 // indirect + golang.org/x/net v0.43.0 // indirect + golang.org/x/oauth2 v0.30.0 // indirect + golang.org/x/sync v0.16.0 // indirect + golang.org/x/sys v0.35.0 // indirect + golang.org/x/text v0.28.0 // indirect + golang.org/x/time v0.12.0 // indirect + google.golang.org/api v0.247.0 // indirect + google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a // indirect + google.golang.org/grpc v1.74.2 // indirect + google.golang.org/protobuf v1.36.7 // indirect ) diff --git a/go.sum b/go.sum index 9f07c6d..6e49101 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,163 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.121.6 h1:waZiuajrI28iAf40cWgycWNgaXPO06dupuS+sgibK6c= +cloud.google.com/go v0.121.6/go.mod h1:coChdst4Ea5vUpiALcYKXEpR1S9ZgXbhEzzMcMR66vI= +cloud.google.com/go/auth v0.16.4 h1:fXOAIQmkApVvcIn7Pc2+5J8QTMVbUGLscnSVNl11su8= +cloud.google.com/go/auth v0.16.4/go.mod h1:j10ncYwjX/g3cdX7GpEzsdM+d+ZNsXAbb6qXA7p1Y5M= +cloud.google.com/go/auth/oauth2adapt v0.2.8 h1:keo8NaayQZ6wimpNSmW5OPc283g65QNIiLpZnkHRbnc= +cloud.google.com/go/auth/oauth2adapt v0.2.8/go.mod h1:XQ9y31RkqZCcwJWNSx2Xvric3RrU88hAYYbjDWYDL+c= +cloud.google.com/go/compute/metadata v0.8.0 h1:HxMRIbao8w17ZX6wBnjhcDkW6lTFpgcaobyVfZWqRLA= +cloud.google.com/go/compute/metadata v0.8.0/go.mod h1:sYOGTp851OV9bOFJ9CH7elVvyzopvWQFNNghtDQ/Biw= +cloud.google.com/go/iam v1.5.2 h1:qgFRAGEmd8z6dJ/qyEchAuL9jpswyODjA2lS+w234g8= +cloud.google.com/go/iam v1.5.2/go.mod h1:SE1vg0N81zQqLzQEwxL2WI6yhetBdbNQuTvIKCSkUHE= +cloud.google.com/go/pubsub v1.50.1 h1:fzbXpPyJnSGvWXF1jabhQeXyxdbCIkXTpjXHy7xviBM= +cloud.google.com/go/pubsub v1.50.1/go.mod h1:6YVJv3MzWJUVdvQXG081sFvS0dWQOdnV+oTo++q/xFk= +cloud.google.com/go/pubsub/v2 v2.0.0 h1:0qS6mRJ41gD1lNmM/vdm6bR7DQu6coQcVwD+VPf0Bz0= +cloud.google.com/go/pubsub/v2 v2.0.0/go.mod h1:0aztFxNzVQIRSZ8vUr79uH2bS3jwLebwK6q1sgEub+E= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= +github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/googleapis/enterprise-certificate-proxy v0.3.6 h1:GW/XbdyBFQ8Qe+YAmFU9uHLo7OnF5tL52HFAgMmyrf4= +github.com/googleapis/enterprise-certificate-proxy v0.3.6/go.mod h1:MkHOF77EYAE7qfSuSS9PU6g4Nt4e11cnsDUowfwewLA= +github.com/googleapis/gax-go/v2 v2.15.0 h1:SyjDc1mGgZU5LncH8gimWo9lW1DtIfPibOG81vgd/bo= +github.com/googleapis/gax-go/v2 v2.15.0/go.mod h1:zVVkkxAQHa1RQpg9z2AUCMnKhi0Qld9rcmyfL1OZhoc= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 h1:q4XOmH/0opmeuJtPsbFNivyl7bCt7yRBbeEm2sC/XtQ= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0/go.mod h1:snMWehoOh2wsEwnvvwtDyFCxVeDAODenXHtn5vzrKjo= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 h1:F7Jx+6hwnZ41NSFTO5q4LYDtJRXBf2PD0rNBkeB/lus= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0/go.mod h1:UHB22Z8QsdRDrnAtX4PntOl36ajSxcdUMt1sF7Y6E7Q= +go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= +go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= +go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE= +go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= +go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= +go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= +golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= +golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= +golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/api v0.247.0 h1:tSd/e0QrUlLsrwMKmkbQhYVa109qIintOls2Wh6bngc= +google.golang.org/api v0.247.0/go.mod h1:r1qZOPmxXffXg6xS5uhx16Fa/UFY8QU/K4bfKrnvovM= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20250603155806-513f23925822 h1:rHWScKit0gvAPuOnu87KpaYtjK5zBMLcULh7gxkCXu4= +google.golang.org/genproto v0.0.0-20250603155806-513f23925822/go.mod h1:HubltRL7rMh0LfnQPkMH4NPDFEWp0jw3vixw7jEM53s= +google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c h1:AtEkQdl5b6zsybXcbz00j1LwNodDuH6hVifIaNqk7NQ= +google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c/go.mod h1:ea2MjsO70ssTfCjiwHgI0ZFqcw45Ksuk2ckf9G468GA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a h1:tPE/Kp+x9dMSwUm/uM0JKK0IfdiJkwAbSMSeZBXXJXc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a/go.mod h1:gw1tLEfykwDz2ET4a12jcXt4couGAm7IwsVaTy0Sflo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= +google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4= +google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= +google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/main.go b/main.go index 381b14e..1c15d96 100644 --- a/main.go +++ b/main.go @@ -1,294 +1,115 @@ -// main.go -// Package main provides a demonstration of the memorystore package functionality. -// It shows various use cases including storing/retrieving data, handling expiration, -// and proper error handling. -package main - -import ( - "encoding/json" - "log" - "strings" - "sync" - "time" - - "github.com/BryceWayne/MemoryStore/memorystore" - "github.com/google/uuid" -) - -// Person represents a sample data structure used to demonstrate -// JSON serialization and deserialization with the MemoryStore. -type Person struct { - Name string `json:"name"` // Person's full name - Age int `json:"age"` // Person's age in years - UID string `json:"uid"` // Unique identifier - Created time.Time `json:"created,omitempty"` // Time when the record was created -} - -// demonstrateBasicOperations shows basic Set/Get operations with raw bytes -// and proper error handling. -func demonstrateBasicOperations(ms *memorystore.MemoryStore) { - log.Println("=== Demonstrating Basic Operations ===") - - // Create a sample person - person := Person{ - Name: "Alice Smith", - Age: 30, - UID: uuid.New().String(), - Created: time.Now(), - } - - // Using SetJSON for convenient JSON serialization - err := ms.SetJSON(person.UID, person, 2*time.Second) - if err != nil { - log.Printf("Failed to store person: %v", err) - return - } - log.Printf("Stored person with UID: %s", person.UID) - - // Retrieve the stored person - var retrievedPerson Person - exists, err := ms.GetJSON(person.UID, &retrievedPerson) - if err != nil { - log.Printf("Error retrieving person: %v", err) - return - } - if exists { - log.Printf("Retrieved person: %+v", retrievedPerson) - } -} - -// demonstrateExpiration shows how the cache handles expired items. -func demonstrateExpiration(ms *memorystore.MemoryStore) { - log.Println("\n=== Demonstrating Expiration ===") - - key := "expiring_key" - value := []byte("This value will expire soon") - shortDuration := 1 * time.Second - - if err := ms.Set(key, value, shortDuration); err != nil { - log.Printf("Failed to store expiring value: %v", err) - return - } - log.Printf("Stored value with %v expiration", shortDuration) - - // Wait for the value to expire - time.Sleep(shortDuration + 100*time.Millisecond) - - if _, exists := ms.Get(key); !exists { - log.Println("Value has expired as expected") - } -} - -// demonstrateNonExistentKeys shows how the cache handles missing keys. -func demonstrateNonExistentKeys(ms *memorystore.MemoryStore) { - log.Println("\n=== Demonstrating Non-Existent Keys ===") - - nonExistentKey := uuid.New().String() - if _, exists := ms.Get(nonExistentKey); !exists { - log.Printf("As expected, key does not exist: %s", nonExistentKey) - } - - var person Person - exists, err := ms.GetJSON(nonExistentKey, &person) - if !exists && err == nil { - log.Println("GetJSON correctly handles non-existent keys") - } -} - -// demonstrateStoreLifecycle shows proper initialization and cleanup of the MemoryStore. -func demonstrateStoreLifecycle() { - log.Println("\n=== Demonstrating Store Lifecycle ===") - - // Create a new store - ms := memorystore.NewMemoryStore() - log.Println("Created new MemoryStore") - - // Store a value - key := "lifecycle_test" - if err := ms.Set(key, []byte("test"), time.Minute); err != nil { - log.Printf("Failed to store value: %v", err) - return - } - - // Properly stop the store - if err := ms.Stop(); err != nil { - log.Printf("Error stopping store: %v", err) - return - } - log.Println("Successfully stopped MemoryStore") - - // Verify store is stopped - if ms.IsStopped() { - log.Println("Confirmed store is stopped") - } -} - -// demonstratePubSub shows the publish/subscribe functionality -// with pattern matching and multiple subscribers. -// demonstratePubSub shows the publish/subscribe functionality -// with complex pattern matching and JSON message support. -func demonstratePubSub(ms *memorystore.MemoryStore) { - log.Println("\n=== Demonstrating PubSub System ===") - - var wg sync.WaitGroup - - // 1. Complex Pattern Matching Examples - log.Println("Setting up pattern-based subscriptions...") - patterns := map[string]<-chan []byte{} // Store channels for cleanup - - // Subscribe to various patterns - subscribePatterns := []string{ - "users:*:status", // Match all user statuses - "users:admin:*", // Match all admin events - "orders:*.completed", // Match all completed orders - "notifications:*:high", // Match high-priority notifications - "system:*.error", // Match all system errors - } - - for _, pattern := range subscribePatterns { - ch, err := ms.Subscribe(pattern) - if err != nil { - log.Printf("Failed to subscribe to %s: %v", pattern, err) - continue - } - patterns[pattern] = ch - log.Printf("Subscribed to pattern: %s", pattern) - } - - // 2. JSON Message Integration - type UserStatus struct { - UserID string `json:"user_id"` - Status string `json:"status"` - LastSeen time.Time `json:"last_seen"` - Metadata map[string]string `json:"metadata"` - } - - type OrderEvent struct { - OrderID string `json:"order_id"` - Status string `json:"status"` - CompletedAt time.Time `json:"completed_at"` - Total float64 `json:"total"` - } - - // Set up listeners for each pattern - for pattern, ch := range patterns { - wg.Add(1) - go func(pattern string, ch <-chan []byte) { - defer wg.Done() - log.Printf("Listening on pattern: %s", pattern) - - select { - case msg := <-ch: - // Try to decode as UserStatus if it's a user event - if strings.HasPrefix(pattern, "users:") { - var status UserStatus - if err := json.Unmarshal(msg, &status); err == nil { - log.Printf("[%s] User Status Update: %+v", pattern, status) - } else { - log.Printf("[%s] Raw message: %s", pattern, string(msg)) - } - } else if strings.HasPrefix(pattern, "orders:") { - // Try to decode as OrderEvent - var order OrderEvent - if err := json.Unmarshal(msg, &order); err == nil { - log.Printf("[%s] Order Event: %+v", pattern, order) - } else { - log.Printf("[%s] Raw message: %s", pattern, string(msg)) - } - } else { - log.Printf("[%s] Message received: %s", pattern, string(msg)) - } - case <-time.After(2 * time.Second): - log.Printf("[%s] No message received", pattern) - } - }(pattern, ch) - } - - // Publish various types of messages - time.Sleep(100 * time.Millisecond) // Ensure subscribers are ready - log.Println("\nPublishing messages...") - - // Publish JSON user status - adminStatus := UserStatus{ - UserID: "admin123", - Status: "online", - LastSeen: time.Now(), - Metadata: map[string]string{"location": "NYC", "device": "desktop"}, - } - statusJSON, _ := json.Marshal(adminStatus) - ms.Publish("users:admin:status", statusJSON) - - // Publish JSON order completion - order := OrderEvent{ - OrderID: "ORD-789", - Status: "completed", - CompletedAt: time.Now(), - Total: 299.99, - } - orderJSON, _ := json.Marshal(order) - ms.Publish("orders:ORD-789.completed", orderJSON) - - // Publish system error - ms.Publish("system:database.error", []byte("Connection timeout")) - - // Publish high-priority notification - ms.Publish("notifications:user123:high", []byte("Account security alert")) - - // Wait for message processing - wg.Wait() - - log.Println("\nDemonstrating pattern matching scenarios...") - // Show which patterns match different keys - testCases := []struct { - channel string - patterns []string - }{ - { - channel: "users:admin:login", - patterns: []string{"users:admin:*"}, - }, - { - channel: "orders:xyz-789.completed", - patterns: []string{"orders:*.completed"}, - }, - { - channel: "notifications:admin:high", - patterns: []string{"notifications:*:high"}, - }, - } - - for _, tc := range testCases { - log.Printf("Channel '%s' matches patterns: %v", tc.channel, tc.patterns) - } - - // Cleanup - log.Println("\nCleaning up subscriptions...") - for pattern := range patterns { - if err := ms.Unsubscribe(pattern); err != nil { - log.Printf("Error unsubscribing from %s: %v", pattern, err) - } else { - log.Printf("Unsubscribed from %s", pattern) - } - } -} - -func main() { - // Create a new MemoryStore instance - ms := memorystore.NewMemoryStore() - - // Ensure proper cleanup when main exits - defer func() { - if err := ms.Stop(); err != nil { - log.Printf("Error stopping MemoryStore: %v", err) - } - }() - - // Run demonstrations - demonstrateBasicOperations(ms) - demonstrateExpiration(ms) - demonstrateNonExistentKeys(ms) - demonstratePubSub(ms) // Add this line - demonstrateStoreLifecycle() - - log.Println("\nAll demonstrations completed successfully") -} +// main.go +// Package main provides a demonstration of the memorystore package functionality. +package main + +import ( + "log" + "os" + "sync" + "time" + + "github.com/BryceWayne/MemoryStore/memorystore" + "github.com/google/uuid" +) + +// Person represents a sample data structure. +type Person struct { + Name string `json:"name"` + Age int `json:"age"` + UID string `json:"uid"` + Created time.Time `json:"created,omitempty"` +} + +// demonstrateBasicOperations shows basic Set/Get operations. +func demonstrateBasicOperations(ms *memorystore.MemoryStore) { + log.Println("=== Demonstrating Basic Operations ===") + + person := Person{ + Name: "Alice Smith", + Age: 30, + UID: uuid.New().String(), + Created: time.Now(), + } + + err := ms.SetJSON(person.UID, person, 2*time.Second) + if err != nil { + log.Printf("Failed to store person: %v", err) + return + } + log.Printf("Stored person with UID: %s", person.UID) + + var retrievedPerson Person + exists, err := ms.GetJSON(person.UID, &retrievedPerson) + if err != nil { + log.Printf("Error retrieving person: %v", err) + return + } + if exists { + log.Printf("Retrieved person: %+v", retrievedPerson) + } +} + +// demonstratePubSub shows the publish/subscribe functionality. +// Note: We use exact topic names to ensure compatibility with GCP PubSub. +func demonstratePubSub(ms *memorystore.MemoryStore) { + log.Println("\n=== Demonstrating PubSub System ===") + + var wg sync.WaitGroup + topics := []string{"updates", "alerts"} + chans := make(map[string]<-chan []byte) + + // Subscribe + for _, topic := range topics { + ch, err := ms.Subscribe(topic) + if err != nil { + log.Printf("Failed to subscribe to %s: %v", topic, err) + continue + } + chans[topic] = ch + log.Printf("Subscribed to topic: %s", topic) + } + + // Listeners + for topic, ch := range chans { + wg.Add(1) + go func(t string, c <-chan []byte) { + defer wg.Done() + for { + select { + case msg, ok := <-c: + if !ok { + return + } + log.Printf("[%s] Received: %s", t, string(msg)) + case <-time.After(3 * time.Second): + return + } + } + }(topic, ch) + } + + // Publish + time.Sleep(500 * time.Millisecond) // Wait for subscriptions + ms.Publish("updates", []byte("System update available")) + ms.Publish("alerts", []byte("High CPU usage")) + + wg.Wait() + + // Unsubscribe + for _, topic := range topics { + ms.Unsubscribe(topic) + } +} + +func main() { + // Example of using GCP PubSub if env var is set + if os.Getenv("GOOGLE_CLOUD_PROJECT") == "" { + log.Println("Note: Set GOOGLE_CLOUD_PROJECT env var to test GCP PubSub backend") + } + + ms := memorystore.NewMemoryStore() + defer ms.Stop() + + demonstrateBasicOperations(ms) + demonstratePubSub(ms) +} diff --git a/memorystore/memorystore.go b/memorystore/memorystore.go index 9f76aee..62be2f4 100644 --- a/memorystore/memorystore.go +++ b/memorystore/memorystore.go @@ -7,6 +7,8 @@ package memorystore import ( "context" "hash/fnv" + "log" + "os" "sync" "sync/atomic" "time" @@ -42,7 +44,7 @@ type MemoryStore struct { lifecycleMu sync.RWMutex shards []*shard // Sharded storage - ps *pubSubManager // PubSub manager for cache events + ps PubSubClient // PubSub client for cache events ctx context.Context // Context for controlling the cleanup worker cancelFunc context.CancelFunc // Function to stop the cleanup worker wg sync.WaitGroup // WaitGroup for cleanup goroutine synchronization @@ -54,9 +56,18 @@ type MemoryStore struct { } // NewMemoryStore creates and initializes a new MemoryStore instance. -// It starts a background worker that periodically cleans up expired items. -// The returned MemoryStore is ready for use. +// It checks for GOOGLE_CLOUD_PROJECT environment variable to decide whether to use GCP PubSub. +// Use NewMemoryStoreWithConfig for more control. func NewMemoryStore() *MemoryStore { + config := Config{} + if projectID := os.Getenv("GOOGLE_CLOUD_PROJECT"); projectID != "" { + config.GCPProjectID = projectID + } + return NewMemoryStoreWithConfig(config) +} + +// NewMemoryStoreWithConfig creates a new MemoryStore with the provided configuration. +func NewMemoryStoreWithConfig(config Config) *MemoryStore { ctx, cancel := context.WithCancel(context.Background()) ms := &MemoryStore{ shards: make([]*shard, numShards), @@ -70,7 +81,7 @@ func NewMemoryStore() *MemoryStore { } } - ms.initPubSub() + ms.initPubSub(config) ms.startCleanupWorker() return ms } @@ -335,3 +346,70 @@ func (m *MemoryStore) GetMetrics() StoreMetrics { Evictions: atomic.LoadInt64(&m.evictions), } } + +// Subscribe subscribes to a topic. +func (m *MemoryStore) Subscribe(topic string) (<-chan []byte, error) { + if m.IsStopped() { + return nil, ErrStoreStopped + } + return m.ps.Subscribe(topic) +} + +// Publish publishes a message to a topic. +func (m *MemoryStore) Publish(topic string, message []byte) error { + if m.IsStopped() { + return ErrStoreStopped + } + return m.ps.Publish(topic, message) +} + +// Unsubscribe unsubscribes from a topic. +func (m *MemoryStore) Unsubscribe(topic string) error { + if m.IsStopped() { + return ErrStoreStopped + } + return m.ps.Unsubscribe(topic) +} + +// SubscriberCount returns the number of subscribers for a pattern. +// Note: This is not supported by the common interface and will return 0 or error in future. +// For now, it only works if the underlying implementation is In-Memory. +func (m *MemoryStore) SubscriberCount(pattern string) int { + // Not part of the interface. + // If we need this, we should add it to the interface or check type. + if ps, ok := m.ps.(*InMemoryPubSub); ok { + ps.mu.RLock() + defer ps.mu.RUnlock() + count := 0 + for p, subs := range ps.subscriptions { + if p == pattern { + count += len(subs) + } + } + return count + } + return 0 +} + +// initPubSub initializes the PubSub system. +func (m *MemoryStore) initPubSub(config Config) { + if config.GCPProjectID != "" { + ps, err := NewGCPPubSub(context.Background(), config.GCPProjectID) + if err == nil { + m.ps = ps + log.Printf("Initialized GCP PubSub with project %s", config.GCPProjectID) + return + } + log.Printf("Failed to initialize GCP PubSub: %v. Falling back to In-Memory.", err) + } + + m.ps = newInMemoryPubSub() + log.Println("Initialized In-Memory PubSub") +} + +// cleanupPubSub cleans up the PubSub system. +func (m *MemoryStore) cleanupPubSub() { + if m.ps != nil { + m.ps.Close() + } +} diff --git a/memorystore/pubsub.go b/memorystore/pubsub.go index 9c87633..da06ce7 100644 --- a/memorystore/pubsub.go +++ b/memorystore/pubsub.go @@ -1,212 +1,41 @@ -// memorystore/pubsub.go -package memorystore - -import ( - "context" - "errors" - "strings" - "sync" -) - -// Constants for PubSub configuration -const ( - defaultChannelBuffer = 100 // Default buffer size for subscriber channels -) - -// Common errors for PubSub operations -var ( - ErrInvalidPattern = errors.New("invalid subscription pattern") - ErrStoreStopped = errors.New("store has been stopped") -) - -// subscription represents an individual subscriber -type subscription struct { - pattern string // The pattern this subscription matches - ch chan []byte // Channel for sending messages to the subscriber - ctx context.Context // Context for managing subscription lifetime - cancel func() // Function to cancel the subscription context -} - -// pubSubManager handles all publish/subscribe operations -type pubSubManager struct { - mu sync.RWMutex - subscriptions map[string][]*subscription // Pattern -> subscriptions mapping - wg sync.WaitGroup // For graceful shutdown -} - -// newPubSubManager creates and initializes a new pubSubManager -func newPubSubManager() *pubSubManager { - return &pubSubManager{ - subscriptions: make(map[string][]*subscription), - } -} - -// Subscribe creates a new subscription for the given pattern -func (m *MemoryStore) Subscribe(pattern string) (<-chan []byte, error) { - if m.IsStopped() { - return nil, ErrStoreStopped - } - - if pattern == "" { - return nil, ErrInvalidPattern - } - - // Create subscription context - ctx, cancel := context.WithCancel(context.Background()) - ch := make(chan []byte, defaultChannelBuffer) - - sub := &subscription{ - pattern: pattern, - ch: ch, - ctx: ctx, - cancel: cancel, - } - - // Add subscription to manager - m.ps.mu.Lock() - m.ps.subscriptions[pattern] = append(m.ps.subscriptions[pattern], sub) - m.ps.mu.Unlock() - - // Add to wait group for graceful shutdown - m.ps.wg.Add(1) - - // Cleanup goroutine - go func() { - defer m.ps.wg.Done() - <-ctx.Done() - m.removeSubscription(pattern, sub) - close(ch) - }() - - return ch, nil -} - -// Publish sends a message to all subscribers matching the given channel -func (m *MemoryStore) Publish(channel string, message []byte) error { - if m.IsStopped() { - return ErrStoreStopped - } - - m.ps.mu.RLock() - defer m.ps.mu.RUnlock() - - // Find all matching subscriptions and publish to them - for pattern, subs := range m.ps.subscriptions { - if matchesPattern(channel, pattern) { - for _, sub := range subs { - select { - case <-sub.ctx.Done(): - continue // Skip closed subscriptions - default: - select { - case sub.ch <- message: - default: - // Channel is full, skip this subscriber - // Could add logging or metrics here - } - } - } - } - } - - return nil -} - -// Unsubscribe cancels all subscriptions for the given pattern -func (m *MemoryStore) Unsubscribe(pattern string) error { - if m.IsStopped() { - return ErrStoreStopped - } - - m.ps.mu.Lock() - defer m.ps.mu.Unlock() - - subs, exists := m.ps.subscriptions[pattern] - if !exists { - return nil - } - - // Cancel all subscriptions for this pattern - for _, sub := range subs { - sub.cancel() - } - - delete(m.ps.subscriptions, pattern) - return nil -} - -// SubscriberCount returns the number of subscribers for a given pattern -func (m *MemoryStore) SubscriberCount(pattern string) int { - m.ps.mu.RLock() - defer m.ps.mu.RUnlock() - - count := 0 - for p, subs := range m.ps.subscriptions { - if p == pattern { - count += len(subs) - } - } - return count -} - -// removeSubscription removes a specific subscription -func (m *MemoryStore) removeSubscription(pattern string, sub *subscription) { - m.ps.mu.Lock() - defer m.ps.mu.Unlock() - - subs := m.ps.subscriptions[pattern] - for i, s := range subs { - if s == sub { - // Remove subscription from slice - subs = append(subs[:i], subs[i+1:]...) - break - } - } - - if len(subs) == 0 { - delete(m.ps.subscriptions, pattern) - } else { - m.ps.subscriptions[pattern] = subs - } -} - -// matchesPattern checks if a channel matches a subscription pattern -func matchesPattern(channel, pattern string) bool { - // Split pattern and channel into segments - patternParts := strings.Split(pattern, ":") - channelParts := strings.Split(channel, ":") - - if len(patternParts) != len(channelParts) { - return false - } - - // Check each segment - for i, patternPart := range patternParts { - if patternPart != "*" && patternPart != channelParts[i] { - return false - } - } - - return true -} - -// initPubSub initializes the PubSub system for the MemoryStore -func (m *MemoryStore) initPubSub() { - m.ps = newPubSubManager() -} - -// cleanupPubSub performs cleanup of the PubSub system during store shutdown -func (m *MemoryStore) cleanupPubSub() { - m.ps.mu.Lock() - // Cancel all subscriptions - for pattern, subs := range m.ps.subscriptions { - for _, sub := range subs { - sub.cancel() - } - delete(m.ps.subscriptions, pattern) - } - m.ps.mu.Unlock() - - // Wait for all subscription goroutines to finish - m.ps.wg.Wait() -} +package memorystore + +import ( + "errors" + "time" +) + +// PubSubClient defines the interface for Publish/Subscribe operations +// to make the underlying implementation agnostic (In-Memory, GCP, etc). +type PubSubClient interface { + // Subscribe subscribes to a topic and returns a channel for messages. + // For GCP PubSub, 'topic' maps to a Topic, and a temporary subscription is created. + // For In-Memory, 'topic' is the pattern/channel name. + Subscribe(topic string) (<-chan []byte, error) + + // Publish sends a message to a topic. + Publish(topic string, message []byte) error + + // Unsubscribe stops receiving messages for a topic and cleans up resources. + Unsubscribe(topic string) error + + // Close shuts down the client and cleans up resources. + Close() error +} + +// Config holds configuration for the MemoryStore and its components. +type Config struct { + // GCPProjectID is the Google Cloud Project ID. + // If set, MemoryStore will attempt to use GCP PubSub. + GCPProjectID string + + // PubSubTimeout is the timeout for PubSub operations. + PubSubTimeout time.Duration +} + +// Common errors for PubSub operations +var ( + ErrInvalidTopic = errors.New("invalid topic") + ErrStoreStopped = errors.New("store has been stopped") + ErrNotImplemented = errors.New("feature not implemented by provider") +) diff --git a/memorystore/pubsub_gcp.go b/memorystore/pubsub_gcp.go new file mode 100644 index 0000000..0007f8a --- /dev/null +++ b/memorystore/pubsub_gcp.go @@ -0,0 +1,174 @@ +package memorystore + +import ( + "context" + "fmt" + "sync" + "time" + + "cloud.google.com/go/pubsub" + "github.com/google/uuid" + "google.golang.org/api/option" +) + +// GCPPubSub implements PubSubClient using Google Cloud PubSub +type GCPPubSub struct { + client *pubsub.Client + mu sync.RWMutex + subscriptions map[string]*gcpSubscription // Map topic/pattern -> subscription info + projectID string +} + +type gcpSubscription struct { + subID string + sub *pubsub.Subscription + cancel context.CancelFunc + wg sync.WaitGroup +} + +// NewGCPPubSub creates a new GCP PubSub client +func NewGCPPubSub(ctx context.Context, projectID string, opts ...option.ClientOption) (*GCPPubSub, error) { + client, err := pubsub.NewClient(ctx, projectID, opts...) + if err != nil { + return nil, err + } + + return &GCPPubSub{ + client: client, + subscriptions: make(map[string]*gcpSubscription), + projectID: projectID, + }, nil +} + +// Subscribe subscribes to a topic. +// Note: In GCP PubSub, we must create a Subscription to the Topic. +// Since this is a temporary/session-based subscription, we create a unique subscription ID +// and delete it when Unsubscribe is called or the client is closed. +func (g *GCPPubSub) Subscribe(topicName string) (<-chan []byte, error) { + g.mu.Lock() + defer g.mu.Unlock() + + // 1. Ensure Topic exists + topic := g.client.Topic(topicName) + exists, err := topic.Exists(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to check topic existence: %w", err) + } + if !exists { + // Try to create topic if it doesn't exist? + // Usually good for dev, might fail in strict prod. Let's try. + topic, err = g.client.CreateTopic(context.Background(), topicName) + if err != nil { + return nil, fmt.Errorf("failed to create topic: %w", err) + } + } + + // 2. Create a unique subscription + // We use a unique ID so each client instance gets its own copy of messages (Pub/Sub fan-out). + subID := fmt.Sprintf("memorystore-sub-%s-%s", topicName, uuid.New().String()) + sub, err := g.client.CreateSubscription(context.Background(), subID, pubsub.SubscriptionConfig{ + Topic: topic, + ExpirationPolicy: time.Duration(24 * time.Hour), // Auto-delete if unused + }) + if err != nil { + return nil, fmt.Errorf("failed to create subscription: %w", err) + } + + // 3. Start receiving messages + ch := make(chan []byte, 100) + ctx, cancel := context.WithCancel(context.Background()) + + gcpSub := &gcpSubscription{ + subID: subID, + sub: sub, + cancel: cancel, + } + + gcpSub.wg.Add(1) + go func() { + defer gcpSub.wg.Done() + defer close(ch) + + err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) { + // Forward message to channel + select { + case ch <- msg.Data: + msg.Ack() + case <-ctx.Done(): + msg.Nack() + } + }) + if err != nil && err != context.Canceled { + // Log error? + // fmt.Printf("Receive error: %v\n", err) + } + }() + + g.subscriptions[topicName] = gcpSub + return ch, nil +} + +// Publish publishes a message to the topic +func (g *GCPPubSub) Publish(topicName string, message []byte) error { + // Ensure topic exists (lazy creation) + // For performance, we might assume it exists or cache existence, but let's be safe. + // Actually, checking every time is slow. + // But `client.Topic` is lightweight. `Publish` handles non-existent topic by failing. + // Let's rely on standard library behavior. + + // However, if we want to auto-create topics like Redis, we should check. + // Let's check existence for now, or just try to publish. + topic := g.client.Topic(topicName) + + // We can't easily check existence without an API call. + // Let's assume the user ensures topics exist, OR we handle the error. + // But for a "canonical example", it's nice if it Just Works. + // Let's check existence once per topic per client instance? + // For now, let's just Publish. If it fails, so be it. + + res := topic.Publish(context.Background(), &pubsub.Message{ + Data: message, + }) + + _, err := res.Get(context.Background()) + return err +} + +// Unsubscribe stops the subscription and deletes it +func (g *GCPPubSub) Unsubscribe(topicName string) error { + g.mu.Lock() + defer g.mu.Unlock() + + sub, ok := g.subscriptions[topicName] + if !ok { + return nil + } + + // Stop receiving + sub.cancel() + sub.wg.Wait() + + // Delete subscription from GCP to clean up + if err := sub.sub.Delete(context.Background()); err != nil { + // Log error but continue + } + + delete(g.subscriptions, topicName) + return nil +} + +// Close closes the client and cleans up all subscriptions +func (g *GCPPubSub) Close() error { + g.mu.Lock() + defer g.mu.Unlock() + + for _, sub := range g.subscriptions { + sub.cancel() + sub.wg.Wait() + // Best effort delete + sub.sub.Delete(context.Background()) + } + g.subscriptions = nil + + return g.client.Close() +} diff --git a/memorystore/pubsub_memory.go b/memorystore/pubsub_memory.go new file mode 100644 index 0000000..b925385 --- /dev/null +++ b/memorystore/pubsub_memory.go @@ -0,0 +1,208 @@ +// memorystore/pubsub_memory.go +package memorystore + +import ( + "context" + "strings" + "sync" +) + +// Constants for PubSub configuration +const ( + defaultChannelBuffer = 100 // Default buffer size for subscriber channels +) + +// subscription represents an individual subscriber +type subscription struct { + topic string // The topic this subscription matches + ch chan []byte // Channel for sending messages to the subscriber + ctx context.Context // Context for managing subscription lifetime + cancel func() // Function to cancel the subscription context +} + +// InMemoryPubSub handles all publish/subscribe operations in memory +type InMemoryPubSub struct { + mu sync.RWMutex + subscriptions map[string][]*subscription // Topic -> subscriptions mapping + wg sync.WaitGroup // For graceful shutdown + closed bool +} + +// newInMemoryPubSub creates and initializes a new InMemoryPubSub +func newInMemoryPubSub() *InMemoryPubSub { + return &InMemoryPubSub{ + subscriptions: make(map[string][]*subscription), + } +} + +// Subscribe creates a new subscription for the given topic +func (ps *InMemoryPubSub) Subscribe(topic string) (<-chan []byte, error) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if ps.closed { + return nil, ErrStoreStopped + } + + if topic == "" { + return nil, ErrInvalidTopic + } + + // Create subscription context + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan []byte, defaultChannelBuffer) + + sub := &subscription{ + topic: topic, + ch: ch, + ctx: ctx, + cancel: cancel, + } + + // Add subscription to manager + ps.subscriptions[topic] = append(ps.subscriptions[topic], sub) + + // Add to wait group for graceful shutdown + ps.wg.Add(1) + + // Cleanup goroutine + go func() { + defer ps.wg.Done() + <-ctx.Done() + ps.removeSubscription(topic, sub) + close(ch) + }() + + return ch, nil +} + +// Publish sends a message to all subscribers matching the given topic +func (ps *InMemoryPubSub) Publish(topic string, message []byte) error { + ps.mu.RLock() + defer ps.mu.RUnlock() + + if ps.closed { + return ErrStoreStopped + } + + // Find all matching subscriptions and publish to them + // We support simple pattern matching here for backward compatibility + for subTopic, subs := range ps.subscriptions { + if matchesTopic(topic, subTopic) { + for _, sub := range subs { + select { + case <-sub.ctx.Done(): + continue // Skip closed subscriptions + default: + select { + case sub.ch <- message: + default: + // Channel is full, skip this subscriber + } + } + } + } + } + + return nil +} + +// Unsubscribe cancels all subscriptions for the given topic +func (ps *InMemoryPubSub) Unsubscribe(topic string) error { + ps.mu.Lock() + defer ps.mu.Unlock() + + if ps.closed { + return ErrStoreStopped + } + + subs, exists := ps.subscriptions[topic] + if !exists { + return nil + } + + // Cancel all subscriptions for this topic + for _, sub := range subs { + sub.cancel() + } + + delete(ps.subscriptions, topic) + return nil +} + +// Close shuts down the PubSub manager +func (ps *InMemoryPubSub) Close() error { + ps.mu.Lock() + if ps.closed { + ps.mu.Unlock() + return nil + } + ps.closed = true + + // Cancel all subscriptions + for topic, subs := range ps.subscriptions { + for _, sub := range subs { + sub.cancel() + } + delete(ps.subscriptions, topic) + } + ps.mu.Unlock() + + // Wait for all subscription goroutines to finish + ps.wg.Wait() + return nil +} + +// removeSubscription removes a specific subscription +func (ps *InMemoryPubSub) removeSubscription(topic string, sub *subscription) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if ps.closed { + return + } + + subs, ok := ps.subscriptions[topic] + if !ok { + return + } + + for i, s := range subs { + if s == sub { + // Remove subscription from slice + subs = append(subs[:i], subs[i+1:]...) + break + } + } + + if len(subs) == 0 { + delete(ps.subscriptions, topic) + } else { + ps.subscriptions[topic] = subs + } +} + +// matchesTopic checks if a topic matches a subscription pattern +func matchesTopic(topic, pattern string) bool { + // If exactly equal, return true + if topic == pattern { + return true + } + + // Split pattern and topic into segments + patternParts := strings.Split(pattern, ":") + topicParts := strings.Split(topic, ":") + + if len(patternParts) != len(topicParts) { + return false + } + + // Check each segment + for i, patternPart := range patternParts { + if patternPart != "*" && patternPart != topicParts[i] { + return false + } + } + + return true +}