diff --git a/flashring/cmd/flashringtest/plan_readthrough_gausian.go b/flashring/cmd/flashringtest/plan_readthrough_gausian.go index c4e049c1..3b0f7d83 100644 --- a/flashring/cmd/flashringtest/plan_readthrough_gausian.go +++ b/flashring/cmd/flashringtest/plan_readthrough_gausian.go @@ -142,9 +142,9 @@ func planReadthroughGaussian() { ) flag.StringVar(&mountPoint, "mount", "/mnt/disks/nvme/", "data directory for shard files") - flag.IntVar(&numShards, "shards", 50, "number of shards") + flag.IntVar(&numShards, "shards", 10, "number of shards") flag.IntVar(&keysPerShard, "keys-per-shard", 6_00_000, "keys per shard") - flag.IntVar(&memtableMB, "memtable-mb", 2, "memtable size in MiB") + flag.IntVar(&memtableMB, "memtable-mb", 8, "memtable size in MiB") flag.Float64Var(&fileSizeMultiplier, "file-size-multiplier", 0.25, "file size in GiB per shard") flag.IntVar(&readWorkers, "readers", 16, "number of read workers") flag.IntVar(&writeWorkers, "writers", 16, "number of write workers") @@ -214,7 +214,7 @@ func planReadthroughGaussian() { missedKeyChanList[i] = make(chan int) } - totalKeys := 30_000_000 + totalKeys := 10_000_000 str1kb := strings.Repeat("a", 1024) str1kb = "%d" + str1kb diff --git a/flashring/internal/indicesV3/delete_manager.go b/flashring/internal/indicesV3/delete_manager.go index c6e632db..0e529698 100644 --- a/flashring/internal/indicesV3/delete_manager.go +++ b/flashring/internal/indicesV3/delete_manager.go @@ -1,15 +1,15 @@ package indicesv2 import ( - "errors" "fmt" "github.com/Meesho/BharatMLStack/flashring/internal/fs" + "github.com/puzpuzpuz/xsync/v4" "github.com/rs/zerolog/log" ) type DeleteManager struct { - memtableData map[uint32]int + memtableData *xsync.Map[uint32, int] toBeDeletedMemId uint32 keyIndex *Index wrapFile *fs.WrapAppendFile @@ -20,7 +20,7 @@ type DeleteManager struct { func NewDeleteManager(keyIndex *Index, wrapFile *fs.WrapAppendFile, deleteAmortizedStep int) *DeleteManager { return &DeleteManager{ - memtableData: make(map[uint32]int), + memtableData: xsync.NewMap[uint32, int](), toBeDeletedMemId: 0, keyIndex: keyIndex, wrapFile: wrapFile, @@ -30,7 +30,9 @@ func NewDeleteManager(keyIndex *Index, wrapFile *fs.WrapAppendFile, deleteAmorti } func (dm *DeleteManager) IncMemtableKeyCount(memId uint32) { - dm.memtableData[memId]++ + dm.memtableData.Compute(memId, func(oldValue int, loaded bool) (int, xsync.ComputeOp) { + return oldValue + 1, xsync.UpdateOp + }) } func (dm *DeleteManager) ExecuteDeleteIfNeeded() error { @@ -40,19 +42,22 @@ func (dm *DeleteManager) ExecuteDeleteIfNeeded() error { return fmt.Errorf("delete failed") } if memtableId != dm.toBeDeletedMemId { - dm.memtableData[dm.toBeDeletedMemId] = dm.memtableData[dm.toBeDeletedMemId] - count + newVal, _ := dm.memtableData.Compute(dm.toBeDeletedMemId, func(oldValue int, loaded bool) (int, xsync.ComputeOp) { + return oldValue - count, xsync.UpdateOp + }) log.Debug().Msgf("memtableId: %d, toBeDeletedMemId: %d", memtableId, dm.toBeDeletedMemId) - if dm.memtableData[dm.toBeDeletedMemId] != 0 { + if newVal != 0 { return fmt.Errorf("memtableData[dm.toBeDeletedMemId] != 0") } - delete(dm.memtableData, dm.toBeDeletedMemId) + dm.memtableData.Delete(dm.toBeDeletedMemId) dm.toBeDeletedMemId = memtableId dm.deleteInProgress = false dm.deleteCount = 0 return nil } else { - dm.memtableData[memtableId] -= count - //log.Debug().Msgf("memtableData[%d] = %d", memtableId, dm.memtableData[memtableId]) + dm.memtableData.Compute(memtableId, func(oldValue int, loaded bool) (int, xsync.ComputeOp) { + return oldValue - count, xsync.UpdateOp + }) } return nil } @@ -62,9 +67,10 @@ func (dm *DeleteManager) ExecuteDeleteIfNeeded() error { if trimNeeded || nextAddNeedsDelete { dm.deleteInProgress = true - dm.deleteCount = int(dm.memtableData[dm.toBeDeletedMemId] / dm.deleteAmortizedStep) + val, _ := dm.memtableData.Load(dm.toBeDeletedMemId) + dm.deleteCount = val / dm.deleteAmortizedStep if dm.deleteCount == 0 { - dm.deleteCount = int(dm.memtableData[dm.toBeDeletedMemId] % dm.deleteAmortizedStep) + dm.deleteCount = val % dm.deleteAmortizedStep } memIdAtHead, err := dm.keyIndex.PeekMemIdAtHead() if err != nil { @@ -75,7 +81,7 @@ func (dm *DeleteManager) ExecuteDeleteIfNeeded() error { } dm.wrapFile.TrimHead() - return errors.New("trim needed retry this write") + return fmt.Errorf("trim needed or next add needs delete") } return nil } diff --git a/flashring/internal/indicesV3/index.go b/flashring/internal/indicesV3/index.go index fcafc483..e367c2f7 100644 --- a/flashring/internal/indicesV3/index.go +++ b/flashring/internal/indicesV3/index.go @@ -7,7 +7,6 @@ import ( "github.com/Meesho/BharatMLStack/flashring/internal/maths" "github.com/cespare/xxhash/v2" - "github.com/puzpuzpuz/xsync/v4" "github.com/zeebo/xxh3" ) @@ -23,7 +22,7 @@ const ( type Index struct { mu *sync.RWMutex - rm *xsync.Map[uint64, int] + rm map[uint64]int rb *RingBuffer mc *maths.MorrisLogCounter startAt int64 @@ -36,7 +35,7 @@ func NewIndex(hashBits int, rbInitial, rbMax, deleteAmortizedStep int, mu *sync. } return &Index{ mu: mu, - rm: xsync.NewMap[uint64, int](), + rm: make(map[uint64]int), rb: NewRingBuffer(rbInitial, rbMax), mc: maths.New(12), startAt: time.Now().Unix(), @@ -53,15 +52,17 @@ func (i *Index) Put(key string, length, ttlInMinutes uint16, memId, offset uint3 delta := uint16(expiryAt - (i.startAt / 60)) encode(key, length, delta, lastAccess, freq, memId, offset, entry) - if headIdx, ok := i.rm.Load(hlo); !ok { + i.mu.Lock() + defer i.mu.Unlock() + if headIdx, ok := i.rm[hlo]; !ok { encodeHashNextPrev(hhi, hlo, -1, -1, hashNextPrev) - i.rm.Store(hlo, idx) + i.rm[hlo] = idx return } else { _, headHashNextPrev, _ := i.rb.Get(int(headIdx)) encodeUpdatePrev(int32(idx), headHashNextPrev) encodeHashNextPrev(hhi, hlo, -1, int32(headIdx), hashNextPrev) - i.rm.Store(hlo, idx) + i.rm[hlo] = idx return } @@ -70,8 +71,9 @@ func (i *Index) Put(key string, length, ttlInMinutes uint16, memId, offset uint3 func (i *Index) Get(key string) (length, lastAccess, remainingTTL uint16, freq uint64, memId, offset uint32, status Status) { hhi, hlo := hash128(key) - idx, ok := i.rm.Load(hlo) - + i.mu.RLock() + idx, ok := i.rm[hlo] + i.mu.RUnlock() if ok { for { entry, hashNextPrev, _ := i.rb.Get(int(idx)) @@ -110,9 +112,9 @@ func (ix *Index) Delete(count int) (uint32, int) { } delMemId, _ := decodeMemIdOffset(deleted) deletedHlo := decodeHashLo(deletedHashNextPrev) - mapIdx, ok := ix.rm.Load(deletedHlo) + mapIdx, ok := ix.rm[deletedHlo] if ok && mapIdx == deletedIdx { - ix.rm.Delete(deletedHlo) + delete(ix.rm, deletedHlo) } else if ok && hasPrev(deletedHashNextPrev) { prevIdx := decodePrev(deletedHashNextPrev) _, hashNextPrev, _ := ix.rb.Get(int(prevIdx)) diff --git a/flashring/internal/indicesV3/rb.go b/flashring/internal/indicesV3/rb.go index 10850bb3..774684ed 100644 --- a/flashring/internal/indicesV3/rb.go +++ b/flashring/internal/indicesV3/rb.go @@ -1,5 +1,7 @@ package indicesv2 +import "sync/atomic" + // Entry represents a 32-byte value. Adjust fields as needed. type Entry [16]byte type HashNextPrev [3]uint64 @@ -8,14 +10,14 @@ type HashNextPrev [3]uint64 // It maintains a sliding window of the most recent entries. Add returns an // absolute index which can be used with Get. type RingBuffer struct { + nextIndex int64 // atomic; must be first for 64-bit alignment on 32-bit platforms + wrapped uint32 // atomic; 0 = false, 1 = true (one-time transition) buf []Entry hashTable []HashNextPrev head int tail int size int - nextIndex int - capacity int // Fixed capacity (initial = max) - wrapped bool + capacity int } // NewRingBuffer creates a ring buffer with the given initial and maximum @@ -30,37 +32,36 @@ func NewRingBuffer(initial, max int) *RingBuffer { buf: make([]Entry, capacity), hashTable: make([]HashNextPrev, capacity), capacity: capacity, - wrapped: false, } } // Add inserts e into the buffer and returns its absolute index. When the buffer // is full it wraps around and overwrites the oldest entry. func (rb *RingBuffer) Add(e *Entry) int { - // Store the entry at current tail position - rb.buf[rb.nextIndex] = *e - idx := rb.nextIndex - rb.nextIndex = (rb.nextIndex + 1) % rb.capacity - if rb.nextIndex == rb.head { + raw := atomic.AddInt64(&rb.nextIndex, 1) - 1 + idx := int(raw) % rb.capacity + rb.buf[idx] = *e + nextIdx := int(raw+1) % rb.capacity + if nextIdx == rb.head { rb.head = (rb.head + 1) % rb.capacity } - return idx } func (rb *RingBuffer) NextAddNeedsDelete() bool { - return rb.nextIndex == rb.head && rb.wrapped + nextIdx := int(atomic.LoadInt64(&rb.nextIndex)) % rb.capacity + return nextIdx == rb.head && atomic.LoadUint32(&rb.wrapped) == 1 } func (rb *RingBuffer) GetNextFreeSlot() (*Entry, *HashNextPrev, int, bool) { - idx := rb.nextIndex - rb.nextIndex = (rb.nextIndex + 1) % rb.capacity + raw := atomic.AddInt64(&rb.nextIndex, 1) - 1 + idx := int(raw) % rb.capacity + shouldDelete := false - if rb.nextIndex == rb.head { - // rb.head = (rb.head + 1) % rb.capacity - rb.wrapped = true + nextIdx := int(raw+1) % rb.capacity + if nextIdx == rb.head { + atomic.StoreUint32(&rb.wrapped, 1) shouldDelete = true - } return &rb.buf[idx], &rb.hashTable[idx], idx, shouldDelete } @@ -85,10 +86,12 @@ func (rb *RingBuffer) Delete() (*Entry, *HashNextPrev, int, *Entry) { return &deleted, &deletedHashNextPrev, deletedIdx, &rb.buf[rb.head] } -// TailIndex returns the absolute index that will be assigned to the next Add. +// TailIndex returns the slot index that will be assigned to the next Add. func (rb *RingBuffer) TailIndex() int { - return rb.nextIndex + return int(atomic.LoadInt64(&rb.nextIndex)) % rb.capacity } + func (rb *RingBuffer) ActiveEntries() int { - return (rb.nextIndex - rb.head + rb.capacity) % rb.capacity + next := int(atomic.LoadInt64(&rb.nextIndex)) % rb.capacity + return (next - rb.head + rb.capacity) % rb.capacity } diff --git a/flashring/internal/memtables/memtable.go b/flashring/internal/memtables/memtable.go index 3be40e4b..8b3d2ca5 100644 --- a/flashring/internal/memtables/memtable.go +++ b/flashring/internal/memtables/memtable.go @@ -2,6 +2,7 @@ package memtables import ( "errors" + "sync/atomic" "github.com/Meesho/BharatMLStack/flashring/internal/fs" ) @@ -16,12 +17,12 @@ var ( ) type Memtable struct { + currentOffset int64 // atomic; must be first for 64-bit alignment on 32-bit platforms + flushStarted uint32 // atomic; 0 = not started, 1 = flush triggered Id uint32 capacity int - currentOffset int file *fs.WrapAppendFile page *fs.AlignedPage - readyForFlush bool next *Memtable prev *Memtable ShardIdx uint32 @@ -49,13 +50,11 @@ func NewMemtable(config MemtableConfig) (*Memtable, error) { return nil, ErrPageBufferCapacityMismatch } return &Memtable{ - Id: config.id, - ShardIdx: config.shardIdx, - capacity: config.capacity, - currentOffset: 0, - file: config.file, - page: config.page, - readyForFlush: false, + Id: config.id, + ShardIdx: config.shardIdx, + capacity: config.capacity, + file: config.file, + page: config.page, }, nil } @@ -67,26 +66,32 @@ func (m *Memtable) Get(offset int, length uint16) ([]byte, error) { } func (m *Memtable) Put(buf []byte) (offset int, length uint16, readyForFlush bool) { - offset = m.currentOffset - if offset+len(buf) > m.capacity { - m.readyForFlush = true - return -1, 0, true + sz := int64(len(buf)) + newOffset := atomic.AddInt64(&m.currentOffset, sz) + start := newOffset - sz + + if newOffset > int64(m.capacity) { + if atomic.CompareAndSwapUint32(&m.flushStarted, 0, 1) { + return -1, 0, true + } + return -1, 0, false } - copy(m.page.Buf[offset:], buf) - m.currentOffset += len(buf) - return offset, uint16(len(buf)), false + copy(m.page.Buf[start:start+sz], buf) + return int(start), uint16(len(buf)), false } -// Efforts to make zero copy func (m *Memtable) GetBufForAppend(size uint16) (bbuf []byte, offset int, length uint16, readyForFlush bool) { - offset = m.currentOffset - if offset+int(size) > m.capacity { - m.readyForFlush = true - return nil, -1, 0, true + sz := int64(size) + newOffset := atomic.AddInt64(&m.currentOffset, sz) + start := newOffset - sz + + if newOffset > int64(m.capacity) { + if atomic.CompareAndSwapUint32(&m.flushStarted, 0, 1) { + return nil, -1, 0, true + } + return nil, -1, 0, false } - bbuf = m.page.Buf[offset : offset+int(size)] - m.currentOffset += int(size) - return bbuf, offset, size, false + return m.page.Buf[start:newOffset], int(start), size, false } func (m *Memtable) GetBufForRead(offset int, length uint16) (bbuf []byte, exists bool) { @@ -97,7 +102,7 @@ func (m *Memtable) GetBufForRead(offset int, length uint16) (bbuf []byte, exists } func (m *Memtable) Flush() (n int, fileOffset int64, err error) { - if !m.readyForFlush { + if atomic.LoadUint32(&m.flushStarted) == 0 { return 0, 0, ErrMemtableNotReadyForFlush } @@ -114,8 +119,8 @@ func (m *Memtable) Flush() (n int, fileOffset int64, err error) { return 0, 0, err } - m.currentOffset = 0 - m.readyForFlush = false + atomic.StoreInt64(&m.currentOffset, 0) + atomic.StoreUint32(&m.flushStarted, 0) return totalWritten, fileOffset, nil } diff --git a/flashring/internal/memtables/memtable_bench_test.go b/flashring/internal/memtables/memtable_bench_test.go index 40175e62..9ae6c968 100644 --- a/flashring/internal/memtables/memtable_bench_test.go +++ b/flashring/internal/memtables/memtable_bench_test.go @@ -6,6 +6,7 @@ import ( "crypto/rand" "fmt" "path/filepath" + "sync/atomic" "testing" "github.com/Meesho/BharatMLStack/flashring/internal/fs" @@ -87,18 +88,16 @@ func BenchmarkMemtable_Put_Small(b *testing.B) { b.SetBytes(SMALL_DATA_SIZE) for i := 0; i < b.N; i++ { - if memtable.readyForFlush { - // Reset memtable for continued benchmarking - memtable.currentOffset = 0 - memtable.readyForFlush = false + if atomic.LoadUint32(&memtable.flushStarted) != 0 { + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) } _, _, readyForFlush := memtable.Put(data) if readyForFlush { - // Don't count flush operations in this benchmark b.StopTimer() - memtable.currentOffset = 0 - memtable.readyForFlush = false + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) b.StartTimer() } } @@ -115,16 +114,16 @@ func BenchmarkMemtable_Put_Medium(b *testing.B) { b.SetBytes(MEDIUM_DATA_SIZE) for i := 0; i < b.N; i++ { - if memtable.readyForFlush { - memtable.currentOffset = 0 - memtable.readyForFlush = false + if atomic.LoadUint32(&memtable.flushStarted) != 0 { + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) } _, _, readyForFlush := memtable.Put(data) if readyForFlush { b.StopTimer() - memtable.currentOffset = 0 - memtable.readyForFlush = false + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) b.StartTimer() } } @@ -141,16 +140,16 @@ func BenchmarkMemtable_Put_Large(b *testing.B) { b.SetBytes(LARGE_DATA_SIZE) for i := 0; i < b.N; i++ { - if memtable.readyForFlush { - memtable.currentOffset = 0 - memtable.readyForFlush = false + if atomic.LoadUint32(&memtable.flushStarted) != 0 { + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) } _, _, readyForFlush := memtable.Put(data) if readyForFlush { b.StopTimer() - memtable.currentOffset = 0 - memtable.readyForFlush = false + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) b.StartTimer() } } @@ -167,16 +166,16 @@ func BenchmarkMemtable_Put_VeryLarge(b *testing.B) { b.SetBytes(VERY_LARGE_DATA_SIZE) for i := 0; i < b.N; i++ { - if memtable.readyForFlush { - memtable.currentOffset = 0 - memtable.readyForFlush = false + if atomic.LoadUint32(&memtable.flushStarted) != 0 { + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) } _, _, readyForFlush := memtable.Put(data) if readyForFlush { b.StopTimer() - memtable.currentOffset = 0 - memtable.readyForFlush = false + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) b.StartTimer() } } @@ -310,8 +309,7 @@ func BenchmarkMemtable_Flush(b *testing.B) { if err != nil { b.Fatalf("Flush failed: %v", err) } - // Force re-flush same data in each iteration - memtable.readyForFlush = true + atomic.StoreUint32(&memtable.flushStarted, 1) } fs.Unmap(page) } @@ -354,10 +352,9 @@ func BenchmarkMemtable_MixedOperations(b *testing.B) { } } else { // Put operation - if memtable.readyForFlush { - // Reset for continued benchmarking - memtable.currentOffset = 0 - memtable.readyForFlush = false + if atomic.LoadUint32(&memtable.flushStarted) != 0 { + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) } memtable.Put(putData) } @@ -376,16 +373,16 @@ func BenchmarkMemtable_SequentialWrites(b *testing.B) { b.SetBytes(MEDIUM_DATA_SIZE) for i := 0; i < b.N; i++ { - if memtable.readyForFlush { - memtable.currentOffset = 0 - memtable.readyForFlush = false + if atomic.LoadUint32(&memtable.flushStarted) != 0 { + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) } _, _, readyForFlush := memtable.Put(data) if readyForFlush { b.StopTimer() - memtable.currentOffset = 0 - memtable.readyForFlush = false + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) b.StartTimer() } } @@ -439,16 +436,16 @@ func BenchmarkMemtable_MemoryCopy(b *testing.B) { b.SetBytes(int64(size)) for i := 0; i < b.N; i++ { - if memtable.readyForFlush { - memtable.currentOffset = 0 - memtable.readyForFlush = false + if atomic.LoadUint32(&memtable.flushStarted) != 0 { + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) } _, _, readyForFlush := memtable.Put(data) if readyForFlush { b.StopTimer() - memtable.currentOffset = 0 - memtable.readyForFlush = false + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) b.StartTimer() } } @@ -490,9 +487,12 @@ func BenchmarkMemtable_FullLifecycle(b *testing.B) { // Add data that will exceed capacity to trigger flush overflowData := generateRandomData(2000) _, _, readyForFlush := memtable.Put(overflowData) - if !readyForFlush { + if !readyForFlush && atomic.LoadUint32(&memtable.flushStarted) == 0 { b.Fatalf("Failed to trigger flush in lifecycle test") } + if atomic.LoadUint32(&memtable.flushStarted) == 0 { + atomic.StoreUint32(&memtable.flushStarted, 1) + } // Flush _, _, err = memtable.Flush() @@ -541,7 +541,7 @@ func BenchmarkMemtable_SingleThreadedWorkload(b *testing.B) { } } else { // Write operation (20%) - only if space available - if !memtable.readyForFlush { + if atomic.LoadUint32(&memtable.flushStarted) == 0 { memtable.Put(data) } } @@ -561,10 +561,9 @@ func BenchmarkMemtable_CPUIntensive(b *testing.B) { b.SetBytes(MEDIUM_DATA_SIZE) for i := 0; i < b.N; i++ { - if memtable.readyForFlush { - // Reset for continued benchmarking - memtable.currentOffset = 0 - memtable.readyForFlush = false + if atomic.LoadUint32(&memtable.flushStarted) != 0 { + atomic.StoreInt64(&memtable.currentOffset, 0) + atomic.StoreUint32(&memtable.flushStarted, 0) } // Perform put operation diff --git a/flashring/internal/memtables/memtable_test.go b/flashring/internal/memtables/memtable_test.go index 2d694218..4a3603e1 100644 --- a/flashring/internal/memtables/memtable_test.go +++ b/flashring/internal/memtables/memtable_test.go @@ -2,6 +2,7 @@ package memtables import ( "path/filepath" + "sync/atomic" "testing" "github.com/Meesho/BharatMLStack/flashring/internal/fs" @@ -65,11 +66,11 @@ func TestNewMemtable_Success(t *testing.T) { if memtable.capacity != capacity { t.Errorf("Expected capacity %d, got %d", capacity, memtable.capacity) } - if memtable.currentOffset != 0 { - t.Errorf("Expected currentOffset 0, got %d", memtable.currentOffset) + if atomic.LoadInt64(&memtable.currentOffset) != 0 { + t.Errorf("Expected currentOffset 0, got %d", atomic.LoadInt64(&memtable.currentOffset)) } - if memtable.readyForFlush != false { - t.Errorf("Expected readyForFlush false, got %v", memtable.readyForFlush) + if atomic.LoadUint32(&memtable.flushStarted) != 0 { + t.Errorf("Expected flushStarted 0, got %d", atomic.LoadUint32(&memtable.flushStarted)) } } @@ -256,8 +257,8 @@ func TestMemtable_Put_Success(t *testing.T) { if readyForFlush { t.Errorf("Expected readyForFlush false, got %v", readyForFlush) } - if memtable.currentOffset != len(testData) { - t.Errorf("Expected currentOffset %d, got %d", len(testData), memtable.currentOffset) + if atomic.LoadInt64(&memtable.currentOffset) != int64(len(testData)) { + t.Errorf("Expected currentOffset %d, got %d", len(testData), atomic.LoadInt64(&memtable.currentOffset)) } // Verify data was written to buffer @@ -305,8 +306,8 @@ func TestMemtable_Put_ExceedsCapacity(t *testing.T) { if !readyForFlush { t.Errorf("Expected readyForFlush true, got %v", readyForFlush) } - if !memtable.readyForFlush { - t.Errorf("Expected memtable.readyForFlush true, got %v", memtable.readyForFlush) + if atomic.LoadUint32(&memtable.flushStarted) != 1 { + t.Errorf("Expected memtable.flushStarted 1, got %d", atomic.LoadUint32(&memtable.flushStarted)) } } @@ -389,7 +390,7 @@ func TestMemtable_Flush_Success(t *testing.T) { // Put data that exceeds capacity to trigger ready for flush memtable.Put(make([]byte, 200)) - if !memtable.readyForFlush { + if atomic.LoadUint32(&memtable.flushStarted) != 1 { t.Fatalf("Expected memtable to be ready for flush") } @@ -404,8 +405,8 @@ func TestMemtable_Flush_Success(t *testing.T) { if fileOffset < 0 { t.Errorf("Expected positive fileOffset, got %d", fileOffset) } - if memtable.readyForFlush { - t.Errorf("Expected readyForFlush to be false after flush, got %v", memtable.readyForFlush) + if atomic.LoadUint32(&memtable.flushStarted) != 0 { + t.Errorf("Expected flushStarted to be 0 after flush, got %d", atomic.LoadUint32(&memtable.flushStarted)) } } @@ -513,7 +514,7 @@ func TestMemtable_Integration(t *testing.T) { } // Fill up the memtable to trigger ready for flush - for !memtable.readyForFlush { + for atomic.LoadUint32(&memtable.flushStarted) == 0 { memtable.Put([]byte("filler")) } diff --git a/flashring/internal/shard/shard_cache.go b/flashring/internal/shard/shard_cache.go index 5856b2d2..3c463e4a 100644 --- a/flashring/internal/shard/shard_cache.go +++ b/flashring/internal/shard/shard_cache.go @@ -3,6 +3,7 @@ package filecache import ( "fmt" "hash/crc32" + "runtime" "sync" "time" @@ -109,17 +110,31 @@ func NewShardCache(config ShardCacheConfig, sl *sync.RWMutex) *ShardCache { func (fc *ShardCache) Put(key string, value []byte, ttlMinutes uint16) error { size := 4 + len(key) + len(value) - mt, mtId, _ := fc.mm.GetMemtable() err := fc.dm.ExecuteDeleteIfNeeded() if err != nil { return err } - buf, offset, length, readyForFlush := mt.GetBufForAppend(uint16(size)) - if readyForFlush { - fc.mm.Flush() + + var buf []byte + var offset int + var length uint16 + var mtId uint32 + + for { + var mt *memtables.Memtable mt, mtId, _ = fc.mm.GetMemtable() - buf, offset, length, _ = mt.GetBufForAppend(uint16(size)) + var readyForFlush bool + buf, offset, length, readyForFlush = mt.GetBufForAppend(uint16(size)) + if buf != nil { + break + } + if readyForFlush { + fc.mm.Flush() + } else { + runtime.Gosched() + } } + copy(buf[4:], key) copy(buf[4+len(key):], value) crc := crc32.ChecksumIEEE(buf[4:]) diff --git a/flashring/pkg/cache/cache.go b/flashring/pkg/cache/cache.go index 5174382b..71168978 100644 --- a/flashring/pkg/cache/cache.go +++ b/flashring/pkg/cache/cache.go @@ -146,9 +146,9 @@ func NewWrapCache(config WrapCacheConfig, mountPoint string) (*WrapCache, error) batchReader, err := fs.NewParallelBatchIoUringReader(fs.BatchIoUringConfig{ RingDepth: 256, MaxBatch: 256, - Window: time.Millisecond * 2, + Window: time.Microsecond * 500, QueueSize: 1024, - }, 2) + }, 1) if err != nil { log.Error().Err(err).Msg("Failed to create batched io_uring reader, falling back to per-shard rings") batchReader = nil @@ -202,9 +202,9 @@ func (wc *WrapCache) Put(key string, value []byte, exptimeInMinutes uint16) erro metrics.Timing(metrics.KEY_PUT_LATENCY, time.Since(start), metrics.GetShardTag(shardIdx)) }() - wc.shardLocks[shardIdx].Lock() - metrics.Timing(metrics.LATENCY_WLOCK, time.Since(start), []string{}) - defer wc.shardLocks[shardIdx].Unlock() + // wc.shardLocks[shardIdx].Lock() + // metrics.Timing(metrics.LATENCY_WLOCK, time.Since(start), []string{}) + // defer wc.shardLocks[shardIdx].Unlock() err := wc.shards[shardIdx].Put(key, value, exptimeInMinutes) if err != nil { diff --git a/flashring/pkg/metrics/metric.go b/flashring/pkg/metrics/metric.go index f6506da8..a145f5a7 100644 --- a/flashring/pkg/metrics/metric.go +++ b/flashring/pkg/metrics/metric.go @@ -114,6 +114,8 @@ func Init() { statsDClient, err = statsd.New( telegrafAddress, statsd.WithTags(globalTags), + statsd.WithChannelMode(), + statsd.WithChannelModeBufferSize(4096), ) if err != nil {