Skip to content
29 changes: 28 additions & 1 deletion src/gxs/rsdataservice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
* #define RS_DATA_SERVICE_DEBUG_CACHE 1
****/

//#define GXSPROFILING

#include <fstream>
#include <util/rsdir.h>
#include <algorithm>
Expand Down Expand Up @@ -1174,11 +1176,21 @@ int RsDataService::retrieveNxsMsgs(const GxsMsgReq &reqIds, GxsMsgResult &msg,
int resultCount = 0;
#endif

#ifdef GXSPROFILING
// [TRACE] Start the database retrieval timer
RsDbg() << "GXSPROFILING [DataService]: START retrieveNxsMsgs for " << reqIds.size() << " groups";
auto start_all = std::chrono::steady_clock::now();
#endif

for(auto mit = reqIds.begin(); mit != reqIds.end(); ++mit)
{

const RsGxsGroupId& grpId = mit->first;

#ifdef GXSPROFILING
// [TRACE] Start timer for this specific group
auto start_group = std::chrono::steady_clock::now();
#endif

// if vector empty then request all messages
const std::set<RsGxsMessageId>& msgIdV = mit->second;
std::vector<RsNxsMsg*> msgSet;
Expand Down Expand Up @@ -1222,13 +1234,28 @@ int RsDataService::retrieveNxsMsgs(const GxsMsgReq &reqIds, GxsMsgResult &msg,

msg[grpId] = msgSet;

#ifdef GXSPROFILING
// [TRACE] Log time per group to monitor progress
auto end_group = std::chrono::steady_clock::now();
auto group_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_group - start_group).count();
RsDbg() << "GXSPROFILING [DataService]: Group " << grpId.toStdString()
<< " (Total " << msgSet.size() << " msgs) processed in " << group_ms << "ms";
#endif

msgSet.clear();
}

#ifdef RS_DATA_SERVICE_DEBUG_TIME
std::cerr << "RsDataService::retrieveNxsMsgs() " << mDbName << ", Requests: " << reqIds.size() << ", Results: " << resultCount << ", Time: " << timer.duration() << std::endl;
#endif

#ifdef GXSPROFILING
// [TRACE] Log total database time
auto end_all = std::chrono::steady_clock::now();
auto total_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_all - start_all).count();
RsDbg() << "GXSPROFILING [DataService]: END retrieveNxsMsgs total time: " << total_ms << "ms";
#endif

return 1;
}

Expand Down
53 changes: 43 additions & 10 deletions src/gxs/rsgenexchange.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ static const uint32_t INTEGRITY_CHECK_PERIOD = 60*31; // 31 minutes
* #define GEN_EXCH_DEBUG 1
*/

//#define GXSPROFILING

