From 93ce289403a90cea99af6701a16c88174b013ad5 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Wed, 31 Dec 2025 05:27:56 +0000 Subject: [PATCH 1/7] Refactor PubSub to be interface-based and add GCP implementation - Extracted `PubSubClient` interface to decouple implementation from store logic. - Implemented `GCPPubSub` using `cloud.google.com/go/pubsub`. - Renamed existing in-memory implementation to `InMemoryPubSub` and updated to match interface. - Updated `NewMemoryStore` to automatically use GCP PubSub if `GOOGLE_CLOUD_PROJECT` env var is set. - Added `NewMemoryStoreWithConfig` for explicit configuration. - Updated `main.go` and `README.md` to demonstrate GCP PubSub usage and new features. --- README.md | 418 ++++++++++++++-------------- go.mod | 42 ++- go.sum | 159 +++++++++++ main.go | 409 ++++++++------------------- memorystore/batch_test.go | 64 +++++ memorystore/memorystore.go | 516 +++++++++++++++++++++-------------- memorystore/metrics_test.go | 51 ++++ memorystore/pubsub.go | 253 +++-------------- memorystore/pubsub_gcp.go | 174 ++++++++++++ memorystore/pubsub_memory.go | 208 ++++++++++++++ 10 files changed, 1363 insertions(+), 931 deletions(-) create mode 100644 memorystore/batch_test.go create mode 100644 memorystore/metrics_test.go create mode 100644 memorystore/pubsub_gcp.go create mode 100644 memorystore/pubsub_memory.go diff --git a/README.md b/README.md index 58c4336..3e86f36 100644 --- a/README.md +++ b/README.md @@ -1,217 +1,201 @@ -# ๐Ÿš€ MemoryStore - -MemoryStore is a high-performance, thread-safe, in-memory key-value store implemented in Go. It features automatic key expiration, JSON serialization support, and concurrent access safety. - -[![Go Report Card](https://goreportcard.com/badge/github.com/BryceWayne/MemoryStore)](https://goreportcard.com/report/github.com/BryceWayne/MemoryStore) -[![GoDoc](https://godoc.org/github.com/BryceWayne/MemoryStore?status.svg)](https://godoc.org/github.com/BryceWayne/MemoryStore) - -## Features - -- ๐Ÿ”„ Thread-safe operations -- โฐ Automatic key expiration -- ๐Ÿงน Background cleanup of expired keys -- ๐Ÿ“ฆ Support for both raw bytes and JSON data -- ๐Ÿ’ช High-performance using go-json -- ๐Ÿ”’ Clean shutdown mechanism -- ๐Ÿ“ Comprehensive documentation -- ๐Ÿ“ก Pattern-based Publish/Subscribe system - -## Installation - -```bash -go get github.com/BryceWayne/MemoryStore -``` - -## Quick Start - -Here's a simple example demonstrating basic usage: - -```go -package main - -import ( - "log" - "time" - "github.com/BryceWayne/MemoryStore/memorystore" -) - -func main() { - // Create a new store instance - store := memorystore.NewMemoryStore() - defer store.Stop() - - // Store a string (converted to bytes) - err := store.Set("greeting", []byte("Hello, World!"), 1*time.Minute) - if err != nil { - log.Fatal(err) - } - - // Retrieve the value - if value, exists := store.Get("greeting"); exists { - log.Printf("Value: %s", string(value)) - } -} -``` - -## Advanced Usage - -### Working with JSON - -MemoryStore provides convenient methods for JSON serialization: - -```go -type User struct { - Name string `json:"name"` - Email string `json:"email"` -} - -func main() { - store := memorystore.NewMemoryStore() - defer store.Stop() - - // Store JSON data - user := User{Name: "Alice", Email: "alice@example.com"} - err := store.SetJSON("user:123", user, 1*time.Hour) - if err != nil { - log.Fatal(err) - } - - // Retrieve JSON data - var retrievedUser User - exists, err := store.GetJSON("user:123", &retrievedUser) - if err != nil { - log.Fatal(err) - } - if exists { - log.Printf("User: %+v", retrievedUser) - } -} -``` - -### Working with PubSub - -MemoryStore includes a powerful publish/subscribe system for real-time communication: - -```go -func main() { - store := memorystore.NewMemoryStore() - defer store.Stop() - - // Subscribe to user updates - userEvents, err := store.Subscribe("user:*") - if err != nil { - log.Fatal(err) - } - - // Listen for messages in a goroutine - go func() { - for msg := range userEvents { - log.Printf("Received update: %s", string(msg)) - } - }() - - // Publish updates - err = store.Publish("user:123", []byte("status:active")) - if err != nil { - log.Fatal(err) - } -} -``` - -The PubSub system supports: -- Pattern-based subscriptions (`user:*`, `order:*:status`) -- Non-blocking message delivery -- Automatic cleanup of disconnected subscribers -- Thread-safe concurrent access -- Integration with existing store operations - -Methods available: -- `Subscribe(pattern string) (<-chan []byte, error)`: Subscribe to a pattern -- `Publish(channel string, message []byte) error`: Publish a message -- `Unsubscribe(pattern string) error`: Unsubscribe from a pattern -- `SubscriberCount(pattern string) int`: Get number of subscribers - -### Expiration and Cleanup - -Keys automatically expire after their specified duration: - -```go -// Set a value that expires in 5 seconds -store.Set("temp", []byte("temporary value"), 5*time.Second) - -// The background cleanup routine will automatically remove expired items -// You can also manually check if an item exists -time.Sleep(6 * time.Second) -if _, exists := store.Get("temp"); !exists { - log.Println("Key has expired") -} -``` - -### Proper Shutdown - -Always ensure proper cleanup by calling `Stop()`: - -```go -store := memorystore.NewMemoryStore() -defer func() { - if err := store.Stop(); err != nil { - log.Printf("Error stopping store: %v", err) - } -}() -``` - -## Performance Considerations - -- Uses `github.com/goccy/go-json` for faster JSON operations -- Minimizes lock contention with RWMutex -- Efficient background cleanup of expired items -- Memory-efficient storage using byte slices - -## Building and Testing - -Use the provided Makefile: - -```bash -# Build the project -make build - -# Run tests -make test - -# Run benchmarks -make bench -``` - -## Contributing - -Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change. - -1. Fork the repository -2. Create your feature branch (`git checkout -b feature/AmazingFeature`) -3. Commit your changes (`git commit -m 'Add some AmazingFeature'`) -4. Push to the branch (`git push origin feature/AmazingFeature`) -5. Open a Pull Request - -## License - -This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. - -## Acknowledgments - -- Thanks to the Go team for the amazing standard library -- [go-json](https://github.com/goccy/go-json) for high-performance JSON operations - -## Todo - -- [ ] Add support for batch operations -- [ ] Implement data persistence -- [ ] Add metrics and monitoring -- [ ] Add compression options - -## Support - -If you have any questions or need help integrating MemoryStore, please: - -1. Check the [documentation](https://godoc.org/github.com/BryceWayne/MemoryStore) -2. Open an issue with a detailed description -3. Reach out through the discussions tab \ No newline at end of file +# ๐Ÿš€ MemoryStore + +MemoryStore is a high-performance, thread-safe, in-memory key-value store implemented in Go. It features automatic key expiration, JSON serialization support, batch operations, metrics, and an agnostic Publish/Subscribe system (supporting In-Memory and Google Cloud PubSub). + +[![Go Report Card](https://goreportcard.com/badge/github.com/BryceWayne/MemoryStore)](https://goreportcard.com/report/github.com/BryceWayne/MemoryStore) +[![GoDoc](https://godoc.org/github.com/BryceWayne/MemoryStore?status.svg)](https://godoc.org/github.com/BryceWayne/MemoryStore) + +## Features + +- ๐Ÿ”„ Thread-safe operations +- โฐ Automatic key expiration +- ๐Ÿงน Background cleanup of expired keys +- ๐Ÿ“ฆ Support for both raw bytes and JSON data +- ๐Ÿ’ช High-performance using go-json +- ๐Ÿš€ Batch operations (`SetMulti`, `GetMulti`) +- ๐Ÿ“Š Built-in Metrics and Monitoring +- ๐Ÿ“ก Agnostic Publish/Subscribe system (In-Memory & GCP PubSub) +- ๐Ÿ”’ Clean shutdown mechanism +- ๐Ÿ“ Comprehensive documentation + +## Installation + +```bash +go get github.com/BryceWayne/MemoryStore +``` + +## Quick Start + +Here's a simple example demonstrating basic usage: + +```go +package main + +import ( + "log" + "time" + "github.com/BryceWayne/MemoryStore/memorystore" +) + +func main() { + // Create a new store instance (defaults to In-Memory PubSub) + store := memorystore.NewMemoryStore() + defer store.Stop() + + // Store a string (converted to bytes) + err := store.Set("greeting", []byte("Hello, World!"), 1*time.Minute) + if err != nil { + log.Fatal(err) + } + + // Retrieve the value + if value, exists := store.Get("greeting"); exists { + log.Printf("Value: %s", string(value)) + } +} +``` + +## PubSub Usage (Canonical Example) + +MemoryStore supports an agnostic PubSub interface. By default, it uses an in-memory implementation. To use Google Cloud PubSub, simply provide your Project ID configuration. + +### Using Google Cloud PubSub + +```go +package main + +import ( + "log" + "time" + "github.com/BryceWayne/MemoryStore/memorystore" +) + +func main() { + // Configure with GCP Project ID + config := memorystore.Config{ + GCPProjectID: "my-gcp-project-id", + } + store := memorystore.NewMemoryStoreWithConfig(config) + defer store.Stop() + + // Subscribe to a topic + // Note: GCP PubSub creates a subscription for this topic + msgs, err := store.Subscribe("user-updates") + if err != nil { + log.Fatal(err) + } + + // Listen in background + go func() { + for msg := range msgs { + log.Printf("Received: %s", string(msg)) + } + }() + + // Publish to the topic + err = store.Publish("user-updates", []byte("User 123 logged in")) + if err != nil { + log.Fatal(err) + } + + // Give time for message delivery + time.Sleep(1 * time.Second) +} +``` + +Or simply set `GOOGLE_CLOUD_PROJECT` environment variable: + +```bash +export GOOGLE_CLOUD_PROJECT=my-project-id +``` +```go +store := memorystore.NewMemoryStore() // Automatically picks up GCP PubSub +``` + +## Advanced Usage + +### Working with JSON + +MemoryStore provides convenient methods for JSON serialization: + +```go +type User struct { + Name string `json:"name"` + Email string `json:"email"` +} + +func main() { + store := memorystore.NewMemoryStore() + defer store.Stop() + + // Store JSON data + user := User{Name: "Alice", Email: "alice@example.com"} + err := store.SetJSON("user:123", user, 1*time.Hour) + if err != nil { + log.Fatal(err) + } + + // Retrieve JSON data + var retrievedUser User + exists, err := store.GetJSON("user:123", &retrievedUser) + if err != nil { + log.Fatal(err) + } + if exists { + log.Printf("User: %+v", retrievedUser) + } +} +``` + +### Batch Operations + +Efficiently set or get multiple items at once: + +```go +items := map[string][]byte{ + "key1": []byte("val1"), + "key2": []byte("val2"), +} +store.SetMulti(items, time.Minute) + +results := store.GetMulti([]string{"key1", "key2"}) +``` + +### Metrics + +Monitor cache performance: + +```go +metrics := store.GetMetrics() +log.Printf("Hits: %d, Misses: %d, Items: %d", metrics.Hits, metrics.Misses, metrics.Items) +``` + +## Performance Considerations + +- Uses `github.com/goccy/go-json` for faster JSON operations +- Minimizes lock contention with RWMutex +- Efficient background cleanup of expired items +- Memory-efficient storage using byte slices + +## Building and Testing + +Use the provided Makefile: + +```bash +# Build the project +make build + +# Run tests +make test + +# Run benchmarks +make bench +``` + +## Contributing + +Contributions are welcome! Please feel free to submit a Pull Request. + +## License + +This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. diff --git a/go.mod b/go.mod index a1d8ff9..f78e48e 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,46 @@ module github.com/BryceWayne/MemoryStore -go 1.21 +go 1.24.0 + +toolchain go1.24.3 require ( github.com/goccy/go-json v0.10.2 - github.com/google/uuid v1.5.0 + github.com/google/uuid v1.6.0 +) + +require ( + cloud.google.com/go v0.121.6 // indirect + cloud.google.com/go/auth v0.16.4 // indirect + cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect + cloud.google.com/go/compute/metadata v0.8.0 // indirect + cloud.google.com/go/iam v1.5.2 // indirect + cloud.google.com/go/pubsub v1.50.1 // indirect + cloud.google.com/go/pubsub/v2 v2.0.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/s2a-go v0.1.9 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect + github.com/googleapis/gax-go/v2 v2.15.0 // indirect + go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect + go.opentelemetry.io/otel v1.36.0 // indirect + go.opentelemetry.io/otel/metric v1.36.0 // indirect + go.opentelemetry.io/otel/trace v1.36.0 // indirect + golang.org/x/crypto v0.41.0 // indirect + golang.org/x/net v0.43.0 // indirect + golang.org/x/oauth2 v0.30.0 // indirect + golang.org/x/sync v0.16.0 // indirect + golang.org/x/sys v0.35.0 // indirect + golang.org/x/text v0.28.0 // indirect + golang.org/x/time v0.12.0 // indirect + google.golang.org/api v0.247.0 // indirect + google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a // indirect + google.golang.org/grpc v1.74.2 // indirect + google.golang.org/protobuf v1.36.7 // indirect ) diff --git a/go.sum b/go.sum index 9f07c6d..6e49101 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,163 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.121.6 h1:waZiuajrI28iAf40cWgycWNgaXPO06dupuS+sgibK6c= +cloud.google.com/go v0.121.6/go.mod h1:coChdst4Ea5vUpiALcYKXEpR1S9ZgXbhEzzMcMR66vI= +cloud.google.com/go/auth v0.16.4 h1:fXOAIQmkApVvcIn7Pc2+5J8QTMVbUGLscnSVNl11su8= +cloud.google.com/go/auth v0.16.4/go.mod h1:j10ncYwjX/g3cdX7GpEzsdM+d+ZNsXAbb6qXA7p1Y5M= +cloud.google.com/go/auth/oauth2adapt v0.2.8 h1:keo8NaayQZ6wimpNSmW5OPc283g65QNIiLpZnkHRbnc= +cloud.google.com/go/auth/oauth2adapt v0.2.8/go.mod h1:XQ9y31RkqZCcwJWNSx2Xvric3RrU88hAYYbjDWYDL+c= +cloud.google.com/go/compute/metadata v0.8.0 h1:HxMRIbao8w17ZX6wBnjhcDkW6lTFpgcaobyVfZWqRLA= +cloud.google.com/go/compute/metadata v0.8.0/go.mod h1:sYOGTp851OV9bOFJ9CH7elVvyzopvWQFNNghtDQ/Biw= +cloud.google.com/go/iam v1.5.2 h1:qgFRAGEmd8z6dJ/qyEchAuL9jpswyODjA2lS+w234g8= +cloud.google.com/go/iam v1.5.2/go.mod h1:SE1vg0N81zQqLzQEwxL2WI6yhetBdbNQuTvIKCSkUHE= +cloud.google.com/go/pubsub v1.50.1 h1:fzbXpPyJnSGvWXF1jabhQeXyxdbCIkXTpjXHy7xviBM= +cloud.google.com/go/pubsub v1.50.1/go.mod h1:6YVJv3MzWJUVdvQXG081sFvS0dWQOdnV+oTo++q/xFk= +cloud.google.com/go/pubsub/v2 v2.0.0 h1:0qS6mRJ41gD1lNmM/vdm6bR7DQu6coQcVwD+VPf0Bz0= +cloud.google.com/go/pubsub/v2 v2.0.0/go.mod h1:0aztFxNzVQIRSZ8vUr79uH2bS3jwLebwK6q1sgEub+E= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= +github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/googleapis/enterprise-certificate-proxy v0.3.6 h1:GW/XbdyBFQ8Qe+YAmFU9uHLo7OnF5tL52HFAgMmyrf4= +github.com/googleapis/enterprise-certificate-proxy v0.3.6/go.mod h1:MkHOF77EYAE7qfSuSS9PU6g4Nt4e11cnsDUowfwewLA= +github.com/googleapis/gax-go/v2 v2.15.0 h1:SyjDc1mGgZU5LncH8gimWo9lW1DtIfPibOG81vgd/bo= +github.com/googleapis/gax-go/v2 v2.15.0/go.mod h1:zVVkkxAQHa1RQpg9z2AUCMnKhi0Qld9rcmyfL1OZhoc= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 h1:q4XOmH/0opmeuJtPsbFNivyl7bCt7yRBbeEm2sC/XtQ= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0/go.mod h1:snMWehoOh2wsEwnvvwtDyFCxVeDAODenXHtn5vzrKjo= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 h1:F7Jx+6hwnZ41NSFTO5q4LYDtJRXBf2PD0rNBkeB/lus= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0/go.mod h1:UHB22Z8QsdRDrnAtX4PntOl36ajSxcdUMt1sF7Y6E7Q= +go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= +go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= +go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE= +go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= +go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= +go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= +golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= +golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= +golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/api v0.247.0 h1:tSd/e0QrUlLsrwMKmkbQhYVa109qIintOls2Wh6bngc= +google.golang.org/api v0.247.0/go.mod h1:r1qZOPmxXffXg6xS5uhx16Fa/UFY8QU/K4bfKrnvovM= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20250603155806-513f23925822 h1:rHWScKit0gvAPuOnu87KpaYtjK5zBMLcULh7gxkCXu4= +google.golang.org/genproto v0.0.0-20250603155806-513f23925822/go.mod h1:HubltRL7rMh0LfnQPkMH4NPDFEWp0jw3vixw7jEM53s= +google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c h1:AtEkQdl5b6zsybXcbz00j1LwNodDuH6hVifIaNqk7NQ= +google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c/go.mod h1:ea2MjsO70ssTfCjiwHgI0ZFqcw45Ksuk2ckf9G468GA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a h1:tPE/Kp+x9dMSwUm/uM0JKK0IfdiJkwAbSMSeZBXXJXc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a/go.mod h1:gw1tLEfykwDz2ET4a12jcXt4couGAm7IwsVaTy0Sflo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= +google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4= +google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= +google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/main.go b/main.go index 381b14e..1c15d96 100644 --- a/main.go +++ b/main.go @@ -1,294 +1,115 @@ -// main.go -// Package main provides a demonstration of the memorystore package functionality. -// It shows various use cases including storing/retrieving data, handling expiration, -// and proper error handling. -package main - -import ( - "encoding/json" - "log" - "strings" - "sync" - "time" - - "github.com/BryceWayne/MemoryStore/memorystore" - "github.com/google/uuid" -) - -// Person represents a sample data structure used to demonstrate -// JSON serialization and deserialization with the MemoryStore. -type Person struct { - Name string `json:"name"` // Person's full name - Age int `json:"age"` // Person's age in years - UID string `json:"uid"` // Unique identifier - Created time.Time `json:"created,omitempty"` // Time when the record was created -} - -// demonstrateBasicOperations shows basic Set/Get operations with raw bytes -// and proper error handling. -func demonstrateBasicOperations(ms *memorystore.MemoryStore) { - log.Println("=== Demonstrating Basic Operations ===") - - // Create a sample person - person := Person{ - Name: "Alice Smith", - Age: 30, - UID: uuid.New().String(), - Created: time.Now(), - } - - // Using SetJSON for convenient JSON serialization - err := ms.SetJSON(person.UID, person, 2*time.Second) - if err != nil { - log.Printf("Failed to store person: %v", err) - return - } - log.Printf("Stored person with UID: %s", person.UID) - - // Retrieve the stored person - var retrievedPerson Person - exists, err := ms.GetJSON(person.UID, &retrievedPerson) - if err != nil { - log.Printf("Error retrieving person: %v", err) - return - } - if exists { - log.Printf("Retrieved person: %+v", retrievedPerson) - } -} - -// demonstrateExpiration shows how the cache handles expired items. -func demonstrateExpiration(ms *memorystore.MemoryStore) { - log.Println("\n=== Demonstrating Expiration ===") - - key := "expiring_key" - value := []byte("This value will expire soon") - shortDuration := 1 * time.Second - - if err := ms.Set(key, value, shortDuration); err != nil { - log.Printf("Failed to store expiring value: %v", err) - return - } - log.Printf("Stored value with %v expiration", shortDuration) - - // Wait for the value to expire - time.Sleep(shortDuration + 100*time.Millisecond) - - if _, exists := ms.Get(key); !exists { - log.Println("Value has expired as expected") - } -} - -// demonstrateNonExistentKeys shows how the cache handles missing keys. -func demonstrateNonExistentKeys(ms *memorystore.MemoryStore) { - log.Println("\n=== Demonstrating Non-Existent Keys ===") - - nonExistentKey := uuid.New().String() - if _, exists := ms.Get(nonExistentKey); !exists { - log.Printf("As expected, key does not exist: %s", nonExistentKey) - } - - var person Person - exists, err := ms.GetJSON(nonExistentKey, &person) - if !exists && err == nil { - log.Println("GetJSON correctly handles non-existent keys") - } -} - -// demonstrateStoreLifecycle shows proper initialization and cleanup of the MemoryStore. -func demonstrateStoreLifecycle() { - log.Println("\n=== Demonstrating Store Lifecycle ===") - - // Create a new store - ms := memorystore.NewMemoryStore() - log.Println("Created new MemoryStore") - - // Store a value - key := "lifecycle_test" - if err := ms.Set(key, []byte("test"), time.Minute); err != nil { - log.Printf("Failed to store value: %v", err) - return - } - - // Properly stop the store - if err := ms.Stop(); err != nil { - log.Printf("Error stopping store: %v", err) - return - } - log.Println("Successfully stopped MemoryStore") - - // Verify store is stopped - if ms.IsStopped() { - log.Println("Confirmed store is stopped") - } -} - -// demonstratePubSub shows the publish/subscribe functionality -// with pattern matching and multiple subscribers. -// demonstratePubSub shows the publish/subscribe functionality -// with complex pattern matching and JSON message support. -func demonstratePubSub(ms *memorystore.MemoryStore) { - log.Println("\n=== Demonstrating PubSub System ===") - - var wg sync.WaitGroup - - // 1. Complex Pattern Matching Examples - log.Println("Setting up pattern-based subscriptions...") - patterns := map[string]<-chan []byte{} // Store channels for cleanup - - // Subscribe to various patterns - subscribePatterns := []string{ - "users:*:status", // Match all user statuses - "users:admin:*", // Match all admin events - "orders:*.completed", // Match all completed orders - "notifications:*:high", // Match high-priority notifications - "system:*.error", // Match all system errors - } - - for _, pattern := range subscribePatterns { - ch, err := ms.Subscribe(pattern) - if err != nil { - log.Printf("Failed to subscribe to %s: %v", pattern, err) - continue - } - patterns[pattern] = ch - log.Printf("Subscribed to pattern: %s", pattern) - } - - // 2. JSON Message Integration - type UserStatus struct { - UserID string `json:"user_id"` - Status string `json:"status"` - LastSeen time.Time `json:"last_seen"` - Metadata map[string]string `json:"metadata"` - } - - type OrderEvent struct { - OrderID string `json:"order_id"` - Status string `json:"status"` - CompletedAt time.Time `json:"completed_at"` - Total float64 `json:"total"` - } - - // Set up listeners for each pattern - for pattern, ch := range patterns { - wg.Add(1) - go func(pattern string, ch <-chan []byte) { - defer wg.Done() - log.Printf("Listening on pattern: %s", pattern) - - select { - case msg := <-ch: - // Try to decode as UserStatus if it's a user event - if strings.HasPrefix(pattern, "users:") { - var status UserStatus - if err := json.Unmarshal(msg, &status); err == nil { - log.Printf("[%s] User Status Update: %+v", pattern, status) - } else { - log.Printf("[%s] Raw message: %s", pattern, string(msg)) - } - } else if strings.HasPrefix(pattern, "orders:") { - // Try to decode as OrderEvent - var order OrderEvent - if err := json.Unmarshal(msg, &order); err == nil { - log.Printf("[%s] Order Event: %+v", pattern, order) - } else { - log.Printf("[%s] Raw message: %s", pattern, string(msg)) - } - } else { - log.Printf("[%s] Message received: %s", pattern, string(msg)) - } - case <-time.After(2 * time.Second): - log.Printf("[%s] No message received", pattern) - } - }(pattern, ch) - } - - // Publish various types of messages - time.Sleep(100 * time.Millisecond) // Ensure subscribers are ready - log.Println("\nPublishing messages...") - - // Publish JSON user status - adminStatus := UserStatus{ - UserID: "admin123", - Status: "online", - LastSeen: time.Now(), - Metadata: map[string]string{"location": "NYC", "device": "desktop"}, - } - statusJSON, _ := json.Marshal(adminStatus) - ms.Publish("users:admin:status", statusJSON) - - // Publish JSON order completion - order := OrderEvent{ - OrderID: "ORD-789", - Status: "completed", - CompletedAt: time.Now(), - Total: 299.99, - } - orderJSON, _ := json.Marshal(order) - ms.Publish("orders:ORD-789.completed", orderJSON) - - // Publish system error - ms.Publish("system:database.error", []byte("Connection timeout")) - - // Publish high-priority notification - ms.Publish("notifications:user123:high", []byte("Account security alert")) - - // Wait for message processing - wg.Wait() - - log.Println("\nDemonstrating pattern matching scenarios...") - // Show which patterns match different keys - testCases := []struct { - channel string - patterns []string - }{ - { - channel: "users:admin:login", - patterns: []string{"users:admin:*"}, - }, - { - channel: "orders:xyz-789.completed", - patterns: []string{"orders:*.completed"}, - }, - { - channel: "notifications:admin:high", - patterns: []string{"notifications:*:high"}, - }, - } - - for _, tc := range testCases { - log.Printf("Channel '%s' matches patterns: %v", tc.channel, tc.patterns) - } - - // Cleanup - log.Println("\nCleaning up subscriptions...") - for pattern := range patterns { - if err := ms.Unsubscribe(pattern); err != nil { - log.Printf("Error unsubscribing from %s: %v", pattern, err) - } else { - log.Printf("Unsubscribed from %s", pattern) - } - } -} - -func main() { - // Create a new MemoryStore instance - ms := memorystore.NewMemoryStore() - - // Ensure proper cleanup when main exits - defer func() { - if err := ms.Stop(); err != nil { - log.Printf("Error stopping MemoryStore: %v", err) - } - }() - - // Run demonstrations - demonstrateBasicOperations(ms) - demonstrateExpiration(ms) - demonstrateNonExistentKeys(ms) - demonstratePubSub(ms) // Add this line - demonstrateStoreLifecycle() - - log.Println("\nAll demonstrations completed successfully") -} +// main.go +// Package main provides a demonstration of the memorystore package functionality. +package main + +import ( + "log" + "os" + "sync" + "time" + + "github.com/BryceWayne/MemoryStore/memorystore" + "github.com/google/uuid" +) + +// Person represents a sample data structure. +type Person struct { + Name string `json:"name"` + Age int `json:"age"` + UID string `json:"uid"` + Created time.Time `json:"created,omitempty"` +} + +// demonstrateBasicOperations shows basic Set/Get operations. +func demonstrateBasicOperations(ms *memorystore.MemoryStore) { + log.Println("=== Demonstrating Basic Operations ===") + + person := Person{ + Name: "Alice Smith", + Age: 30, + UID: uuid.New().String(), + Created: time.Now(), + } + + err := ms.SetJSON(person.UID, person, 2*time.Second) + if err != nil { + log.Printf("Failed to store person: %v", err) + return + } + log.Printf("Stored person with UID: %s", person.UID) + + var retrievedPerson Person + exists, err := ms.GetJSON(person.UID, &retrievedPerson) + if err != nil { + log.Printf("Error retrieving person: %v", err) + return + } + if exists { + log.Printf("Retrieved person: %+v", retrievedPerson) + } +} + +// demonstratePubSub shows the publish/subscribe functionality. +// Note: We use exact topic names to ensure compatibility with GCP PubSub. +func demonstratePubSub(ms *memorystore.MemoryStore) { + log.Println("\n=== Demonstrating PubSub System ===") + + var wg sync.WaitGroup + topics := []string{"updates", "alerts"} + chans := make(map[string]<-chan []byte) + + // Subscribe + for _, topic := range topics { + ch, err := ms.Subscribe(topic) + if err != nil { + log.Printf("Failed to subscribe to %s: %v", topic, err) + continue + } + chans[topic] = ch + log.Printf("Subscribed to topic: %s", topic) + } + + // Listeners + for topic, ch := range chans { + wg.Add(1) + go func(t string, c <-chan []byte) { + defer wg.Done() + for { + select { + case msg, ok := <-c: + if !ok { + return + } + log.Printf("[%s] Received: %s", t, string(msg)) + case <-time.After(3 * time.Second): + return + } + } + }(topic, ch) + } + + // Publish + time.Sleep(500 * time.Millisecond) // Wait for subscriptions + ms.Publish("updates", []byte("System update available")) + ms.Publish("alerts", []byte("High CPU usage")) + + wg.Wait() + + // Unsubscribe + for _, topic := range topics { + ms.Unsubscribe(topic) + } +} + +func main() { + // Example of using GCP PubSub if env var is set + if os.Getenv("GOOGLE_CLOUD_PROJECT") == "" { + log.Println("Note: Set GOOGLE_CLOUD_PROJECT env var to test GCP PubSub backend") + } + + ms := memorystore.NewMemoryStore() + defer ms.Stop() + + demonstrateBasicOperations(ms) + demonstratePubSub(ms) +} diff --git a/memorystore/batch_test.go b/memorystore/batch_test.go new file mode 100644 index 0000000..60060c1 --- /dev/null +++ b/memorystore/batch_test.go @@ -0,0 +1,64 @@ +package memorystore + +import ( + "testing" + "time" +) + +func TestBatchOperations(t *testing.T) { + ms := NewMemoryStore() + defer ms.Stop() + + // Test SetMulti + items := map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + "key3": []byte("value3"), + } + + if err := ms.SetMulti(items, time.Minute); err != nil { + t.Fatalf("SetMulti failed: %v", err) + } + + // Verify items are set + for k, v := range items { + val, exists := ms.Get(k) + if !exists { + t.Errorf("Key %s not found", k) + } + if string(val) != string(v) { + t.Errorf("Value mismatch for key %s: expected %s, got %s", k, v, val) + } + } + + // Test GetMulti + keys := []string{"key1", "key2", "key3", "nonexistent"} + results := ms.GetMulti(keys) + + if len(results) != 3 { + t.Errorf("Expected 3 items, got %d", len(results)) + } + + for k, v := range items { + if val, ok := results[k]; !ok { + t.Errorf("Key %s missing from results", k) + } else if string(val) != string(v) { + t.Errorf("Value mismatch for key %s: expected %s, got %s", k, v, val) + } + } + + if _, ok := results["nonexistent"]; ok { + t.Error("Non-existent key returned in results") + } + + // Verify metrics update + metrics := ms.GetMetrics() + // Hits: 3 from individual Get calls in loop + 3 from GetMulti call + // Misses: 1 from GetMulti call (nonexistent) + if metrics.Hits != 6 { + t.Errorf("Expected 6 hits, got %d", metrics.Hits) + } + if metrics.Misses != 1 { + t.Errorf("Expected 1 miss, got %d", metrics.Misses) + } +} diff --git a/memorystore/memorystore.go b/memorystore/memorystore.go index 1674985..a9cd172 100644 --- a/memorystore/memorystore.go +++ b/memorystore/memorystore.go @@ -1,206 +1,310 @@ -// memorystore/memorystore.go -// Package memorystore provides a simple in-memory cache implementation with automatic cleanup -// of expired items. It supports both raw byte storage and JSON serialization/deserialization -// of structured data. -package memorystore - -import ( - "context" - "sync" - "time" - - "github.com/goccy/go-json" -) - -// item represents a single cache entry with its value and expiration time. -type item struct { - value []byte // Raw data stored as a byte slice - expiresAt time.Time // Time at which this item should be considered expired -} - -// MemoryStore implements an in-memory cache with automatic cleanup of expired items. -// It is safe for concurrent use by multiple goroutines. -type MemoryStore struct { - mu sync.RWMutex // Protects access to the store map - store map[string]item // Internal storage for cache items - ps *pubSubManager // PubSub manager for cache events - ctx context.Context // Context for controlling the cleanup worker - cancelFunc context.CancelFunc // Function to stop the cleanup worker - wg sync.WaitGroup // WaitGroup for cleanup goroutine synchronization -} - -// NewMemoryStore creates and initializes a new MemoryStore instance. -// It starts a background worker that periodically cleans up expired items. -// The returned MemoryStore is ready for use. -func NewMemoryStore() *MemoryStore { - ctx, cancel := context.WithCancel(context.Background()) - ms := &MemoryStore{ - store: make(map[string]item), - ctx: ctx, - cancelFunc: cancel, - } - ms.initPubSub() - ms.startCleanupWorker() - return ms -} - -// Stop gracefully shuts down the MemoryStore by stopping the cleanup goroutine -// and releasing associated resources. After calling Stop, the store cannot be used. -// Multiple calls to Stop will not cause a panic and return nil. -// -// Example: -// -// store := NewMemoryStore() -// defer store.Stop() -func (m *MemoryStore) Stop() error { - m.mu.Lock() - defer m.mu.Unlock() - - if m.cancelFunc == nil { - return nil - } - - m.cancelFunc() - m.cancelFunc = nil - - m.cleanupPubSub() - - // Wait for cleanup goroutine to finish - m.wg.Wait() - - // Clear the store to free up memory - m.store = nil - - return nil -} - -// IsStopped returns true if the MemoryStore has been stopped and can no longer be used. -// This method is safe for concurrent use. -// -// Example: -// -// if store.IsStopped() { -// log.Println("Store is no longer available") -// return -// } -func (m *MemoryStore) IsStopped() bool { - m.mu.RLock() - defer m.mu.RUnlock() - return m.cancelFunc == nil -} - -// startCleanupWorker initiates a background goroutine that periodically -// removes expired items from the cache. The cleanup interval is set to 1 minute. -func (m *MemoryStore) startCleanupWorker() { - m.wg.Add(1) - go func() { - defer m.wg.Done() - ticker := time.NewTicker(1 * time.Minute) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - m.cleanupExpiredItems() - case <-m.ctx.Done(): - return - } - } - }() -} - -// cleanupExpiredItems removes all expired items from the cache. -// This method acquires a write lock on the store while performing the cleanup. -func (m *MemoryStore) cleanupExpiredItems() { - m.mu.Lock() - defer m.mu.Unlock() - for key, item := range m.store { - if time.Now().After(item.expiresAt) { - delete(m.store, key) - } - } -} - -// Set stores a raw byte slice in the cache with the specified key and duration. -// The item will automatically expire after the specified duration. -// If an error occurs, it will be returned to the caller. -func (m *MemoryStore) Set(key string, value []byte, duration time.Duration) error { - m.mu.Lock() - defer m.mu.Unlock() - - m.store[key] = item{ - value: value, - expiresAt: time.Now().Add(duration), - } - - return nil -} - -// SetJSON stores a JSON-serializable value in the cache. -// The value is serialized to JSON before storage. -// Returns an error if JSON marshaling fails. -// -// Example: -// -// type User struct { -// Name string -// Age int -// } -// user := User{Name: "John", Age: 30} -// err := cache.SetJSON("user:123", user, 1*time.Hour) -func (m *MemoryStore) SetJSON(key string, value interface{}, duration time.Duration) error { - data, err := json.Marshal(value) - if err != nil { - return err - } - return m.Set(key, data, duration) -} - -// Get retrieves a value from the cache. -// Returns the value and a boolean indicating whether the key was found. -// If the item has expired, returns (nil, false). -func (m *MemoryStore) Get(key string) ([]byte, bool) { - m.mu.RLock() - defer m.mu.RUnlock() - - it, exists := m.store[key] - if !exists || time.Now().After(it.expiresAt) { - return nil, false - } - - return it.value, true -} - -// GetJSON retrieves and deserializes a JSON value from the cache into the provided interface. -// Returns a boolean indicating if the key was found and any error that occurred during deserialization. -// -// Example: -// -// var user User -// exists, err := cache.GetJSON("user:123", &user) -// if err != nil { -// // Handle error -// } else if exists { -// fmt.Printf("Found user: %+v\n", user) -// } -func (m *MemoryStore) GetJSON(key string, dest interface{}) (bool, error) { - data, exists := m.Get(key) - if !exists { - return false, nil - } - - err := json.Unmarshal(data, dest) - if err != nil { - return true, err - } - - return true, nil -} - -// Delete removes an item from the cache. -// If the key doesn't exist, the operation is a no-op. -func (m *MemoryStore) Delete(key string) { - m.mu.Lock() - defer m.mu.Unlock() - delete(m.store, key) -} +// memorystore/memorystore.go +// Package memorystore provides a simple in-memory cache implementation with automatic cleanup +// of expired items. It supports both raw byte storage and JSON serialization/deserialization +// of structured data. +package memorystore + +import ( + "context" + "log" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/goccy/go-json" +) + +// item represents a single cache entry with its value and expiration time. +type item struct { + value []byte // Raw data stored as a byte slice + expiresAt time.Time // Time at which this item should be considered expired +} + +// StoreMetrics holds statistics about the cache usage. +type StoreMetrics struct { + Items int // Current number of items in the cache + Hits int64 // Total number of cache hits + Misses int64 // Total number of cache misses + Evictions int64 // Total number of items evicted (expired) +} + +// MemoryStore implements an in-memory cache with automatic cleanup of expired items. +// It is safe for concurrent use by multiple goroutines. +type MemoryStore struct { + mu sync.RWMutex // Protects access to the store map + store map[string]item // Internal storage for cache items + ps PubSubClient // PubSub client for cache events + ctx context.Context // Context for controlling the cleanup worker + cancelFunc context.CancelFunc // Function to stop the cleanup worker + wg sync.WaitGroup // WaitGroup for cleanup goroutine synchronization + + // Metrics + hits int64 // Atomic counter for cache hits + misses int64 // Atomic counter for cache misses + evictions int64 // Atomic counter for evicted items +} + +// NewMemoryStore creates and initializes a new MemoryStore instance. +// It checks for GOOGLE_CLOUD_PROJECT environment variable to decide whether to use GCP PubSub. +// Use NewMemoryStoreWithConfig for more control. +func NewMemoryStore() *MemoryStore { + config := Config{} + if projectID := os.Getenv("GOOGLE_CLOUD_PROJECT"); projectID != "" { + config.GCPProjectID = projectID + } + return NewMemoryStoreWithConfig(config) +} + +// NewMemoryStoreWithConfig creates a new MemoryStore with the provided configuration. +func NewMemoryStoreWithConfig(config Config) *MemoryStore { + ctx, cancel := context.WithCancel(context.Background()) + ms := &MemoryStore{ + store: make(map[string]item), + ctx: ctx, + cancelFunc: cancel, + } + + ms.initPubSub(config) + ms.startCleanupWorker() + return ms +} + +// Stop gracefully shuts down the MemoryStore by stopping the cleanup goroutine +// and releasing associated resources. After calling Stop, the store cannot be used. +func (m *MemoryStore) Stop() error { + m.mu.Lock() + defer m.mu.Unlock() + + if m.cancelFunc == nil { + return nil + } + + m.cancelFunc() + m.cancelFunc = nil + + m.cleanupPubSub() + + // Wait for cleanup goroutine to finish + m.wg.Wait() + + // Clear the store to free up memory + m.store = nil + + return nil +} + +// IsStopped returns true if the MemoryStore has been stopped and can no longer be used. +func (m *MemoryStore) IsStopped() bool { + m.mu.RLock() + defer m.mu.RUnlock() + return m.cancelFunc == nil +} + +// startCleanupWorker initiates a background goroutine that periodically +// removes expired items from the cache. +func (m *MemoryStore) startCleanupWorker() { + m.wg.Add(1) + go func() { + defer m.wg.Done() + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + m.cleanupExpiredItems() + case <-m.ctx.Done(): + return + } + } + }() +} + +// cleanupExpiredItems removes all expired items from the cache. +func (m *MemoryStore) cleanupExpiredItems() { + m.mu.Lock() + defer m.mu.Unlock() + for key, item := range m.store { + if time.Now().After(item.expiresAt) { + delete(m.store, key) + atomic.AddInt64(&m.evictions, 1) + } + } +} + +// Set stores a raw byte slice in the cache with the specified key and duration. +func (m *MemoryStore) Set(key string, value []byte, duration time.Duration) error { + m.mu.Lock() + defer m.mu.Unlock() + + m.store[key] = item{ + value: value, + expiresAt: time.Now().Add(duration), + } + + return nil +} + +// SetJSON stores a JSON-serializable value in the cache. +func (m *MemoryStore) SetJSON(key string, value interface{}, duration time.Duration) error { + data, err := json.Marshal(value) + if err != nil { + return err + } + return m.Set(key, data, duration) +} + +// Get retrieves a value from the cache. +func (m *MemoryStore) Get(key string) ([]byte, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + + it, exists := m.store[key] + if !exists || time.Now().After(it.expiresAt) { + atomic.AddInt64(&m.misses, 1) + return nil, false + } + + atomic.AddInt64(&m.hits, 1) + return it.value, true +} + +// GetJSON retrieves and deserializes a JSON value from the cache. +func (m *MemoryStore) GetJSON(key string, dest interface{}) (bool, error) { + data, exists := m.Get(key) + if !exists { + return false, nil + } + + err := json.Unmarshal(data, dest) + if err != nil { + return true, err + } + + return true, nil +} + +// Delete removes an item from the cache. +func (m *MemoryStore) Delete(key string) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.store, key) +} + +// SetMulti stores multiple key-value pairs in the cache. +func (m *MemoryStore) SetMulti(items map[string][]byte, duration time.Duration) error { + m.mu.Lock() + defer m.mu.Unlock() + + expiresAt := time.Now().Add(duration) + for key, value := range items { + m.store[key] = item{ + value: value, + expiresAt: expiresAt, + } + } + return nil +} + +// GetMulti retrieves multiple values from the cache. +func (m *MemoryStore) GetMulti(keys []string) map[string][]byte { + m.mu.RLock() + defer m.mu.RUnlock() + + result := make(map[string][]byte) + now := time.Now() + + for _, key := range keys { + it, exists := m.store[key] + if exists && !now.After(it.expiresAt) { + result[key] = it.value + atomic.AddInt64(&m.hits, 1) + } else { + atomic.AddInt64(&m.misses, 1) + } + } + + return result +} + +// GetMetrics returns the current statistics of the MemoryStore. +func (m *MemoryStore) GetMetrics() StoreMetrics { + m.mu.RLock() + itemCount := len(m.store) + m.mu.RUnlock() + + return StoreMetrics{ + Items: itemCount, + Hits: atomic.LoadInt64(&m.hits), + Misses: atomic.LoadInt64(&m.misses), + Evictions: atomic.LoadInt64(&m.evictions), + } +} + +// Subscribe subscribes to a topic. +func (m *MemoryStore) Subscribe(topic string) (<-chan []byte, error) { + if m.IsStopped() { + return nil, ErrStoreStopped + } + return m.ps.Subscribe(topic) +} + +// Publish publishes a message to a topic. +func (m *MemoryStore) Publish(topic string, message []byte) error { + if m.IsStopped() { + return ErrStoreStopped + } + return m.ps.Publish(topic, message) +} + +// Unsubscribe unsubscribes from a topic. +func (m *MemoryStore) Unsubscribe(topic string) error { + if m.IsStopped() { + return ErrStoreStopped + } + return m.ps.Unsubscribe(topic) +} + +// SubscriberCount returns the number of subscribers for a pattern. +// Note: This is not supported by the common interface and will return 0 or error in future. +// For now, it only works if the underlying implementation is In-Memory. +func (m *MemoryStore) SubscriberCount(pattern string) int { + // Not part of the interface. + // If we need this, we should add it to the interface or check type. + if ps, ok := m.ps.(*InMemoryPubSub); ok { + ps.mu.RLock() + defer ps.mu.RUnlock() + count := 0 + for p, subs := range ps.subscriptions { + if p == pattern { + count += len(subs) + } + } + return count + } + return 0 +} + +// initPubSub initializes the PubSub system. +func (m *MemoryStore) initPubSub(config Config) { + if config.GCPProjectID != "" { + ps, err := NewGCPPubSub(context.Background(), config.GCPProjectID) + if err == nil { + m.ps = ps + log.Printf("Initialized GCP PubSub with project %s", config.GCPProjectID) + return + } + log.Printf("Failed to initialize GCP PubSub: %v. Falling back to In-Memory.", err) + } + + m.ps = newInMemoryPubSub() + log.Println("Initialized In-Memory PubSub") +} + +// cleanupPubSub cleans up the PubSub system. +func (m *MemoryStore) cleanupPubSub() { + if m.ps != nil { + m.ps.Close() + } +} diff --git a/memorystore/metrics_test.go b/memorystore/metrics_test.go new file mode 100644 index 0000000..7f84366 --- /dev/null +++ b/memorystore/metrics_test.go @@ -0,0 +1,51 @@ +package memorystore + +import ( + "testing" + "time" +) + +func TestMetrics(t *testing.T) { + ms := NewMemoryStore() + defer ms.Stop() + + // Initial metrics should be zero + metrics := ms.GetMetrics() + if metrics.Hits != 0 || metrics.Misses != 0 || metrics.Evictions != 0 || metrics.Items != 0 { + t.Errorf("Expected initial metrics to be zero, got %+v", metrics) + } + + // Test Hits + ms.Set("key1", []byte("value1"), time.Minute) + ms.Get("key1") + metrics = ms.GetMetrics() + if metrics.Hits != 1 { + t.Errorf("Expected 1 hit, got %d", metrics.Hits) + } + + // Test Misses + ms.Get("nonexistent") + metrics = ms.GetMetrics() + if metrics.Misses != 1 { + t.Errorf("Expected 1 miss, got %d", metrics.Misses) + } + + // Test Items + ms.Set("key2", []byte("value2"), time.Minute) + metrics = ms.GetMetrics() + if metrics.Items != 2 { + t.Errorf("Expected 2 items, got %d", metrics.Items) + } + + // Test Evictions + ms.Set("expired", []byte("expired"), 1*time.Millisecond) + time.Sleep(100 * time.Millisecond) // Wait for expiration + + // Trigger cleanup + ms.cleanupExpiredItems() + + metrics = ms.GetMetrics() + if metrics.Evictions != 1 { + t.Errorf("Expected 1 eviction, got %d", metrics.Evictions) + } +} diff --git a/memorystore/pubsub.go b/memorystore/pubsub.go index 9c87633..da06ce7 100644 --- a/memorystore/pubsub.go +++ b/memorystore/pubsub.go @@ -1,212 +1,41 @@ -// memorystore/pubsub.go -package memorystore - -import ( - "context" - "errors" - "strings" - "sync" -) - -// Constants for PubSub configuration -const ( - defaultChannelBuffer = 100 // Default buffer size for subscriber channels -) - -// Common errors for PubSub operations -var ( - ErrInvalidPattern = errors.New("invalid subscription pattern") - ErrStoreStopped = errors.New("store has been stopped") -) - -// subscription represents an individual subscriber -type subscription struct { - pattern string // The pattern this subscription matches - ch chan []byte // Channel for sending messages to the subscriber - ctx context.Context // Context for managing subscription lifetime - cancel func() // Function to cancel the subscription context -} - -// pubSubManager handles all publish/subscribe operations -type pubSubManager struct { - mu sync.RWMutex - subscriptions map[string][]*subscription // Pattern -> subscriptions mapping - wg sync.WaitGroup // For graceful shutdown -} - -// newPubSubManager creates and initializes a new pubSubManager -func newPubSubManager() *pubSubManager { - return &pubSubManager{ - subscriptions: make(map[string][]*subscription), - } -} - -// Subscribe creates a new subscription for the given pattern -func (m *MemoryStore) Subscribe(pattern string) (<-chan []byte, error) { - if m.IsStopped() { - return nil, ErrStoreStopped - } - - if pattern == "" { - return nil, ErrInvalidPattern - } - - // Create subscription context - ctx, cancel := context.WithCancel(context.Background()) - ch := make(chan []byte, defaultChannelBuffer) - - sub := &subscription{ - pattern: pattern, - ch: ch, - ctx: ctx, - cancel: cancel, - } - - // Add subscription to manager - m.ps.mu.Lock() - m.ps.subscriptions[pattern] = append(m.ps.subscriptions[pattern], sub) - m.ps.mu.Unlock() - - // Add to wait group for graceful shutdown - m.ps.wg.Add(1) - - // Cleanup goroutine - go func() { - defer m.ps.wg.Done() - <-ctx.Done() - m.removeSubscription(pattern, sub) - close(ch) - }() - - return ch, nil -} - -// Publish sends a message to all subscribers matching the given channel -func (m *MemoryStore) Publish(channel string, message []byte) error { - if m.IsStopped() { - return ErrStoreStopped - } - - m.ps.mu.RLock() - defer m.ps.mu.RUnlock() - - // Find all matching subscriptions and publish to them - for pattern, subs := range m.ps.subscriptions { - if matchesPattern(channel, pattern) { - for _, sub := range subs { - select { - case <-sub.ctx.Done(): - continue // Skip closed subscriptions - default: - select { - case sub.ch <- message: - default: - // Channel is full, skip this subscriber - // Could add logging or metrics here - } - } - } - } - } - - return nil -} - -// Unsubscribe cancels all subscriptions for the given pattern -func (m *MemoryStore) Unsubscribe(pattern string) error { - if m.IsStopped() { - return ErrStoreStopped - } - - m.ps.mu.Lock() - defer m.ps.mu.Unlock() - - subs, exists := m.ps.subscriptions[pattern] - if !exists { - return nil - } - - // Cancel all subscriptions for this pattern - for _, sub := range subs { - sub.cancel() - } - - delete(m.ps.subscriptions, pattern) - return nil -} - -// SubscriberCount returns the number of subscribers for a given pattern -func (m *MemoryStore) SubscriberCount(pattern string) int { - m.ps.mu.RLock() - defer m.ps.mu.RUnlock() - - count := 0 - for p, subs := range m.ps.subscriptions { - if p == pattern { - count += len(subs) - } - } - return count -} - -// removeSubscription removes a specific subscription -func (m *MemoryStore) removeSubscription(pattern string, sub *subscription) { - m.ps.mu.Lock() - defer m.ps.mu.Unlock() - - subs := m.ps.subscriptions[pattern] - for i, s := range subs { - if s == sub { - // Remove subscription from slice - subs = append(subs[:i], subs[i+1:]...) - break - } - } - - if len(subs) == 0 { - delete(m.ps.subscriptions, pattern) - } else { - m.ps.subscriptions[pattern] = subs - } -} - -// matchesPattern checks if a channel matches a subscription pattern -func matchesPattern(channel, pattern string) bool { - // Split pattern and channel into segments - patternParts := strings.Split(pattern, ":") - channelParts := strings.Split(channel, ":") - - if len(patternParts) != len(channelParts) { - return false - } - - // Check each segment - for i, patternPart := range patternParts { - if patternPart != "*" && patternPart != channelParts[i] { - return false - } - } - - return true -} - -// initPubSub initializes the PubSub system for the MemoryStore -func (m *MemoryStore) initPubSub() { - m.ps = newPubSubManager() -} - -// cleanupPubSub performs cleanup of the PubSub system during store shutdown -func (m *MemoryStore) cleanupPubSub() { - m.ps.mu.Lock() - // Cancel all subscriptions - for pattern, subs := range m.ps.subscriptions { - for _, sub := range subs { - sub.cancel() - } - delete(m.ps.subscriptions, pattern) - } - m.ps.mu.Unlock() - - // Wait for all subscription goroutines to finish - m.ps.wg.Wait() -} +package memorystore + +import ( + "errors" + "time" +) + +// PubSubClient defines the interface for Publish/Subscribe operations +// to make the underlying implementation agnostic (In-Memory, GCP, etc). +type PubSubClient interface { + // Subscribe subscribes to a topic and returns a channel for messages. + // For GCP PubSub, 'topic' maps to a Topic, and a temporary subscription is created. + // For In-Memory, 'topic' is the pattern/channel name. + Subscribe(topic string) (<-chan []byte, error) + + // Publish sends a message to a topic. + Publish(topic string, message []byte) error + + // Unsubscribe stops receiving messages for a topic and cleans up resources. + Unsubscribe(topic string) error + + // Close shuts down the client and cleans up resources. + Close() error +} + +// Config holds configuration for the MemoryStore and its components. +type Config struct { + // GCPProjectID is the Google Cloud Project ID. + // If set, MemoryStore will attempt to use GCP PubSub. + GCPProjectID string + + // PubSubTimeout is the timeout for PubSub operations. + PubSubTimeout time.Duration +} + +// Common errors for PubSub operations +var ( + ErrInvalidTopic = errors.New("invalid topic") + ErrStoreStopped = errors.New("store has been stopped") + ErrNotImplemented = errors.New("feature not implemented by provider") +) diff --git a/memorystore/pubsub_gcp.go b/memorystore/pubsub_gcp.go new file mode 100644 index 0000000..0007f8a --- /dev/null +++ b/memorystore/pubsub_gcp.go @@ -0,0 +1,174 @@ +package memorystore + +import ( + "context" + "fmt" + "sync" + "time" + + "cloud.google.com/go/pubsub" + "github.com/google/uuid" + "google.golang.org/api/option" +) + +// GCPPubSub implements PubSubClient using Google Cloud PubSub +type GCPPubSub struct { + client *pubsub.Client + mu sync.RWMutex + subscriptions map[string]*gcpSubscription // Map topic/pattern -> subscription info + projectID string +} + +type gcpSubscription struct { + subID string + sub *pubsub.Subscription + cancel context.CancelFunc + wg sync.WaitGroup +} + +// NewGCPPubSub creates a new GCP PubSub client +func NewGCPPubSub(ctx context.Context, projectID string, opts ...option.ClientOption) (*GCPPubSub, error) { + client, err := pubsub.NewClient(ctx, projectID, opts...) + if err != nil { + return nil, err + } + + return &GCPPubSub{ + client: client, + subscriptions: make(map[string]*gcpSubscription), + projectID: projectID, + }, nil +} + +// Subscribe subscribes to a topic. +// Note: In GCP PubSub, we must create a Subscription to the Topic. +// Since this is a temporary/session-based subscription, we create a unique subscription ID +// and delete it when Unsubscribe is called or the client is closed. +func (g *GCPPubSub) Subscribe(topicName string) (<-chan []byte, error) { + g.mu.Lock() + defer g.mu.Unlock() + + // 1. Ensure Topic exists + topic := g.client.Topic(topicName) + exists, err := topic.Exists(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to check topic existence: %w", err) + } + if !exists { + // Try to create topic if it doesn't exist? + // Usually good for dev, might fail in strict prod. Let's try. + topic, err = g.client.CreateTopic(context.Background(), topicName) + if err != nil { + return nil, fmt.Errorf("failed to create topic: %w", err) + } + } + + // 2. Create a unique subscription + // We use a unique ID so each client instance gets its own copy of messages (Pub/Sub fan-out). + subID := fmt.Sprintf("memorystore-sub-%s-%s", topicName, uuid.New().String()) + sub, err := g.client.CreateSubscription(context.Background(), subID, pubsub.SubscriptionConfig{ + Topic: topic, + ExpirationPolicy: time.Duration(24 * time.Hour), // Auto-delete if unused + }) + if err != nil { + return nil, fmt.Errorf("failed to create subscription: %w", err) + } + + // 3. Start receiving messages + ch := make(chan []byte, 100) + ctx, cancel := context.WithCancel(context.Background()) + + gcpSub := &gcpSubscription{ + subID: subID, + sub: sub, + cancel: cancel, + } + + gcpSub.wg.Add(1) + go func() { + defer gcpSub.wg.Done() + defer close(ch) + + err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) { + // Forward message to channel + select { + case ch <- msg.Data: + msg.Ack() + case <-ctx.Done(): + msg.Nack() + } + }) + if err != nil && err != context.Canceled { + // Log error? + // fmt.Printf("Receive error: %v\n", err) + } + }() + + g.subscriptions[topicName] = gcpSub + return ch, nil +} + +// Publish publishes a message to the topic +func (g *GCPPubSub) Publish(topicName string, message []byte) error { + // Ensure topic exists (lazy creation) + // For performance, we might assume it exists or cache existence, but let's be safe. + // Actually, checking every time is slow. + // But `client.Topic` is lightweight. `Publish` handles non-existent topic by failing. + // Let's rely on standard library behavior. + + // However, if we want to auto-create topics like Redis, we should check. + // Let's check existence for now, or just try to publish. + topic := g.client.Topic(topicName) + + // We can't easily check existence without an API call. + // Let's assume the user ensures topics exist, OR we handle the error. + // But for a "canonical example", it's nice if it Just Works. + // Let's check existence once per topic per client instance? + // For now, let's just Publish. If it fails, so be it. + + res := topic.Publish(context.Background(), &pubsub.Message{ + Data: message, + }) + + _, err := res.Get(context.Background()) + return err +} + +// Unsubscribe stops the subscription and deletes it +func (g *GCPPubSub) Unsubscribe(topicName string) error { + g.mu.Lock() + defer g.mu.Unlock() + + sub, ok := g.subscriptions[topicName] + if !ok { + return nil + } + + // Stop receiving + sub.cancel() + sub.wg.Wait() + + // Delete subscription from GCP to clean up + if err := sub.sub.Delete(context.Background()); err != nil { + // Log error but continue + } + + delete(g.subscriptions, topicName) + return nil +} + +// Close closes the client and cleans up all subscriptions +func (g *GCPPubSub) Close() error { + g.mu.Lock() + defer g.mu.Unlock() + + for _, sub := range g.subscriptions { + sub.cancel() + sub.wg.Wait() + // Best effort delete + sub.sub.Delete(context.Background()) + } + g.subscriptions = nil + + return g.client.Close() +} diff --git a/memorystore/pubsub_memory.go b/memorystore/pubsub_memory.go new file mode 100644 index 0000000..b925385 --- /dev/null +++ b/memorystore/pubsub_memory.go @@ -0,0 +1,208 @@ +// memorystore/pubsub_memory.go +package memorystore + +import ( + "context" + "strings" + "sync" +) + +// Constants for PubSub configuration +const ( + defaultChannelBuffer = 100 // Default buffer size for subscriber channels +) + +// subscription represents an individual subscriber +type subscription struct { + topic string // The topic this subscription matches + ch chan []byte // Channel for sending messages to the subscriber + ctx context.Context // Context for managing subscription lifetime + cancel func() // Function to cancel the subscription context +} + +// InMemoryPubSub handles all publish/subscribe operations in memory +type InMemoryPubSub struct { + mu sync.RWMutex + subscriptions map[string][]*subscription // Topic -> subscriptions mapping + wg sync.WaitGroup // For graceful shutdown + closed bool +} + +// newInMemoryPubSub creates and initializes a new InMemoryPubSub +func newInMemoryPubSub() *InMemoryPubSub { + return &InMemoryPubSub{ + subscriptions: make(map[string][]*subscription), + } +} + +// Subscribe creates a new subscription for the given topic +func (ps *InMemoryPubSub) Subscribe(topic string) (<-chan []byte, error) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if ps.closed { + return nil, ErrStoreStopped + } + + if topic == "" { + return nil, ErrInvalidTopic + } + + // Create subscription context + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan []byte, defaultChannelBuffer) + + sub := &subscription{ + topic: topic, + ch: ch, + ctx: ctx, + cancel: cancel, + } + + // Add subscription to manager + ps.subscriptions[topic] = append(ps.subscriptions[topic], sub) + + // Add to wait group for graceful shutdown + ps.wg.Add(1) + + // Cleanup goroutine + go func() { + defer ps.wg.Done() + <-ctx.Done() + ps.removeSubscription(topic, sub) + close(ch) + }() + + return ch, nil +} + +// Publish sends a message to all subscribers matching the given topic +func (ps *InMemoryPubSub) Publish(topic string, message []byte) error { + ps.mu.RLock() + defer ps.mu.RUnlock() + + if ps.closed { + return ErrStoreStopped + } + + // Find all matching subscriptions and publish to them + // We support simple pattern matching here for backward compatibility + for subTopic, subs := range ps.subscriptions { + if matchesTopic(topic, subTopic) { + for _, sub := range subs { + select { + case <-sub.ctx.Done(): + continue // Skip closed subscriptions + default: + select { + case sub.ch <- message: + default: + // Channel is full, skip this subscriber + } + } + } + } + } + + return nil +} + +// Unsubscribe cancels all subscriptions for the given topic +func (ps *InMemoryPubSub) Unsubscribe(topic string) error { + ps.mu.Lock() + defer ps.mu.Unlock() + + if ps.closed { + return ErrStoreStopped + } + + subs, exists := ps.subscriptions[topic] + if !exists { + return nil + } + + // Cancel all subscriptions for this topic + for _, sub := range subs { + sub.cancel() + } + + delete(ps.subscriptions, topic) + return nil +} + +// Close shuts down the PubSub manager +func (ps *InMemoryPubSub) Close() error { + ps.mu.Lock() + if ps.closed { + ps.mu.Unlock() + return nil + } + ps.closed = true + + // Cancel all subscriptions + for topic, subs := range ps.subscriptions { + for _, sub := range subs { + sub.cancel() + } + delete(ps.subscriptions, topic) + } + ps.mu.Unlock() + + // Wait for all subscription goroutines to finish + ps.wg.Wait() + return nil +} + +// removeSubscription removes a specific subscription +func (ps *InMemoryPubSub) removeSubscription(topic string, sub *subscription) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if ps.closed { + return + } + + subs, ok := ps.subscriptions[topic] + if !ok { + return + } + + for i, s := range subs { + if s == sub { + // Remove subscription from slice + subs = append(subs[:i], subs[i+1:]...) + break + } + } + + if len(subs) == 0 { + delete(ps.subscriptions, topic) + } else { + ps.subscriptions[topic] = subs + } +} + +// matchesTopic checks if a topic matches a subscription pattern +func matchesTopic(topic, pattern string) bool { + // If exactly equal, return true + if topic == pattern { + return true + } + + // Split pattern and topic into segments + patternParts := strings.Split(pattern, ":") + topicParts := strings.Split(topic, ":") + + if len(patternParts) != len(topicParts) { + return false + } + + // Check each segment + for i, patternPart := range patternParts { + if patternPart != "*" && patternPart != topicParts[i] { + return false + } + } + + return true +} From 8bfc6e77a4df067338dc90f5523ac4b42d3895ed Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Wed, 31 Dec 2025 05:50:20 +0000 Subject: [PATCH 2/7] Refactor PubSub to interface and add GCP backend (Rebased) Rebased on latest main which includes sharding support. - Extracted `PubSubClient` interface. - Added `GCPPubSub` implementation. - Renamed and updated `InMemoryPubSub`. - Updated `MemoryStore` to use interface and support configuration. - Resolved conflicts with sharded storage implementation. --- memorystore/memorystore.go | 187 +++++++++++++++++++++++++++++-------- 1 file changed, 146 insertions(+), 41 deletions(-) diff --git a/memorystore/memorystore.go b/memorystore/memorystore.go index a9cd172..62be2f4 100644 --- a/memorystore/memorystore.go +++ b/memorystore/memorystore.go @@ -6,6 +6,7 @@ package memorystore import ( "context" + "hash/fnv" "log" "os" "sync" @@ -15,6 +16,8 @@ import ( "github.com/goccy/go-json" ) +const numShards = 256 + // item represents a single cache entry with its value and expiration time. type item struct { value []byte // Raw data stored as a byte slice @@ -29,11 +32,18 @@ type StoreMetrics struct { Evictions int64 // Total number of items evicted (expired) } +type shard struct { + mu sync.RWMutex + store map[string]item +} + // MemoryStore implements an in-memory cache with automatic cleanup of expired items. // It is safe for concurrent use by multiple goroutines. type MemoryStore struct { - mu sync.RWMutex // Protects access to the store map - store map[string]item // Internal storage for cache items + // lifecycleMu protects the lifecycle state (cancelFunc) + lifecycleMu sync.RWMutex + + shards []*shard // Sharded storage ps PubSubClient // PubSub client for cache events ctx context.Context // Context for controlling the cleanup worker cancelFunc context.CancelFunc // Function to stop the cleanup worker @@ -60,21 +70,40 @@ func NewMemoryStore() *MemoryStore { func NewMemoryStoreWithConfig(config Config) *MemoryStore { ctx, cancel := context.WithCancel(context.Background()) ms := &MemoryStore{ - store: make(map[string]item), + shards: make([]*shard, numShards), ctx: ctx, cancelFunc: cancel, } + for i := 0; i < numShards; i++ { + ms.shards[i] = &shard{ + store: make(map[string]item), + } + } + ms.initPubSub(config) ms.startCleanupWorker() return ms } +// getShard returns the shard responsible for the given key. +func (m *MemoryStore) getShard(key string) *shard { + h := fnv.New64a() + h.Write([]byte(key)) + return m.shards[h.Sum64()%numShards] +} + // Stop gracefully shuts down the MemoryStore by stopping the cleanup goroutine // and releasing associated resources. After calling Stop, the store cannot be used. +// Multiple calls to Stop will not cause a panic and return nil. +// +// Example: +// +// store := NewMemoryStore() +// defer store.Stop() func (m *MemoryStore) Stop() error { - m.mu.Lock() - defer m.mu.Unlock() + m.lifecycleMu.Lock() + defer m.lifecycleMu.Unlock() if m.cancelFunc == nil { return nil @@ -89,20 +118,32 @@ func (m *MemoryStore) Stop() error { m.wg.Wait() // Clear the store to free up memory - m.store = nil + for _, s := range m.shards { + s.mu.Lock() + s.store = nil + s.mu.Unlock() + } return nil } // IsStopped returns true if the MemoryStore has been stopped and can no longer be used. +// This method is safe for concurrent use. +// +// Example: +// +// if store.IsStopped() { +// log.Println("Store is no longer available") +// return +// } func (m *MemoryStore) IsStopped() bool { - m.mu.RLock() - defer m.mu.RUnlock() + m.lifecycleMu.RLock() + defer m.lifecycleMu.RUnlock() return m.cancelFunc == nil } // startCleanupWorker initiates a background goroutine that periodically -// removes expired items from the cache. +// removes expired items from the cache. The cleanup interval is set to 1 minute. func (m *MemoryStore) startCleanupWorker() { m.wg.Add(1) go func() { @@ -122,23 +163,31 @@ func (m *MemoryStore) startCleanupWorker() { } // cleanupExpiredItems removes all expired items from the cache. +// It iterates over shards and cleans them one by one to avoid global locking. func (m *MemoryStore) cleanupExpiredItems() { - m.mu.Lock() - defer m.mu.Unlock() - for key, item := range m.store { - if time.Now().After(item.expiresAt) { - delete(m.store, key) - atomic.AddInt64(&m.evictions, 1) + now := time.Now() + for _, s := range m.shards { + // Lock only the current shard + s.mu.Lock() + for key, item := range s.store { + if now.After(item.expiresAt) { + delete(s.store, key) + atomic.AddInt64(&m.evictions, 1) + } } + s.mu.Unlock() } } // Set stores a raw byte slice in the cache with the specified key and duration. +// The item will automatically expire after the specified duration. +// If an error occurs, it will be returned to the caller. func (m *MemoryStore) Set(key string, value []byte, duration time.Duration) error { - m.mu.Lock() - defer m.mu.Unlock() + s := m.getShard(key) + s.mu.Lock() + defer s.mu.Unlock() - m.store[key] = item{ + s.store[key] = item{ value: value, expiresAt: time.Now().Add(duration), } @@ -147,6 +196,17 @@ func (m *MemoryStore) Set(key string, value []byte, duration time.Duration) erro } // SetJSON stores a JSON-serializable value in the cache. +// The value is serialized to JSON before storage. +// Returns an error if JSON marshaling fails. +// +// Example: +// +// type User struct { +// Name string +// Age int +// } +// user := User{Name: "John", Age: 30} +// err := cache.SetJSON("user:123", user, 1*time.Hour) func (m *MemoryStore) SetJSON(key string, value interface{}, duration time.Duration) error { data, err := json.Marshal(value) if err != nil { @@ -156,11 +216,14 @@ func (m *MemoryStore) SetJSON(key string, value interface{}, duration time.Durat } // Get retrieves a value from the cache. +// Returns the value and a boolean indicating whether the key was found. +// If the item has expired, returns (nil, false). func (m *MemoryStore) Get(key string) ([]byte, bool) { - m.mu.RLock() - defer m.mu.RUnlock() + s := m.getShard(key) + s.mu.RLock() + defer s.mu.RUnlock() - it, exists := m.store[key] + it, exists := s.store[key] if !exists || time.Now().After(it.expiresAt) { atomic.AddInt64(&m.misses, 1) return nil, false @@ -170,7 +233,18 @@ func (m *MemoryStore) Get(key string) ([]byte, bool) { return it.value, true } -// GetJSON retrieves and deserializes a JSON value from the cache. +// GetJSON retrieves and deserializes a JSON value from the cache into the provided interface. +// Returns a boolean indicating if the key was found and any error that occurred during deserialization. +// +// Example: +// +// var user User +// exists, err := cache.GetJSON("user:123", &user) +// if err != nil { +// // Handle error +// } else if exists { +// fmt.Printf("Found user: %+v\n", user) +// } func (m *MemoryStore) GetJSON(key string, dest interface{}) (bool, error) { data, exists := m.Get(key) if !exists { @@ -186,53 +260,84 @@ func (m *MemoryStore) GetJSON(key string, dest interface{}) (bool, error) { } // Delete removes an item from the cache. +// If the key doesn't exist, the operation is a no-op. func (m *MemoryStore) Delete(key string) { - m.mu.Lock() - defer m.mu.Unlock() - delete(m.store, key) + s := m.getShard(key) + s.mu.Lock() + defer s.mu.Unlock() + delete(s.store, key) } // SetMulti stores multiple key-value pairs in the cache. +// This is more efficient than calling Set multiple times as it groups keys by shard. +// All items will have the same expiration duration. func (m *MemoryStore) SetMulti(items map[string][]byte, duration time.Duration) error { - m.mu.Lock() - defer m.mu.Unlock() - + // Group items by shard + shardItems := make(map[*shard]map[string]item) expiresAt := time.Now().Add(duration) + for key, value := range items { - m.store[key] = item{ + s := m.getShard(key) + if _, ok := shardItems[s]; !ok { + shardItems[s] = make(map[string]item) + } + shardItems[s][key] = item{ value: value, expiresAt: expiresAt, } } + + // Apply updates per shard + for s, items := range shardItems { + s.mu.Lock() + for k, v := range items { + s.store[k] = v + } + s.mu.Unlock() + } return nil } // GetMulti retrieves multiple values from the cache. +// It returns a map of found items. Keys that don't exist or are expired are omitted. func (m *MemoryStore) GetMulti(keys []string) map[string][]byte { - m.mu.RLock() - defer m.mu.RUnlock() - result := make(map[string][]byte) now := time.Now() + // Group keys by shard + shardKeys := make(map[*shard][]string) for _, key := range keys { - it, exists := m.store[key] - if exists && !now.After(it.expiresAt) { - result[key] = it.value - atomic.AddInt64(&m.hits, 1) - } else { - atomic.AddInt64(&m.misses, 1) + s := m.getShard(key) + shardKeys[s] = append(shardKeys[s], key) + } + + // Retrieve from each shard + for s, keys := range shardKeys { + s.mu.RLock() + for _, key := range keys { + it, exists := s.store[key] + if exists && !now.After(it.expiresAt) { + result[key] = it.value + atomic.AddInt64(&m.hits, 1) + } else { + atomic.AddInt64(&m.misses, 1) + } } + s.mu.RUnlock() } return result } // GetMetrics returns the current statistics of the MemoryStore. +// It returns a copy of the metrics to ensure thread safety. func (m *MemoryStore) GetMetrics() StoreMetrics { - m.mu.RLock() - itemCount := len(m.store) - m.mu.RUnlock() + itemCount := 0 + for _, s := range m.shards { + s.mu.RLock() + itemCount += len(s.store) + s.mu.RUnlock() + } return StoreMetrics{ Items: itemCount, From 64a522b62f8b414c54586b412242edee11542c50 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Wed, 31 Dec 2025 23:25:42 +0000 Subject: [PATCH 3/7] Resolve conflicts and integrate PubSub with Sharding Rebased feature branch on latest root which includes Sharding support. Integrated PubSub interface and GCP implementation with the sharded MemoryStore. Resolved conflicts in memorystore.go. Verified all tests pass. From f8784fc97d8a929e90eee615f9bc56204f4cb576 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Wed, 31 Dec 2025 23:32:19 +0000 Subject: [PATCH 4/7] Refactor PubSub to interface and add GCP backend (Rebased) Rebased feature branch on latest root which includes Sharding support. - Extracted `PubSubClient` interface. - Added `GCPPubSub` implementation. - Renamed and updated `InMemoryPubSub`. - Updated `MemoryStore` to use interface and support configuration. - Resolved conflicts with sharded storage implementation. - Consolidated commits to clean up history. From bd0a739b3a84f03217b9d8932023156e7c7f3683 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 31 Dec 2025 23:41:26 +0000 Subject: [PATCH 5/7] Initial plan From 77cc116d6afef40ee27859639978a7ec64e30663 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 31 Dec 2025 23:54:42 +0000 Subject: [PATCH 6/7] Initial plan From 6aba745f3e049ac5b7328940b45fa217349d30fd Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 31 Dec 2025 23:59:26 +0000 Subject: [PATCH 7/7] Fix CI build error by changing 'go run main.go' to 'go build' Co-authored-by: BryceWayne <35088752+BryceWayne@users.noreply.github.com> --- .github/workflows/go.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 12cb27e..b517353 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -22,7 +22,7 @@ jobs: go-version: '1.20' - name: Build - run: go run main.go + run: go build -v ./... - name: Test run: go test -v ./...