Skip to content

Commit adfbce6

Browse files
julianbrostyhabteab
authored andcommitted
IcingaDB: use key extractor for worker queue
This commit restructures the queue items so that each one now has a method `GetQueueLookupKey()` that is used to derive which elements of the queue are considered to be equal. For this, there is a key extractor for the `multi_index_container` that takes the `variant` from the queue item, calls that method on it, and puts the result in a second variant type. The types in that variant type are automatically deduced from the return types of the individual methods.
1 parent ac31789 commit adfbce6

2 files changed

Lines changed: 164 additions & 137 deletions

File tree

lib/icingadb/icingadb-worker.cpp

Lines changed: 73 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
8585
{
8686
using namespace std::chrono_literals;
8787
namespace ch = std::chrono;
88+
namespace queue = icingadb::task_queue;
8889

8990
bool madeProgress = false; // Did we make any progress in this iteration?
9091
ch::duration<double> retryAfter{0}; // If we can't process anything right now, how long to wait before retrying?
@@ -93,22 +94,27 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
9394
auto& seqView = m_PendingItems.get<1>();
9495
for (auto it(seqView.begin()); it != seqView.end(); ++it) {
9596
if (it != seqView.begin()) {
96-
if (dynamic_cast<const icingadb::task_queue::RelationsDeletionItem*>(it->get())) {
97+
if (std::holds_alternative<queue::RelationsDeletionItem>(it->Item)) {
9798
// We don't know whether the previous items are related to this deletion item or not,
9899
// thus we can't just process this right now when there are older items in the queue.
99100
// Otherwise, we might delete something that is going to be updated/created.
100101
break;
101102
}
102103
}
103104

104-
if (auto age = now - (*it)->EnqueueTime; 1000ms > age) {
105+
if (auto age = now - it->EnqueueTime; 1000ms > age) {
105106
if (it == seqView.begin()) {
106107
retryAfter = 1000ms - age;
107108
}
108109
break;
109110
}
110111

111-
ConfigObject::Ptr cobj = (*it)->GetObjectToLock();
112+
ConfigObject::Ptr cobj;
113+
if (auto confPtr = std::get_if<queue::PendingConfigItem>(&it->Item); confPtr) {
114+
cobj = confPtr->Object;
115+
} else if (auto edgePtr = std::get_if<queue::PendingDependencyEdgeItem>(&it->Item)) {
116+
cobj = edgePtr->Child;
117+
}
112118
ObjectLock olock(cobj, std::defer_lock);
113119
if (cobj && !olock.TryLock()) {
114120
continue; // Can't lock the object right now, try the next one.
@@ -119,14 +125,15 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
119125
madeProgress = true;
120126

121127
lock.unlock();
122-
try {
123-
itemToProcess->Execute(*this);
124-
} catch (const std::exception& ex) {
125-
icingadb::task_queue::PendingQueueItem& itemRef = *itemToProcess; // For typeid(operand of typeid must not have any side effects).
126-
Log(LogCritical, "IcingaDB")
127-
<< "Exception while processing pending item of type '" << typeid(itemRef).name() << "': "
128-
<< DiagnosticInformation(ex, GetActive());
129-
}
128+
std::visit([this](auto &item) {
129+
try {
130+
ProcessQueueItem(item);
131+
} catch (const std::exception& ex) {
132+
Log(LogCritical, "IcingaDB")
133+
<< "Exception while processing pending item of type '" << typeid(decltype(item)).name() << "': "
134+
<< DiagnosticInformation(ex, GetActive());
135+
}
136+
}, itemToProcess.Item);
130137
lock.lock();
131138
break;
132139
}
@@ -138,27 +145,25 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
138145
return retryAfter;
139146
}
140147

141-
ConfigObject::Ptr icingadb::task_queue::PendingConfigItem::GetObjectToLock() const
142-
{
143-
return Object;
144-
}
145-
146148
/**
147149
* Execute the pending configuration item.
148150
*
149151
* This function processes the pending configuration item by performing the necessary Redis operations based
150152
* on the dirty bits set for the associated configuration object. It handles configuration deletions, updates,
151153
* and state updates for checkable objects.
152154
*
153-
* @param icingadb The IcingaDB instance to use for executing Redis queries.
155+
* @param item The queue item to process.
154156
*/
155-
void icingadb::task_queue::PendingConfigItem::Execute(IcingaDB& icingadb) const {
156-
if (DirtyBits & ConfigDelete) {
157-
auto redisKeyPair = icingadb.GetSyncableTypeRedisKeys(Object->GetReflectionType());
158-
icingadb.m_RconWorker->FireAndForgetQueries(
157+
void IcingaDB::ProcessQueueItem(const icingadb::task_queue::PendingConfigItem& item)
158+
{
159+
namespace queue = icingadb::task_queue;
160+
161+
if (item.DirtyBits & queue::ConfigDelete) {
162+
auto redisKeyPair = GetSyncableTypeRedisKeys(item.Object->GetReflectionType());
163+
m_RconWorker->FireAndForgetQueries(
159164
{
160-
{"HDEL", redisKeyPair.ObjectKey, icingadb.GetObjectIdentifier(Object)},
161-
{"HDEL", redisKeyPair.ChecksumKey, icingadb.GetObjectIdentifier(Object)},
165+
{"HDEL", redisKeyPair.ObjectKey, GetObjectIdentifier(item.Object)},
166+
{"HDEL", redisKeyPair.ChecksumKey, GetObjectIdentifier(item.Object)},
162167
{
163168
"XADD",
164169
"icinga:runtime",
@@ -169,27 +174,27 @@ void icingadb::task_queue::PendingConfigItem::Execute(IcingaDB& icingadb) const
169174
"redis_key",
170175
redisKeyPair.ObjectKey,
171176
"id",
172-
icingadb.GetObjectIdentifier(Object),
177+
GetObjectIdentifier(item.Object),
173178
"runtime_type",
174179
"delete"
175180
}
176181
}
177182
);
178183
}
179184

180-
if (DirtyBits & ConfigUpdate) {
185+
if (item.DirtyBits & queue::ConfigUpdate) {
181186
std::map<RedisConnection::QueryArg, RedisConnection::Query> hMSets;
182187
std::vector<Dictionary::Ptr> runtimeUpdates;
183-
icingadb.CreateConfigUpdate(Object, icingadb.GetSyncableTypeRedisKeys(Object->GetReflectionType()), hMSets, runtimeUpdates, true);
184-
icingadb.ExecuteRedisTransaction(icingadb.m_RconWorker, hMSets, runtimeUpdates);
188+
CreateConfigUpdate(item.Object, GetSyncableTypeRedisKeys(item.Object->GetReflectionType()), hMSets, runtimeUpdates, true);
189+
ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates);
185190
}
186191

187-
if (auto checkable = dynamic_pointer_cast<Checkable>(Object); checkable) {
188-
if (DirtyBits & FullState) {
189-
icingadb.UpdateState(checkable, DirtyBits);
192+
if (auto checkable = dynamic_pointer_cast<Checkable>(item.Object); checkable) {
193+
if (item.DirtyBits & queue::FullState) {
194+
UpdateState(checkable, item.DirtyBits);
190195
}
191-
if (DirtyBits & NextUpdate) {
192-
icingadb.SendNextUpdate(checkable);
196+
if (item.DirtyBits & queue::NextUpdate) {
197+
SendNextUpdate(checkable);
193198
}
194199
}
195200
}
@@ -201,15 +206,15 @@ void icingadb::task_queue::PendingConfigItem::Execute(IcingaDB& icingadb) const
201206
* dependency group in Redis. It selects any child checkable from the dependency group to initiate
202207
* the state update, as all children share the same dependency group state.
203208
*
204-
* @param icingadb The IcingaDB instance to use for executing Redis queries.
209+
* @param item The queue item to process.
205210
*/
206-
void icingadb::task_queue::PendingDependencyGroupStateItem::Execute(IcingaDB& icingadb) const
211+
void IcingaDB::ProcessQueueItem(const icingadb::task_queue::PendingDependencyGroupStateItem& item) const
207212
{
208213
// For dependency group state updates, we don't actually care which child triggered the update,
209214
// since all children share the same dependency group state. Thus, we can just pick any child to
210215
// start the update from.
211-
if (auto child = DepGroup->GetAnyChild(); child) {
212-
icingadb.UpdateDependenciesState(child, DepGroup);
216+
if (auto child = item.DepGroup->GetAnyChild(); child) {
217+
UpdateDependenciesState(child, item.DepGroup);
213218
}
214219
}
215220

@@ -219,14 +224,14 @@ void icingadb::task_queue::PendingDependencyGroupStateItem::Execute(IcingaDB& ic
219224
* This function processes the pending dependency edge item and ensures that the necessary Redis
220225
* operations are performed to register the child checkable as part of the dependency group.
221226
*
222-
* @param icingadb The IcingaDB instance to use for executing Redis queries.
227+
* @param item The queue item to process.
223228
*/
224-
void icingadb::task_queue::PendingDependencyEdgeItem::Execute(IcingaDB& icingadb) const
229+
void IcingaDB::ProcessQueueItem(const icingadb::task_queue::PendingDependencyEdgeItem& item)
225230
{
226231
std::vector<Dictionary::Ptr> runtimeUpdates;
227232
std::map<RedisConnection::QueryArg, RedisConnection::Query> hMSets;
228-
icingadb.InsertCheckableDependencies(Child, hMSets, &runtimeUpdates, DepGroup);
229-
icingadb.ExecuteRedisTransaction(icingadb.m_RconWorker, hMSets, runtimeUpdates);
233+
InsertCheckableDependencies(item.Child, hMSets, &runtimeUpdates, item.DepGroup);
234+
ExecuteRedisTransaction(m_RconWorker, hMSets, runtimeUpdates);
230235
}
231236

232237
/**
@@ -236,15 +241,15 @@ void icingadb::task_queue::PendingDependencyEdgeItem::Execute(IcingaDB& icingadb
236241
* from Redis. It iterates over the map of Redis keys and deletes the relations associated with
237242
* the given ID.
238243
*
239-
* @param icingadb The IcingaDB instance to use for executing Redis queries.
244+
* @param item The queue item to process.
240245
*/
241-
void icingadb::task_queue::RelationsDeletionItem::Execute(IcingaDB& icingadb) const
246+
void IcingaDB::ProcessQueueItem(const icingadb::task_queue::RelationsDeletionItem& item)
242247
{
243-
for (const auto& [configKey, checksumKey] : Relations) {
244-
if (icingadb.IsStateKey(configKey)) {
245-
icingadb.DeleteState(ID, configKey, checksumKey);
248+
for (const auto& [configKey, checksumKey] : item.Relations) {
249+
if (IsStateKey(configKey)) {
250+
DeleteState(item.ID, configKey, checksumKey);
246251
} else {
247-
icingadb.DeleteRelationship(ID, configKey, checksumKey);
252+
DeleteRelationship(item.ID, configKey, checksumKey);
248253
}
249254
}
250255
}
@@ -257,22 +262,23 @@ void icingadb::task_queue::RelationsDeletionItem::Execute(IcingaDB& icingadb) co
257262
*/
258263
void IcingaDB::EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bits)
259264
{
265+
namespace queue = icingadb::task_queue;
266+
260267
if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) {
261268
return; // No need to enqueue anything if we're not connected.
262269
}
263-
namespace queue = icingadb::task_queue;
264270

265271
{
266272
std::lock_guard lock(m_PendingItemsMutex);
267-
if (auto [it, inserted] = m_PendingItems.insert(std::make_shared<queue::PendingConfigItem>(object, bits)); !inserted) {
268-
m_PendingItems.modify(it, [bits](const std::shared_ptr<queue::PendingQueueItem>& item) {
269-
auto configItem = dynamic_cast<queue::PendingConfigItem*>(item.get());
273+
if (auto [it, inserted] = m_PendingItems.emplace(queue::PendingConfigItem{object, bits}); !inserted) {
274+
m_PendingItems.modify(it, [bits](queue::PendingQueueItem& item) {
275+
auto& configItem = std::get<queue::PendingConfigItem>(item.Item);
270276
if (bits & queue::ConfigDelete) {
271-
configItem->DirtyBits &= ~(queue::ConfigUpdate | queue::FullState);
277+
configItem.DirtyBits &= ~(queue::ConfigUpdate | queue::FullState);
272278
} else if (bits & queue::ConfigUpdate) {
273-
configItem->DirtyBits &= ~queue::ConfigDelete;
279+
configItem.DirtyBits &= ~queue::ConfigDelete;
274280
}
275-
configItem->DirtyBits |= bits & queue::DirtyBitsAll;
281+
configItem.DirtyBits |= bits & queue::DirtyBitsAll;
276282
});
277283
}
278284
}
@@ -284,7 +290,7 @@ void IcingaDB::EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& dep
284290
if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) {
285291
{
286292
std::lock_guard lock(m_PendingItemsMutex);
287-
m_PendingItems.insert(std::make_shared<icingadb::task_queue::PendingDependencyGroupStateItem>(depGroup));
293+
m_PendingItems.emplace(icingadb::task_queue::PendingDependencyGroupStateItem{depGroup});
288294
}
289295
m_PendingItemsCV.notify_one();
290296
}
@@ -304,7 +310,7 @@ void IcingaDB::EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depG
304310
if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) {
305311
{
306312
std::lock_guard lock(m_PendingItemsMutex);
307-
m_PendingItems.insert(std::make_shared<icingadb::task_queue::PendingDependencyEdgeItem>(depGroup, child));
313+
m_PendingItems.emplace(icingadb::task_queue::PendingDependencyEdgeItem{depGroup, child});
308314
}
309315
m_PendingItemsCV.notify_one();
310316
}
@@ -330,6 +336,8 @@ void IcingaDB::EnqueueDependencyChildRemoved(
330336
bool removeGroup
331337
)
332338
{
339+
namespace queue = icingadb::task_queue;
340+
333341
if (dependencies.empty() || !GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) {
334342
return; // No need to enqueue anything if we're not connected or there are no dependencies.
335343
}
@@ -339,13 +347,12 @@ void IcingaDB::EnqueueDependencyChildRemoved(
339347

340348
{
341349
std::lock_guard lock(m_PendingItemsMutex);
342-
if (auto it(m_PendingItems.find(std::make_pair(child, depGroup))); it != m_PendingItems.end()) {
350+
if (m_PendingItems.erase(std::make_pair(depGroup.get(), child.get())) > 0) {
343351
cancelledRegistration = true;
344-
m_PendingItems.erase(it);
345352
if (removeGroup) {
346353
// If we're removing the entire group registration, we can also drop any pending dependency group
347354
// state update triggered previously as it should no longer have any children left.
348-
m_PendingItems.erase(std::make_pair(nullptr, depGroup));
355+
m_PendingItems.erase(depGroup.get());
349356
}
350357
}
351358
}
@@ -410,7 +417,7 @@ void IcingaDB::EnqueueDependencyChildRemoved(
410417
// Checkable as well. The grandparent Checkable may still have wrong numbers of total children, though it's
411418
// not worth traversing the whole tree way up and sending config updates for each one of them, as the next
412419
// Redis config dump is going to fix it anyway.
413-
EnqueueConfigObject(parent, icingadb::task_queue::ConfigUpdate);
420+
EnqueueConfigObject(parent, queue::ConfigUpdate);
414421

415422
if (!parent->HasAnyDependencies()) {
416423
// If the parent Checkable isn't part of any other dependency chain anymore, drop its dependency node entry.
@@ -447,18 +454,20 @@ void IcingaDB::EnqueueDependencyChildRemoved(
447454
* @param id The ID of the relation to be deleted.
448455
* @param relations A map of Redis keys from which to delete the relation.
449456
*/
450-
void IcingaDB::EnqueueRelationsDeletion(const String& id, const icingadb::task_queue::RelationsDeletionItem::RelationsKeySet& relations)
457+
void IcingaDB::EnqueueRelationsDeletion(const String& id, icingadb::task_queue::RelationsDeletionItem::RelationsKeySet relations)
451458
{
459+
namespace queue = icingadb::task_queue;
460+
452461
if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) {
453462
return; // No need to enqueue anything if we're not connected.
454463
}
455464

456465
{
457466
std::lock_guard lock(m_PendingItemsMutex);
458-
if (auto [it, inserted] = m_PendingItems.insert(std::make_shared<icingadb::task_queue::RelationsDeletionItem>(id, relations)); !inserted) {
459-
m_PendingItems.modify(it, [&relations](std::shared_ptr<icingadb::task_queue::PendingQueueItem>& val) {
460-
auto item = dynamic_cast<icingadb::task_queue::RelationsDeletionItem*>(val.get());
461-
item->Relations.insert(relations.begin(), relations.end());
467+
if (auto [it, inserted] = m_PendingItems.emplace(queue::RelationsDeletionItem{id, relations}); !inserted) {
468+
m_PendingItems.modify(it, [&relations](queue::PendingQueueItem& val) {
469+
auto& item = std::get<queue::RelationsDeletionItem>(val.Item);
470+
item.Relations.merge(std::move(relations));
462471
});
463472
}
464473
}

0 commit comments

Comments
 (0)