From aa66befaaee472482391ceb01fad93f417bf6af2 Mon Sep 17 00:00:00 2001 From: cc004 <1176321897@qq.com> Date: Mon, 5 Jul 2021 13:42:10 +0800 Subject: [PATCH 01/12] feat: add time serise pool --- src/base/time_serise_pool.h | 113 ++++++++++++++++++++++++++++++ src/base/time_serise_pool_test.cc | 57 +++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 src/base/time_serise_pool.h create mode 100644 src/base/time_serise_pool_test.cc diff --git a/src/base/time_serise_pool.h b/src/base/time_serise_pool.h new file mode 100644 index 00000000000..6aada6db51c --- /dev/null +++ b/src/base/time_serise_pool.h @@ -0,0 +1,113 @@ +/* + * Copyright 2021 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SRC_BASE_TIME_SERISE_POOL_H_ +#define SRC_BASE_TIME_SERISE_POOL_H_ + +#include +#include +#include + +namespace openmldb { +namespace base { + + +class TimeBucket { + public: + TimeBucket(uint32_t block_size) : block_size_(block_size), current_offset_(block_size + 1), object_num_(0) { + head_ = (Block*)malloc(sizeof(Block*)); //empty block at end + head_->next = NULL; + } + void Clear() + { + auto p = head_; + while (p) + { + auto q = p->next; + free(p); + p = q; + } + } + void* Alloc(uint32_t size) { + object_num_++; + if (current_offset_ + size <= block_size_ - sizeof(Block)) { + void* addr = head_->data + current_offset_; + current_offset_ += size; + return addr; + } else { + auto block = (Block*)malloc(block_size_); + current_offset_ = size; + block->next = head_->next; + head_ = block; + return head_->data; + } + } + bool Free() // ret if fully freed + { + if (!--object_num_) + { + Clear(); + return true; + } + else return false; + } + private: + uint32_t block_size_; + uint32_t current_offset_; + uint32_t object_num_; + struct Block { + Block* next; + char data[]; + } *head_; +}; + +class TimeSerisePool { + public: + explicit TimeSerisePool(uint32_t block_size) : block_size_(block_size) { + + } + void* Alloc(uint32_t size, uint64_t time) { + auto pair = pool_.find(keyof(time)); + if (pair == pool_.end()){ + auto bucket = new TimeBucket(block_size_); + pool_.insert(std::pair>(keyof(time), std::unique_ptr(bucket))); + return bucket->Alloc(size); + } + + return pair->second->Alloc(size); + } + void Free(uint64_t time) { + auto pair = pool_.find(keyof(time)); + if (pair->second->Free()) { + pool_.erase(pair); + } + } + bool Empty(){ + return pool_.empty(); + } + private: + // key is the time / (60 * 60 * 1000) + uint32_t block_size_; + std::map> pool_; + inline static uint32_t keyof(uint64_t time) { + return time / (60 * 60 * 1000); + } +}; + +} +} + +#endif // SRC_BASE_TIME_SERISE_POOL_H_ \ No newline at end of file diff --git a/src/base/time_serise_pool_test.cc b/src/base/time_serise_pool_test.cc new file mode 100644 index 00000000000..e9a9e3576f0 --- /dev/null +++ b/src/base/time_serise_pool_test.cc @@ -0,0 +1,57 @@ +/* + * Copyright 2021 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "base/time_serise_pool.h" + +#include "gtest/gtest.h" + +#include + +namespace openmldb { +namespace base { + +class TimeSerisePoolTest : public ::testing::Test { + public: + TimeSerisePoolTest() {} + ~TimeSerisePoolTest() {} +}; + +TEST_F(TimeSerisePoolTest, FreeToEmpty) { + TimeSerisePool pool(1024); + std::vector times; + const int datasize = 1024/2; + char data[datasize]; + for (int i = 0; i < datasize; ++i) data[i] = i * i * i; + for (int i = 0; i < 1000; ++i){ + auto time = (i * i % 7) * (60 * 60 * 1000); + auto ptr = pool.Alloc(datasize, time); + memcpy(ptr, data, datasize); + times.push_back(time); + } + + for (auto time : times) pool.Free(time); + + ASSERT_TRUE(pool.Empty()); +} + +} // namespace base +} // namespace openmldb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From c83753491c1f58f0c4cd87f8d7e9233f9dd87356 Mon Sep 17 00:00:00 2001 From: cc004 <1176321897@qq.com> Date: Mon, 5 Jul 2021 14:27:44 +0800 Subject: [PATCH 02/12] fix code style --- src/base/time_serise_pool.h | 84 +++++++++++++++---------------- src/base/time_serise_pool_test.cc | 11 ++-- 2 files changed, 45 insertions(+), 50 deletions(-) diff --git a/src/base/time_serise_pool.h b/src/base/time_serise_pool.h index 6aada6db51c..82d6956abe2 100644 --- a/src/base/time_serise_pool.h +++ b/src/base/time_serise_pool.h @@ -18,24 +18,24 @@ #define SRC_BASE_TIME_SERISE_POOL_H_ #include -#include + #include +#include +#include namespace openmldb { namespace base { - class TimeBucket { - public: - TimeBucket(uint32_t block_size) : block_size_(block_size), current_offset_(block_size + 1), object_num_(0) { - head_ = (Block*)malloc(sizeof(Block*)); //empty block at end + public: + explicit TimeBucket(uint32_t block_size) + : block_size_(block_size), current_offset_(block_size + 1), object_num_(0) { + head_ = reinterpret_cast(malloc(sizeof(Block*))); // empty block at end head_->next = NULL; } - void Clear() - { + void Clear() { auto p = head_; - while (p) - { + while (p) { auto q = p->next; free(p); p = q; @@ -48,66 +48,62 @@ class TimeBucket { current_offset_ += size; return addr; } else { - auto block = (Block*)malloc(block_size_); + auto block = reinterpret_cast(malloc(block_size_)); current_offset_ = size; block->next = head_->next; head_ = block; return head_->data; } } - bool Free() // ret if fully freed - { - if (!--object_num_) - { - Clear(); - return true; + bool Free() { // ret if fully freed + if (!--object_num_) { + Clear(); + return true; + } else { + return false; } - else return false; } - private: + + private: uint32_t block_size_; uint32_t current_offset_; uint32_t object_num_; struct Block { Block* next; char data[]; - } *head_; + } * head_; }; class TimeSerisePool { - public: - explicit TimeSerisePool(uint32_t block_size) : block_size_(block_size) { - - } + public: + explicit TimeSerisePool(uint32_t block_size) : block_size_(block_size) {} void* Alloc(uint32_t size, uint64_t time) { - auto pair = pool_.find(keyof(time)); - if (pair == pool_.end()){ - auto bucket = new TimeBucket(block_size_); - pool_.insert(std::pair>(keyof(time), std::unique_ptr(bucket))); - return bucket->Alloc(size); - } + auto pair = pool_.find(keyof(time)); + if (pair == pool_.end()) { + auto bucket = new TimeBucket(block_size_); + pool_.insert( + std::pair>(keyof(time), std::unique_ptr(bucket))); + return bucket->Alloc(size); + } - return pair->second->Alloc(size); + return pair->second->Alloc(size); } void Free(uint64_t time) { - auto pair = pool_.find(keyof(time)); - if (pair->second->Free()) { - pool_.erase(pair); - } - } - bool Empty(){ - return pool_.empty(); + auto pair = pool_.find(keyof(time)); + if (pair->second->Free()) { + pool_.erase(pair); + } } - private: + bool Empty() { return pool_.empty(); } + + private: // key is the time / (60 * 60 * 1000) uint32_t block_size_; std::map> pool_; - inline static uint32_t keyof(uint64_t time) { - return time / (60 * 60 * 1000); - } + inline static uint32_t keyof(uint64_t time) { return time / (60 * 60 * 1000); } }; -} -} +} // namespace base +} // namespace openmldb -#endif // SRC_BASE_TIME_SERISE_POOL_H_ \ No newline at end of file +#endif // SRC_BASE_TIME_SERISE_POOL_H_ diff --git a/src/base/time_serise_pool_test.cc b/src/base/time_serise_pool_test.cc index e9a9e3576f0..2c5dc13ddac 100644 --- a/src/base/time_serise_pool_test.cc +++ b/src/base/time_serise_pool_test.cc @@ -14,13 +14,12 @@ * limitations under the License. */ - #include "base/time_serise_pool.h" -#include "gtest/gtest.h" - #include +#include "gtest/gtest.h" + namespace openmldb { namespace base { @@ -33,10 +32,10 @@ class TimeSerisePoolTest : public ::testing::Test { TEST_F(TimeSerisePoolTest, FreeToEmpty) { TimeSerisePool pool(1024); std::vector times; - const int datasize = 1024/2; - char data[datasize]; + const int datasize = 1024 / 2; + char *data = new char[datasize]; for (int i = 0; i < datasize; ++i) data[i] = i * i * i; - for (int i = 0; i < 1000; ++i){ + for (int i = 0; i < 1000; ++i) { auto time = (i * i % 7) * (60 * 60 * 1000); auto ptr = pool.Alloc(datasize, time); memcpy(ptr, data, datasize); From 5c9857f4ff3eabffee6b654f4d302ddbbe4d5df1 Mon Sep 17 00:00:00 2001 From: cc004 <1176321897@qq.com> Date: Mon, 5 Jul 2021 14:53:48 +0800 Subject: [PATCH 03/12] fix in reviews --- src/base/time_serise_pool.h | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/base/time_serise_pool.h b/src/base/time_serise_pool.h index 82d6956abe2..10a103f3230 100644 --- a/src/base/time_serise_pool.h +++ b/src/base/time_serise_pool.h @@ -78,19 +78,20 @@ class TimeSerisePool { public: explicit TimeSerisePool(uint32_t block_size) : block_size_(block_size) {} void* Alloc(uint32_t size, uint64_t time) { - auto pair = pool_.find(keyof(time)); + auto key = ComputeTimeSlot(time); + auto pair = pool_.find(key); if (pair == pool_.end()) { auto bucket = new TimeBucket(block_size_); pool_.insert( - std::pair>(keyof(time), std::unique_ptr(bucket))); + std::pair>(key, std::unique_ptr(bucket))); return bucket->Alloc(size); } return pair->second->Alloc(size); } void Free(uint64_t time) { - auto pair = pool_.find(keyof(time)); - if (pair->second->Free()) { + auto pair = pool_.find(ComputeTimeSlot(time)); + if (pair != pool_.end() && pair->second->Free()) { pool_.erase(pair); } } @@ -100,7 +101,7 @@ class TimeSerisePool { // key is the time / (60 * 60 * 1000) uint32_t block_size_; std::map> pool_; - inline static uint32_t keyof(uint64_t time) { return time / (60 * 60 * 1000); } + inline static uint32_t ComputeTimeSlot(uint64_t time) { return time / (60 * 60 * 1000); } }; } // namespace base From 6080f0030c9bcba4b56f99a0ab0d1821f207cea0 Mon Sep 17 00:00:00 2001 From: cc004 <1176321897@qq.com> Date: Mon, 5 Jul 2021 19:54:00 +0800 Subject: [PATCH 04/12] add destructor for timebucket --- src/base/time_serise_pool.h | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/base/time_serise_pool.h b/src/base/time_serise_pool.h index 10a103f3230..d3a1c2a0773 100644 --- a/src/base/time_serise_pool.h +++ b/src/base/time_serise_pool.h @@ -33,7 +33,7 @@ class TimeBucket { head_ = reinterpret_cast(malloc(sizeof(Block*))); // empty block at end head_->next = NULL; } - void Clear() { + ~TimeBucket() { auto p = head_; while (p) { auto q = p->next; @@ -56,12 +56,7 @@ class TimeBucket { } } bool Free() { // ret if fully freed - if (!--object_num_) { - Clear(); - return true; - } else { - return false; - } + return !--object_num_; } private: From 96b9a289116e84aa576ec8b282d10cd08a70077c Mon Sep 17 00:00:00 2001 From: cc004 <1176321897@qq.com> Date: Wed, 7 Jul 2021 11:18:23 +0800 Subject: [PATCH 05/12] add pool alloc based method for skiplist --- src/base/skiplist.h | 79 +++++++++++++++++++++++++++++++++++++++++- src/storage/segment.cc | 2 +- 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/src/base/skiplist.h b/src/base/skiplist.h index f7753f72d8d..8939c768f9b 100644 --- a/src/base/skiplist.h +++ b/src/base/skiplist.h @@ -24,6 +24,7 @@ #include #include "base/random.h" +#include "base/time_serise_pool.h" namespace openmldb { namespace base { @@ -43,6 +44,16 @@ struct DefaultComparator { template class Node { public: + // Set data reference and Node height + Node(const K& key, V& value, uint8_t height, std::atomic*>* preAlloc) // NOLINT + : height_(height), key_(key), value_(value) { + nexts_ = preAlloc; + } + + Node(uint8_t height, std::atomic*>* preAlloc) : height_(height), key_(), value_() { // NOLINT + nexts_ = preAlloc; + } + // Set data reference and Node height Node(const K& key, V& value, uint8_t height) // NOLINT : height_(height), key_(key), value_(value) { @@ -98,8 +109,8 @@ class Skiplist { Branch(branch), max_height_(0), compare_(compare), + pool_(NULL), rand_(0xdeadbeef), - head_(NULL), tail_(NULL) { head_ = new Node(MaxHeight); for (uint8_t i = 0; i < head_->Height(); i++) { @@ -107,6 +118,12 @@ class Skiplist { } max_height_.store(1, std::memory_order_relaxed); } + Skiplist(TimeSerisePool pool, uint8_t max_height, uint8_t branch, const Comparator& compare) + : Skiplist(max_height, branch, compare) + { + pool_ = pool; + } + ~Skiplist() { delete head_; } // Insert need external synchronized @@ -131,6 +148,29 @@ class Skiplist { return height; } + // Insert need external synchronized + // use iif skiplist is using a pool + uint8_t Insert(const K& key, V& value, uint64_t time) { // NOLINT + uint8_t height = RandomHeight(); + Node* pre[MaxHeight]; + FindLessOrEqual(key, pre); + if (height > GetMaxHeight()) { + for (uint8_t i = GetMaxHeight(); i < height; i++) { + pre[i] = head_; + } + max_height_.store(height, std::memory_order_relaxed); + } + Node* node = NewNode(key, value, height, time); + if (pre[0]->GetNext(0) == NULL) { + tail_.store(node, std::memory_order_release); + } + for (uint8_t i = 0; i < height; i++) { + node->SetNextNoBarrier(i, pre[i]->GetNextNoBarrier(i)); + pre[i]->SetNext(i, node); + } + return height; + } + bool IsEmpty() { if (head_->GetNextNoBarrier(0) == NULL) { return true; @@ -269,6 +309,7 @@ class Skiplist { } // Need external synchronized + // called iif skiplist is using tcalloc uint64_t Clear() { uint64_t cnt = 0; Node* node = head_->GetNext(0); @@ -291,6 +332,34 @@ class Skiplist { return cnt; } + // Need external synchronized + // use iif skiplist is using a pool + bool AddToFirst(const K& key, V& value, uint64_t time) { // NOLINT + { + Node* node = head_->GetNext(0); + if (node != NULL && compare_(key, node->GetKey()) > 0) { + return false; + } + } + uint8_t height = RandomHeight(); + Node* pre[MaxHeight]; + for (uint8_t i = 0; i < height; i++) { + pre[i] = head_; + } + if (height > GetMaxHeight()) { + max_height_.store(height, std::memory_order_relaxed); + } + Node* node = NewNode(key, value, height, time); + if (pre[0]->GetNext(0) == NULL) { + tail_.store(node, std::memory_order_release); + } + for (uint8_t i = 0; i < height; i++) { + node->SetNextNoBarrier(i, pre[i]->GetNextNoBarrier(i)); + pre[i]->SetNext(i, node); + } + return true; + } + // Need external synchronized bool AddToFirst(const K& key, V& value) { // NOLINT { @@ -363,6 +432,13 @@ class Skiplist { Iterator* NewIterator() { return new Iterator(this); } private: + Node* NewNode(const K& key, V& value, uint8_t height, uint64_t time) { // NOLINT + auto arrmem = pool_.Alloc(sizeof(std::atomic*>) * height, time); + auto nodemem = pool_.Alloc(sizeof(Node), time); + Node* node = new (nodemem)Node(key, value, height, arrmem); + return node; + } + Node* NewNode(const K& key, V& value, uint8_t height) { // NOLINT Node* node = new Node(key, value, height); return node; @@ -464,6 +540,7 @@ class Skiplist { uint8_t const Branch; std::atomic max_height_; Comparator const compare_; + TimeSerisePool pool_; Random rand_; Node* head_; std::atomic*> tail_; diff --git a/src/storage/segment.cc b/src/storage/segment.cc index c39e0b69779..0c185ab0967 100644 --- a/src/storage/segment.cc +++ b/src/storage/segment.cc @@ -104,7 +104,7 @@ uint64_t Segment::Release() { } it->Next(); } - entries_->Clear(); + //entries_->Clear(); delete it; KeyEntryNodeList::Iterator* f_it = entry_free_list_->NewIterator(); From 729b1937e8460571e926a8839e738d13607d6560 Mon Sep 17 00:00:00 2001 From: cc004 <1176321897@qq.com> Date: Wed, 7 Jul 2021 08:35:59 +0000 Subject: [PATCH 06/12] change mem alloc backend of time entry to TimeSerisePool --- src/base/skiplist.h | 33 ++++++++---------------- src/base/skiplist_test.cc | 51 +++++++++++++++++++++++++++++++++++++ src/base/time_serise_pool.h | 4 +-- src/flags.cc | 1 + src/storage/segment.cc | 24 +++++++++++++---- src/storage/segment.h | 7 +++-- 6 files changed, 88 insertions(+), 32 deletions(-) diff --git a/src/base/skiplist.h b/src/base/skiplist.h index 8939c768f9b..b989403bdf1 100644 --- a/src/base/skiplist.h +++ b/src/base/skiplist.h @@ -21,7 +21,6 @@ #include #include -#include #include "base/random.h" #include "base/time_serise_pool.h" @@ -105,25 +104,13 @@ template class Skiplist { public: Skiplist(uint8_t max_height, uint8_t branch, const Comparator& compare) - : MaxHeight(max_height), - Branch(branch), - max_height_(0), - compare_(compare), - pool_(NULL), - rand_(0xdeadbeef), - tail_(NULL) { + : MaxHeight(max_height), Branch(branch), max_height_(0), compare_(compare), rand_(0xdeadbeef), tail_(NULL) { head_ = new Node(MaxHeight); for (uint8_t i = 0; i < head_->Height(); i++) { head_->SetNext(i, NULL); } max_height_.store(1, std::memory_order_relaxed); } - Skiplist(TimeSerisePool pool, uint8_t max_height, uint8_t branch, const Comparator& compare) - : Skiplist(max_height, branch, compare) - { - pool_ = pool; - } - ~Skiplist() { delete head_; } // Insert need external synchronized @@ -150,7 +137,7 @@ class Skiplist { // Insert need external synchronized // use iif skiplist is using a pool - uint8_t Insert(const K& key, V& value, uint64_t time) { // NOLINT + uint8_t Insert(const K& key, V& value, uint64_t time, TimeSerisePool& pool) { // NOLINT uint8_t height = RandomHeight(); Node* pre[MaxHeight]; FindLessOrEqual(key, pre); @@ -160,7 +147,7 @@ class Skiplist { } max_height_.store(height, std::memory_order_relaxed); } - Node* node = NewNode(key, value, height, time); + Node* node = NewNode(key, value, height, time, pool); if (pre[0]->GetNext(0) == NULL) { tail_.store(node, std::memory_order_release); } @@ -334,7 +321,7 @@ class Skiplist { // Need external synchronized // use iif skiplist is using a pool - bool AddToFirst(const K& key, V& value, uint64_t time) { // NOLINT + bool AddToFirst(const K& key, V& value, uint64_t time, TimeSerisePool& pool) { // NOLINT { Node* node = head_->GetNext(0); if (node != NULL && compare_(key, node->GetKey()) > 0) { @@ -349,7 +336,7 @@ class Skiplist { if (height > GetMaxHeight()) { max_height_.store(height, std::memory_order_relaxed); } - Node* node = NewNode(key, value, height, time); + Node* node = NewNode(key, value, height, time, pool); if (pre[0]->GetNext(0) == NULL) { tail_.store(node, std::memory_order_release); } @@ -432,10 +419,11 @@ class Skiplist { Iterator* NewIterator() { return new Iterator(this); } private: - Node* NewNode(const K& key, V& value, uint8_t height, uint64_t time) { // NOLINT - auto arrmem = pool_.Alloc(sizeof(std::atomic*>) * height, time); - auto nodemem = pool_.Alloc(sizeof(Node), time); - Node* node = new (nodemem)Node(key, value, height, arrmem); + Node* NewNode(const K& key, V& value, uint8_t height, uint64_t time, TimeSerisePool& pool) { // NOLINT + auto arrmemvptr = pool.Alloc(sizeof(std::atomic*>) * height, time); + auto arrmem = reinterpret_cast*>*>(arrmemvptr); + auto nodemem = pool.Alloc(sizeof(Node), time); + Node* node = new (nodemem) Node(key, value, height, arrmem); return node; } @@ -540,7 +528,6 @@ class Skiplist { uint8_t const Branch; std::atomic max_height_; Comparator const compare_; - TimeSerisePool pool_; Random rand_; Node* head_; std::atomic*> tail_; diff --git a/src/base/skiplist_test.cc b/src/base/skiplist_test.cc index 4d816a53ba8..41204a53d03 100644 --- a/src/base/skiplist_test.cc +++ b/src/base/skiplist_test.cc @@ -20,6 +20,7 @@ #include #include "base/slice.h" +#include "base/time_serise_pool.h" #include "gtest/gtest.h" namespace openmldb { @@ -178,6 +179,45 @@ TEST_F(SkiplistTest, InsertAndIterator) { } } +TEST_F(SkiplistTest, InsertAndIteratorWithPool) { + Comparator cmp; + TimeSerisePool pool(1024); + for (auto height : vec) { + Skiplist sl(height, 4, cmp); + uint32_t key1 = 1; + uint32_t value1 = 2; + sl.Insert(key1, value1, 1, pool); + uint32_t key2 = 2; + uint32_t value2 = 4; + sl.Insert(key2, value2, 2, pool); + uint32_t key3 = 2; + uint32_t value3 = 5; + sl.Insert(key3, value3, 1, pool); + uint32_t key4 = 3; + uint32_t value4 = 6; + sl.Insert(key4, value4, 1, pool); + Skiplist::Iterator* it = sl.NewIterator(); + it->Seek(0); + ASSERT_EQ(1, (signed)it->GetKey()); + ASSERT_EQ(2, (signed)it->GetValue()); + it->Next(); + ASSERT_EQ(2, (signed)it->GetKey()); + ASSERT_EQ(5, (signed)it->GetValue()); + it->Next(); + ASSERT_EQ(2, (signed)it->GetKey()); + ASSERT_EQ(4, (signed)it->GetValue()); + it->Next(); + ASSERT_EQ(3, (signed)it->GetKey()); + ASSERT_EQ(6, (signed)it->GetValue()); + it->Next(); + ASSERT_FALSE(it->Valid()); + it->Seek(2); + ASSERT_EQ(2, (signed)it->GetKey()); + ASSERT_EQ(5, (signed)it->GetValue()); + delete it; + } +} + TEST_F(SkiplistTest, GetSize) { Comparator cmp; Skiplist sl(12, 4, cmp); @@ -663,6 +703,17 @@ TEST_F(SkiplistTest, Duplicate) { ASSERT_FALSE(it->Valid()); } +TEST_F(SkiplistTest, DuplicateWithPool) { + TimeSerisePool pool(1024); + DescComparator cmp; + Skiplist sl(12, 4, cmp); + uint32_t val = 1; + sl.Insert(1, val, 111, pool); + sl.Insert(2, val, 111, pool); + val = 2; + sl.Insert(1, val, 112, pool); +} + } // namespace base } // namespace openmldb diff --git a/src/base/time_serise_pool.h b/src/base/time_serise_pool.h index d3a1c2a0773..76f48918239 100644 --- a/src/base/time_serise_pool.h +++ b/src/base/time_serise_pool.h @@ -42,6 +42,7 @@ class TimeBucket { } } void* Alloc(uint32_t size) { + // return new char[size]; object_num_++; if (current_offset_ + size <= block_size_ - sizeof(Block)) { void* addr = head_->data + current_offset_; @@ -77,8 +78,7 @@ class TimeSerisePool { auto pair = pool_.find(key); if (pair == pool_.end()) { auto bucket = new TimeBucket(block_size_); - pool_.insert( - std::pair>(key, std::unique_ptr(bucket))); + pool_.insert(std::pair>(key, std::unique_ptr(bucket))); return bucket->Alloc(size); } diff --git a/src/flags.cc b/src/flags.cc index 4c33b0fdc9c..b7121cea6f7 100644 --- a/src/flags.cc +++ b/src/flags.cc @@ -119,6 +119,7 @@ DEFINE_uint32(recycle_ttl, 0, "ttl of recycle in minute"); DEFINE_uint32(latest_ttl_max, 1000, "the max ttl of latest"); DEFINE_uint32(absolute_ttl_max, 60 * 24 * 365 * 30, "the max ttl of absolute time"); +DEFINE_uint32(time_serise_pool_block_size, 4 * 1024, "the block size of time serise pool"); DEFINE_uint32(skiplist_max_height, 12, "the max height of skiplist"); DEFINE_uint32(key_entry_max_height, 8, "the max height of key entry"); DEFINE_uint32(latest_default_skiplist_height, 1, "the default height of skiplist for latest table"); diff --git a/src/storage/segment.cc b/src/storage/segment.cc index 0c185ab0967..c6a6af75966 100644 --- a/src/storage/segment.cc +++ b/src/storage/segment.cc @@ -24,6 +24,7 @@ #include "storage/record.h" DECLARE_int32(gc_safe_offset); +DECLARE_uint32(time_serise_pool_block_size); DECLARE_uint32(skiplist_max_height); DECLARE_uint32(gc_deleted_pk_version_delta); @@ -39,6 +40,7 @@ Segment::Segment() pk_cnt_(0), ts_cnt_(1), gc_version_(0), + pool_(FLAGS_time_serise_pool_block_size), ttl_offset_(FLAGS_gc_safe_offset * 60 * 1000) { entries_ = new KeyEntries((uint8_t)FLAGS_skiplist_max_height, 4, scmp); key_entry_max_height_ = (uint8_t)FLAGS_skiplist_max_height; @@ -54,6 +56,7 @@ Segment::Segment(uint8_t height) key_entry_max_height_(height), ts_cnt_(1), gc_version_(0), + pool_(FLAGS_time_serise_pool_block_size), ttl_offset_(FLAGS_gc_safe_offset * 60 * 1000) { entries_ = new KeyEntries((uint8_t)FLAGS_skiplist_max_height, 4, scmp); entry_free_list_ = new KeyEntryNodeList(4, 4, tcmp); @@ -68,6 +71,7 @@ Segment::Segment(uint8_t height, const std::vector& ts_idx_vec) key_entry_max_height_(height), ts_cnt_(ts_idx_vec.size()), gc_version_(0), + pool_(FLAGS_time_serise_pool_block_size), ttl_offset_(FLAGS_gc_safe_offset * 60 * 1000) { entries_ = new KeyEntries((uint8_t)FLAGS_skiplist_max_height, 4, scmp); entry_free_list_ = new KeyEntryNodeList(4, 4, tcmp); @@ -104,7 +108,7 @@ uint64_t Segment::Release() { } it->Next(); } - //entries_->Clear(); + entries_->Clear(); delete it; KeyEntryNodeList::Iterator* f_it = entry_free_list_->NewIterator(); @@ -177,13 +181,14 @@ void Segment::Put(const Slice& key, uint64_t time, DataBlock* row) { // need to delete memory when free node Slice skey(pk, key.size()); entry = (void*)new KeyEntry(key_entry_max_height_); // NOLINT + // Key entry do not use pool uint8_t height = entries_->Insert(skey, entry); byte_size += GetRecordPkIdxSize(height, key.size(), key_entry_max_height_); pk_cnt_.fetch_add(1, std::memory_order_relaxed); } idx_cnt_.fetch_add(1, std::memory_order_relaxed); - uint8_t height = ((KeyEntry*)entry)->entries.Insert(time, row); // NOLINT - ((KeyEntry*)entry) // NOLINT + uint8_t height = ((KeyEntry*)entry)->entries.Insert(time, row, time, pool_); // NOLINT + ((KeyEntry*)entry) // NOLINT ->count_.fetch_add(1, std::memory_order_relaxed); byte_size += GetRecordTsIdxSize(height); idx_byte_size_.fetch_add(byte_size, std::memory_order_relaxed); @@ -227,13 +232,14 @@ void Segment::Put(const Slice& key, const TSDimensions& ts_dimension, DataBlock* entry_arr_tmp[i] = new KeyEntry(key_entry_max_height_); } entry_arr = (void*)entry_arr_tmp; // NOLINT + // key entry do not use pool uint8_t height = entries_->Insert(skey, entry_arr); byte_size += GetRecordPkMultiIdxSize(height, key.size(), key_entry_max_height_, ts_cnt_); pk_cnt_.fetch_add(1, std::memory_order_relaxed); } } uint8_t height = ((KeyEntry**)entry_arr)[pos->second]->entries.Insert( // NOLINT - cur_ts.ts(), row); + cur_ts.ts(), row, cur_ts.ts(), pool_); ((KeyEntry**)entry_arr)[pos->second]->count_.fetch_add( // NOLINT 1, std::memory_order_relaxed); byte_size += GetRecordTsIdxSize(height); @@ -284,6 +290,7 @@ bool Segment::Delete(const Slice& key) { } { std::lock_guard lock(gc_mu_); + // gc insert should not use timepool entry_free_list_->Insert(gc_version_.load(std::memory_order_relaxed), entry_node); } return true; @@ -305,7 +312,9 @@ void Segment::FreeList(::openmldb::base::Node* node, uint6 delete tmp->GetValue(); gc_record_cnt++; } - delete tmp; + // changed to pool free (time entry) + pool_.Free(tmp->GetKey()); + // delete tmp; } } @@ -365,9 +374,11 @@ void Segment::GcEntryFreeList(uint64_t version, uint64_t& gc_idx_cnt, uint64_t& while (node != NULL) { ::openmldb::base::Node* entry_node = node->GetValue(); FreeEntry(entry_node, gc_idx_cnt, gc_record_cnt, gc_record_byte_size); + // no change for key entry not using pool delete entry_node; ::openmldb::base::Node*>* tmp = node; node = node->GetNextNoBarrier(0); + // no change for gc not using pool delete tmp; pk_cnt_.fetch_sub(1, std::memory_order_relaxed); } @@ -584,6 +595,7 @@ void Segment::GcAllType(const std::map& ttl_st_map, uint64_t& g } if (entry_node != NULL) { std::lock_guard lock(gc_mu_); + // gc insert should not use timepool entry_free_list_->Insert(gc_version_.load(std::memory_order_relaxed), entry_node); } } @@ -632,6 +644,7 @@ void Segment::Gc4TTL(const uint64_t time, uint64_t& gc_idx_cnt, uint64_t& gc_rec } if (entry_node != NULL) { std::lock_guard lock(gc_mu_); + // gc insert should not use timepool entry_free_list_->Insert(gc_version_.load(std::memory_order_relaxed), entry_node); } uint64_t entry_gc_idx_cnt = 0; @@ -725,6 +738,7 @@ void Segment::Gc4TTLOrHead(const uint64_t time, const uint64_t keep_cnt, uint64_ } if (entry_node != NULL) { std::lock_guard lock(gc_mu_); + // gc insert should not use timepool entry_free_list_->Insert(gc_version_.load(std::memory_order_relaxed), entry_node); } uint64_t entry_gc_idx_cnt = 0; diff --git a/src/storage/segment.h b/src/storage/segment.h index 165a8aa9578..0ab52cb9381 100644 --- a/src/storage/segment.h +++ b/src/storage/segment.h @@ -25,6 +25,7 @@ #include "base/skiplist.h" #include "base/slice.h" +#include "base/time_serise_pool.h" #include "proto/tablet.pb.h" #include "storage/iterator.h" #include "storage/schema.h" @@ -110,7 +111,8 @@ class KeyEntry { } it->Next(); } - entries.Clear(); + // not clearing for using pool for time entry + // entries.Clear(); delete it; return cnt; } @@ -251,11 +253,12 @@ class Segment { std::atomic idx_byte_size_; std::atomic pk_cnt_; uint8_t key_entry_max_height_; - KeyEntryNodeList* entry_free_list_; + KeyEntryNodeList* entry_free_list_; // NOTE: entry free list DO NOT use pool uint32_t ts_cnt_; std::atomic gc_version_; std::map ts_idx_map_; std::vector>> idx_cnt_vec_; + ::openmldb::base::TimeSerisePool pool_; uint64_t ttl_offset_; }; From eea8202b6e5288eac01a60cb2c0a340f2f0057ce Mon Sep 17 00:00:00 2001 From: cc004 <1176321897@qq.com> Date: Wed, 7 Jul 2021 18:22:38 +0800 Subject: [PATCH 07/12] fix memory leak --- src/storage/segment.cc | 8 ++++---- src/storage/segment.h | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/storage/segment.cc b/src/storage/segment.cc index c6a6af75966..e3d60fa696e 100644 --- a/src/storage/segment.cc +++ b/src/storage/segment.cc @@ -96,13 +96,13 @@ uint64_t Segment::Release() { if (ts_cnt_ > 1) { KeyEntry** entry_arr = (KeyEntry**)it->GetValue(); // NOLINT for (uint32_t i = 0; i < ts_cnt_; i++) { - cnt += entry_arr[i]->Release(); + cnt += entry_arr[i]->Release(pool_); delete entry_arr[i]; } delete[] entry_arr; } else { KeyEntry* entry = (KeyEntry*)it->GetValue(); // NOLINT - cnt += entry->Release(); + cnt += entry->Release(pool_); delete entry; } } @@ -119,13 +119,13 @@ uint64_t Segment::Release() { if (ts_cnt_ > 1) { KeyEntry** entry_arr = (KeyEntry**)node->GetValue(); // NOLINT for (uint32_t i = 0; i < ts_cnt_; i++) { - entry_arr[i]->Release(); + entry_arr[i]->Release(pool_); delete entry_arr[i]; } delete[] entry_arr; } else { KeyEntry* entry = (KeyEntry*)node->GetValue(); // NOLINT - entry->Release(); + entry->Release(pool_); delete entry; } delete node; diff --git a/src/storage/segment.h b/src/storage/segment.h index 0ab52cb9381..0ece55ce361 100644 --- a/src/storage/segment.h +++ b/src/storage/segment.h @@ -96,7 +96,7 @@ class KeyEntry { ~KeyEntry() {} // just return the count of datablock - uint64_t Release() { + uint64_t Release(::openmldb::base::TimeSerisePool pool) { uint64_t cnt = 0; TimeEntries::Iterator* it = entries.NewIterator(); it->SeekToFirst(); @@ -109,6 +109,7 @@ class KeyEntry { } else { delete block; } + pool.Free(it->GetKey()); it->Next(); } // not clearing for using pool for time entry From adbdc7a99138b974a4cfe2460f26f945350755c6 Mon Sep 17 00:00:00 2001 From: cc004 <1176321897@qq.com> Date: Mon, 12 Jul 2021 14:55:56 +0800 Subject: [PATCH 08/12] fix in compile --- src/storage/segment.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/segment.h b/src/storage/segment.h index 0ece55ce361..7e549f378ee 100644 --- a/src/storage/segment.h +++ b/src/storage/segment.h @@ -96,7 +96,7 @@ class KeyEntry { ~KeyEntry() {} // just return the count of datablock - uint64_t Release(::openmldb::base::TimeSerisePool pool) { + uint64_t Release(::openmldb::base::TimeSerisePool& pool) { uint64_t cnt = 0; TimeEntries::Iterator* it = entries.NewIterator(); it->SeekToFirst(); From cc4f8046efe2d35dedae911db91377ffb40962d7 Mon Sep 17 00:00:00 2001 From: cc004 <1176321897@qq.com> Date: Mon, 12 Jul 2021 17:57:55 +0800 Subject: [PATCH 09/12] fix typo --- src/base/skiplist.h | 8 ++++---- src/base/skiplist_test.cc | 6 +++--- src/base/{time_serise_pool.h => time_series_pool.h} | 10 +++++----- ..._serise_pool_test.cc => time_series_pool_test.cc} | 12 ++++++------ src/flags.cc | 2 +- src/storage/segment.cc | 8 ++++---- src/storage/segment.h | 6 +++--- 7 files changed, 26 insertions(+), 26 deletions(-) rename src/base/{time_serise_pool.h => time_series_pool.h} (93%) rename src/base/{time_serise_pool_test.cc => time_series_pool_test.cc} (85%) diff --git a/src/base/skiplist.h b/src/base/skiplist.h index b989403bdf1..bc4e4417ece 100644 --- a/src/base/skiplist.h +++ b/src/base/skiplist.h @@ -23,7 +23,7 @@ #include #include "base/random.h" -#include "base/time_serise_pool.h" +#include "base/time_series_pool.h" namespace openmldb { namespace base { @@ -137,7 +137,7 @@ class Skiplist { // Insert need external synchronized // use iif skiplist is using a pool - uint8_t Insert(const K& key, V& value, uint64_t time, TimeSerisePool& pool) { // NOLINT + uint8_t Insert(const K& key, V& value, uint64_t time, TimeSeriesPool& pool) { // NOLINT uint8_t height = RandomHeight(); Node* pre[MaxHeight]; FindLessOrEqual(key, pre); @@ -321,7 +321,7 @@ class Skiplist { // Need external synchronized // use iif skiplist is using a pool - bool AddToFirst(const K& key, V& value, uint64_t time, TimeSerisePool& pool) { // NOLINT + bool AddToFirst(const K& key, V& value, uint64_t time, TimeSeriesPool& pool) { // NOLINT { Node* node = head_->GetNext(0); if (node != NULL && compare_(key, node->GetKey()) > 0) { @@ -419,7 +419,7 @@ class Skiplist { Iterator* NewIterator() { return new Iterator(this); } private: - Node* NewNode(const K& key, V& value, uint8_t height, uint64_t time, TimeSerisePool& pool) { // NOLINT + Node* NewNode(const K& key, V& value, uint8_t height, uint64_t time, TimeSeriesPool& pool) { // NOLINT auto arrmemvptr = pool.Alloc(sizeof(std::atomic*>) * height, time); auto arrmem = reinterpret_cast*>*>(arrmemvptr); auto nodemem = pool.Alloc(sizeof(Node), time); diff --git a/src/base/skiplist_test.cc b/src/base/skiplist_test.cc index 41204a53d03..fb5c584f356 100644 --- a/src/base/skiplist_test.cc +++ b/src/base/skiplist_test.cc @@ -20,7 +20,7 @@ #include #include "base/slice.h" -#include "base/time_serise_pool.h" +#include "base/time_series_pool.h" #include "gtest/gtest.h" namespace openmldb { @@ -181,7 +181,7 @@ TEST_F(SkiplistTest, InsertAndIterator) { TEST_F(SkiplistTest, InsertAndIteratorWithPool) { Comparator cmp; - TimeSerisePool pool(1024); + TimeSeriesPool pool(1024); for (auto height : vec) { Skiplist sl(height, 4, cmp); uint32_t key1 = 1; @@ -704,7 +704,7 @@ TEST_F(SkiplistTest, Duplicate) { } TEST_F(SkiplistTest, DuplicateWithPool) { - TimeSerisePool pool(1024); + TimeSeriesPool pool(1024); DescComparator cmp; Skiplist sl(12, 4, cmp); uint32_t val = 1; diff --git a/src/base/time_serise_pool.h b/src/base/time_series_pool.h similarity index 93% rename from src/base/time_serise_pool.h rename to src/base/time_series_pool.h index 76f48918239..89168090a4d 100644 --- a/src/base/time_serise_pool.h +++ b/src/base/time_series_pool.h @@ -14,8 +14,8 @@ * limitations under the License. */ -#ifndef SRC_BASE_TIME_SERISE_POOL_H_ -#define SRC_BASE_TIME_SERISE_POOL_H_ +#ifndef SRC_BASE_TIME_Series_POOL_H_ +#define SRC_BASE_TIME_Series_POOL_H_ #include @@ -70,9 +70,9 @@ class TimeBucket { } * head_; }; -class TimeSerisePool { +class TimeSeriesPool { public: - explicit TimeSerisePool(uint32_t block_size) : block_size_(block_size) {} + explicit TimeSeriesPool(uint32_t block_size) : block_size_(block_size) {} void* Alloc(uint32_t size, uint64_t time) { auto key = ComputeTimeSlot(time); auto pair = pool_.find(key); @@ -102,4 +102,4 @@ class TimeSerisePool { } // namespace base } // namespace openmldb -#endif // SRC_BASE_TIME_SERISE_POOL_H_ +#endif // SRC_BASE_TIME_Series_POOL_H_ diff --git a/src/base/time_serise_pool_test.cc b/src/base/time_series_pool_test.cc similarity index 85% rename from src/base/time_serise_pool_test.cc rename to src/base/time_series_pool_test.cc index 2c5dc13ddac..14090be16f7 100644 --- a/src/base/time_serise_pool_test.cc +++ b/src/base/time_series_pool_test.cc @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "base/time_serise_pool.h" +#include "base/time_series_pool.h" #include @@ -23,14 +23,14 @@ namespace openmldb { namespace base { -class TimeSerisePoolTest : public ::testing::Test { +class TimeSeriesPoolTest : public ::testing::Test { public: - TimeSerisePoolTest() {} - ~TimeSerisePoolTest() {} + TimeSeriesPoolTest() {} + ~TimeSeriesPoolTest() {} }; -TEST_F(TimeSerisePoolTest, FreeToEmpty) { - TimeSerisePool pool(1024); +TEST_F(TimeSeriesPoolTest, FreeToEmpty) { + TimeSeriesPool pool(1024); std::vector times; const int datasize = 1024 / 2; char *data = new char[datasize]; diff --git a/src/flags.cc b/src/flags.cc index b7121cea6f7..982166e66d1 100644 --- a/src/flags.cc +++ b/src/flags.cc @@ -119,7 +119,7 @@ DEFINE_uint32(recycle_ttl, 0, "ttl of recycle in minute"); DEFINE_uint32(latest_ttl_max, 1000, "the max ttl of latest"); DEFINE_uint32(absolute_ttl_max, 60 * 24 * 365 * 30, "the max ttl of absolute time"); -DEFINE_uint32(time_serise_pool_block_size, 4 * 1024, "the block size of time serise pool"); +DEFINE_uint32(time_series_pool_block_size, 4 * 1024, "the block size of time series pool"); DEFINE_uint32(skiplist_max_height, 12, "the max height of skiplist"); DEFINE_uint32(key_entry_max_height, 8, "the max height of key entry"); DEFINE_uint32(latest_default_skiplist_height, 1, "the default height of skiplist for latest table"); diff --git a/src/storage/segment.cc b/src/storage/segment.cc index e3d60fa696e..9db687b9ca0 100644 --- a/src/storage/segment.cc +++ b/src/storage/segment.cc @@ -24,7 +24,7 @@ #include "storage/record.h" DECLARE_int32(gc_safe_offset); -DECLARE_uint32(time_serise_pool_block_size); +DECLARE_uint32(time_series_pool_block_size); DECLARE_uint32(skiplist_max_height); DECLARE_uint32(gc_deleted_pk_version_delta); @@ -40,7 +40,7 @@ Segment::Segment() pk_cnt_(0), ts_cnt_(1), gc_version_(0), - pool_(FLAGS_time_serise_pool_block_size), + pool_(FLAGS_time_series_pool_block_size), ttl_offset_(FLAGS_gc_safe_offset * 60 * 1000) { entries_ = new KeyEntries((uint8_t)FLAGS_skiplist_max_height, 4, scmp); key_entry_max_height_ = (uint8_t)FLAGS_skiplist_max_height; @@ -56,7 +56,7 @@ Segment::Segment(uint8_t height) key_entry_max_height_(height), ts_cnt_(1), gc_version_(0), - pool_(FLAGS_time_serise_pool_block_size), + pool_(FLAGS_time_series_pool_block_size), ttl_offset_(FLAGS_gc_safe_offset * 60 * 1000) { entries_ = new KeyEntries((uint8_t)FLAGS_skiplist_max_height, 4, scmp); entry_free_list_ = new KeyEntryNodeList(4, 4, tcmp); @@ -71,7 +71,7 @@ Segment::Segment(uint8_t height, const std::vector& ts_idx_vec) key_entry_max_height_(height), ts_cnt_(ts_idx_vec.size()), gc_version_(0), - pool_(FLAGS_time_serise_pool_block_size), + pool_(FLAGS_time_series_pool_block_size), ttl_offset_(FLAGS_gc_safe_offset * 60 * 1000) { entries_ = new KeyEntries((uint8_t)FLAGS_skiplist_max_height, 4, scmp); entry_free_list_ = new KeyEntryNodeList(4, 4, tcmp); diff --git a/src/storage/segment.h b/src/storage/segment.h index 7e549f378ee..bd1223a702f 100644 --- a/src/storage/segment.h +++ b/src/storage/segment.h @@ -25,7 +25,7 @@ #include "base/skiplist.h" #include "base/slice.h" -#include "base/time_serise_pool.h" +#include "base/time_series_pool.h" #include "proto/tablet.pb.h" #include "storage/iterator.h" #include "storage/schema.h" @@ -96,7 +96,7 @@ class KeyEntry { ~KeyEntry() {} // just return the count of datablock - uint64_t Release(::openmldb::base::TimeSerisePool& pool) { + uint64_t Release(::openmldb::base::TimeSeriesPool& pool) { uint64_t cnt = 0; TimeEntries::Iterator* it = entries.NewIterator(); it->SeekToFirst(); @@ -259,7 +259,7 @@ class Segment { std::atomic gc_version_; std::map ts_idx_map_; std::vector>> idx_cnt_vec_; - ::openmldb::base::TimeSerisePool pool_; + ::openmldb::base::TimeSeriesPool pool_; uint64_t ttl_offset_; }; From 6218a9c1053bad247ce83e907d8373eea85c68db Mon Sep 17 00:00:00 2001 From: cc004 <1176321897@qq.com> Date: Tue, 13 Jul 2021 14:20:28 +0800 Subject: [PATCH 10/12] fix typo --- src/base/skiplist.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/base/skiplist.h b/src/base/skiplist.h index bc4e4417ece..e46eb72d1d9 100644 --- a/src/base/skiplist.h +++ b/src/base/skiplist.h @@ -136,7 +136,7 @@ class Skiplist { } // Insert need external synchronized - // use iif skiplist is using a pool + // use if skiplist is using a pool uint8_t Insert(const K& key, V& value, uint64_t time, TimeSeriesPool& pool) { // NOLINT uint8_t height = RandomHeight(); Node* pre[MaxHeight]; @@ -296,7 +296,7 @@ class Skiplist { } // Need external synchronized - // called iif skiplist is using tcalloc + // called if skiplist is using tcalloc uint64_t Clear() { uint64_t cnt = 0; Node* node = head_->GetNext(0); @@ -320,7 +320,7 @@ class Skiplist { } // Need external synchronized - // use iif skiplist is using a pool + // use if skiplist is using a pool bool AddToFirst(const K& key, V& value, uint64_t time, TimeSeriesPool& pool) { // NOLINT { Node* node = head_->GetNext(0); From 8364e73a148970273a8157a2a0c5a302b115a3ef Mon Sep 17 00:00:00 2001 From: cc004 <1176321897@qq.com> Date: Tue, 13 Jul 2021 16:38:09 +0800 Subject: [PATCH 11/12] use boost pool to alloc mem for keyentry --- src/storage/segment.cc | 19 +++++++++++-------- src/storage/segment.h | 3 +++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/storage/segment.cc b/src/storage/segment.cc index 9db687b9ca0..8a4ddc4064a 100644 --- a/src/storage/segment.cc +++ b/src/storage/segment.cc @@ -41,6 +41,7 @@ Segment::Segment() ts_cnt_(1), gc_version_(0), pool_(FLAGS_time_series_pool_block_size), + boost_pool_(sizeof(KeyEntry)), ttl_offset_(FLAGS_gc_safe_offset * 60 * 1000) { entries_ = new KeyEntries((uint8_t)FLAGS_skiplist_max_height, 4, scmp); key_entry_max_height_ = (uint8_t)FLAGS_skiplist_max_height; @@ -57,6 +58,7 @@ Segment::Segment(uint8_t height) ts_cnt_(1), gc_version_(0), pool_(FLAGS_time_series_pool_block_size), + boost_pool_(sizeof(KeyEntry)), ttl_offset_(FLAGS_gc_safe_offset * 60 * 1000) { entries_ = new KeyEntries((uint8_t)FLAGS_skiplist_max_height, 4, scmp); entry_free_list_ = new KeyEntryNodeList(4, 4, tcmp); @@ -72,6 +74,7 @@ Segment::Segment(uint8_t height, const std::vector& ts_idx_vec) ts_cnt_(ts_idx_vec.size()), gc_version_(0), pool_(FLAGS_time_series_pool_block_size), + boost_pool_(sizeof(KeyEntry)), ttl_offset_(FLAGS_gc_safe_offset * 60 * 1000) { entries_ = new KeyEntries((uint8_t)FLAGS_skiplist_max_height, 4, scmp); entry_free_list_ = new KeyEntryNodeList(4, 4, tcmp); @@ -97,13 +100,13 @@ uint64_t Segment::Release() { KeyEntry** entry_arr = (KeyEntry**)it->GetValue(); // NOLINT for (uint32_t i = 0; i < ts_cnt_; i++) { cnt += entry_arr[i]->Release(pool_); - delete entry_arr[i]; + boost_pool_.free(entry_arr[i]); } delete[] entry_arr; } else { KeyEntry* entry = (KeyEntry*)it->GetValue(); // NOLINT cnt += entry->Release(pool_); - delete entry; + boost_pool_.free(entry); } } it->Next(); @@ -120,13 +123,13 @@ uint64_t Segment::Release() { KeyEntry** entry_arr = (KeyEntry**)node->GetValue(); // NOLINT for (uint32_t i = 0; i < ts_cnt_; i++) { entry_arr[i]->Release(pool_); - delete entry_arr[i]; + boost_pool_.free(entry_arr[i]); } delete[] entry_arr; } else { KeyEntry* entry = (KeyEntry*)node->GetValue(); // NOLINT entry->Release(pool_); - delete entry; + boost_pool_.free(entry); } delete node; f_it->Next(); @@ -180,7 +183,7 @@ void Segment::Put(const Slice& key, uint64_t time, DataBlock* row) { memcpy(pk, key.data(), key.size()); // need to delete memory when free node Slice skey(pk, key.size()); - entry = (void*)new KeyEntry(key_entry_max_height_); // NOLINT + entry = new (boost_pool_.malloc())KeyEntry(key_entry_max_height_); // Key entry do not use pool uint8_t height = entries_->Insert(skey, entry); byte_size += GetRecordPkIdxSize(height, key.size(), key_entry_max_height_); @@ -229,7 +232,7 @@ void Segment::Put(const Slice& key, const TSDimensions& ts_dimension, DataBlock* Slice skey(pk, key.size()); KeyEntry** entry_arr_tmp = new KeyEntry*[ts_cnt_]; for (uint32_t i = 0; i < ts_cnt_; i++) { - entry_arr_tmp[i] = new KeyEntry(key_entry_max_height_); + entry_arr_tmp[i] = new (boost_pool_.malloc())KeyEntry(key_entry_max_height_); } entry_arr = (void*)entry_arr_tmp; // NOLINT // key entry do not use pool @@ -338,7 +341,7 @@ void Segment::FreeEntry(::openmldb::base::Node* entry_node, uint64 FreeList(data_node, gc_idx_cnt, gc_record_cnt, gc_record_byte_size); } delete it; - delete entry; + boost_pool_.free(entry); idx_cnt_vec_[i]->fetch_sub(gc_idx_cnt - old, std::memory_order_relaxed); } delete[] entry_arr; @@ -356,7 +359,7 @@ void Segment::FreeEntry(::openmldb::base::Node* entry_node, uint64 FreeList(data_node, gc_idx_cnt, gc_record_cnt, gc_record_byte_size); } delete it; - delete entry; + boost_pool_.free(entry); uint64_t byte_size = GetRecordPkIdxSize(entry_node->Height(), entry_node->GetKey().size(), key_entry_max_height_); idx_byte_size_.fetch_sub(byte_size, std::memory_order_relaxed); diff --git a/src/storage/segment.h b/src/storage/segment.h index bd1223a702f..53a65c3ab43 100644 --- a/src/storage/segment.h +++ b/src/storage/segment.h @@ -23,6 +23,8 @@ #include // NOLINT #include +#include "boost/pool/pool.hpp" + #include "base/skiplist.h" #include "base/slice.h" #include "base/time_series_pool.h" @@ -260,6 +262,7 @@ class Segment { std::map ts_idx_map_; std::vector>> idx_cnt_vec_; ::openmldb::base::TimeSeriesPool pool_; + boost::pool<> boost_pool_; uint64_t ttl_offset_; }; From 3c66c5f5a22d2dbc9641212d6b9e4567ff2a3e33 Mon Sep 17 00:00:00 2001 From: cc004 <1176321897@qq.com> Date: Mon, 19 Jul 2021 10:59:54 +0800 Subject: [PATCH 12/12] fix lint --- src/base/time_series_pool.h | 6 +++--- src/storage/segment.cc | 4 ++-- src/storage/segment.h | 3 +-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/base/time_series_pool.h b/src/base/time_series_pool.h index 89168090a4d..4c5e4330787 100644 --- a/src/base/time_series_pool.h +++ b/src/base/time_series_pool.h @@ -14,8 +14,8 @@ * limitations under the License. */ -#ifndef SRC_BASE_TIME_Series_POOL_H_ -#define SRC_BASE_TIME_Series_POOL_H_ +#ifndef SRC_BASE_TIME_SERIES_POOL_H_ +#define SRC_BASE_TIME_SERIES_POOL_H_ #include @@ -102,4 +102,4 @@ class TimeSeriesPool { } // namespace base } // namespace openmldb -#endif // SRC_BASE_TIME_Series_POOL_H_ +#endif // SRC_BASE_TIME_SERIES_POOL_H_ diff --git a/src/storage/segment.cc b/src/storage/segment.cc index 8a4ddc4064a..6fd7f879d41 100644 --- a/src/storage/segment.cc +++ b/src/storage/segment.cc @@ -183,7 +183,7 @@ void Segment::Put(const Slice& key, uint64_t time, DataBlock* row) { memcpy(pk, key.data(), key.size()); // need to delete memory when free node Slice skey(pk, key.size()); - entry = new (boost_pool_.malloc())KeyEntry(key_entry_max_height_); + entry = new (boost_pool_.malloc()) KeyEntry(key_entry_max_height_); // Key entry do not use pool uint8_t height = entries_->Insert(skey, entry); byte_size += GetRecordPkIdxSize(height, key.size(), key_entry_max_height_); @@ -232,7 +232,7 @@ void Segment::Put(const Slice& key, const TSDimensions& ts_dimension, DataBlock* Slice skey(pk, key.size()); KeyEntry** entry_arr_tmp = new KeyEntry*[ts_cnt_]; for (uint32_t i = 0; i < ts_cnt_; i++) { - entry_arr_tmp[i] = new (boost_pool_.malloc())KeyEntry(key_entry_max_height_); + entry_arr_tmp[i] = new (boost_pool_.malloc()) KeyEntry(key_entry_max_height_); } entry_arr = (void*)entry_arr_tmp; // NOLINT // key entry do not use pool diff --git a/src/storage/segment.h b/src/storage/segment.h index 53a65c3ab43..2cc4cd57cfc 100644 --- a/src/storage/segment.h +++ b/src/storage/segment.h @@ -23,11 +23,10 @@ #include // NOLINT #include -#include "boost/pool/pool.hpp" - #include "base/skiplist.h" #include "base/slice.h" #include "base/time_series_pool.h" +#include "boost/pool/pool.hpp" #include "proto/tablet.pb.h" #include "storage/iterator.h" #include "storage/schema.h"