Skip to content

Commit 74a92f5

Browse files
authored
disagg: Fix unexpected object storage usage caused by pre-lock residue (#10760)
close #10763 disagg: eliminate pre-lock key residue that lead to unexpected OSS usage Signed-off-by: JaySon-Huang <tshent@qq.com>
1 parent a886bd9 commit 74a92f5

13 files changed

+616
-56
lines changed

AGENTS.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,12 @@ TiFlash follows a style based on Google, enforced by `clang-format` 17.0.0+.
8686
- **Smart Pointers:** Prefer `std::shared_ptr` and `std::unique_ptr`. Use `std::make_shared` and `std::make_unique`.
8787
- **Error Handling:**
8888
- Use `DB::Exception`.
89-
- Pattern: `throw Exception("Message", ErrorCodes::SOME_CODE);`
89+
- Prefer the fmt-style constructor with error code first: `throw Exception(ErrorCodes::SOME_CODE, "Message with {}", arg);`
90+
- For fixed strings without formatting, `throw Exception("Message", ErrorCodes::SOME_CODE);` is still acceptable.
9091
- Error codes are defined in `dbms/src/Common/ErrorCodes.cpp` and `errors.toml`.
92+
- In broad `catch (...)` paths, prefer `tryLogCurrentException(log, "context")` to avoid duplicated exception-formatting code.
9193
- **Logging:** Use macros like `LOG_INFO(log, "message {}", arg)`. `log` is usually a `DB::LoggerPtr`.
94+
- When only log level differs by runtime condition, prefer `LOG_IMPL(log, level, ...)` (with `Poco::Message::Priority`) instead of duplicated `if/else` log blocks.
9295

9396
### Modern C++ Practices
9497
- Prefer `auto` for complex iterators/templates.

dbms/src/Common/TiFlashMetrics.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,15 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
846846
F(type_to_finished, {"type", "to_finished"}), \
847847
F(type_to_error, {"type", "to_error"}), \
848848
F(type_to_cancelled, {"type", "to_cancelled"})) \
849+
M(tiflash_storage_s3_lock_mgr_status, "S3 Lock Manager", Gauge, F(type_prelock_keys, {{"type", "prelock_keys"}})) \
850+
M(tiflash_storage_s3_lock_mgr_counter, \
851+
"S3 Lock Manager Counter", \
852+
Counter, \
853+
F(type_create_lock_local, {{"type", "create_lock_local"}}), \
854+
F(type_create_lock_ingest, {{"type", "create_lock_ingest"}}), \
855+
F(type_clean_lock, {{"type", "clean_lock"}}), \
856+
F(type_clean_lock_erase_hit, {{"type", "clean_lock_erase_hit"}}), \
857+
F(type_clean_lock_erase_miss, {{"type", "clean_lock_erase_miss"}})) \
849858
M(tiflash_storage_s3_gc_status, \
850859
"S3 GC status", \
851860
Gauge, \

dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,9 +1283,9 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(
12831283

12841284
if (!isSegmentValid(lock, segment))
12851285
{
1286-
LOG_DEBUG(
1286+
LOG_INFO(
12871287
log,
1288-
"MergeDelta - Give up segmentMergeDelta because segment not valid, segment={}",
1288+
"MergeDelta - Give up segmentMergeDelta because segment not valid, reason=concurrent_update segment={}",
12891289
segment->simpleInfo());
12901290
wbs.setRollback();
12911291
return {};

dbms/src/Storages/Page/V3/PageDirectory.cpp

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include <shared_mutex>
3939
#include <type_traits>
4040
#include <utility>
41+
#include <vector>
4142

4243

4344
#ifdef FIU_ENABLE
@@ -1612,6 +1613,13 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
16121613
CurrentMetrics::Increment pending_writer_size{CurrentMetrics::PSPendingWriterNum};
16131614
Writer w;
16141615
w.edit = &edit;
1616+
// Capture this writer's checkpoint data_file_ids before write-group merge.
1617+
// Followers' edit objects are cleared by the owner during merge.
1618+
for (const auto & r : edit.getRecords())
1619+
{
1620+
if (r.entry.checkpoint_info.has_value())
1621+
w.applied_data_files.emplace(*r.entry.checkpoint_info.data_location.data_file_id);
1622+
}
16151623

16161624
Stopwatch watch;
16171625
std::unique_lock apply_lock(apply_mutex);
@@ -1639,9 +1647,9 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
16391647
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown exception");
16401648
}
16411649
}
1642-
// the `applied_data_files` will be returned by the write
1643-
// group owner, others just return an empty set.
1644-
return {};
1650+
// Return per-writer ids instead of merged-group ids, so upper-layer
1651+
// lock cleanup can always clean locks created by this writer.
1652+
return std::move(w.applied_data_files);
16451653
}
16461654

16471655
/// This thread now is the write group owner, build the group. It will merge the
@@ -1703,7 +1711,6 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
17031711
});
17041712

