Skip to content

Commit 57bc1c8

Browse files
committed
Add per-issuer lock and negative caching to prevent thundering herd
This PR addresses the thundering herd problem when multiple threads simultaneously try to validate tokens from the same issuer: Per-issuer locking: - Added a per-issuer mutex map with shared_ptr ownership - Threads acquire a lock for an issuer before fetching keys from web - Other threads wait on the lock, then find keys in cache - Lock ownership transfers through async status for proper lifecycle - Limited to 1000 cached mutexes to prevent resource exhaustion Negative caching: - On web fetch failure (e.g., 404), store empty keys in cache - Uses same TTL as successful lookups (get_next_update_delta) - Subsequent lookups hit cache and fail fast without web requests - Prevents repeated web requests for known-bad issuers SQLite busy timeout: - Added 5-second busy timeout to handle concurrent DB access - Applied to all database operations (init, read, write) Stress tests: - StressTestValidToken: 10 threads, 5 seconds, valid token - StressTestInvalidIssuer: 10 threads, 5 seconds, 404 issuer - ConcurrentNewIssuerLookup: Verifies only ONE web fetch occurs Verified behavior: - Valid issuer: ONE key lookup for thousands of validations - Invalid issuer: ONE web request (OIDC + OAuth fallback), then cached
1 parent ae3fc37 commit 57bc1c8

File tree

4 files changed

+597
-47
lines changed

4 files changed

+597
-47
lines changed

src/scitokens_cache.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818

