Skip to content

Commit 3a775da

Browse files
committed
Avoid Stripe Mutex lock contention for RWW
1 parent 85dabe1 commit 3a775da

5 files changed

Lines changed: 98 additions & 48 deletions

File tree

include/iocore/cache/CacheDefs.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ enum CacheEventType {
9595
CACHE_EVENT_SCAN_OPERATION_BLOCKED = CACHE_EVENT_EVENTS_START + 23,
9696
CACHE_EVENT_SCAN_OPERATION_FAILED = CACHE_EVENT_EVENTS_START + 24,
9797
CACHE_EVENT_SCAN_DONE = CACHE_EVENT_EVENTS_START + 25,
98+
CACHE_EVENT_OPEN_DIR_RETRY = CACHE_EVENT_EVENTS_START + 26,
9899
//////////////////////////
99100
// Internal error codes //
100101
//////////////////////////

src/iocore/cache/Cache.cc

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,28 @@ DbgCtl dbg_ctl_cache_init{"cache_init"};
109109
DbgCtl dbg_ctl_cache_hosting{"cache_hosting"};
110110
DbgCtl dbg_ctl_cache_update{"cache_update"};
111111

112+
CacheVC *
113+
new_CacheVC_for_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, const HttpConfigAccessor *params,
114+
StripeSM *stripe)
115+
{
116+
CacheVC *cache_vc = new_CacheVC(cont);
117+
118+
cache_vc->first_key = *key;
119+
cache_vc->key = *key;
120+
cache_vc->earliest_key = *key;
121+
cache_vc->stripe = stripe;
122+
cache_vc->vio.op = VIO::READ;
123+
cache_vc->op_type = static_cast<int>(CacheOpType::Read);
124+
cache_vc->frag_type = CACHE_FRAG_TYPE_HTTP;
125+
cache_vc->params = params;
126+
cache_vc->request.copy_shallow(request);
127+
128+
ts::Metrics::Gauge::increment(cache_rsb.status[cache_vc->op_type].active);
129+
ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[cache_vc->op_type].active);
130+
131+
return cache_vc;
132+
}
133+
112134
} // end anonymous namespace
113135

114136
// Global list of the volumes created
@@ -543,20 +565,24 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request,
543565
OpenDirEntry *od = nullptr;
544566
CacheVC *c = nullptr;
545567