17051713
SYNC_FOR("before_PageDirectory::apply_to_memory");
1706-
std::unordered_set<String> applied_data_files;
17071714
{
17081715
std::unique_lock table_lock(table_rw_mutex);
17091716

@@ -1775,12 +1782,6 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
17751782
"should not handle edit with invalid type, type={}",
17761783
magic_enum::enum_name(r.type));
17771784
}
1778-
1779-
// collect the applied remote data_file_ids
1780-
if (r.entry.checkpoint_info.has_value())
1781-
{
1782-
applied_data_files.emplace(*r.entry.checkpoint_info.data_location.data_file_id);
1783-
}
17841785
}
17851786
catch (DB::Exception & e)
17861787
{
@@ -1800,7 +1801,9 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
18001801
}
18011802

18021803
success = true;
1803-
return applied_data_files;
1804+
// Even for write-group owner, return only this writer's pre-captured ids.
1805+
// Other writers return their own ids in the `w.done` branch above.
1806+
return std::move(w.applied_data_files);
18041807
}
18051808

18061809
template <typename Trait>

dbms/src/Storages/Page/V3/PageDirectory.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,9 @@ class PageDirectory
628628
struct Writer
629629
{
630630
PageEntriesEdit * edit;
631+
// Keep per-writer checkpoint lock keys before write-group merge so
632+
// followers can still return their own applied ids for lock cleanup.
633+
std::unordered_set<String> applied_data_files;
631634
bool done = false; // The work has been performed by other thread
632635
bool success = false; // The work complete successfully
633636
std::unique_ptr<DB::Exception> exception;

dbms/src/Storages/Page/V3/Universal/S3LockLocalManager.cpp

Lines changed: 95 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
#include <Common/Exception.h>
1616
#include <Common/Stopwatch.h>
17+
#include <Common/TiFlashMetrics.h>
1718
#include <Flash/Disaggregated/S3LockClient.h>
19+
#include <Poco/Message.h>
1820
#include <Storages/Page/V3/CheckpointFile/CPManifestFileReader.h>
1921
#include <Storages/Page/V3/CheckpointFile/Proto/manifest_file.pb.h>
2022
#include <Storages/Page/V3/Universal/S3LockLocalManager.h>
@@ -23,6 +25,7 @@
2325
#include <Storages/S3/S3Common.h>
2426
#include <Storages/S3/S3Filename.h>
2527
#include <Storages/S3/S3RandomAccessFile.h>
28+
#include <common/logger_useful.h>
2629

2730
#include <magic_enum.hpp>
2831

@@ -40,7 +43,7 @@ S3LockLocalManager::S3LockLocalManager()
4043
{}
4144

4245
// `store_id` is inited later because they may not
43-
// accessable when S3LockLocalManager is created.
46+
// accessible when S3LockLocalManager is created.
4447
std::optional<CheckpointProto::ManifestFilePrefix> S3LockLocalManager::initStoreInfo(
4548
StoreID actual_store_id,
4649
DB::S3::S3LockClientPtr s3lock_client_,
@@ -137,7 +140,7 @@ S3LockLocalManager::ExtraLockInfo S3LockLocalManager::allocateNewUploadLocksInfo
137140
};
138141
}
139142

140-
void S3LockLocalManager::createS3LockForWriteBatch(UniversalWriteBatch & write_batch)
143+
std::unordered_set<String> S3LockLocalManager::createS3LockForWriteBatch(UniversalWriteBatch & write_batch)
141144
{
142145
waitUntilInited();
143146

@@ -162,6 +165,10 @@ void S3LockLocalManager::createS3LockForWriteBatch(UniversalWriteBatch & write_b
162165
}
163166
}
164167

