Skip to content

Commit 68e9b2b

Browse files
committed
address comments
1 parent 98dfe65 commit 68e9b2b

5 files changed

Lines changed: 149 additions & 11 deletions

File tree

CMV_SUPPORT.md

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ behavior via the standard `CreateMaterializedView` gRPC method.
1010

1111
### 1. Creating a CMV
1212

13+
There are two ways to create a CMV in the emulator:
14+
15+
#### Option A: gRPC (recommended, matches production)
16+
1317
Use the standard Go admin client, pointed at the emulator:
1418

1519
```go
@@ -32,6 +36,45 @@ ORDER BY region, account_id, ts, typ, item_id, src_key`,
3236
The emulator parses the SQL to extract the key transformation config. The same code works
3337
against both production Bigtable and the emulator.
3438

39+
#### Option B: `--cmv-config` flag (local testing convenience)
40+
41+
For local testing without writing client code, pass a JSON config file at startup:
42+
43+
```bash
44+
./little_bigtable --port 9000 --db-file /tmp/lbt.db --cmv-config /path/to/cmv_config.json
45+
```
46+
47+
The JSON file is an array of CMV config objects:
48+
49+
```json
50+
[
51+
{
52+
"source_table": "events",
53+
"view_id": "events_by_account",
54+
"key_separator": "#",
55+
"key_mapping": [3, 4, 1, 2, 0],
56+
"append_source_key": true,
57+
"include_families": ["cf1"]
58+
}
59+
]
60+
```
61+
62+
| Field | Description |
63+
|---|---|
64+
| `source_table` | Table ID of the source table (not the fully-qualified name) |
65+
| `view_id` | Table ID to use for the CMV shadow table |
66+
| `key_separator` | Delimiter used in the composite row key |
67+
| `key_mapping` | Ordered list of 0-based source key component indices for the CMV key |
68+
| `append_source_key` | If true, appends the full original source key as the final component |
69+
| `include_families` | Column families to copy; omit or leave empty to include all |
70+
71+
> **Note:** The `--cmv-config` approach registers CMVs directly without going through SQL
72+
> parsing. It is intended for local testing only. In production (and for full emulator
73+
> fidelity), use `CreateMaterializedView` via the gRPC client.
74+
75+
> **Persistence:** CMV registrations loaded via either approach are in-memory only. If the
76+
> emulator restarts, CMVs must be re-registered (or the `--cmv-config` flag re-supplied).
77+
3578
### 2. Write-time Sync
3679

