Skip to content

Commit c30255c

Browse files
committed
Fix orphaned pvtdata missing entries from stale iterator during elg conversion
Re-create the LevelDB iterator after releasing and re-acquiring purgerLock in processCollElgEvents. The old snapshot-based iterator yields entries deleted by the purger during the unlock window, creating eligible missing data entries with no expiry entry that are never cleaned up. Signed-off-by: Ady0333 <adityashinde1525@gmail.com>
1 parent 1ece732 commit c30255c

2 files changed

Lines changed: 70 additions & 0 deletions

File tree

core/ledger/pvtdatastorage/store.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,6 +1054,13 @@ func (s *Store) processCollElgEvents() error {
10541054
s.purgerLock.Unlock()
10551055
time.Sleep(sleepTime * time.Millisecond)
10561056
s.purgerLock.Lock()
1057+
// Re-create the iterator so it reflects any deletes
1058+
// made by the purger while the lock was released.
1059+
collItr.Release()
1060+
collItr, err = s.db.GetIterator(startKey, endKey)
1061+
if err != nil {
1062+
return err
1063+
}
10571064
}
10581065
} // entry loop
10591066

core/ledger/pvtdatastorage/store_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1765,6 +1765,69 @@ func testutilWaitForCollElgProcToFinish(s *Store) {
17651765
s.collElgProcSync.waitForDone()
17661766
}
17671767

1768+
func TestProcessCollElgEventsIteratorRecreatedAfterPurge(t *testing.T) {
1769+
btlPolicy := btltestutil.SampleBTLPolicy(map[[2]string]uint64{
1770+
{"ns-1", "coll-1"}: 1,
1771+
})
1772+
conf := pvtDataConf()
1773+
conf.MaxBatchSize = 1
1774+
conf.BatchesInterval = 500
1775+
1776+
env := NewTestStoreEnv(t, "TestCollElgStaleIter", btlPolicy, conf)
1777+
defer env.Cleanup()
1778+
s := env.TestStore
1779+
1780+
s.purgeInterval = math.MaxUint64
1781+
1782+
s.purgerLock.Lock()
1783+
s.purgerLock.Unlock() //lint:ignore SA2001 syncpoint
1784+
1785+
require.NoError(t, s.Commit(0, nil, nil, nil))
1786+
1787+
const numBlocks = uint64(10)
1788+
for blk := uint64(1); blk <= numBlocks; blk++ {
1789+
missingData := make(ledger.TxMissingPvtData)
1790+
missingData.Add(1, "ns-1", "coll-1", false)
1791+
require.NoError(t, s.Commit(blk, nil, missingData, nil))
1792+
}
1793+
1794+
const lastBlock = uint64(15)
1795+
for blk := numBlocks + 1; blk <= lastBlock; blk++ {
1796+
require.NoError(t, s.Commit(blk, nil, nil, nil))
1797+
}
1798+
1799+
{
1800+
key := encodeCollElgKey(lastBlock)
1801+
m := newCollElgInfo(map[string][]string{"ns-1": []string{"coll-1"}})
1802+
val, err := encodeCollElgVal(m)
1803+
require.NoError(t, err)
1804+
b := s.db.NewUpdateBatch()
1805+
b.Put(key, val)
1806+
require.NoError(t, s.db.WriteBatch(b, true))
1807+
}
1808+
1809+
procErr := make(chan error, 1)
1810+
go func() {
1811+
procErr <- s.processCollElgEvents()
1812+
}()
1813+
1814+
time.Sleep(100 * time.Millisecond)
1815+
1816+
s.purgerLock.Lock()
1817+
require.NoError(t, s.purgeExpiredData(0, lastBlock))
1818+
s.purgerLock.Unlock()
1819+
1820+
require.NoError(t, <-procErr)
1821+
1822+
for blk := uint64(1); blk <= numBlocks; blk++ {
1823+
k := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: blk}}
1824+
require.False(t,
1825+
testElgPrioMissingDataKeyExists(t, s, k),
1826+
"orphaned eligible prioritized missing data entry must not exist for expired block %d", blk,
1827+
)
1828+
}
1829+
}
1830+
17681831
func produceSamplePvtdata(t *testing.T, txNum uint64, nsColls []string) *ledger.TxPvtData {
17691832
builder := rwsetutil.NewRWSetBuilder()
17701833
for _, nsColl := range nsColls {

0 commit comments

Comments
 (0)