diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 000000000000..8bafe46addf2 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "third-party/PhotonLibOS"] + path = third-party/PhotonLibOS + url = https://github.com/alibaba/PhotonLibOS.git + branch = release/0.8 diff --git a/CMakeLists.txt b/CMakeLists.txt index 132d3b04e965..869c7614852f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -44,6 +44,17 @@ endif() list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake/modules/") +# Photon +set(PHOTON_ENABLE_URING OFF CACHE INTERNAL "Enable iouring") +add_subdirectory(third-party/PhotonLibOS) +if (PHOTON_ENABLE_URING) + add_compile_definitions("PHOTON_ENABLE_URING") +endif () +option(INIT_PHOTON_IN_ENV "INIT PHOTON IN ROCKSDB" OFF) +if(INIT_PHOTON_IN_ENV) + add_compile_definitions("INIT_PHOTON_IN_ENV") +endif() + option(WITH_JEMALLOC "build with JeMalloc" OFF) option(WITH_SNAPPY "build with SNAPPY" OFF) option(WITH_LZ4 "build with lz4" OFF) @@ -174,11 +185,11 @@ if(MSVC) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /FC /d2Zi+ /W4 /wd4127 /wd4800 /wd4996 /wd4351 /wd4100 /wd4204 /wd4324") else() set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -W -Wextra -Wall") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wsign-compare -Wshadow -Wno-unused-parameter -Wno-unused-variable -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers -Wno-strict-aliasing") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wsign-compare -Wno-unused-parameter -Wno-unused-variable -Woverloaded-virtual -Wnon-virtual-dtor -Wno-missing-field-initializers -Wno-strict-aliasing") if(MINGW) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-format") endif() - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14") if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-omit-frame-pointer") include(CheckCXXCompilerFlag) @@ -458,7 +469,7 @@ endif() include_directories(${PROJECT_SOURCE_DIR}) include_directories(${PROJECT_SOURCE_DIR}/include) -include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/third-party/gtest-1.7.0/fused-src) +include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/third-party/gtest-1.7.0/fused-src ${PROJECT_SOURCE_DIR}/third-party/PhotonLibOS/include) find_package(Threads REQUIRED) add_subdirectory(third-party/gtest-1.7.0/fused-src/gtest) @@ -742,18 +753,18 @@ else() add_library(${ROCKSDB_SHARED_LIB} SHARED ${SOURCES}) target_link_libraries(${ROCKSDB_SHARED_LIB} - ${THIRDPARTY_LIBS} ${SYSTEM_LIBS}) + ${THIRDPARTY_LIBS} ${SYSTEM_LIBS} $) set_target_properties(${ROCKSDB_SHARED_LIB} PROPERTIES LINKER_LANGUAGE CXX VERSION ${ROCKSDB_VERSION} SOVERSION ${ROCKSDB_VERSION_MAJOR} - CXX_STANDARD 11 + CXX_STANDARD 14 OUTPUT_NAME "rocksdb") endif() add_library(${ROCKSDB_STATIC_LIB} STATIC ${SOURCES}) target_link_libraries(${ROCKSDB_STATIC_LIB} - ${THIRDPARTY_LIBS} ${SYSTEM_LIBS}) + ${THIRDPARTY_LIBS} ${SYSTEM_LIBS} $) if(WIN32) add_library(${ROCKSDB_IMPORT_LIB} SHARED ${SOURCES}) @@ -1067,3 +1078,5 @@ option(WITH_TOOLS "build with tools" ON) if(WITH_TOOLS) add_subdirectory(tools) endif() + +add_subdirectory(examples) diff --git a/db/db_write_test.cc b/db/db_write_test.cc index e6bab8751148..110bd794212a 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -61,6 +61,7 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { leader_count++; while (ready_count < kNumThreads) { // busy waiting + std::this_thread::yield(); } } }); diff --git a/db/dbformat_test.cc b/db/dbformat_test.cc index 0b16c13f5737..a0471ba7e809 100644 --- a/db/dbformat_test.cc +++ b/db/dbformat_test.cc @@ -202,6 +202,7 @@ TEST_F(FormatTest, RangeTombstoneSerializeEndKey) { } // namespace rocksdb int main(int argc, char** argv) { + rocksdb::PhotonEnv::Singleton(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/db/prefix_test.cc b/db/prefix_test.cc index ac854cb3dbd4..affb2ffe4bd7 100644 --- a/db/prefix_test.cc +++ b/db/prefix_test.cc @@ -43,13 +43,17 @@ DEFINE_uint64(num_locks, 10001, "number of locks"); DEFINE_bool(random_prefix, false, "randomize prefix"); DEFINE_uint64(total_prefixes, 100000, "total number of prefixes"); DEFINE_uint64(items_per_prefix, 1, "total number of values per prefix"); -DEFINE_int64(write_buffer_size, 33554432, ""); -DEFINE_int32(max_write_buffer_number, 2, ""); -DEFINE_int32(min_write_buffer_number_to_merge, 1, ""); +// DEFINE_int64(write_buffer_size, 33554432, ""); +DECLARE_int64(write_buffer_size); +// DEFINE_int32(max_write_buffer_number, 2, ""); +DECLARE_int32(max_write_buffer_number); +// DEFINE_int32(min_write_buffer_number_to_merge, 1, ""); +DECLARE_int32(min_write_buffer_number_to_merge); DEFINE_int32(skiplist_height, 4, ""); DEFINE_double(memtable_prefix_bloom_size_ratio, 0.1, ""); DEFINE_int32(memtable_huge_page_size, 2 * 1024 * 1024, ""); -DEFINE_int32(value_size, 40, ""); +// DEFINE_int32(value_size, 40, ""); +DECLARE_int32(value_size); DEFINE_bool(enable_print, false, "Print options generated to console."); // Path to the database on file system @@ -876,6 +880,10 @@ TEST_F(PrefixTest, PrefixSeekModePrev3) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); ParseCommandLineFlags(&argc, &argv, true); + FLAGS_write_buffer_size = 33554432; + FLAGS_max_write_buffer_number = 2; + FLAGS_min_write_buffer_number_to_merge = 1; + FLAGS_value_size = 40; return RUN_ALL_TESTS(); } diff --git a/db/range_del_aggregator.h b/db/range_del_aggregator.h index 712ae458390a..469f58f41fa4 100644 --- a/db/range_del_aggregator.h +++ b/db/range_del_aggregator.h @@ -12,6 +12,7 @@ #include #include #include +#include #include "db/compaction_iteration_stats.h" #include "db/dbformat.h" diff --git a/db/range_del_aggregator_test.cc b/db/range_del_aggregator_test.cc index 28c8129ecb01..82a62dd8d840 100644 --- a/db/range_del_aggregator_test.cc +++ b/db/range_del_aggregator_test.cc @@ -704,6 +704,7 @@ TEST_F(RangeDelAggregatorTest, } // namespace rocksdb int main(int argc, char** argv) { + rocksdb::PhotonEnv::Singleton(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/db/range_tombstone_fragmenter_test.cc b/db/range_tombstone_fragmenter_test.cc index ddd3f774176a..2b76a9de9e99 100644 --- a/db/range_tombstone_fragmenter_test.cc +++ b/db/range_tombstone_fragmenter_test.cc @@ -547,6 +547,7 @@ TEST_F(RangeTombstoneFragmenterTest, SeekOutOfBounds) { } // namespace rocksdb int main(int argc, char** argv) { + rocksdb::PhotonEnv::Singleton(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/db/version_edit.h b/db/version_edit.h index ee6499cdc3b5..d707dd35e717 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -52,14 +52,14 @@ struct FileDescriptor { smallest_seqno(_smallest_seqno), largest_seqno(_largest_seqno) {} - FileDescriptor& operator=(const FileDescriptor& fd) { - table_reader = fd.table_reader; - packed_number_and_path_id = fd.packed_number_and_path_id; - file_size = fd.file_size; - smallest_seqno = fd.smallest_seqno; - largest_seqno = fd.largest_seqno; - return *this; - } + // FileDescriptor& operator=(const FileDescriptor& fd) { + // table_reader = fd.table_reader; + // packed_number_and_path_id = fd.packed_number_and_path_id; + // file_size = fd.file_size; + // smallest_seqno = fd.smallest_seqno; + // largest_seqno = fd.largest_seqno; + // return *this; + // } uint64_t GetNumber() const { return packed_number_and_path_id & kFileNumberMask; diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index cb880560efc9..8860938525e2 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -185,6 +185,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { // This allows us to confidently detect the first writer // who increases threads_linked as the leader. while (threads_linked.load() < cur_threads_joining) { + std::this_thread::yield(); } }); @@ -258,11 +259,13 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { // leaders gotta lead while (i > 0 && threads_verified.load() < 1) { + std::this_thread::yield(); } // loser has to lose while (i == write_group.size() - 1 && threads_verified.load() < write_group.size() - 1) { + std::this_thread::yield(); } auto& write_op = write_group.at(i); diff --git a/env/env_posix.cc b/env/env_posix.cc index 387c0279397c..088129eae8ea 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -12,7 +12,6 @@ #if defined(OS_LINUX) #include #endif -#include #include #include #include @@ -120,8 +119,9 @@ class PosixEnv : public Env { PosixEnv(); ~PosixEnv() override { - for (const auto tid : threads_to_join_) { - pthread_join(tid, nullptr); + LOG_INFO("global PosixEnv destruct: Join thread pools"); + for (auto& tid : threads_to_join_) { + tid.join(); } for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { thread_pools_[pool_id].JoinAllThreads(); @@ -255,7 +255,7 @@ class PosixEnv : public Env { result->reset(); Status s; int fd = -1; - int flags = (reopen) ? (O_CREAT | O_APPEND) : (O_CREAT | O_TRUNC); + int flags = (reopen) ? (O_CREAT | O_APPEND) : (O_CREAT | O_TRUNC | O_APPEND); // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX) if (options.use_direct_writes && !options.use_mmap_writes) { // Note: we should avoid O_APPEND here due to ta the following bug: @@ -760,18 +760,11 @@ class PosixEnv : public Env { return thread_status_updater_->GetThreadList(thread_list); } - static uint64_t gettid(pthread_t tid) { - uint64_t thread_id = 0; - memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); - return thread_id; - } - static uint64_t gettid() { - pthread_t tid = pthread_self(); - return gettid(tid); + return (uint64_t) photon::CURRENT; } - uint64_t GetThreadID() const override { return gettid(pthread_self()); } + uint64_t GetThreadID() const override { return gettid(); } Status GetFreeSpace(const std::string& fname, uint64_t* free_space) override { struct statvfs sbuf; @@ -847,7 +840,7 @@ class PosixEnv : public Env { return 0; } - void SleepForMicroseconds(int micros) override { usleep(micros); } + void SleepForMicroseconds(int micros) override { std::this_thread::sleep_for(std::chrono::microseconds(micros)); } Status GetHostName(char* name, uint64_t len) override { int ret = gethostname(name, static_cast(len)); @@ -1008,8 +1001,8 @@ class PosixEnv : public Env { size_t page_size_; std::vector thread_pools_; - pthread_mutex_t mu_; - std::vector threads_to_join_; + std::mutex mu_; + std::vector threads_to_join_; // If true, allow non owner read access for db files. Otherwise, non-owner // has no access to db files. bool allow_non_owner_access_; @@ -1021,7 +1014,7 @@ PosixEnv::PosixEnv() page_size_(getpagesize()), thread_pools_(Priority::TOTAL), allow_non_owner_access_(true) { - ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); + LOG_INFO("global PosixEnv construct: Create thread pools"); for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { thread_pools_[pool_id].SetThreadPriority( static_cast(pool_id)); @@ -1059,20 +1052,16 @@ static void* StartThreadWrapper(void* arg) { } void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { - pthread_t t; StartThreadState* state = new StartThreadState; state->user_function = function; state->arg = arg; - ThreadPoolImpl::PthreadCall( - "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); - ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_)); - threads_to_join_.push_back(t); - ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + std::lock_guard lock(mu_); + threads_to_join_.emplace_back(std::thread(&StartThreadWrapper, state)); } void PosixEnv::WaitForJoin() { - for (const auto tid : threads_to_join_) { - pthread_join(tid, nullptr); + for (auto& tid : threads_to_join_) { + tid.join(); } threads_to_join_.clear(); } @@ -1104,6 +1093,29 @@ std::string Env::GenerateUniqueId() { return uuid2; } +PhotonEnv::PhotonEnv(int vcpu_num, int ev_engine) { + LOG_INFO("Begin init Photon Env"); + set_log_output_level(ALOG_INFO); + int ret = photon::init(ev_engine, photon::INIT_IO_NONE); + if (ret != 0) { + LOG_FATAL("Photon init failed"); + abort(); + } + ret = photon_std::work_pool_init(vcpu_num, ev_engine, photon::INIT_IO_NONE); + if (ret != 0) { + LOG_FATAL("Work-pool init failed"); + abort(); + } + LOG_INFO("End init Photon Env"); +} + +PhotonEnv::~PhotonEnv() { + LOG_INFO("Begin destruct Photon Env"); + photon_std::work_pool_fini(); + photon::fini(); + LOG_INFO("End destruct Photon Env"); +} + // // Default Posix Env // @@ -1118,6 +1130,9 @@ Env* Env::Default() { // of their construction, having this call here guarantees that // the destructor of static PosixEnv will go first, then the // the singletons of ThreadLocalPtr. +#ifdef INIT_PHOTON_IN_ENV + PhotonEnv::Singleton(); +#endif ThreadLocalPtr::InitSingletons(); CompressionContextCache::InitSingleton(); INIT_SYNC_POINT_SINGLETONS(); diff --git a/env/io_posix.cc b/env/io_posix.cc index 628ed8413002..123980d3740f 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -40,6 +40,46 @@ #define F_SET_RW_HINT (F_LINUX_SPECIFIC_BASE + 12) #endif +#ifdef PHOTON_ENABLE_URING +ssize_t photon_read(int fd, void* buf, size_t count) { + return photon::iouring_pread(fd, buf, count, -1); +} +ssize_t photon_write(int fd, const void* buf, size_t count) { + return photon::iouring_pwrite(fd, buf, count, -1); +} +ssize_t photon_pread(int fd, void* buf, size_t count, off_t offset) { + return photon::iouring_pread(fd, buf, count, offset); +} +ssize_t photon_pwrite(int fd, const void* buf, size_t count, off_t offset) { + return photon::iouring_pwrite(fd, buf, count, offset); +} +int photon_fsync(int fd) { + return photon::iouring_fsync(fd); +} +int photon_fdatasync(int fd) { + return photon::iouring_fdatasync(fd); +} +#else +ssize_t photon_read(int fd, void* buf, size_t count) { + return read(fd, buf, count); +} +ssize_t photon_write(int fd, const void* buf, size_t count) { + return write(fd, buf, count); +} +ssize_t photon_pread(int fd, void* buf, size_t count, off_t offset) { + return pread(fd, buf, count, offset); +} +ssize_t photon_pwrite(int fd, const void* buf, size_t count, off_t offset) { + return pwrite(fd, buf, count, offset); +} +int photon_fsync(int fd) { + return fsync(fd); +} +int photon_fdatasync(int fd) { + return fdatasync(fd); +} +#endif + namespace rocksdb { // A wrapper for fadvise, if the platform doesn't support fadvise, @@ -201,11 +241,8 @@ Status PosixSequentialFile::PositionedRead(uint64_t offset, size_t n, size_t left = n; char* ptr = scratch; while (left > 0) { - r = pread(fd_, ptr, left, static_cast(offset)); + r = photon_pread(fd_, ptr, left, (off_t)offset); if (r <= 0) { - if (r == -1 && errno == EINTR) { - continue; - } break; } ptr += r; @@ -335,11 +372,8 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result, size_t left = n; char* ptr = scratch; while (left > 0) { - r = pread(fd_, ptr, left, static_cast(offset)); + r = photon_pread(fd_, ptr, left, offset); if (r <= 0) { - if (r == -1 && errno == EINTR) { - continue; - } break; } ptr += r; @@ -760,11 +794,8 @@ Status PosixWritableFile::Append(const Slice& data) { const char* src = data.data(); size_t left = data.size(); while (left != 0) { - ssize_t done = write(fd_, src, left); + ssize_t done = photon_write(fd_, src, left); if (done < 0) { - if (errno == EINTR) { - continue; - } return IOError("While appending to file", filename_, errno); } left -= done; @@ -784,11 +815,8 @@ Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) { const char* src = data.data(); size_t left = data.size(); while (left != 0) { - ssize_t done = pwrite(fd_, src, left, static_cast(offset)); + ssize_t done = photon_pwrite(fd_, src, left, (off_t)offset);; if (done < 0) { - if (errno == EINTR) { - continue; - } return IOError("While pwrite to file at offset " + ToString(offset), filename_, errno); } @@ -870,14 +898,14 @@ Status PosixWritableFile::Close() { Status PosixWritableFile::Flush() { return Status::OK(); } Status PosixWritableFile::Sync() { - if (fdatasync(fd_) < 0) { + if (photon_fdatasync(fd_) < 0) { return IOError("While fdatasync", filename_, errno); } return Status::OK(); } Status PosixWritableFile::Fsync() { - if (fsync(fd_) < 0) { + if (photon_fsync(fd_) < 0) { return IOError("While fsync", filename_, errno); } return Status::OK(); @@ -984,13 +1012,9 @@ Status PosixRandomRWFile::Write(uint64_t offset, const Slice& data) { const char* src = data.data(); size_t left = data.size(); while (left != 0) { - ssize_t done = pwrite(fd_, src, left, offset); + ssize_t done = photon_pwrite(fd_, src, left, (off_t)offset); if (done < 0) { // error while writing to file - if (errno == EINTR) { - // write was interrupted, try again. - continue; - } return IOError( "While write random read/write file at offset " + ToString(offset), filename_, errno); @@ -1010,13 +1034,9 @@ Status PosixRandomRWFile::Read(uint64_t offset, size_t n, Slice* result, size_t left = n; char* ptr = scratch; while (left > 0) { - ssize_t done = pread(fd_, ptr, left, offset); + ssize_t done = photon_pread(fd_, ptr, left, (off_t)offset); if (done < 0) { // error while reading from file - if (errno == EINTR) { - // read was interrupted, try again. - continue; - } return IOError("While reading random read/write file offset " + ToString(offset) + " len " + ToString(n), filename_, errno); @@ -1038,14 +1058,14 @@ Status PosixRandomRWFile::Read(uint64_t offset, size_t n, Slice* result, Status PosixRandomRWFile::Flush() { return Status::OK(); } Status PosixRandomRWFile::Sync() { - if (fdatasync(fd_) < 0) { + if (photon_fdatasync(fd_) < 0) { return IOError("While fdatasync random read/write file", filename_, errno); } return Status::OK(); } Status PosixRandomRWFile::Fsync() { - if (fsync(fd_) < 0) { + if (photon_fsync(fd_) < 0) { return IOError("While fsync random read/write file", filename_, errno); } return Status::OK(); diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt new file mode 100644 index 000000000000..6383fd9c964c --- /dev/null +++ b/examples/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(perf-client perf/perf-client.cpp) +target_include_directories(perf-client PRIVATE ${PROJECT_SOURCE_DIR}/include) +target_link_libraries(perf-client ${ROCKSDB_STATIC_LIB}) + +add_executable(perf-server perf/perf-server.cpp) +target_include_directories(perf-server PRIVATE ${PROJECT_SOURCE_DIR}/include) +target_link_libraries(perf-server ${ROCKSDB_STATIC_LIB}) \ No newline at end of file diff --git a/examples/perf/perf-client.cpp b/examples/perf/perf-client.cpp new file mode 100644 index 000000000000..fe0d1ecc9833 --- /dev/null +++ b/examples/perf/perf-client.cpp @@ -0,0 +1,119 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "protocol.h" + +DEFINE_int32(port, 9527, "server port"); +DEFINE_string(host, "127.0.0.1", "server ip"); +DEFINE_string(type, "fill", "fill/get/put"); +DEFINE_int32(concurrency, 32, "concurrency"); +DEFINE_int32(key_num, 100'000, "key num"); +DEFINE_int32(value_size, 256 * 1024, "value size"); + +static std::string random_value(size_t size) { + static std::random_device rd; + static thread_local std::mt19937_64 gen(rd()); + static const char alphabet[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + std::string s; + s.resize(size); + for (size_t i = 0; i < size; ++i) { + s[i] = alphabet[gen() % (sizeof(alphabet) - 1)]; + } + return s; +} + +static std::string random_key() { + static std::random_device rd; + static thread_local std::mt19937_64 gen(rd()); + return std::to_string(gen() % FLAGS_key_num); +} + +void run_put(photon::net::EndPoint ep, photon::rpc::StubPool* pool) { + int ret; + auto stub = pool->get_stub(ep, false); + DEFER(pool->put_stub(ep, ret < 0)); + + while (true) { + KvPut::Request req; + auto key = random_key(); + req.key.assign(key); + auto val = random_value(FLAGS_value_size); + req.value.assign(val); + + KvPut::Response resp; + ret = stub->call(req, resp); + if (ret < 0 || resp.ret != 0) abort(); + } +} + +void run_get(photon::net::EndPoint ep, photon::rpc::StubPool* pool) { + int ret; + auto stub = pool->get_stub(ep, false); + DEFER(pool->put_stub(ep, ret < 0)); + + while (true) { + KvGet::Request req; + std::string key = random_key(); + req.key.assign(key); + + KvGet::Response resp; + ret = stub->call(req, resp); + if (ret < 0 || resp.ret != 0 || resp.value.size() != (uint64_t) FLAGS_value_size) { + abort(); + } + } +} + +void run_fill(photon::net::EndPoint ep, photon::rpc::StubPool* pool) { + int ret; + auto stub = pool->get_stub(ep, false); + DEFER(pool->put_stub(ep, ret < 0)); + + for (int i = 0; i < FLAGS_key_num; ++i) { + KvPut::Request req; + auto key = random_key(); + req.key.assign(key); + auto val = random_value(FLAGS_value_size); + req.value.assign(val); + + KvPut::Response resp; + ret = stub->call(req, resp); + if (ret < 0 || resp.ret != 0) { + abort(); + } + } +} + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + set_log_output_level(ALOG_INFO); + if (photon::init(photon::INIT_EVENT_IOURING, photon::INIT_IO_NONE)) { + LOG_ERROR_RETURN(0, -1, "fail to init photon"); + } + DEFER(photon::fini()); + + auto ep = photon::net::EndPoint(photon::net::IPAddr(FLAGS_host.c_str()), + FLAGS_port); + + auto pool = photon::rpc::new_stub_pool(-1, -1); + DEFER(delete pool); + + if (FLAGS_type == "fill") { + run_fill(ep, pool); + } else if (FLAGS_type == "put") { + for (int i = 0; i < FLAGS_concurrency; ++i) { + photon::thread_create11(run_put, ep, pool); + } + photon::thread_sleep(-1); + } else { + for (int i = 0; i < FLAGS_concurrency; ++i) { + photon::thread_create11(run_get, ep, pool); + } + photon::thread_sleep(-1); + } +} diff --git a/examples/perf/perf-server.cpp b/examples/perf/perf-server.cpp new file mode 100644 index 000000000000..8af035c49246 --- /dev/null +++ b/examples/perf/perf-server.cpp @@ -0,0 +1,214 @@ +// Photon版本RocksDB server,在接收RPC的vCPU直接查询DB +// 原生多线程版本RocksDB server,使用WorkPool派发任务到多线程 + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "protocol.h" + +DEFINE_int32(port, 9527, "Server listen port"); +DEFINE_int32(show_qps_interval, 1, "Interval seconds to show qps"); +DEFINE_int32(vcpu_num, 8, "vCPU number"); +DEFINE_bool(use_photon, false, "Use photon rocksdb instead of the native"); +DEFINE_string(db_dir, "perf-db", "DB dir"); +DEFINE_bool(clean_db, false, "Clean db before tests"); + +static std::atomic qps{0}; + +static void show_qps_loop() { + while (true) { + photon::thread_sleep(FLAGS_show_qps_interval); + LOG_INFO("QPS: `", qps.load() / FLAGS_show_qps_interval); + qps = 0; + } +} + +class IOHandler { +public: + IOHandler(rocksdb::DB* db, rocksdb::WriteOptions* writeOptions, + rocksdb::ReadOptions* readOptions, photon::WorkPool* work_pool) : + skeleton_(photon::rpc::new_skeleton(65536U)), + socket_server_(photon::net::new_tcp_socket_server()), + db_(db), + writeOptions_(writeOptions), + readOptions_(readOptions), + work_pool_(work_pool) { + skeleton_->register_service(this); + } + + int serve(photon::net::ISocketStream* stream) { + return skeleton_->serve(stream); + } + + int run() { + socket_server_->set_handler({this, &IOHandler::serve}); + socket_server_->setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); + if (socket_server_->bind(FLAGS_port) < 0) { + LOG_ERRNO_RETURN(0, -1, "Failed to bind port `", FLAGS_port) + } + if (socket_server_->listen() < 0) { + LOG_ERRNO_RETURN(0, -1, "Failed to listen"); + } + LOG_INFO("Started rpc server at `", socket_server_->getsockname()); + return socket_server_->start_loop(true); + } + + int do_rpc_service(KvPut::Request* req, KvPut::Response* resp, IOVector*, IStream*) { + if (FLAGS_use_photon) { + do_put(req); + } else { + photon::semaphore sem; + auto func = new auto([&]() { + do_put(req); + sem.signal(1); + }); + work_pool_->async_call(func); + sem.wait(1); + } + resp->ret = 0; + qps++; + return 0; + } + + int do_rpc_service(KvGet::Request* req, KvGet::Response* resp, IOVector*, IStream*) { + std::string val; + if (FLAGS_use_photon) { + do_get(req, &val); + } else { + photon::semaphore sem; + auto func = new auto([&]() { + do_get(req, &val); + sem.signal(1); + }); + work_pool_->async_call(func); + sem.wait(1); + } + resp->ret = 0; + resp->value.assign(val); + qps++; + return 0; + } + +private: + std::unique_ptr skeleton_; + std::unique_ptr socket_server_; + rocksdb::DB* db_; // Owned by others + rocksdb::WriteOptions* writeOptions_; // Owned by others + rocksdb::ReadOptions* readOptions_; // Owned by others + photon::WorkPool* work_pool_; // Owned by others + + void do_put(KvPut::Request* req) { + rocksdb::Slice key(req->key.c_str(), req->key.size()); + rocksdb::Slice val(req->value.c_str(), req->value.size()); + rocksdb::Status s = db_->Put(*writeOptions_, key, val); + if (!s.ok()) { + LOG_ERROR("db write error"); + abort(); + } + } + + void do_get(KvGet::Request* req, std::string* val) { + rocksdb::Slice key(req->key.c_str(), req->key.size()); + rocksdb::Status s = db_->Get(*readOptions_, key, val); + if (!s.ok()) { + LOG_ERROR("db read error"); + abort(); + } + } +}; + +class ExampleServer { +public: + ExampleServer() { + writeOptions.sync = false; + pool = new photon::WorkPool(FLAGS_vcpu_num, photon::INIT_EVENT_IOURING, 0); + } + + int run() { + // Optimize RocksDB + if (FLAGS_use_photon) { + options.IncreaseParallelism(256); + options.allow_concurrent_memtable_write = false; + options.enable_pipelined_write = false; + } else { + options.IncreaseParallelism(8); + } + options.OptimizeLevelStyleCompaction(); + options.compression = rocksdb::CompressionType::kNoCompression; + // create the DB if it's not already present + options.create_if_missing = true; + + if (open_db()) { + return -1; + } + + for (int i = 0; i < FLAGS_vcpu_num; ++i) { + std::thread([&] { + int ret = photon::init(photon::INIT_EVENT_IOURING, photon::INIT_IO_NONE); + if (ret) { + abort(); + } + DEFER(photon::fini()); + IOHandler handler(db, &writeOptions, &readOptions, pool); + handler.run(); + }).detach(); + } + return 0; + } + +private: + rocksdb::DB* db = nullptr; + rocksdb::Options options; + rocksdb::WriteOptions writeOptions; + rocksdb::ReadOptions readOptions; + photon::WorkPool* pool = nullptr; + + int open_db() { + auto path = std::string(get_current_dir_name()) + "/" + FLAGS_db_dir; + if (FLAGS_clean_db) { + system((std::string("rm -rf ") + path).c_str()); + LOG_INFO("Create new db at `", path.c_str()); + } else { + LOG_INFO("Open db at `", path.c_str()); + } + rocksdb::Status s = rocksdb::DB::Open(options, path, &db); + if (!s.ok()) { + LOG_ERROR_RETURN(0, -1, "open db failed:`", s.ToString()); + } + return 0; + } +}; + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + set_log_output_level(ALOG_INFO); + if (photon::init(photon::INIT_EVENT_IOURING, photon::INIT_IO_NONE)) { + LOG_ERROR_RETURN(0, -1, "fail to init photon"); + } + DEFER(photon::fini()); + + photon::thread_create11(show_qps_loop); + + auto server = new ExampleServer(); + if (server->run()) { + return -1; + } + photon::thread_sleep(-1UL); +} diff --git a/examples/perf/protocol.h b/examples/perf/protocol.h new file mode 100644 index 000000000000..b31c123295d9 --- /dev/null +++ b/examples/perf/protocol.h @@ -0,0 +1,41 @@ +#pragma once + +#include + +#include + +struct KvPut { + const static uint32_t IID = 1; + const static uint32_t FID = 1; + + struct Request : public photon::rpc::Message { + photon::rpc::string key; + photon::rpc::string value; + + PROCESS_FIELDS(key, value); + }; + + struct Response : public photon::rpc::Message { + int32_t ret; + + PROCESS_FIELDS(ret); + }; +}; + +struct KvGet { + const static uint32_t IID = 1; + const static uint32_t FID = 2; + + struct Request : public photon::rpc::Message { + photon::rpc::string key; + + PROCESS_FIELDS(key); + }; + + struct Response : public photon::rpc::Message { + int32_t ret; + photon::rpc::string value; + + PROCESS_FIELDS(ret, value); + }; +}; \ No newline at end of file diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 4d3a96fe288d..588bb8247571 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -25,6 +25,7 @@ #include #include "rocksdb/status.h" #include "rocksdb/thread_status.h" +#include "port/port.h" #ifdef _WIN32 // Windows API macro interference @@ -1454,4 +1455,26 @@ Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname); // This is a factory method for TimedEnv defined in utilities/env_timed.cc. Env* NewTimedEnv(Env* base_env); +class PhotonEnv { +public: + static PhotonEnv& Singleton() { + // 8 vCPU. Hardcoded for now. +#ifdef PHOTON_ENABLE_URING + static PhotonEnv instance(8, photon::INIT_EVENT_IOURING); +#else + static PhotonEnv instance(8, photon::INIT_EVENT_EPOLL); +#endif + return instance; + } + + PhotonEnv(PhotonEnv const&) = delete; + PhotonEnv(PhotonEnv&&) = delete; + PhotonEnv& operator=(PhotonEnv const&) = delete; + PhotonEnv& operator=(PhotonEnv&&) = delete; + +private: + PhotonEnv(int vcpu_num, int ev_engine); + ~PhotonEnv(); +}; + } // namespace rocksdb diff --git a/include/rocksdb/slice.h b/include/rocksdb/slice.h index 2b01e6d9a66e..931f2731e160 100644 --- a/include/rocksdb/slice.h +++ b/include/rocksdb/slice.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #ifdef __cpp_lib_string_view diff --git a/monitoring/iostats_context.cc b/monitoring/iostats_context.cc index 3d102f912034..5534853d64f4 100644 --- a/monitoring/iostats_context.cc +++ b/monitoring/iostats_context.cc @@ -10,12 +10,12 @@ namespace rocksdb { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL -__thread IOStatsContext iostats_context; +photon::thread_local_ptr iostats_context; #endif IOStatsContext* get_iostats_context() { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL - return &iostats_context; + return iostats_context.operator->(); #else return nullptr; #endif diff --git a/monitoring/iostats_context_imp.h b/monitoring/iostats_context_imp.h index 23c2088cab24..80a84d98502f 100644 --- a/monitoring/iostats_context_imp.h +++ b/monitoring/iostats_context_imp.h @@ -9,38 +9,38 @@ #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL namespace rocksdb { -extern __thread IOStatsContext iostats_context; +extern photon::thread_local_ptr iostats_context; } // namespace rocksdb // increment a specific counter by the specified value -#define IOSTATS_ADD(metric, value) (iostats_context.metric += value) +#define IOSTATS_ADD(metric, value) (iostats_context->metric += value) // Increase metric value only when it is positive #define IOSTATS_ADD_IF_POSITIVE(metric, value) \ if (value > 0) { IOSTATS_ADD(metric, value); } // reset a specific counter to zero -#define IOSTATS_RESET(metric) (iostats_context.metric = 0) +#define IOSTATS_RESET(metric) (iostats_context->metric = 0) // reset all counters to zero -#define IOSTATS_RESET_ALL() (iostats_context.Reset()) +#define IOSTATS_RESET_ALL() (iostats_context->Reset()) #define IOSTATS_SET_THREAD_POOL_ID(value) \ - (iostats_context.thread_pool_id = value) + (iostats_context->thread_pool_id = value) -#define IOSTATS_THREAD_POOL_ID() (iostats_context.thread_pool_id) +#define IOSTATS_THREAD_POOL_ID() (iostats_context->thread_pool_id) -#define IOSTATS(metric) (iostats_context.metric) +#define IOSTATS(metric) (iostats_context->metric) // Declare and set start time of the timer #define IOSTATS_TIMER_GUARD(metric) \ - PerfStepTimer iostats_step_timer_##metric(&(iostats_context.metric)); \ + PerfStepTimer iostats_step_timer_##metric(&(iostats_context->metric)); \ iostats_step_timer_##metric.Start(); // Declare and set start time of the timer #define IOSTATS_CPU_TIMER_GUARD(metric, env) \ PerfStepTimer iostats_step_timer_##metric( \ - &(iostats_context.metric), env, true, \ + &(iostats_context->metric), env, true, \ PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \ iostats_step_timer_##metric.Start(); diff --git a/monitoring/iostats_context_test.cc b/monitoring/iostats_context_test.cc index 74d3e43291dc..daf03ea083c6 100644 --- a/monitoring/iostats_context_test.cc +++ b/monitoring/iostats_context_test.cc @@ -24,6 +24,7 @@ TEST(IOStatsContextTest, ToString) { } // namespace rocksdb int main(int argc, char** argv) { + rocksdb::PhotonEnv::Singleton(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/monitoring/perf_context.cc b/monitoring/perf_context.cc index 40b0b215c472..bdc1fb06c064 100644 --- a/monitoring/perf_context.cc +++ b/monitoring/perf_context.cc @@ -15,7 +15,7 @@ PerfContext perf_context; #if defined(OS_SOLARIS) __thread PerfContext perf_context_; #else -thread_local PerfContext perf_context; +photon::thread_local_ptr perf_context; #endif #endif @@ -26,7 +26,7 @@ PerfContext* get_perf_context() { #if defined(OS_SOLARIS) return &perf_context_; #else - return &perf_context; + return perf_context.operator->(); #endif #endif } diff --git a/monitoring/perf_context_imp.h b/monitoring/perf_context_imp.h index e0ff8afc58e1..a0d4b89c9481 100644 --- a/monitoring/perf_context_imp.h +++ b/monitoring/perf_context_imp.h @@ -16,7 +16,7 @@ extern PerfContext perf_context; extern __thread PerfContext perf_context_; #define perf_context (*get_perf_context()) #else -extern thread_local PerfContext perf_context; +extern photon::thread_local_ptr perf_context; #endif #endif @@ -38,24 +38,24 @@ extern thread_local PerfContext perf_context; // Declare and set start time of the timer #define PERF_TIMER_GUARD(metric) \ - PerfStepTimer perf_step_timer_##metric(&(perf_context.metric)); \ + PerfStepTimer perf_step_timer_##metric(&(perf_context->metric)); \ perf_step_timer_##metric.Start(); // Declare and set start time of the timer #define PERF_TIMER_GUARD_WITH_ENV(metric, env) \ - PerfStepTimer perf_step_timer_##metric(&(perf_context.metric), env); \ + PerfStepTimer perf_step_timer_##metric(&(perf_context->metric), env); \ perf_step_timer_##metric.Start(); // Declare and set start time of the timer #define PERF_CPU_TIMER_GUARD(metric, env) \ PerfStepTimer perf_step_timer_##metric( \ - &(perf_context.metric), env, true, \ + &(perf_context->metric), env, true, \ PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \ perf_step_timer_##metric.Start(); #define PERF_CONDITIONAL_TIMER_FOR_MUTEX_GUARD(metric, condition, stats, \ ticker_type) \ - PerfStepTimer perf_step_timer_##metric(&(perf_context.metric), nullptr, \ + PerfStepTimer perf_step_timer_##metric(&(perf_context->metric), nullptr, \ false, PerfLevel::kEnableTime, stats, \ ticker_type); \ if (condition) { \ @@ -68,23 +68,23 @@ extern thread_local PerfContext perf_context; // Increase metric value #define PERF_COUNTER_ADD(metric, value) \ - if (perf_level >= PerfLevel::kEnableCount) { \ - perf_context.metric += value; \ + if (*perf_level >= PerfLevel::kEnableCount) { \ + perf_context->metric += value; \ } // Increase metric value #define PERF_COUNTER_BY_LEVEL_ADD(metric, value, level) \ - if (perf_level >= PerfLevel::kEnableCount && \ - perf_context.per_level_perf_context_enabled && \ - perf_context.level_to_perf_context) { \ - if ((*(perf_context.level_to_perf_context)).find(level) != \ - (*(perf_context.level_to_perf_context)).end()) { \ - (*(perf_context.level_to_perf_context))[level].metric += value; \ + if (*perf_level >= PerfLevel::kEnableCount && \ + perf_context->per_level_perf_context_enabled && \ + perf_context->level_to_perf_context) { \ + if ((*(perf_context->level_to_perf_context)).find(level) != \ + (*(perf_context->level_to_perf_context)).end()) { \ + (*(perf_context->level_to_perf_context))[level].metric += value; \ } \ else { \ PerfContextByLevel empty_context; \ - (*(perf_context.level_to_perf_context))[level] = empty_context; \ - (*(perf_context.level_to_perf_context))[level].metric += value; \ + (*(perf_context->level_to_perf_context))[level] = empty_context; \ + (*(perf_context->level_to_perf_context))[level].metric += value; \ } \ } \ diff --git a/monitoring/perf_level.cc b/monitoring/perf_level.cc index 79c718cce764..527f6d434907 100644 --- a/monitoring/perf_level.cc +++ b/monitoring/perf_level.cc @@ -10,7 +10,7 @@ namespace rocksdb { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL -__thread PerfLevel perf_level = kEnableCount; +photon::thread_local_ptr perf_level(kEnableCount); #else PerfLevel perf_level = kEnableCount; #endif @@ -18,11 +18,11 @@ PerfLevel perf_level = kEnableCount; void SetPerfLevel(PerfLevel level) { assert(level > kUninitialized); assert(level < kOutOfBounds); - perf_level = level; + *perf_level = level; } PerfLevel GetPerfLevel() { - return perf_level; + return *perf_level; } } // namespace rocksdb diff --git a/monitoring/perf_level_imp.h b/monitoring/perf_level_imp.h index 2a3add19cee2..6e56c79e1cf2 100644 --- a/monitoring/perf_level_imp.h +++ b/monitoring/perf_level_imp.h @@ -10,7 +10,7 @@ namespace rocksdb { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL -extern __thread PerfLevel perf_level; +extern photon::thread_local_ptr perf_level; #else extern PerfLevel perf_level; #endif diff --git a/monitoring/perf_step_timer.h b/monitoring/perf_step_timer.h index 6501bd54aba9..35a416e30ccc 100644 --- a/monitoring/perf_step_timer.h +++ b/monitoring/perf_step_timer.h @@ -16,7 +16,7 @@ class PerfStepTimer { uint64_t* metric, Env* env = nullptr, bool use_cpu_time = false, PerfLevel enable_level = PerfLevel::kEnableTimeExceptForMutex, Statistics* statistics = nullptr, uint32_t ticker_type = 0) - : perf_counter_enabled_(perf_level >= enable_level), + : perf_counter_enabled_((*perf_level) >= enable_level), use_cpu_time_(use_cpu_time), env_((perf_counter_enabled_ || statistics != nullptr) ? ((env != nullptr) ? env : Env::Default()) diff --git a/monitoring/thread_status_updater.cc b/monitoring/thread_status_updater.cc index cde44928b620..408d509a6d3a 100644 --- a/monitoring/thread_status_updater.cc +++ b/monitoring/thread_status_updater.cc @@ -13,7 +13,8 @@ namespace rocksdb { #ifdef ROCKSDB_USING_THREAD_STATUS -__thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr; +photon::thread_local_ptr ThreadStatusUpdater::thread_status_data_ptr_(nullptr); +#define thread_status_data_ (*thread_status_data_ptr_) void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType ttype, uint64_t thread_id) { diff --git a/monitoring/thread_status_updater.h b/monitoring/thread_status_updater.h index 6706d159dfbf..a9fec89521c9 100644 --- a/monitoring/thread_status_updater.h +++ b/monitoring/thread_status_updater.h @@ -196,7 +196,7 @@ class ThreadStatusUpdater { protected: #ifdef ROCKSDB_USING_THREAD_STATUS // The thread-local variable for storing thread status. - static __thread ThreadStatusData* thread_status_data_; + static photon::thread_local_ptr thread_status_data_ptr_; // Returns the pointer to the thread status data only when the // thread status data is non-null and has enable_tracking == true. @@ -205,7 +205,7 @@ class ThreadStatusUpdater { // Directly returns the pointer to thread_status_data_ without // checking whether enabling_tracking is true of not. ThreadStatusData* Get() { - return thread_status_data_; + return *thread_status_data_ptr_; } // The mutex that protects cf_info_map and db_key_map. diff --git a/monitoring/thread_status_util.cc b/monitoring/thread_status_util.cc index c2af0a574546..9782b15048a1 100644 --- a/monitoring/thread_status_util.cc +++ b/monitoring/thread_status_util.cc @@ -11,9 +11,10 @@ namespace rocksdb { #ifdef ROCKSDB_USING_THREAD_STATUS -__thread ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = - nullptr; -__thread bool ThreadStatusUtil::thread_updater_initialized_ = false; +photon::thread_local_ptr ThreadStatusUtil::thread_updater_local_cache_ptr_(nullptr); +photon::thread_local_ptr ThreadStatusUtil::thread_updater_initialized_ptr_(false); +#define thread_updater_initialized_ (*thread_updater_initialized_ptr_) +#define thread_updater_local_cache_ (*thread_updater_local_cache_ptr_) void ThreadStatusUtil::RegisterThread(const Env* env, ThreadStatus::ThreadType thread_type) { diff --git a/monitoring/thread_status_util.h b/monitoring/thread_status_util.h index a403435c3d03..c455b0a1fb92 100644 --- a/monitoring/thread_status_util.h +++ b/monitoring/thread_status_util.h @@ -94,7 +94,7 @@ class ThreadStatusUtil { // When this variable is set to true, thread_updater_local_cache_ // will not be updated until this variable is again set to false // in UnregisterThread(). - static __thread bool thread_updater_initialized_; + static photon::thread_local_ptr thread_updater_initialized_ptr_; // The thread-local cached ThreadStatusUpdater that caches the // thread_status_updater_ of the first Env that uses any ThreadStatusUtil @@ -109,7 +109,8 @@ class ThreadStatusUtil { // When thread_updater_initialized_ is set to true, this variable // will not be updated until this thread_updater_initialized_ is // again set to false in UnregisterThread(). - static __thread ThreadStatusUpdater* thread_updater_local_cache_; + static photon::thread_local_ptr thread_updater_local_cache_ptr_; + #else static bool thread_updater_initialized_; static ThreadStatusUpdater* thread_updater_local_cache_; diff --git a/photon-auto-convert.sh b/photon-auto-convert.sh new file mode 100755 index 000000000000..08d4e5ac2c2a --- /dev/null +++ b/photon-auto-convert.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +set -e + +cc_files=$(find . -type f -name "*.cc" -not -path "./build/*" -not -path "./third-party/PhotonLibOS/*") +h_files=$(find . -type f -name "*.h" -not -path "./build/*" -not -path "./third-party/PhotonLibOS/*") +files="${cc_files} ${h_files}" + +sed -i 's|#include |#include "port/port.h"|g' $files +sed -i 's|#include |#include "port/port.h"|g' $files +sed -i 's|#include |#include "port/port.h"|g' $files +sed -i 's/std::mutex/photon_std::mutex/g' $files +sed -i 's/std::condition_variable/photon_std::condition_variable/g' $files +sed -i 's/std::lock_guard/photon_std::lock_guard/g' $files +sed -i 's/std::unique_lock/photon_std::unique_lock/g' $files +sed -i 's/std::thread/photon_std::thread/g' $files +sed -i 's/std::this_thread/photon_std::this_thread/g' $files diff --git a/photon-bench.md b/photon-bench.md new file mode 100644 index 000000000000..3cad580a4edd --- /dev/null +++ b/photon-bench.md @@ -0,0 +1,30 @@ +## Standalone db_bench tests + +```bash +cd build +cp ../tools/benchmark.sh . + +export DB_DIR=`pwd`/test-db +export WAL_DIR=$DB_DIR +export OUTPUT_DIR=$DB_DIR +export COMPRESSION_TYPE=none +export NUM_THREADS=64 # Concurrency +export KEY_SIZE=20 +export VALUE_SIZE=400 +export NUM_KEYS=100000000 # Require 40 GB disk space +export CACHE_SIZE=0 # Disable block cache. Need to remove --pin_l0_filter_and_index_blocks_in_cache=1 argument from benchmark.sh +export DURATION=60 # Only run 1 minutes + +# In env.h, Photon now has hardcoded to use 8 vCPUs. +# In order to make a fair comparison, you may edit benchmark.sh, and add `taskset -c 1-8` before the ./db_bench command. +# This would limit the CPU number for both thread and coroutine. +# But it's not necessary, because essentially these two concurrency models are quite different. + +# Clean page cache before every test +echo 3 > /proc/sys/vm/drop_caches + +./benchmark.sh bulkload # Generate data +./benchmark.sh readrandom # Read test +./benchmark.sh overwrite # Overwrite test (sync = 0) +./benchmark.sh updaterandom # Update test (read first, then write, sync = 1) +``` diff --git a/photon.md b/photon.md new file mode 100644 index 000000000000..71db5c4212bc --- /dev/null +++ b/photon.md @@ -0,0 +1,34 @@ +## Build + +```bash +# Install dependencies +dnf install gflags-devel snappy-devel zlib-devel bzip2-devel lz4-devel libzstd-devel +apt install libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev libzstd-dev + +# Build performance test on RPC client/server (Photon RocksDB) +./photon-auto-convert.sh +cmake -B build -D INIT_PHOTON_IN_ENV=off -D WITH_LZ4=on -D WITH_SNAPPY=on -D CMAKE_BUILD_TYPE=Release +cmake --build build -t perf-client -t perf-server -j `nproc` + +# Build performance test on RPC client/server (Native RocksDB) +git checkout 6.1.2 +git checkout origin/photon-on-6.1.2 -- examples/ CMakeLists.txt +cmake -B build -D INIT_PHOTON_IN_ENV=off -D WITH_LZ4=on -D WITH_SNAPPY=on -D CMAKE_BUILD_TYPE=Release +cmake --build build -t perf-client -t perf-server -j `nproc` + +# Build db_bench +./photon-auto-convert.sh +cmake -B build -D INIT_PHOTON_IN_ENV=on -D WITH_LZ4=on -D WITH_SNAPPY=on -D CMAKE_BUILD_TYPE=Release +cmake --build build -t db_bench -j `nproc` + +# Build CI tests +./photon-auto-convert.sh +cmake -B build -D WITH_TESTS=on -D INIT_PHOTON_IN_ENV=on -D WITH_LZ4=on -D WITH_SNAPPY=on -D CMAKE_BUILD_TYPE=Debug +cmake --build build -j `nproc` +cd build && ctest . +``` + +```bash +# TODO +-D PHOTON_ENABLE_URING=on +``` \ No newline at end of file diff --git a/port/port_posix.cc b/port/port_posix.cc index 80081e480e0f..455d6eb1eb6f 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -42,38 +42,21 @@ extern const bool kDefaultToAdaptiveMutex = false; namespace port { -static int PthreadCall(const char* label, int result) { - if (result != 0 && result != ETIMEDOUT) { - fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); - abort(); - } - return result; -} +// static int PthreadCall(const char* label, int result) { +// if (result != 0 && result != ETIMEDOUT) { +// fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); +// abort(); +// } +// return result; +// } Mutex::Mutex(bool adaptive) { - (void) adaptive; -#ifdef ROCKSDB_PTHREAD_ADAPTIVE_MUTEX - if (!adaptive) { - PthreadCall("init mutex", pthread_mutex_init(&mu_, nullptr)); - } else { - pthread_mutexattr_t mutex_attr; - PthreadCall("init mutex attr", pthread_mutexattr_init(&mutex_attr)); - PthreadCall("set mutex attr", - pthread_mutexattr_settype(&mutex_attr, - PTHREAD_MUTEX_ADAPTIVE_NP)); - PthreadCall("init mutex", pthread_mutex_init(&mu_, &mutex_attr)); - PthreadCall("destroy mutex attr", - pthread_mutexattr_destroy(&mutex_attr)); - } -#else - PthreadCall("init mutex", pthread_mutex_init(&mu_, nullptr)); -#endif // ROCKSDB_PTHREAD_ADAPTIVE_MUTEX } -Mutex::~Mutex() { PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); } +Mutex::~Mutex() { } void Mutex::Lock() { - PthreadCall("lock", pthread_mutex_lock(&mu_)); + mu_.lock(); #ifndef NDEBUG locked_ = true; #endif @@ -83,7 +66,7 @@ void Mutex::Unlock() { #ifndef NDEBUG locked_ = false; #endif - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + mu_.unlock(); } void Mutex::AssertHeld() { @@ -94,63 +77,57 @@ void Mutex::AssertHeld() { CondVar::CondVar(Mutex* mu) : mu_(mu) { - PthreadCall("init cv", pthread_cond_init(&cv_, nullptr)); } -CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); } +CondVar::~CondVar() {} void CondVar::Wait() { #ifndef NDEBUG mu_->locked_ = false; #endif - PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_)); + cv_.wait(mu_->mu_); #ifndef NDEBUG mu_->locked_ = true; #endif } bool CondVar::TimedWait(uint64_t abs_time_us) { - struct timespec ts; - ts.tv_sec = static_cast(abs_time_us / 1000000); - ts.tv_nsec = static_cast((abs_time_us % 1000000) * 1000); - #ifndef NDEBUG mu_->locked_ = false; #endif - int err = pthread_cond_timedwait(&cv_, &mu_->mu_, &ts); + auto abs_now_us = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count(); + uint64_t timeout = abs_time_us > uint64_t(abs_now_us) ? abs_time_us - abs_now_us : 0; + int ret = cv_.wait(mu_->mu_, timeout); #ifndef NDEBUG mu_->locked_ = true; #endif - if (err == ETIMEDOUT) { + if (ret != 0 || timeout == 0) { return true; } - if (err != 0) { - PthreadCall("timedwait", err); - } return false; } void CondVar::Signal() { - PthreadCall("signal", pthread_cond_signal(&cv_)); + cv_.notify_one(); } void CondVar::SignalAll() { - PthreadCall("broadcast", pthread_cond_broadcast(&cv_)); + cv_.notify_all(); } RWMutex::RWMutex() { - PthreadCall("init mutex", pthread_rwlock_init(&mu_, nullptr)); } -RWMutex::~RWMutex() { PthreadCall("destroy mutex", pthread_rwlock_destroy(&mu_)); } +RWMutex::~RWMutex() { } -void RWMutex::ReadLock() { PthreadCall("read lock", pthread_rwlock_rdlock(&mu_)); } +void RWMutex::ReadLock() { mu_.lock(photon::RLOCK); } -void RWMutex::WriteLock() { PthreadCall("write lock", pthread_rwlock_wrlock(&mu_)); } +void RWMutex::WriteLock() { mu_.lock(photon::WLOCK); } -void RWMutex::ReadUnlock() { PthreadCall("read unlock", pthread_rwlock_unlock(&mu_)); } +void RWMutex::ReadUnlock() { mu_.unlock(); } -void RWMutex::WriteUnlock() { PthreadCall("write unlock", pthread_rwlock_unlock(&mu_)); } +void RWMutex::WriteUnlock() { mu_.unlock(); } int PhysicalCoreID() { #if defined(ROCKSDB_SCHED_GETCPU_PRESENT) && defined(__x86_64__) && \ @@ -175,10 +152,6 @@ int PhysicalCoreID() { #endif } -void InitOnce(OnceType* once, void (*initializer)()) { - PthreadCall("once", pthread_once(once, initializer)); -} - void Crash(const std::string& srcfile, int srcline) { fprintf(stdout, "Crashing at %s:%d\n", srcfile.c_str(), srcline); fflush(stdout); diff --git a/port/port_posix.h b/port/port_posix.h index 63d7239fe6dd..9bbf2053f191 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -11,7 +11,11 @@ #pragma once -#include +#include +#include +#include +#include +#include // size_t printf formatting named in the manner of C99 standard formatting // strings such as PRIu64 // in fact, we could use that one @@ -49,7 +53,6 @@ #else #include #endif -#include #include #include @@ -114,7 +117,7 @@ class Mutex { private: friend class CondVar; - pthread_mutex_t mu_; + photon::mutex mu_; #ifndef NDEBUG bool locked_; #endif @@ -136,7 +139,7 @@ class RWMutex { void AssertHeld() { } private: - pthread_rwlock_t mu_; // the underlying platform mutex + photon::rwlock mu_; // the underlying platform mutex // No copying allowed RWMutex(const RWMutex&); @@ -153,7 +156,7 @@ class CondVar { void Signal(); void SignalAll(); private: - pthread_cond_t cv_; + photon::condition_variable cv_; Mutex* mu_; }; @@ -173,10 +176,6 @@ static inline void AsmVolatilePause() { // Returns -1 if not available on this platform extern int PhysicalCoreID(); -typedef pthread_once_t OnceType; -#define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT -extern void InitOnce(OnceType* once, void (*initializer)()); - #ifndef CACHE_LINE_SIZE #if defined(__s390__) #define CACHE_LINE_SIZE 256U diff --git a/table/block_based_filter_block_test.cc b/table/block_based_filter_block_test.cc index 6b352b2f6b07..ae06d7e727af 100644 --- a/table/block_based_filter_block_test.cc +++ b/table/block_based_filter_block_test.cc @@ -243,6 +243,7 @@ TEST_F(BlockBasedFilterBlockTest, BlockBasedMultiChunk) { } // namespace rocksdb int main(int argc, char** argv) { + rocksdb::PhotonEnv::Singleton(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/table/full_filter_block_test.cc b/table/full_filter_block_test.cc index f01ae52bf7dd..e74b5931cd8b 100644 --- a/table/full_filter_block_test.cc +++ b/table/full_filter_block_test.cc @@ -217,6 +217,7 @@ TEST_F(FullFilterBlockTest, SingleChunk) { } // namespace rocksdb int main(int argc, char** argv) { + rocksdb::PhotonEnv::Singleton(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/table/merger_test.cc b/table/merger_test.cc index 1b04d065727c..da7b6ca3776b 100644 --- a/table/merger_test.cc +++ b/table/merger_test.cc @@ -175,6 +175,7 @@ TEST_F(MergerTest, SeekToLastTest) { } // namespace rocksdb int main(int argc, char** argv) { + rocksdb::PhotonEnv::Singleton(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/third-party/PhotonLibOS b/third-party/PhotonLibOS new file mode 160000 index 000000000000..01cde1ca6b86 --- /dev/null +++ b/third-party/PhotonLibOS @@ -0,0 +1 @@ +Subproject commit 01cde1ca6b860cc440b3f219681fb7d177a67fb4 diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 0cb4e0eb27ec..ddccadf2904f 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -2947,7 +2947,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { } SetPerfLevel(static_cast (shared->perf_level)); - perf_context.EnablePerLevelPerfContext(); + perf_context->EnablePerLevelPerfContext(); thread->stats.Start(thread->tid); (arg->bm->*(arg->method))(thread); thread->stats.Stop(); diff --git a/util/concurrent_arena.cc b/util/concurrent_arena.cc index cef77d7e75fd..3b53e9fc9e31 100644 --- a/util/concurrent_arena.cc +++ b/util/concurrent_arena.cc @@ -15,7 +15,7 @@ namespace rocksdb { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL -__thread size_t ConcurrentArena::tls_cpuid = 0; +photon::thread_local_ptr ConcurrentArena::tls_cpuid(0); #endif namespace { @@ -39,7 +39,7 @@ ConcurrentArena::Shard* ConcurrentArena::Repick() { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL // even if we are cpu 0, use a non-zero tls_cpuid so we can tell we // have repicked - tls_cpuid = shard_and_index.second | shards_.Size(); + *tls_cpuid = shard_and_index.second | shards_.Size(); #endif return shard_and_index.first; } diff --git a/util/concurrent_arena.h b/util/concurrent_arena.h index a6191100fd08..6605f67f8395 100644 --- a/util/concurrent_arena.h +++ b/util/concurrent_arena.h @@ -95,7 +95,7 @@ class ConcurrentArena : public Allocator { }; #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL - static __thread size_t tls_cpuid; + static photon::thread_local_ptr tls_cpuid; #else enum ZeroFirstEnum : size_t { tls_cpuid = 0 }; #endif @@ -135,7 +135,7 @@ class ConcurrentArena : public Allocator { // concurrency zero unless it might actually confer an advantage. std::unique_lock arena_lock(arena_mutex_, std::defer_lock); if (bytes > shard_block_size_ / 4 || force_arena || - ((cpu = tls_cpuid) == 0 && + ((cpu = *tls_cpuid) == 0 && !shards_.AccessAtCore(0)->allocated_and_unused_.load( std::memory_order_relaxed) && arena_lock.try_lock())) { diff --git a/util/mutexlock.h b/util/mutexlock.h index 640cef3daf7d..c4d4d6938328 100644 --- a/util/mutexlock.h +++ b/util/mutexlock.h @@ -97,9 +97,9 @@ class WriteLock { // SpinMutex has very low overhead for low-contention cases. Method names // are chosen so you can use std::unique_lock or std::lock_guard with it. // -class SpinMutex { +class SpinMutexObsolete { public: - SpinMutex() : locked_(false) {} + SpinMutexObsolete() : locked_(false) {} bool try_lock() { auto currently_locked = locked_.load(std::memory_order_relaxed); @@ -128,4 +128,6 @@ class SpinMutex { std::atomic locked_; }; +using SpinMutex = std::mutex; + } // namespace rocksdb diff --git a/util/repeatable_thread.h b/util/repeatable_thread.h index 967cc49945e4..7bd66fb51d56 100644 --- a/util/repeatable_thread.h +++ b/util/repeatable_thread.h @@ -97,16 +97,6 @@ class RepeatableThread { } void thread() { -#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) -#if __GLIBC_PREREQ(2, 12) - // Set thread name. - auto thread_handle = thread_.native_handle(); - int ret __attribute__((__unused__)) = - pthread_setname_np(thread_handle, thread_name_.c_str()); - assert(ret == 0); -#endif -#endif - assert(delay_us_ > 0); if (!wait(initial_delay_us_)) { return; diff --git a/util/string_util.h b/util/string_util.h index 6e125ddfa8f0..36aad308567a 100644 --- a/util/string_util.h +++ b/util/string_util.h @@ -10,6 +10,7 @@ #include #include #include +#include namespace rocksdb { diff --git a/util/thread_local.cc b/util/thread_local.cc index 7346eff11e8d..21325520bdd0 100644 --- a/util/thread_local.cc +++ b/util/thread_local.cc @@ -142,17 +142,17 @@ class ThreadLocalPtr::StaticMeta { port::Mutex mutex_; #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL // Thread local storage - static __thread ThreadData* tls_; + static photon::thread_local_ptr tls_; #endif // Used to make thread exit trigger possible if !defined(OS_MACOSX). // Otherwise, used to retrieve thread data. - pthread_key_t pthread_key_; + photon::thread_key_t pthread_key_; }; #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL -__thread ThreadData* ThreadLocalPtr::StaticMeta::tls_ = nullptr; +photon::thread_local_ptr ThreadLocalPtr::StaticMeta::tls_(nullptr); #endif // Windows doesn't support a per-thread destructor with its @@ -285,7 +285,7 @@ void ThreadLocalPtr::StaticMeta::OnThreadExit(void* ptr) { // scope here in case this OnThreadExit is called after the main thread // dies. auto* inst = tls->inst; - pthread_setspecific(inst->pthread_key_, nullptr); + photon::thread_setspecific(inst->pthread_key_, nullptr); MutexLock l(inst->MemberMutex()); inst->RemoveThreadData(tls); @@ -309,35 +309,11 @@ ThreadLocalPtr::StaticMeta::StaticMeta() : next_instance_id_(0), head_(this), pthread_key_(0) { - if (pthread_key_create(&pthread_key_, &OnThreadExit) != 0) { + if (photon::thread_key_create(&pthread_key_, &OnThreadExit) != 0) { abort(); } - // OnThreadExit is not getting called on the main thread. - // Call through the static destructor mechanism to avoid memory leak. - // - // Caveats: ~A() will be invoked _after_ ~StaticMeta for the global - // singleton (destructors are invoked in reverse order of constructor - // _completion_); the latter must not mutate internal members. This - // cleanup mechanism inherently relies on use-after-release of the - // StaticMeta, and is brittle with respect to compiler-specific handling - // of memory backing destructed statically-scoped objects. Perhaps - // registering with atexit(3) would be more robust. - // -// This is not required on Windows. -#if !defined(OS_WIN) - static struct A { - ~A() { -#ifndef ROCKSDB_SUPPORT_THREAD_LOCAL - ThreadData* tls_ = - static_cast(pthread_getspecific(Instance()->pthread_key_)); -#endif - if (tls_) { - OnThreadExit(tls_); - } - } - } a; -#endif // !defined(OS_WIN) + // Photon's thread key has already supported destruction on main thread head_.next = &head_; head_.prev = &head_; @@ -373,27 +349,27 @@ ThreadData* ThreadLocalPtr::StaticMeta::GetThreadLocal() { static_cast(pthread_getspecific(Instance()->pthread_key_)); #endif - if (UNLIKELY(tls_ == nullptr)) { + if (UNLIKELY(*tls_ == nullptr)) { auto* inst = Instance(); - tls_ = new ThreadData(inst); + *tls_ = new ThreadData(inst); { // Register it in the global chain, needs to be done before thread exit // handler registration MutexLock l(Mutex()); - inst->AddThreadData(tls_); + inst->AddThreadData(*tls_); } // Even it is not OS_MACOSX, need to register value for pthread_key_ so that // its exit handler will be triggered. - if (pthread_setspecific(inst->pthread_key_, tls_) != 0) { + if (photon::thread_setspecific(inst->pthread_key_, *tls_) != 0) { { MutexLock l(Mutex()); - inst->RemoveThreadData(tls_); + inst->RemoveThreadData(*tls_); } - delete tls_; + delete *tls_; abort(); } } - return tls_; + return *tls_; } void* ThreadLocalPtr::StaticMeta::Get(uint32_t id) const { diff --git a/util/threadpool_imp.cc b/util/threadpool_imp.cc index acac0063bcdc..1d3d4740a0fd 100644 --- a/util/threadpool_imp.cc +++ b/util/threadpool_imp.cc @@ -231,31 +231,10 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { #ifdef OS_LINUX if (decrease_cpu_priority) { - setpriority( - PRIO_PROCESS, - // Current thread. - 0, - // Lowest priority possible. - 19); low_cpu_priority = true; } if (decrease_io_priority) { -#define IOPRIO_CLASS_SHIFT (13) -#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) - // Put schedule into IOPRIO_CLASS_IDLE class (lowest) - // These system calls only have an effect when used in conjunction - // with an I/O scheduler that supports I/O priorities. As at - // kernel 2.6.17 the only such scheduler is the Completely - // Fair Queuing (CFQ) I/O scheduler. - // To change scheduler: - // echo cfq > /sys/block//queue/schedule - // Tunables to consider: - // /sys/block//queue/slice_idle - // /sys/block//queue/slice_sync - syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS - 0, // current thread - IOPRIO_PRIO_VALUE(3, 0)); low_io_priority = true; } #else @@ -337,20 +316,6 @@ void ThreadPoolImpl::Impl::StartBGThreads() { port::Thread p_t(&BGThreadWrapper, new BGThreadMetadata(this, bgthreads_.size())); -// Set the thread name to aid debugging -#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) -#if __GLIBC_PREREQ(2, 12) - auto th_handle = p_t.native_handle(); - std::string thread_priority = Env::PriorityToString(GetThreadPriority()); - std::ostringstream thread_name_stream; - thread_name_stream << "rocksdb:"; - for (char c : thread_priority) { - thread_name_stream << static_cast(tolower(c)); - } - thread_name_stream << bgthreads_.size(); - pthread_setname_np(th_handle, thread_name_stream.str().c_str()); -#endif -#endif bgthreads_.push_back(std::move(p_t)); } } diff --git a/util/timer_queue_test.cc b/util/timer_queue_test.cc index 5f5f08f21bbe..a564daf793b1 100644 --- a/util/timer_queue_test.cc +++ b/util/timer_queue_test.cc @@ -25,6 +25,7 @@ // #include "util/timer_queue.h" +#include "rocksdb/env.h" #include namespace Timing { @@ -39,6 +40,7 @@ double now() { } // namespace Timing int main() { + rocksdb::PhotonEnv::Singleton(); TimerQueue q; double tnow = Timing::now(); diff --git a/utilities/persistent_cache/hash_table_test.cc b/utilities/persistent_cache/hash_table_test.cc index d6ff3e68e429..71f6e43e103f 100644 --- a/utilities/persistent_cache/hash_table_test.cc +++ b/utilities/persistent_cache/hash_table_test.cc @@ -156,5 +156,6 @@ TEST_F(EvictableHashTableTest, TestEvict) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); + rocksdb::PhotonEnv::Singleton(); return RUN_ALL_TESTS(); } diff --git a/utilities/transactions/transaction_db_mutex_impl.cc b/utilities/transactions/transaction_db_mutex_impl.cc index 244a950773cb..611c47e35be9 100644 --- a/utilities/transactions/transaction_db_mutex_impl.cc +++ b/utilities/transactions/transaction_db_mutex_impl.cc @@ -67,6 +67,7 @@ Status TransactionDBMutexImpl::Lock() { } Status TransactionDBMutexImpl::TryLockFor(int64_t timeout_time) { + // return mutex_.lock(timeout_time) == 0 ? Status::OK() : Status::TimedOut(Status::SubCode::kMutexTimeout); bool locked = true; if (timeout_time == 0) { diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index c0f5a10682ad..015185520a5d 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1374,11 +1374,13 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrentTest) { if (linked == 1) { // Wait until the others are linked too. while (linked < first_group_size) { + std::this_thread::yield(); } } else if (linked == 1 + first_group_size) { // Make the 2nd batch of the rest of writes plus any followup // commits from the first batch while (linked < txn_cnt + commit_writes) { + std::this_thread::yield(); } } // Then we will have one or more batches consisting of follow-up @@ -1411,13 +1413,16 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrentTest) { } // wait to be linked while (linked.load() <= bi) { + std::this_thread::yield(); } // after a queue of size first_group_size if (bi + 1 == first_group_size) { while (!batch_formed) { + std::this_thread::yield(); } // to make it more deterministic, wait until the commits are linked while (linked.load() <= bi + expected_commits) { + std::this_thread::yield(); } } }