168+
// If there are multiple data files need to create locks but only partially created, the
169+
// created "locks" will be cleaned up by S3GCManager because `pre_lock_keys` does not contain
170+
// the keys that are only partially created.
171+
std::vector<String> lock_keys_to_append;
165172
for (auto & [input_key, lock_key] : s3_datafiles_to_lock)
166173
{
167174
auto view = S3::S3FilenameView::fromKey(input_key);
@@ -170,13 +177,35 @@ void S3LockLocalManager::createS3LockForWriteBatch(UniversalWriteBatch & write_b
170177
"invalid data_file_id, input_key={} type={}",
171178
input_key,
172179
magic_enum::enum_name(view.type));
180+
// Already a lock file, which means the data file has been locked. This can happen when
181+
// FAP apply a write batch with pages reference a file that is already uploaded. Just
182+
// reuse the existing lock file
173183
if (view.isLockFile())
174184
{
175185
lock_key = std::make_shared<String>(input_key);
176186
continue;
177187
}
188+
// Only a data file, we need to create a lock file for it.
178189
auto lock_result = createS3Lock(input_key, view, store_id);
179190
lock_key = std::make_shared<String>(lock_result);
191+
lock_keys_to_append.push_back(lock_result);
192+
}
193+
194+
{
195+
// The related S3 data files in write batch is not applied into PageDirectory,
196+
// but we need to ensure they exist in the next manifest file so that these
197+
// S3 data files will not be deleted by the S3GCManager.
198+
// Add the lock file key to `pre_locks_files` for manifest uploading.
199+
std::unique_lock wlatch_keys(mtx_lock_keys);
200+
for (const auto & lock_key : lock_keys_to_append)
201+
{
202+
const auto [_, inserted] = pre_lock_keys.emplace(lock_key);
203+
if (!inserted)
204+
{
205+
LOG_WARNING(log, "Duplicate pre-lock key detected, lockkey={} lock_store_id={}", lock_key, store_id);
206+
}
207+
}
208+
GET_METRIC(tiflash_storage_s3_lock_mgr_status, type_prelock_keys).Set(pre_lock_keys.size());
180209
}
181210

182211
for (auto & w : write_batch.getMutWrites())
@@ -200,8 +229,13 @@ void S3LockLocalManager::createS3LockForWriteBatch(UniversalWriteBatch & write_b
200229
break;
201230
}
202231
}
232+
233+
// Return only the lock keys newly appended into `pre_lock_keys`.
234+
// Existing lock-file inputs are intentionally excluded.
235+
return std::unordered_set<String>(lock_keys_to_append.begin(), lock_keys_to_append.end());
203236
}
204237

238+
// If any "lock" failed to be created, this function will throw exception.
205239
String S3LockLocalManager::createS3Lock(
206240
const String & datafile_key,
207241
const S3::S3FilenameView & s3_file,
@@ -224,7 +258,8 @@ String S3LockLocalManager::createS3Lock(
224258
// TODO: handle s3 network error and retry?
225259
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
226260
S3::uploadEmptyFile(*s3_client, lockkey);
227-
LOG_DEBUG(log, "S3 lock created for local datafile, lockkey={}", lockkey);
261+
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_create_lock_local).Increment();
262+
LOG_DEBUG(log, "S3 lock created for local datafile, datafile_key={} lockkey={}", datafile_key, lockkey);
228263
}
229264
else
230265
{
@@ -237,29 +272,72 @@ String S3LockLocalManager::createS3Lock(
237272
{
238273
throw Exception(ErrorCodes::S3_LOCK_CONFLICT, err_msg);
239274
}
240-
LOG_DEBUG(log, "S3 lock created for ingest datafile, lockkey={}", lockkey);
275+
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_create_lock_ingest).Increment();
276+
LOG_DEBUG(log, "S3 lock created for ingest datafile, datafile_key={} lockkey={}", datafile_key, lockkey);
241277
}
242278

243-
// The related S3 data files in write batch is not applied into PageDirectory,
244-
// but we need to ensure they exist in the next manifest file so that these
245-
// S3 data files will not be deleted by the S3GCManager.
246-
// Add the lock file key to `pre_locks_files` for manifest uploading.
279+
return lockkey;
280+
}
281+
282+
std::tuple<std::size_t, std::size_t, std::size_t> S3LockLocalManager::cleanPreLockKeysImpl(
283+
const std::unordered_set<String> & lock_keys_to_clean)
284+
{
285+
size_t erase_hit = 0;
286+
size_t erase_miss = 0;
287+
size_t remaining_pre_lock_keys = 0;
247288
{
289+
// After the entries applied into PageDirectory, manifest can get the S3 lock key
290+
// from `VersionedPageEntries`, cleanup the pre lock files.
248291
std::unique_lock wlatch_keys(mtx_lock_keys);
249-
pre_lock_keys.emplace(lockkey);
292+
for (const auto & file : lock_keys_to_clean)
293+
{
294+
if (pre_lock_keys.erase(file) > 0)
295+
{
296+
++erase_hit;
297+
}
298+
else
299+
{
300+
++erase_miss;
301+
}
302+
}
303+
remaining_pre_lock_keys = pre_lock_keys.size();
304+
GET_METRIC(tiflash_storage_s3_lock_mgr_status, type_prelock_keys).Set(remaining_pre_lock_keys);
250305
}
251-
return lockkey;
306+
return {erase_hit, erase_miss, remaining_pre_lock_keys};
252307
}
253308