3780
When data is written to a source table (via MutateRow, MutateRows, CheckAndMutateRow,
@@ -59,16 +102,18 @@ row, err := table.ReadRow(ctx, "region-a#account-42#...")
59102
## What's Changed
60103

61104
### New Files
62-
- `bttest/cmv.go` — CMV config types, SQL parser, key transformation logic
105+
- `bttest/cmv.go` — CMV config types, registry, key transformation logic
106+
- `bttest/sql_parse.go` — SQL parser for extracting CMV config from a `CreateMaterializedView` query
63107
- `bttest/cmv_test.go` — Tests for key transformation, write sync, delete propagation
108+
- `bttest/sql_parse_test.go` — Tests for the SQL parser
64109

65110
### Modified Files
66-
- `little_bigtable.go` — Version bump to 0.2.0
111+
- `little_bigtable.go` — Version bump to 0.2.0; added `--cmv-config` flag
67112
- `bttest/inmem.go` — Added `cmvs` field to server struct, CMV registration,
68113
shadow table creation, write-time sync hooks in MutateRow/MutateRows/
69114
CheckAndMutateRow/ReadModifyWriteRow/DropRowRange
70115
- `bttest/instance_server.go` — Implemented CreateMaterializedView, GetMaterializedView,
71-
ListMaterializedViews, DeleteMaterializedView
116+
ListMaterializedViews, UpdateMaterializedView (DeletionProtection only), DeleteMaterializedView
72117

73118
## Known Limitations
74119

@@ -80,8 +125,8 @@ row, err := table.ReadRow(ctx, "region-a#account-42#...")
80125
creation are not reflected in the CMV table.
81126
- **Backfill**: Data written to the source table before the CMV is registered is not
82127
retroactively copied.
83-
- **Persistence**: CMV registrations are in-memory only. If the emulator restarts, CMVs
84-
must be re-registered via `CreateMaterializedView`.
128+
- **Persistence**: CMV registrations are in-memory only and do not survive a restart.
129+
Re-register via `CreateMaterializedView` or re-supply `--cmv-config` on each startup.
85130

86131
## Example: Key Transformation
87132

bttest/cmv.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package bttest
22

33
import (
4+
"encoding/json"
5+
"fmt"
46
"log"
7+
"os"
58
"strings"
9+
"sync"
610
)
711

812
// CMVConfig defines a Continuous Materialized View for the emulator.
@@ -28,7 +32,9 @@ type CMVConfig struct {
2832

2933
// cmvRegistry maps plain source table IDs to CMV definitions.
3034
// Lookups match by table ID suffix against fully-qualified table names.
35+
// Its own mu protects concurrent reads/writes to configs independently of s.mu.
3136
type cmvRegistry struct {
37+
mu sync.RWMutex
3238
configs map[string][]CMVConfig
3339
}
3440

@@ -39,10 +45,14 @@ func newCMVRegistry() *cmvRegistry {
3945
}
4046

4147
func (r *cmvRegistry) register(cfg CMVConfig) {
48+
r.mu.Lock()
49+
defer r.mu.Unlock()
4250
r.configs[cfg.SourceTable] = append(r.configs[cfg.SourceTable], cfg)
4351
}
4452

4553
func (r *cmvRegistry) deregister(viewID string) {
54+
r.mu.Lock()
55+
defer r.mu.Unlock()
4656
for src, cfgs := range r.configs {
4757
filtered := cfgs[:0]
4858
for _, c := range cfgs {
@@ -58,16 +68,36 @@ func (r *cmvRegistry) deregister(viewID string) {
5868
}
5969
}
6070

71+
// deregisterBySource removes all CMV configs for a given source table and
72+
// returns the view IDs that were registered against it.
73+
func (r *cmvRegistry) deregisterBySource(sourceTable string) []string {
74+
r.mu.Lock()
75+
defer r.mu.Unlock()
76+
cfgs := r.configs[sourceTable]
77+
if len(cfgs) == 0 {
78+
return nil
79+
}
80+
viewIDs := make([]string, len(cfgs))
81+
for i, c := range cfgs {
82+
viewIDs[i] = c.ViewID
83+
}
84+
delete(r.configs, sourceTable)
85+
return viewIDs
86+
}
87+
6188
func (r *cmvRegistry) cmvsForTable(fqTable string) []*cmvInstance {
6289
parent, tableID := splitFQTable(fqTable)
90+
r.mu.RLock()
6391
cfgs, ok := r.configs[tableID]
6492
if !ok {
93+
r.mu.RUnlock()
6594
return nil
6695
}
6796
result := make([]*cmvInstance, len(cfgs))
6897
for i := range cfgs {
6998
result[i] = &cmvInstance{config: cfgs[i], parent: parent}
7099
}
100+
r.mu.RUnlock()
71101
return result
72102
}
73103

@@ -145,3 +175,18 @@ func (c *cmvInstance) buildCMVRow(sourceRow *row) *row {
145175
func (c *cmvInstance) deriveCMVKey(sourceKey string) string {
146176
return c.transformKey(sourceKey)
147177
}
178+
179+
// LoadCMVConfigs reads a JSON array of CMVConfig from the given file path.
180+
// This is used with the --cmv-config flag for local testing convenience.
181+
func LoadCMVConfigs(path string) ([]CMVConfig, error) {
182+
f, err := os.Open(path)
183+
if err != nil {
184+
return nil, fmt.Errorf("opening cmv config: %w", err)
185+
}
186+
defer f.Close()
187+
var configs []CMVConfig
188+
if err := json.NewDecoder(f).Decode(&configs); err != nil {
189+
return nil, fmt.Errorf("parsing cmv config: %w", err)
190+
}
191+
return configs, nil
192+
}

bttest/inmem.go

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,17 @@ func NewServer(laddr string, db *sql.DB, opt ...grpc.ServerOption) (*Server, err
140140
return s, nil
141141
}
142142

143+
// RegisterCMVs pre-loads CMV configs for local testing (used with --cmv-config).
144+
// This is equivalent to calling CreateMaterializedView for each config but does
145+
// not require a project/instance path and bypasses SQL parsing.
146+
func (s *Server) RegisterCMVs(configs []CMVConfig) {
147+
s.s.mu.Lock()
148+
defer s.s.mu.Unlock()
149+
for _, cfg := range configs {
150+
s.s.cmvs.register(cfg)
151+
}
152+
}
153+
143154
// Close shuts down the server.
144155
func (s *Server) Close() {
145156
s.s.mu.Lock()
@@ -361,12 +372,33 @@ func (s *server) UpdateTable(ctx context.Context, req *btapb.UpdateTableRequest)
361372
func (s *server) DeleteTable(ctx context.Context, req *btapb.DeleteTableRequest) (*emptypb.Empty, error) {
362373
s.mu.Lock()
363374
defer s.mu.Unlock()
364-
if tbl, ok := s.tables[req.Name]; !ok {
375+
tbl, ok := s.tables[req.Name]
376+
if !ok {
365377
return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name)
366-
} else {
367-
s.tableBackend.Delete(tbl)
368-
tbl.rows.DeleteAll()
369-
delete(s.tables, req.Name)
378+
}
379+
s.tableBackend.Delete(tbl)
380+
tbl.rows.DeleteAll()
381+
delete(s.tables, req.Name)
382+
383+
// Clean up any CMVs that use this table as their source.
384+
idx := strings.LastIndex(req.Name, "/tables/")
385+
if idx >= 0 {
386+
parent := req.Name[:idx]
387+
tableID := req.Name[idx+len("/tables/"):]
388+
for _, viewID := range s.cmvs.deregisterBySource(tableID) {
389+
fqShadow := parent + "/tables/" + viewID
390+
if shadowTbl, exists := s.tables[fqShadow]; exists {
391+
s.tableBackend.Delete(shadowTbl)
392+
shadowTbl.rows.DeleteAll()
393+
delete(s.tables, fqShadow)
394+
}
395+
for mvName := range s.materializedViews {
396+
if strings.HasSuffix(mvName, "/materializedViews/"+viewID) {
397+
delete(s.materializedViews, mvName)
398+
break
399+
}
400+
}
401+
}
370402
}
371403
return &emptypb.Empty{}, nil
372404
}

bttest/instance_server.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,16 @@ func (s *server) DeleteMaterializedView(ctx context.Context, req *btapb.DeleteMa
260260
return nil, status.Errorf(codes.FailedPrecondition, "materialized view %q is protected against deletion", req.Name)
261261
}
262262

263-
// Extract view ID from the full resource name.
263+
// Extract parent and view ID from the full resource name.
264264
parts := strings.Split(mv.Name, "/materializedViews/")
265265
if len(parts) == 2 {
266266
s.cmvs.deregister(parts[1])
267+
fqShadow := parts[0] + "/tables/" + parts[1]
268+
if shadowTbl, exists := s.tables[fqShadow]; exists {
269+
s.tableBackend.Delete(shadowTbl)
270+
shadowTbl.rows.DeleteAll()
271+
delete(s.tables, fqShadow)
272+
}
267273
}
268274
delete(s.materializedViews, req.Name)
269275
return new(empty.Empty), nil

little_bigtable.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func main() {
2626
host := flag.String("host", "localhost", "the address to bind to on the local machine")
2727
port := flag.Int("port", 9000, "the port number to bind to on the local machine")
2828
dbFile := flag.String("db-file", "little_bigtable.db", "path to data file")
29+
cmvConfig := flag.String("cmv-config", "", "optional path to a JSON file pre-loading CMV configs for local testing")
2930
showVersion := flag.Bool("version", false, "show version")
3031

3132
ctx := context.Background()
@@ -62,6 +63,15 @@ func main() {
6263
log.Fatalf("failed to start emulator: %v", err)
6364
}
6465

66+
if *cmvConfig != "" {
67+
configs, err := bttest.LoadCMVConfigs(*cmvConfig)
68+
if err != nil {
69+
log.Fatalf("failed to load cmv config: %v", err)
70+
}
71+
srv.RegisterCMVs(configs)
72+
log.Printf("loaded %d CMV config(s) from %s", len(configs), *cmvConfig)
73+
}
74+
6575
log.Printf("\"little\" Bigtable emulator running. DB:%s Connect with environment variable BIGTABLE_EMULATOR_HOST=%q", *dbFile, srv.Addr)
6676
select {}
6777
}

0 commit comments

Comments
 (0)