Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions flashring/cmd/flashringtest/plan_readthrough_gausian.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ func planReadthroughGaussian() {
)

flag.StringVar(&mountPoint, "mount", "/mnt/disks/nvme/", "data directory for shard files")
flag.IntVar(&numShards, "shards", 50, "number of shards")
flag.IntVar(&numShards, "shards", 10, "number of shards")
flag.IntVar(&keysPerShard, "keys-per-shard", 6_00_000, "keys per shard")
flag.IntVar(&memtableMB, "memtable-mb", 2, "memtable size in MiB")
flag.IntVar(&memtableMB, "memtable-mb", 8, "memtable size in MiB")
flag.Float64Var(&fileSizeMultiplier, "file-size-multiplier", 0.25, "file size in GiB per shard")
flag.IntVar(&readWorkers, "readers", 16, "number of read workers")
flag.IntVar(&writeWorkers, "writers", 16, "number of write workers")
Expand Down Expand Up @@ -214,7 +214,7 @@ func planReadthroughGaussian() {
missedKeyChanList[i] = make(chan int)
}

totalKeys := 30_000_000
totalKeys := 10_000_000
str1kb := strings.Repeat("a", 1024)
str1kb = "%d" + str1kb

Expand Down
30 changes: 18 additions & 12 deletions flashring/internal/indicesV3/delete_manager.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package indicesv2

import (
"errors"
"fmt"

"github.com/Meesho/BharatMLStack/flashring/internal/fs"
"github.com/puzpuzpuz/xsync/v4"
"github.com/rs/zerolog/log"
)