254309
void S3LockLocalManager::cleanAppliedS3ExternalFiles(std::unordered_set<String> && applied_s3files)
255310
{
256-
// After the entries applied into PageDirectory, manifest can get the S3 lock key
257-
// from `VersionedPageEntries`, cleanup the pre lock files.
258-
std::unique_lock wlatch_keys(mtx_lock_keys);
259-
for (const auto & file : applied_s3files)
260-
{
261-
pre_lock_keys.erase(file);
262-
}
311+
auto [erase_hit, erase_miss, remaining_pre_lock_keys] = cleanPreLockKeysImpl(applied_s3files);
312+
const auto log_lvl = erase_miss > 0 ? Poco::Message::PRIO_WARNING : Poco::Message::PRIO_DEBUG;
313+
LOG_IMPL(
314+
log,
315+
log_lvl,
316+
"Clean applied S3 external files, applied_count={} erase_hit={} erase_miss={} remaining_pre_lock_keys={}",
317+
applied_s3files.size(),
318+
erase_hit,
319+
erase_miss,
320+
remaining_pre_lock_keys);
321+
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock).Increment();
322+
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_hit).Increment(erase_hit);
323+
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_miss).Increment(erase_miss);
324+
}
325+
326+
void S3LockLocalManager::cleanPreLockKeysOnWriteFailure(std::unordered_set<String> && pre_lock_keys_on_failure)
327+
{
328+
auto [erase_hit, erase_miss, remaining_pre_lock_keys] = cleanPreLockKeysImpl(pre_lock_keys_on_failure);
329+
const auto log_lvl = erase_miss > 0 ? Poco::Message::PRIO_WARNING : Poco::Message::PRIO_DEBUG;
330+
LOG_IMPL(
331+
log,
332+
log_lvl,
333+
"Clean pre-lock keys on write failure, requested={} erase_hit={} erase_miss={} remaining_pre_lock_keys={}",
334+
pre_lock_keys_on_failure.size(),
335+
erase_hit,
336+
erase_miss,
337+
remaining_pre_lock_keys);
338+
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock).Increment();
339+
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_hit).Increment(erase_hit);
340+
GET_METRIC(tiflash_storage_s3_lock_mgr_counter, type_clean_lock_erase_miss).Increment(erase_miss);
263341
}
264342

265343
} // namespace DB::PS::V3

dbms/src/Storages/Page/V3/Universal/S3LockLocalManager.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <Storages/S3/S3Filename.h>
2424
#include <aws/s3/S3Client.h>
2525

26+
#include <tuple>
27+
2628

2729
namespace DB::S3
2830
{
@@ -59,15 +61,21 @@ class S3LockLocalManager
5961
};
6062
ExtraLockInfo allocateNewUploadLocksInfo();
6163

62-
void createS3LockForWriteBatch(UniversalWriteBatch & write_batch);
64+
std::unordered_set<String> createS3LockForWriteBatch(UniversalWriteBatch & write_batch);
6365

6466
// after write batch applied, we can clean the applied locks from `pre_locks_files`
6567
void cleanAppliedS3ExternalFiles(std::unordered_set<String> && applied_s3files);
6668

69+
// If write fails after creating pre-locks, clean these pre-lock keys to avoid residual entries.
70+
void cleanPreLockKeysOnWriteFailure(std::unordered_set<String> && pre_lock_keys_on_failure);
71+
6772
DISALLOW_COPY_AND_MOVE(S3LockLocalManager);
6873

6974

7075
private:
76+
std::tuple<std::size_t, std::size_t, std::size_t> //
77+
cleanPreLockKeysImpl(const std::unordered_set<String> & lock_keys_to_clean);
78+
7179
// return the s3 lock_key
7280
String createS3Lock(const String & datafile_key, const S3::S3FilenameView & s3_file, UInt64 lock_store_id);
7381

0 commit comments

Comments
 (0)