#if defined(GEN_EXCH_DEBUG)
static const uint32_t service_to_print = RS_SERVICE_GXS_TYPE_FORUMS;// use this to allow to this service id only, or 0 for all services
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums)
Expand Down Expand Up @@ -1569,22 +1571,34 @@ bool RsGenExchange::getGroupData(const uint32_t &token, std::vector<RsGxsGrpItem

bool RsGenExchange::getMsgData(uint32_t token, GxsMsgDataMap &msgItems)
{
#ifdef GXSPROFILING
// [TRACE] Start CPU/Deserialization timer
auto start_time = std::chrono::steady_clock::now();
#endif
RS_STACK_MUTEX(mGenMtx) ;
NxsMsgDataResult msgResult;
bool ok = mDataAccess->getMsgData(token, msgResult);

if(ok)
{
uint32_t count = 0;
NxsMsgDataResult::iterator mit = msgResult.begin();
for(; mit != msgResult.end(); ++mit)
{
const RsGxsGroupId& grpId = mit->first;
std::vector<RsGxsMsgItem*>& gxsMsgItems = msgItems[grpId];
std::vector<RsNxsMsg*>& nxsMsgsV = mit->second;
std::vector<RsNxsMsg*>::iterator vit = nxsMsgsV.begin();
for(; vit != nxsMsgsV.end(); ++vit)

// Pre-allocate a temporary vector for results to avoid locking in the parallel loop
std::vector<RsGxsMsgItem*> tempItems(nxsMsgsV.size(), nullptr);

// THREAD-SAFETY NOTE: This OMP loop performs in-memory deserialization only.
// The SQLite/SQLCipher query has already completed above (getMsgData).
// The serialiser (mSerialiser) must remain stateless/re-entrant for this to be safe.
#pragma omp parallel for
for(size_t i = 0; i < nxsMsgsV.size(); ++i)
{
RsNxsMsg*& msg = *vit;
RsNxsMsg* msg = nxsMsgsV[i];
RsItem* item = NULL;

if(msg->msg.bin_len != 0)
Expand All @@ -1595,25 +1609,44 @@ bool RsGenExchange::getMsgData(uint32_t token, GxsMsgDataMap &msgItems)
RsGxsMsgItem* mItem = dynamic_cast<RsGxsMsgItem*>(item);
if (mItem)
{
mItem->meta = *((*vit)->metaData); // get meta info from nxs msg
gxsMsgItems.push_back(mItem);
mItem->meta = *(msg->metaData); // get meta info from nxs msg
tempItems[i] = mItem;
}
else
{
std::cerr << "RsGenExchange::getMsgData() deserialisation/dynamic_cast ERROR";
std::cerr << std::endl;
// Should almost never happen if serializer is correct
delete item;
}
}
else
{
std::cerr << "RsGenExchange::getMsgData() deserialisation ERROR";
std::cerr << std::endl;
// Deserialization failed (corrupt data?)
// std::cerr << "RsGenExchange::getMsgData() deserialisation ERROR" << std::endl;
}
delete msg;
}

// Serial merge of successful items
for(size_t i = 0; i < tempItems.size(); ++i) {
if(tempItems[i]) {
gxsMsgItems.push_back(tempItems[i]);
count++;
}
delete msg;
}
}
// [TRACE] Log the number of items processed
#ifdef GXSPROFILING
RsDbg() << "GXSPROFILING [GenExch]: Deserialized " << count << " items";
#endif
}

#ifdef GXSPROFILING
// [TRACE] End timer and log total processing time
auto end_time = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
RsDbg() << "GXSPROFILING [GenExch]: getMsgData (Token: " << token << ") total time: " << elapsed << "ms";
#endif

return ok;
}

Expand Down
18 changes: 9 additions & 9 deletions src/gxs/rsgenexchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ class RsGenExchange : public RsNxsObserver, public RsTickingThread, public RsGxs
*/
virtual void service_tick() = 0;

/*!
*
* @return handle to token service handle for making
* request to this gxs service
*/
RsTokenService* getTokenService();

void threadTick() override; /// @see RsTickingThread
/*!
*
* @return handle to token service handle for making
* request to this gxs service
*/
RsTokenService* getTokenService();
void threadTick() override; /// @see RsTickingThread