type DeleteManager struct {
memtableData map[uint32]int
memtableData *xsync.Map[uint32, int]
toBeDeletedMemId uint32
keyIndex *Index
wrapFile *fs.WrapAppendFile
Expand All @@ -20,7 +20,7 @@ type DeleteManager struct {

func NewDeleteManager(keyIndex *Index, wrapFile *fs.WrapAppendFile, deleteAmortizedStep int) *DeleteManager {
return &DeleteManager{
memtableData: make(map[uint32]int),
memtableData: xsync.NewMap[uint32, int](),
toBeDeletedMemId: 0,
keyIndex: keyIndex,
wrapFile: wrapFile,
Expand All @@ -30,7 +30,9 @@ func NewDeleteManager(keyIndex *Index, wrapFile *fs.WrapAppendFile, deleteAmorti
}

func (dm *DeleteManager) IncMemtableKeyCount(memId uint32) {
dm.memtableData[memId]++
dm.memtableData.Compute(memId, func(oldValue int, loaded bool) (int, xsync.ComputeOp) {
return oldValue + 1, xsync.UpdateOp
})
}

func (dm *DeleteManager) ExecuteDeleteIfNeeded() error {
Expand All @@ -40,19 +42,22 @@ func (dm *DeleteManager) ExecuteDeleteIfNeeded() error {
return fmt.Errorf("delete failed")
}
if memtableId != dm.toBeDeletedMemId {
dm.memtableData[dm.toBeDeletedMemId] = dm.memtableData[dm.toBeDeletedMemId] - count
newVal, _ := dm.memtableData.Compute(dm.toBeDeletedMemId, func(oldValue int, loaded bool) (int, xsync.ComputeOp) {
return oldValue - count, xsync.UpdateOp
})
log.Debug().Msgf("memtableId: %d, toBeDeletedMemId: %d", memtableId, dm.toBeDeletedMemId)
if dm.memtableData[dm.toBeDeletedMemId] != 0 {
if newVal != 0 {
return fmt.Errorf("memtableData[dm.toBeDeletedMemId] != 0")
}
delete(dm.memtableData, dm.toBeDeletedMemId)
dm.memtableData.Delete(dm.toBeDeletedMemId)
dm.toBeDeletedMemId = memtableId
dm.deleteInProgress = false
dm.deleteCount = 0
return nil
} else {
dm.memtableData[memtableId] -= count
//log.Debug().Msgf("memtableData[%d] = %d", memtableId, dm.memtableData[memtableId])
dm.memtableData.Compute(memtableId, func(oldValue int, loaded bool) (int, xsync.ComputeOp) {
return oldValue - count, xsync.UpdateOp
})
}
return nil
}
Expand All @@ -62,9 +67,10 @@ func (dm *DeleteManager) ExecuteDeleteIfNeeded() error {

if trimNeeded || nextAddNeedsDelete {
dm.deleteInProgress = true
dm.deleteCount = int(dm.memtableData[dm.toBeDeletedMemId] / dm.deleteAmortizedStep)
val, _ := dm.memtableData.Load(dm.toBeDeletedMemId)
dm.deleteCount = val / dm.deleteAmortizedStep
if dm.deleteCount == 0 {
dm.deleteCount = int(dm.memtableData[dm.toBeDeletedMemId] % dm.deleteAmortizedStep)
dm.deleteCount = val % dm.deleteAmortizedStep
}
memIdAtHead, err := dm.keyIndex.PeekMemIdAtHead()
if err != nil {
Expand All @@ -75,7 +81,7 @@ func (dm *DeleteManager) ExecuteDeleteIfNeeded() error {
}

dm.wrapFile.TrimHead()
return errors.New("trim needed retry this write")
return fmt.Errorf("trim needed or next add needs delete")
}
return nil
}
22 changes: 12 additions & 10 deletions flashring/internal/indicesV3/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/Meesho/BharatMLStack/flashring/internal/maths"
"github.com/cespare/xxhash/v2"
"github.com/puzpuzpuz/xsync/v4"
"github.com/zeebo/xxh3"
)

Expand All @@ -23,7 +22,7 @@ const (

type Index struct {
mu *sync.RWMutex
rm *xsync.Map[uint64, int]
rm map[uint64]int
rb *RingBuffer
mc *maths.MorrisLogCounter
startAt int64
Expand All @@ -36,7 +35,7 @@ func NewIndex(hashBits int, rbInitial, rbMax, deleteAmortizedStep int, mu *sync.
}
return &Index{
mu: mu,
rm: xsync.NewMap[uint64, int](),
rm: make(map[uint64]int),
rb: NewRingBuffer(rbInitial, rbMax),
mc: maths.New(12),
startAt: time.Now().Unix(),
Expand All @@ -53,15 +52,17 @@ func (i *Index) Put(key string, length, ttlInMinutes uint16, memId, offset uint3
delta := uint16(expiryAt - (i.startAt / 60))
encode(key, length, delta, lastAccess, freq, memId, offset, entry)

if headIdx, ok := i.rm.Load(hlo); !ok {
i.mu.Lock()
defer i.mu.Unlock()
if headIdx, ok := i.rm[hlo]; !ok {
encodeHashNextPrev(hhi, hlo, -1, -1, hashNextPrev)
i.rm.Store(hlo, idx)
i.rm[hlo] = idx
return
} else {
_, headHashNextPrev, _ := i.rb.Get(int(headIdx))
encodeUpdatePrev(int32(idx), headHashNextPrev)
encodeHashNextPrev(hhi, hlo, -1, int32(headIdx), hashNextPrev)
i.rm.Store(hlo, idx)
i.rm[hlo] = idx
return
}

Expand All @@ -70,8 +71,9 @@ func (i *Index) Put(key string, length, ttlInMinutes uint16, memId, offset uint3
func (i *Index) Get(key string) (length, lastAccess, remainingTTL uint16, freq uint64, memId, offset uint32, status Status) {
hhi, hlo := hash128(key)

idx, ok := i.rm.Load(hlo)

i.mu.RLock()
idx, ok := i.rm[hlo]
i.mu.RUnlock()
if ok {
for {
entry, hashNextPrev, _ := i.rb.Get(int(idx))
Expand Down Expand Up @@ -110,9 +112,9 @@ func (ix *Index) Delete(count int) (uint32, int) {
}
delMemId, _ := decodeMemIdOffset(deleted)
deletedHlo := decodeHashLo(deletedHashNextPrev)
mapIdx, ok := ix.rm.Load(deletedHlo)
mapIdx, ok := ix.rm[deletedHlo]
if ok && mapIdx == deletedIdx {
ix.rm.Delete(deletedHlo)
delete(ix.rm, deletedHlo)
} else if ok && hasPrev(deletedHashNextPrev) {
prevIdx := decodePrev(deletedHashNextPrev)
_, hashNextPrev, _ := ix.rb.Get(int(prevIdx))
Expand Down
43 changes: 23 additions & 20 deletions flashring/internal/indicesV3/rb.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package indicesv2

import "sync/atomic"

// Entry represents a 32-byte value. Adjust fields as needed.
type Entry [16]byte
type HashNextPrev [3]uint64
Expand All @@ -8,14 +10,14 @@ type HashNextPrev [3]uint64
// It maintains a sliding window of the most recent entries. Add returns an
// absolute index which can be used with Get.
type RingBuffer struct {
nextIndex int64 // atomic; must be first for 64-bit alignment on 32-bit platforms
wrapped uint32 // atomic; 0 = false, 1 = true (one-time transition)
buf []Entry
hashTable []HashNextPrev
head int
tail int
size int
nextIndex int
capacity int // Fixed capacity (initial = max)
wrapped bool
capacity int
}

// NewRingBuffer creates a ring buffer with the given initial and maximum
Expand All @@ -30,37 +32,36 @@ func NewRingBuffer(initial, max int) *RingBuffer {
buf: make([]Entry, capacity),
hashTable: make([]HashNextPrev, capacity),
capacity: capacity,
wrapped: false,
}
}

// Add inserts e into the buffer and returns its absolute index. When the buffer
// is full it wraps around and overwrites the oldest entry.
func (rb *RingBuffer) Add(e *Entry) int {
// Store the entry at current tail position
rb.buf[rb.nextIndex] = *e
idx := rb.nextIndex
rb.nextIndex = (rb.nextIndex + 1) % rb.capacity
if rb.nextIndex == rb.head {
raw := atomic.AddInt64(&rb.nextIndex, 1) - 1
idx := int(raw) % rb.capacity
rb.buf[idx] = *e
nextIdx := int(raw+1) % rb.capacity
if nextIdx == rb.head {
rb.head = (rb.head + 1) % rb.capacity
}

return idx
}

func (rb *RingBuffer) NextAddNeedsDelete() bool {
return rb.nextIndex == rb.head && rb.wrapped
nextIdx := int(atomic.LoadInt64(&rb.nextIndex)) % rb.capacity
return nextIdx == rb.head && atomic.LoadUint32(&rb.wrapped) == 1
}

func (rb *RingBuffer) GetNextFreeSlot() (*Entry, *HashNextPrev, int, bool) {
idx := rb.nextIndex
rb.nextIndex = (rb.nextIndex + 1) % rb.capacity
raw := atomic.AddInt64(&rb.nextIndex, 1) - 1
idx := int(raw) % rb.capacity

shouldDelete := false
if rb.nextIndex == rb.head {
// rb.head = (rb.head + 1) % rb.capacity
rb.wrapped = true
nextIdx := int(raw+1) % rb.capacity
if nextIdx == rb.head {
atomic.StoreUint32(&rb.wrapped, 1)
shouldDelete = true

}
return &rb.buf[idx], &rb.hashTable[idx], idx, shouldDelete
}
Expand All @@ -85,10 +86,12 @@ func (rb *RingBuffer) Delete() (*Entry, *HashNextPrev, int, *Entry) {
return &deleted, &deletedHashNextPrev, deletedIdx, &rb.buf[rb.head]
}

// TailIndex returns the absolute index that will be assigned to the next Add.
// TailIndex returns the slot index that will be assigned to the next Add.
func (rb *RingBuffer) TailIndex() int {
return rb.nextIndex
return int(atomic.LoadInt64(&rb.nextIndex)) % rb.capacity
}

func (rb *RingBuffer) ActiveEntries() int {
return (rb.nextIndex - rb.head + rb.capacity) % rb.capacity
next := int(atomic.LoadInt64(&rb.nextIndex)) % rb.capacity
return (next - rb.head + rb.capacity) % rb.capacity
}
59 changes: 32 additions & 27 deletions flashring/internal/memtables/memtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package memtables

import (
"errors"
"sync/atomic"

"github.com/Meesho/BharatMLStack/flashring/internal/fs"
)
Expand All @@ -16,12 +17,12 @@ var (
)

type Memtable struct {
currentOffset int64 // atomic; must be first for 64-bit alignment on 32-bit platforms
flushStarted uint32 // atomic; 0 = not started, 1 = flush triggered
Id uint32
capacity int
currentOffset int
file *fs.WrapAppendFile
page *fs.AlignedPage
readyForFlush bool
next *Memtable
prev *Memtable
ShardIdx uint32
Expand Down Expand Up @@ -49,13 +50,11 @@ func NewMemtable(config MemtableConfig) (*Memtable, error) {
return nil, ErrPageBufferCapacityMismatch
}
return &Memtable{
Id: config.id,
ShardIdx: config.shardIdx,
capacity: config.capacity,
currentOffset: 0,
file: config.file,
page: config.page,
readyForFlush: false,
Id: config.id,
ShardIdx: config.shardIdx,
capacity: config.capacity,
file: config.file,
page: config.page,
}, nil
}

Expand All @@ -67,26 +66,32 @@ func (m *Memtable) Get(offset int, length uint16) ([]byte, error) {
}

func (m *Memtable) Put(buf []byte) (offset int, length uint16, readyForFlush bool) {
offset = m.currentOffset
if offset+len(buf) > m.capacity {
m.readyForFlush = true
return -1, 0, true
sz := int64(len(buf))
newOffset := atomic.AddInt64(&m.currentOffset, sz)
start := newOffset - sz

if newOffset > int64(m.capacity) {
if atomic.CompareAndSwapUint32(&m.flushStarted, 0, 1) {
return -1, 0, true
}
return -1, 0, false
}
copy(m.page.Buf[offset:], buf)
m.currentOffset += len(buf)
return offset, uint16(len(buf)), false
copy(m.page.Buf[start:start+sz], buf)
return int(start), uint16(len(buf)), false
}

// Efforts to make zero copy
func (m *Memtable) GetBufForAppend(size uint16) (bbuf []byte, offset int, length uint16, readyForFlush bool) {
offset = m.currentOffset
if offset+int(size) > m.capacity {
m.readyForFlush = true
return nil, -1, 0, true
sz := int64(size)
newOffset := atomic.AddInt64(&m.currentOffset, sz)
start := newOffset - sz

if newOffset > int64(m.capacity) {
if atomic.CompareAndSwapUint32(&m.flushStarted, 0, 1) {
return nil, -1, 0, true
}
return nil, -1, 0, false
}
bbuf = m.page.Buf[offset : offset+int(size)]
m.currentOffset += int(size)
return bbuf, offset, size, false
return m.page.Buf[start:newOffset], int(start), size, false
}

func (m *Memtable) GetBufForRead(offset int, length uint16) (bbuf []byte, exists bool) {
Expand All @@ -97,7 +102,7 @@ func (m *Memtable) GetBufForRead(offset int, length uint16) (bbuf []byte, exists
}

func (m *Memtable) Flush() (n int, fileOffset int64, err error) {
if !m.readyForFlush {
if atomic.LoadUint32(&m.flushStarted) == 0 {
return 0, 0, ErrMemtableNotReadyForFlush
}

Expand All @@ -114,8 +119,8 @@ func (m *Memtable) Flush() (n int, fileOffset int64, err error) {
return 0, 0, err
}

m.currentOffset = 0
m.readyForFlush = false
atomic.StoreInt64(&m.currentOffset, 0)
atomic.StoreUint32(&m.flushStarted, 0)
return totalWritten, fileOffset, nil
}

Expand Down
Loading