568+
// Read-While-Writer
569+
// This OpenDirEntry lookup doesn't need stripe mutex lock because OpenDir has own reader-writer lock
570+
od = stripe->open_read(key);
571+
if (od != nullptr) {
572+
c = new_CacheVC_for_read(cont, key, request, params, stripe);
573+
c->od = od;
574+
cont->handleEvent(CACHE_EVENT_OPEN_READ_RWW, nullptr);
575+
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
576+
if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
577+
return ACTION_RESULT_DONE;
578+
}
579+
return &c->_action;
580+
}
581+
546582
{
547583
CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
548-
if (!lock.is_locked() || (od = stripe->open_read(key)) || stripe->directory.probe(key, stripe, &result, &last_collision)) {
549-
c = new_CacheVC(cont);
550-
c->first_key = c->key = c->earliest_key = *key;
551-
c->stripe = stripe;
552-
c->vio.op = VIO::READ;
553-
c->op_type = static_cast<int>(CacheOpType::Read);
554-
ts::Metrics::Gauge::increment(cache_rsb.status[c->op_type].active);
555-
ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active);
556-
c->request.copy_shallow(request);
557-
c->frag_type = CACHE_FRAG_TYPE_HTTP;
558-
c->params = params;
559-
c->od = od;
584+
if (!lock.is_locked() || stripe->directory.probe(key, stripe, &result, &last_collision)) {
585+
c = new_CacheVC_for_read(cont, key, request, params, stripe);
560586
}
561587
if (!lock.is_locked()) {
562588
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
@@ -566,9 +592,7 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request,
566592
if (!c) {
567593
goto Lmiss;
568594
}
569-
if (c->od) {
570-
goto Lwriter;
571-
}
595+
572596
// hit
573597
c->dir = c->first_dir = result;
574598
c->last_collision = last_collision;
@@ -587,13 +611,6 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request,
587611
ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[static_cast<int>(CacheOpType::Read)].failure);
588612
cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<void *>(-ECACHE_NO_DOC));
589613
return ACTION_RESULT_DONE;
590-
Lwriter:
591-
cont->handleEvent(CACHE_EVENT_OPEN_READ_RWW, nullptr);
592-
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
593-
if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
594-
return ACTION_RESULT_DONE;
595-
}
596-
return &c->_action;
597614
Lcallreturn:
598615
if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) {
599616
return ACTION_RESULT_DONE;

src/iocore/cache/CacheDir.cc

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@
2828
#include "PreservationTable.h"
2929
#include "Stripe.h"
3030

31+
#include "iocore/eventsystem/Event.h"
3132
#include "tscore/hugepages.h"
3233
#include "tscore/Random.h"
3334
#include "ts/ats_probe.h"
35+
#include "tsutil/Bravo.h"
36+
#include <mutex>
3437

3538
#ifdef LOOP_CHECK_MODE
3639
#define DIR_LOOP_THRESHOLD 1000
@@ -46,6 +49,7 @@ DbgCtl dbg_ctl_dir_clean{"dir_clean"};
4649
#ifdef DEBUG
4750

4851
DbgCtl dbg_ctl_cache_stats{"cache_stats"};
52+
DbgCtl dbg_ctl_cache_open_dir{"cache_open_dir"};
4953
DbgCtl dbg_ctl_dir_probe_hit{"dir_probe_hit"};
5054
DbgCtl dbg_ctl_dir_probe_tag{"dir_probe_tag"};
5155
DbgCtl dbg_ctl_dir_probe_miss{"dir_probe_miss"};
@@ -78,10 +82,11 @@ OpenDir::OpenDir()
7882
int
7983
OpenDir::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
8084
{
81-
ink_assert(cont->stripe->mutex->thread_holding == this_ethread());
85+
std::lock_guard lock(_shared_mutex);
86+
8287
unsigned int h = cont->first_key.slice32(0);
8388
int b = h % OPEN_DIR_BUCKETS;
84-
for (OpenDirEntry *d = bucket[b].head; d; d = d->link.next) {
89+
for (OpenDirEntry *d = _bucket[b].head; d; d = d->link.next) {
8590
if (!(d->writers.head->first_key == cont->first_key)) {
8691
continue;
8792
}
@@ -107,17 +112,27 @@ OpenDir::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
107112
dir_clear(&od->first_dir);
108113
cont->od = od;
109114
cont->write_vector = &od->vector;
110-
bucket[b].push(od);
115+
_bucket[b].push(od);
111116
return 1;
112117
}
113118

119+
/**
120+
This event handler is called in two cases:
121+
122+
1. Direct call from OpenDir::close_write - writer lock is already acquired
123+
2. Self retry through event system - need to acquire writer lock
124+
*/
114125
int
115-
OpenDir::signal_readers(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
126+
OpenDir::signal_readers(int event, Event * /* ATS UNUSED*/)
116127
{
128+
if (event == CACHE_EVENT_OPEN_DIR_RETRY) {
129+
_shared_mutex.lock();
130+
}
131+
117132
Queue<CacheVC, Link_CacheVC_opendir_link> newly_delayed_readers;
118133
EThread *t = mutex->thread_holding;
119134
CacheVC *c = nullptr;
120-
while ((c = delayed_readers.dequeue())) {
135+
while ((c = _delayed_readers.dequeue())) {
121136
CACHE_TRY_LOCK(lock, c->mutex, t);
122137
if (lock.is_locked()) {
123138
c->f.open_read_timeout = 0;
@@ -127,28 +142,34 @@ OpenDir::signal_readers(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
127142
newly_delayed_readers.push(c);
128143
}
129144
if (newly_delayed_readers.head) {
130-
delayed_readers = newly_delayed_readers;
131-
EThread *t1 = newly_delayed_readers.head->mutex->thread_holding;
145+
_delayed_readers = newly_delayed_readers;
146+
EThread *t1 = newly_delayed_readers.head->mutex->thread_holding;
132147
if (!t1) {
133148
t1 = mutex->thread_holding;
134149
}
135-
t1->schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
150+
t1->schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay), CACHE_EVENT_OPEN_DIR_RETRY);
136151
}
137-
return 0;
152+
153+
if (event == CACHE_EVENT_OPEN_DIR_RETRY) {
154+
_shared_mutex.unlock();
155+
}
156+
157+
return EVENT_DONE;
138158
}
139159

140160
int
141161
OpenDir::close_write(CacheVC *cont)
142162
{
143-
ink_assert(cont->stripe->mutex->thread_holding == this_ethread());
163+
std::lock_guard lock(_shared_mutex);
164+
144165
cont->od->writers.remove(cont);
145166
cont->od->num_writers--;
146167
if (!cont->od->writers.head) {
147168
unsigned int h = cont->first_key.slice32(0);
148169
int b = h % OPEN_DIR_BUCKETS;
149-
bucket[b].remove(cont->od);
150-
delayed_readers.append(cont->od->readers);
151-
signal_readers(0, nullptr);
170+
_bucket[b].remove(cont->od);
171+
_delayed_readers.append(cont->od->readers);
172+
signal_readers(EVENT_CALL, nullptr);
152173
cont->od->vector.clear();
153174
THREAD_FREE(cont->od, openDirEntryAllocator, cont->mutex->thread_holding);
154175
}
@@ -159,9 +180,11 @@ OpenDir::close_write(CacheVC *cont)
159180
OpenDirEntry *
160181
OpenDir::open_read(const CryptoHash *key) const
161182
{
183+
ts::bravo::shared_lock lock(_shared_mutex);
184+
162185
unsigned int h = key->slice32(0);
163186
int b = h % OPEN_DIR_BUCKETS;
164-
for (OpenDirEntry *d = bucket[b].head; d; d = d->link.next) {
187+
for (OpenDirEntry *d = _bucket[b].head; d; d = d->link.next) {
165188
if (d->writers.head->first_key == *key) {
166189
return d;
167190
}

src/iocore/cache/P_CacheDir.h

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "iocore/eventsystem/Continuation.h"
2929
#include "iocore/aio/AIO.h"
3030
#include "tscore/Version.h"
31+
#include "tsutil/Bravo.h"
3132

3233
#include <cstdint>
3334
#include <ctime>
@@ -225,16 +226,23 @@ struct OpenDirEntry {
225226
}
226227
};
227228

228-
struct OpenDir : public Continuation {
229-
Queue<CacheVC, Link_CacheVC_opendir_link> delayed_readers;
230-
DLL<OpenDirEntry> bucket[OPEN_DIR_BUCKETS];
229+
class OpenDir : public Continuation
230+
{
231+
public:
232+
OpenDir();
231233

232234
int open_write(CacheVC *c, int allow_if_writers, int max_writers);
233235
int close_write(CacheVC *c);
234236
OpenDirEntry *open_read(const CryptoHash *key) const;
235-
int signal_readers(int event, Event *e);
236237

237-
OpenDir();
238+
// event handler
239+
int signal_readers(int event, Event *e);
240+
241+
private:
242+
mutable ts::bravo::shared_mutex _shared_mutex;
243+
244+
Queue<CacheVC, Link_CacheVC_opendir_link> _delayed_readers;
245+
DLL<OpenDirEntry> _bucket[OPEN_DIR_BUCKETS];
238246
};
239247

240248
struct CacheSync : public Continuation {

src/iocore/cache/StripeSM.h

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,15 @@ class StripeSM : public Continuation, public Stripe
101101

102102
int recover_data();
103103

104-
int open_write(CacheVC *cont, int allow_if_writers, int max_writers);
105-
int open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers);
106-
int close_write(CacheVC *cont);
107-
int begin_read(CacheVC *cont) const;
108-
// unused read-write interlock code
109-
// currently http handles a write-lock failure by retrying the read
104+
// OpenDir API
105+
int open_write(CacheVC *cont, int allow_if_writers, int max_writers);
106+
int open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers);
107+
int close_write(CacheVC *cont);
110108
OpenDirEntry *open_read(const CryptoHash *key) const;
111-
int close_read(CacheVC *cont) const;
109+
110+
// PreservationTable API
111+
int begin_read(CacheVC *cont) const;
112+
int close_read(CacheVC *cont) const;
112113

113114
int clear_dir_aio();
114115
int clear_dir();

0 commit comments

Comments
 (0)