/*!
* Policy bit pattern portion
Expand Down Expand Up @@ -948,7 +948,7 @@ class RsGenExchange : public RsNxsObserver, public RsTickingThread, public RsGxs
RsGxsDataAccess* mDataAccess;
RsGeneralDataService* mDataStore;
RsNetworkExchangeService *mNetService;
RsSerialType *mSerialiser;
RsSerialType *mSerialiser; // WARNING: used concurrently via OpenMP in getMsgData() — must remain stateless/re-entrant
/// service type
uint16_t mServType;
RsGixs* mGixs;
Expand Down
15 changes: 15 additions & 0 deletions src/gxs/rsgxsnetservice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@

//#define NXS_FRAG

//#define GXSPROFILING

// The constant below have a direct influence on how fast forums/channels/posted/identity groups propagate and on the overloading of queues:
//
// Channels/forums will update at a rate of SYNC_PERIOD*MAX_REQLIST_SIZE/60 messages per minute.
Expand Down Expand Up @@ -3515,6 +3517,11 @@ void RsGxsNetService::runVetting()

void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
{
#ifdef GXSPROFILING
// [TRACE] Start global timer for the network transaction
auto start_net = std::chrono::steady_clock::now();
#endif

#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << "locked_genSendMsgsTransaction() Generating Msg data send fron TransN: " << tr->mTransaction->transactionNumber << std::endl;
#endif
Expand Down Expand Up @@ -3702,8 +3709,16 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
delete newTr;
}

#ifdef GXSPROFILING
// [TRACE] End global timer and log with the exact same format as V3
auto end_net = std::chrono::steady_clock::now();
auto net_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_net - start_net).count();
RsDbg() << "GXSPROFILING [NetService]: TOTAL locked_genSendMsgsTransaction for " << tr->mItems.size() << " items took " << net_ms << "ms";
#endif

return;
}

uint32_t RsGxsNetService::locked_getTransactionId()
{
return ++mTransactionN;
Expand Down
4 changes: 4 additions & 0 deletions src/libretroshare.pro
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ DESTDIR = lib

QMAKE_CXXFLAGS += -fPIC

# OpenMP support for parallel deserialization in GXS (rsgenexchange.cc)
QMAKE_CXXFLAGS += -fopenmp
LIBS += -fopenmp

## Uncomment to enable Unfinished Services.
#CONFIG += wikipoos
#CONFIG += gxsthewire
Expand Down
4 changes: 3 additions & 1 deletion src/use_libretroshare.pri
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ rs_jsonapi {

linux-* {
mLibs += dl
# OpenMP runtime needed for parallel deserialization in rsgenexchange.cc
LIBS += -fopenmp
}

rs_deep_channels_index | rs_deep_files_index | rs_deep_forums_index {
rs_deep_channels_index | rs_deep_files_index {
mLibs += xapian
win32-g++|win32-clang-g++:mLibs += rpcrt4
}
Expand Down
19 changes: 18 additions & 1 deletion src/util/retrodb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

//#define RETRODB_DEBUG

//#define GXSPROFILING

const int RetroDb::OPEN_READONLY = SQLITE_OPEN_READONLY;
const int RetroDb::OPEN_READWRITE = SQLITE_OPEN_READWRITE;
const int RetroDb::OPEN_READWRITE_CREATE = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
Expand Down Expand Up @@ -241,6 +243,11 @@ bool RetroDb::execSQL(const std::string &query){
RetroCursor* RetroDb::sqlQuery(const std::string& tableName, const std::list<std::string>& columns,
const std::string& selection, const std::string& orderBy){

#ifdef GXSPROFILING
// [TRACE] Start individual query timer
auto start_sql = std::chrono::steady_clock::now();
#endif

if(tableName.empty() || columns.empty()){
std::cerr << "RetroDb::sqlQuery(): No table or columns given" << std::endl;
return NULL;
Expand Down Expand Up @@ -279,7 +286,17 @@ RetroCursor* RetroDb::sqlQuery(const std::string& tableName, const std::list<std
#endif

sqlite3_prepare_v2(mDb, sqlQuery.c_str(), sqlQuery.length(), &stmt, NULL);
return (new RetroCursor(stmt));
RetroCursor* cursor = new RetroCursor(stmt);

#ifdef GXSPROFILING
// [TRACE] End timer and log using the same "Batch SQL" tag for direct comparison
auto end_sql = std::chrono::steady_clock::now();
auto sql_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_sql - start_sql).count();

RsDbg() << "GXSPROFILING [RetroDb]: Batch SQL for group individual_query took " << sql_ms << "ms";
#endif

return cursor;
}

bool RetroDb::isOpen() const {
Expand Down
Loading