This package provides a generic, database-agnostic abstraction layer with MongoDB as the primary implementation. It enables type-safe database operations with built-in support for change streams, event logging, and high availability configurations.
The package follows a three-tier interface hierarchy:
StoreClient (Database Cluster)
↓
Store (Database)
↓
StoreCollection (Collection/Table)
This design allows for easy swapping of database implementations without affecting higher-level code.
The StoreCollection interface provides collection-level operations:
type StoreCollection interface {
SetKeyType(keyType reflect.Type) error
InsertOne(ctx context.Context, key any, data any) error
UpdateOne(ctx context.Context, key any, data any, upsert bool) error
FindOne(ctx context.Context, key any, data any) error
FindMany(ctx context.Context, filter any, data any, opts ...any) error
Count(ctx context.Context, filter any) (int64, error)
DeleteOne(ctx context.Context, key any) error
DeleteMany(ctx context.Context, filter any) (int64, error)
Watch(ctx context.Context, filter any, cb WatchCallbackfn) error
EnsureIndexes(ctx context.Context, indexes []IndexDefinition) error
startEventLogger(ctx context.Context, eventType reflect.Type, timestamp *bson.Timestamp) error
}Key Features:
- Type-safe operations through reflection-based key type registration
- Context-aware operations for cancellation and timeout support
- Change monitoring via
Watch()with callback notifications - Bulk operations with
FindMany()andDeleteMany() - Index management via
EnsureIndexes()for idempotent index creation - Event logging for audit trails and debugging
The Store interface represents a database:
type Store interface {
GetCollection(col string) StoreCollection
Name() string
}The StoreClient interface represents a database cluster/client:
type StoreClient interface {
GetDataStore(dbName string) Store
GetCollection(dbName, col string) StoreCollection
HealthCheck(ctx context.Context) error
}Defines operation constants and default source identifiers:
MongoAddOp: Constant for insert operationsMongoUpdateOp: Constant for update/replace operationsMongoDeleteOp: Constant for delete operationsDefaultSourceIdentifier: Default identifier for MongoDB connections
Thread-safe source identifier management using sync.RWMutex:
func SetSourceIdentifier(source string)
func GetSourceIdentifier() stringImportant: The source identifier can only be set once before the first GetSourceIdentifier() call. Attempts to change it after use will cause a panic.
Use Case: Allows tracking which service instance made changes to the database, useful in multi-replica deployments.
Generic event structures for change stream monitoring and event logging:
type Event[K any, E any] struct {
Doc DocumentKey[K] // Document key that changed
Op string // Operation type (add/update/delete)
Time bson.Timestamp // Timestamp of change
Ns *Namespace // Database and collection information
Entry *E // Full document (for inserts/updates)
Updates *UpdateDescription[E] // Update details (for updates)
}type EventLogger[K, E] struct {
col StoreCollection
ts *bson.Timestamp
}
func NewEventLogger[K, E any](col StoreCollection) *EventLogger[K, E]
func (e *EventLogger[K, E]) StartLogger(ctx context.Context, ts *bson.Timestamp) errorFeatures:
- Generic event types matching your data structures
- Automatic invocation of
LogEvent()method if implemented on entry type - Resume capability using BSON timestamps
- Full document capture with change details
Core interface definitions, callback types, and index types:
type WatchCallbackfn func(op string, key any)Defines the callback signature for change notifications through Watch().
type IndexType int
const (
IndexAscending IndexType = 1
IndexDescending IndexType = -1
)
type IndexField struct {
Field string
IndexType IndexType
}
type IndexDefinition struct {
Fields []IndexField
Unique bool
Sparse bool
TTL time.Duration
Name string
}Used with EnsureIndexes() to declaratively create indexes on a collection. Supports unique constraints, sparse indexes (only index documents containing the field), and TTL indexes (automatic document expiration). The operation is idempotent — calling it multiple times with the same definitions is safe.
Complete MongoDB implementation of all interfaces.
Connection Management:
type MongoConfig struct {
Host string // MongoDB host (default: localhost)
Port string // MongoDB port (default: 27017)
Uri string // Full MongoDB URI (overrides Host/Port)
Username string // Authentication username
Password string // Authentication password
}
func NewMongoClient(config MongoConfig) (StoreClient, error)High Availability Configuration:
- Majority write concern for replica set safety
- Journal writes enabled for durability
- SCRAM-SHA-256 authentication
- Connection pooling with retries
Error Interpretation:
The interpretMongoError() function translates MongoDB errors to library-specific error codes:
- Duplicate key errors →
errors.AlreadyExists - Not found errors →
errors.NotFound - Context cancellation →
errors.Canceled - Timeout errors →
errors.DeadlineExceeded
Change Streams:
The Watch() implementation:
- Runs in a separate goroutine
- Supports pipeline filters
- Automatic cleanup on context cancellation
- Type-safe key extraction and marshaling
- Panics on unexpected errors (not context cancellation)
Event Logging:
The startEventLogger() implementation:
- Uses reflection to invoke
LogEvent()on entry types - Supports resume tokens for crash recovery
- Captures full documents with
updateLookupoption - Filters events by operation type
// Configure connection
config := db.MongoConfig{
Host: "localhost",
Port: "27017",
Username: "admin",
Password: "secret",
}
// Create client
client, err := db.NewMongoClient(config)
if err != nil {
log.Fatal(err)
}
// Get collection
col := client.GetCollection("mydb", "users")
// Set key type (required before operations)
col.SetKeyType(reflect.TypeOf(""))
// Insert document
type User struct {
Name string
Email string
}
user := User{Name: "Alice", Email: "alice@example.com"}
err = col.InsertOne(ctx, "alice", user)
// Find document
var found User
err = col.FindOne(ctx, "alice", &found)
// Update document
user.Email = "newemail@example.com"
err = col.UpdateOne(ctx, "alice", user, false)
// Delete document
err = col.DeleteOne(ctx, "alice")// Watch for changes
callback := func(op string, key any) {
log.Printf("Operation %s on key: %v", op, key)
}
// Start watching (runs in background)
err := col.Watch(ctx, nil, callback)
// With filter (only watch specific documents)
filter := bson.M{"fullDocument.status": "active"}
err := col.Watch(ctx, filter, callback)type MyEntry struct {
Data string
}
// Implement LogEvent method for automatic logging
func (e *MyEntry) LogEvent() {
log.Printf("Event logged: %s", e.Data)
}
// Start event logger
eventLogger := db.NewEventLogger[string, MyEntry](col)
err := eventLogger.StartLogger(ctx, nil) // nil = start from now
// Resume from timestamp
ts := bson.Timestamp{T: 1234567890, I: 1}
err := eventLogger.StartLogger(ctx, &ts)// Create a unique index on a single field
err := col.EnsureIndexes(ctx, []db.IndexDefinition{
{
Fields: []db.IndexField{
{Field: "email", IndexType: db.IndexAscending},
},
Unique: true,
},
})
// Create a compound index with a custom name
err = col.EnsureIndexes(ctx, []db.IndexDefinition{
{
Fields: []db.IndexField{
{Field: "status", IndexType: db.IndexAscending},
{Field: "created_at", IndexType: db.IndexDescending},
},
Name: "status_created_idx",
},
})
// Create a sparse index (only indexes documents where the field exists)
err = col.EnsureIndexes(ctx, []db.IndexDefinition{
{
Fields: []db.IndexField{
{Field: "optional_tag", IndexType: db.IndexAscending},
},
Sparse: true,
},
})
// Create a TTL index (auto-expire documents after 30 days)
err = col.EnsureIndexes(ctx, []db.IndexDefinition{
{
Fields: []db.IndexField{
{Field: "completed_at", IndexType: db.IndexAscending},
},
TTL: 30 * 24 * time.Hour,
},
})// Set before creating any clients (one-time only)
db.SetSourceIdentifier("service-replica-1")
// Later, when change events occur, you can track which replica made changes
identifier := db.GetSourceIdentifier() // Returns "service-replica-1"Abstracts database operations behind interfaces, enabling:
- Database implementation swapping
- Easy mocking for tests
- Clean separation of concerns
Uses Go generics for type-safe event handling:
type Event[K any, E any] struct { ... }
type EventLogger[K, E] struct { ... }Change streams with callbacks allow reactive programming:
col.Watch(ctx, filter, func(op string, key any) {
// React to changes
})Consistent error handling with context:
return errors.Wrap(errors.NotFound, "document not found")- mongoClient: Safe for concurrent use across goroutines
- mongoStore: Safe for concurrent use across goroutines
- mongoCollection: Safe for concurrent use across goroutines
- Source Identifier: Thread-safe with
sync.RWMutex, but one-time write semantics
- Always set key type before performing operations on a collection
- Use contexts for cancellation and timeout control
- Handle error codes instead of comparing error messages
- Set source identifier early if tracking multi-replica changes
- Use Watch with filters to reduce unnecessary callback invocations
- Implement LogEvent() on entry types for automatic audit logging
- Clean up resources by canceling contexts when done
See mongo_test.go for comprehensive unit tests covering:
- Connection establishment
- CRUD operations
- Error handling
- Change stream monitoring
- Event logging
- Index management
- Connection Pooling: MongoDB driver handles connection pooling automatically
- Write Concern: Majority write concern adds latency but ensures durability
- Change Streams: Run in separate goroutines to avoid blocking operations
- Bulk Operations: Use
FindMany()andDeleteMany()for batch processing
The interface-based design allows for future implementations:
- PostgreSQL backend
- Redis backend
- In-memory backend for testing
- Custom backends with specific requirements