1919
namespace {
2020

21+
// Timeout in milliseconds to wait when database is locked
22+
// This handles concurrent access from multiple threads/processes
23+
constexpr int SQLITE_BUSY_TIMEOUT_MS = 5000;
24+
2125
void initialize_cachedb(const std::string &keycache_file) {
2226

2327
sqlite3 *db;
@@ -27,6 +31,8 @@ void initialize_cachedb(const std::string &keycache_file) {
2731
sqlite3_close(db);
2832
return;
2933
}
34+
// Set busy timeout to handle concurrent access
35+
sqlite3_busy_timeout(db, SQLITE_BUSY_TIMEOUT_MS);
3036
char *err_msg = nullptr;
3137
rc = sqlite3_exec(db,
3238
"CREATE TABLE IF NOT EXISTS keycache ("
@@ -161,6 +167,8 @@ bool scitokens::Validator::get_public_keys_from_db(const std::string issuer,
161167
sqlite3_close(db);
162168
return false;
163169
}
170+
// Set busy timeout to handle concurrent access
171+
sqlite3_busy_timeout(db, SQLITE_BUSY_TIMEOUT_MS);
164172

165173
sqlite3_stmt *stmt;
166174
rc = sqlite3_prepare_v2(db, "SELECT keys from keycache where issuer = ?",
@@ -260,6 +268,8 @@ bool scitokens::Validator::store_public_keys(const std::string &issuer,
260268
sqlite3_close(db);
261269
return false;
262270
}
271+
// Set busy timeout to handle concurrent access
272+
sqlite3_busy_timeout(db, SQLITE_BUSY_TIMEOUT_MS);
263273

264274
if ((rc = sqlite3_exec(db, "BEGIN", 0, 0, 0)) != SQLITE_OK) {
265275
sqlite3_close(db);

src/scitokens_internal.cpp

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -904,20 +904,27 @@ Validator::get_public_key_pem(const std::string &issuer, const std::string &kid,
904904

905905
// Use per-issuer lock to prevent thundering herd for new issuers
906906
auto issuer_mutex = get_issuer_mutex(issuer);
907-
std::lock_guard<std::mutex> lock(*issuer_mutex);
907+
std::unique_lock<std::mutex> issuer_lock(*issuer_mutex);
908908

909909
// Check again if keys are now in DB (another thread may have fetched
910-
// them)
910+
// them while we were waiting for the lock)
911911
if (get_public_keys_from_db(issuer, now, result->m_keys,
912912
result->m_next_update)) {
913913
// Keys are now available, use them
914914
result->m_continue_fetch = false;
915915
result->m_do_store = false;
916916
result->m_done = true;
917+
// Lock released here - no need to hold it
917918
} else {
918919
// Still no keys, fetch them from the web
919920
result = get_public_keys_from_web(
920921
issuer, internal::SimpleCurlGet::default_timeout);
922+
923+
// Transfer ownership of the lock to the async status
924+
// The lock will be held until keys are stored in
925+
// get_public_key_pem_continue
926+
result->m_issuer_mutex = issuer_mutex;
927+
result->m_issuer_lock = std::move(issuer_lock);
921928
}
922929
}
923930
result->m_issuer = issuer;
@@ -934,21 +941,56 @@ Validator::get_public_key_pem_continue(std::unique_ptr<AsyncStatus> status,
934941
std::string &algorithm) {
935942

936943
if (status->m_continue_fetch) {
937-
status = get_public_keys_from_web_continue(std::move(status));
938-
if (status->m_continue_fetch) {
939-
return std::move(status);
944+
// Save issuer and lock info before potentially moving status
945+
std::string issuer = status->m_issuer;
946+
auto issuer_mutex = status->m_issuer_mutex;
947+
std::unique_lock<std::mutex> issuer_lock(std::move(status->m_issuer_lock));
948+
949+
try {
950+
status = get_public_keys_from_web_continue(std::move(status));
951+
if (status->m_continue_fetch) {
952+
// Restore the lock to status before returning
953+
status->m_issuer_mutex = issuer_mutex;
954+
status->m_issuer_lock = std::move(issuer_lock);
955+
return std::move(status);
956+
}
957+
// Success - restore the lock to status for later release
958+
status->m_issuer_mutex = issuer_mutex;
959+
status->m_issuer_lock = std::move(issuer_lock);
960+
} catch (...) {
961+
// Web fetch failed - store empty keys as negative cache entry
962+
// This prevents thundering herd on repeated failed lookups
963+
if (issuer_lock.owns_lock()) {
964+
// Store empty keys with short TTL for negative caching
965+
auto now = std::time(NULL);
966+
int negative_cache_ttl =
967+
configurer::Configuration::get_next_update_delta();
968+
picojson::value empty_keys;
969+
picojson::object keys_obj;
970+
keys_obj["keys"] = picojson::value(picojson::array());
971+
empty_keys = picojson::value(keys_obj);
972+
store_public_keys(issuer, empty_keys,
973+
now + negative_cache_ttl,
974+
now + negative_cache_ttl);
975+
issuer_lock.unlock();
976+
}
977+
throw; // Re-throw the original exception
940978
}
941979
}
942980
if (status->m_do_store) {
943981
// Async web fetch completed successfully - record monitoring
944-
if (status->m_is_refresh) {
945-
auto &issuer_stats =
946-
internal::MonitoringStats::instance().get_issuer_stats(
947-
status->m_issuer);
948-
issuer_stats.inc_successful_key_lookup();
949-
}
982+
// This counts both initial fetches and refreshes
983+
auto &issuer_stats =
984+
internal::MonitoringStats::instance().get_issuer_stats(
985+
status->m_issuer);
986+
issuer_stats.inc_successful_key_lookup();
950987
store_public_keys(status->m_issuer, status->m_keys,
951988
status->m_next_update, status->m_expires);
989+
// Release the per-issuer lock now that keys are stored
990+
// Other threads waiting on this issuer can now proceed
991+
if (status->m_issuer_lock.owns_lock()) {
992+
status->m_issuer_lock.unlock();
993+
}
952994
}
953995
status->m_done = true;
954996

src/scitokens_internal.h

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,10 @@ class AsyncStatus {
445445
bool m_is_refresh{false}; // True if this is a refresh of an existing key
446446
AsyncState m_state{DOWNLOAD_METADATA};
447447
std::unique_lock<std::mutex> m_refresh_lock;
448+
// Per-issuer lock to prevent thundering herd on new issuers
449+
// We store both the shared_ptr (to keep mutex alive) and the lock
450+
std::shared_ptr<std::mutex> m_issuer_mutex;
451+
std::unique_lock<std::mutex> m_issuer_lock;
448452

449453
int64_t m_next_update{-1};
450454
int64_t m_expires{-1};
@@ -666,6 +670,8 @@ class Validator {
666670

667671
try {
668672
auto result = verify_async(scitoken);
673+
// Note: m_is_sync flag no longer needed since counting is only done
674+
// in verify_async_continue
669675

670676
// Extract issuer from the result's JWT string after decoding starts
671677
const jwt::decoded_jwt<jwt::traits::kazuho_picojson> *jwt_decoded =
@@ -724,7 +730,8 @@ class Validator {
724730
std::chrono::duration_cast<std::chrono::nanoseconds>(
725731
end_time - last_duration_update);
726732
issuer_stats->add_sync_time(delta);
727-
issuer_stats->inc_successful_validation();
733+
// Note: inc_successful_validation() is called in
734+
// verify_async_continue
728735
}
729736
} catch (const std::exception &e) {
730737
// Record failure (final duration update)
@@ -771,6 +778,8 @@ class Validator {
771778
}
772779

773780
auto result = verify_async(jwt);
781+
// Note: m_is_sync flag no longer needed since counting is only done
782+
// in verify_async_continue
774783
while (!result->m_done) {
775784
result = verify_async_continue(std::move(result));
776785
}
@@ -782,7 +791,8 @@ class Validator {
782791
std::chrono::duration_cast<std::chrono::nanoseconds>(
783792
end_time - start_time);
784793
issuer_stats->add_sync_time(duration);
785-
issuer_stats->inc_successful_validation();
794+
// Note: inc_successful_validation() is called in
795+
// verify_async_continue
786796
}
787797
} catch (const std::exception &e) {
788798
// Record failure if we have an issuer
@@ -886,6 +896,7 @@ class Validator {
886896
// Start monitoring timing and record async validation started
887897
status->m_start_time = std::chrono::steady_clock::now();
888898
status->m_monitoring_started = true;
899+
status->m_issuer = jwt.get_issuer();
889900
auto &stats = internal::MonitoringStats::instance().get_issuer_stats(
890901
jwt.get_issuer());
891902
stats.inc_async_validation_started();
@@ -1063,9 +1074,8 @@ class Validator {
10631074
}
10641075
}
10651076

1066-
// Record successful validation (only for async API, sync handles its
1067-
// own)
1068-
if (status->m_monitoring_started && !status->m_is_sync) {
1077+
// Record successful validation
1078+
if (status->m_monitoring_started) {
10691079
auto end_time = std::chrono::steady_clock::now();
10701080
auto duration =
10711081
std::chrono::duration_cast<std::chrono::nanoseconds>(
@@ -1077,8 +1087,11 @@ class Validator {
10771087
stats.add_async_time(duration);
10781088
}
10791089

1090+
// Create new result, preserving monitoring flags
10801091
std::unique_ptr<AsyncStatus> result(new AsyncStatus());
10811092
result->m_done = true;
1093+
result->m_is_sync = status->m_is_sync;
1094+
result->m_monitoring_started = status->m_monitoring_started;
10821095
return result;
10831096
}
10841097

0 commit comments

Comments
 (0)