Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 87 additions & 18 deletions dali/base/dasds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "dautils.hpp"
#include "dadfs.hpp"
#include "jmetrics.hpp"
#include "jlzw.hpp"
#include "jstream.hpp"

#define DEBUG_DIR "debug"
#define DEFAULT_KEEP_LASTN_STORES 10 // should match default in dali.xsd
Expand Down Expand Up @@ -1816,11 +1818,16 @@ struct CStoreInfo
{
unsigned xmlCrc{0};
unsigned binaryCrc{0};
unsigned flags{0}; // bit flags for future expandability
} crcInfo;
StringAttr cache;

enum : unsigned
{
FLAG_BINARY_COMPRESSED = 0x0001
};

static void save(IFileIO *fileIO, unsigned *crcXml, unsigned *crcBinary)
static void save(IFileIO *fileIO, unsigned *crcXml, unsigned *crcBinary, bool binaryCompressed)
{
assertex(fileIO);

Expand All @@ -1829,21 +1836,27 @@ struct CStoreInfo
crcInfo.xmlCrc = *crcXml;
if (crcBinary)
crcInfo.binaryCrc = *crcBinary;
if (binaryCompressed)
crcInfo.flags |= FLAG_BINARY_COMPRESSED;
fileIO->write(0, sizeof(CrcInfo), &crcInfo);
}

void restore(IFileIO *fileIO)
{
assertex(fileIO);

// Only restore xmlCrc and binaryCrc
// Handle backward compatibility with old format
size32_t sz = fileIO->read(0, sizeof(CrcInfo), &crcInfo);
switch(sz)
{
case sizeof(CrcInfo):
case sizeof(CrcInfo): // Current format with flags
break;
case sizeof(unsigned) * 2: // Old format with just CRCs
crcInfo.flags = 0;
break;
case sizeof(unsigned):
case sizeof(unsigned): // Very old format with just XML CRC
crcInfo.binaryCrc = 0;
crcInfo.flags = 0;
break;
default:
crcInfo = {};
Expand Down Expand Up @@ -5302,7 +5315,7 @@ class CStoreHelper : implements IStoreHelper, public CInterface
}
}

void writeStoreInfo(const char *base, const char *location, unsigned edition, unsigned *crcXml, unsigned *crcBinary, CStoreInfo *storeInfo)
void writeStoreInfo(const char *base, const char *location, unsigned edition, unsigned *crcXml, unsigned *crcBinary, CStoreInfo *storeInfo, bool binaryCompressed)
{
assertex(storeInfo);

Expand All @@ -5314,17 +5327,17 @@ class CStoreHelper : implements IStoreHelper, public CInterface

OwnedIFile iFile = createIFile(path.str());
OwnedIFileIO iFileIO = iFile->open(IFOcreate);
storeInfo->save(iFileIO, crcXml, crcBinary);
storeInfo->save(iFileIO, crcXml, crcBinary, binaryCompressed);

storeInfo->cache.set(filename.str());
iFileIO->close();
}

void updateStoreInfo(const char *base, const char *location, unsigned edition, unsigned *crcXml, unsigned *crcBinary, CStoreInfo *storeInfo)
void updateStoreInfo(const char *base, const char *location, unsigned edition, unsigned *crcXml, unsigned *crcBinary, CStoreInfo *storeInfo, bool binaryCompressed)
{
assertex(storeInfo);
clearStoreInfo(base, location, edition, storeInfo);
writeStoreInfo(base, location, edition, crcXml, crcBinary, storeInfo);
writeStoreInfo(base, location, edition, crcXml, crcBinary, storeInfo, binaryCompressed);
}

void refreshInfo(CStoreInfo &info, const char *base)
Expand Down Expand Up @@ -5396,7 +5409,7 @@ class CStoreHelper : implements IStoreHelper, public CInterface
wcard.append(base).append(".*");
Owned<IDirectoryIterator> dIter = createDirectoryIterator(location, wcard.str());
if (!dIter->first())
updateStoreInfo(base, location, 0, nullptr, nullptr, &info);
updateStoreInfo(base, location, 0, nullptr, nullptr, &info, false);
else if (dIter->next())
throw MakeStringException(0, "Multiple store.X files - only one corresponding to latest dalisds<X>.xml should exist");
}
Expand Down Expand Up @@ -5713,10 +5726,42 @@ class CStoreHelper : implements IStoreHelper, public CInterface
{
Owned<ISerialOutputStream> serialStream = createSerialOutputStream(iFileIOTmpStore);
Owned<ICrcSerialOutputStream> crcSerialStream = createCrcOutputStream(serialStream);
Owned<IBufferedSerialOutputStream> bufOutStream = createBufferedOutputStream(crcSerialStream, bufferSize);
root->serializeToStream(*bufOutStream);
bufOutStream->flush();
bufOutStream.clear();

// Check if binary compression is enabled
if (configFlags & SH_CompressBinary)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot - I can't see anywhere where SH_CompressBinary is set. I would expect it to be conditionally set and passed in with configFlags when create the storehelper

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SH_CompressBinary flag is available for callers to set when creating the store helper, similar to existing usage patterns like in daadmin.cpp where flags are combined (e.g., SH_External|SH_RecoverFromIncErrors). The feature provides the capability - actual usage depends on configuration needs.

{
Owned<ICompressor> compressor = getCompressor("LZ4");
if (compressor)
{
LOG(MCdebugProgress, "Using LZ4 compression for binary store");
Owned<IBufferedSerialOutputStream> stream = createBufferedOutputStream(crcSerialStream, bufferSize);
Owned<ISerialOutputStream> compressed = createCompressingOutputStream(stream, compressor);
Owned<IBufferedSerialOutputStream> bufOutStream = createBufferedOutputStream(compressed, bufferSize, false);
root->serializeToStream(*bufOutStream);
bufOutStream->flush();
bufOutStream.clear();
compressed->flush();
compressed.clear();
stream->flush();
stream.clear();
}
else
{
WARNLOG("Failed to create compressor, saving uncompressed");
Owned<IBufferedSerialOutputStream> bufOutStream = createBufferedOutputStream(crcSerialStream, bufferSize);
root->serializeToStream(*bufOutStream);
bufOutStream->flush();
bufOutStream.clear();
}
}
else
{
Owned<IBufferedSerialOutputStream> bufOutStream = createBufferedOutputStream(crcSerialStream, bufferSize);
root->serializeToStream(*bufOutStream);
bufOutStream->flush();
bufOutStream.clear();
}

crcSerialStream->flush();
crc = crcSerialStream->queryCrc();
crcSerialStream.clear();
Expand Down Expand Up @@ -5882,7 +5927,8 @@ class CStoreHelper : implements IStoreHelper, public CInterface
}
}
clearStoreInfo(storeFileName, location, 0, NULL);
writeStoreInfo(storeFileName, location, newEdition, &xmlCrc, binaryCrcPtr, &storeInfo); // binaryCrcPtr could be nullptr if the binary store save failed
bool binaryCompressed = (configFlags & SH_CompressBinary) && binaryCrcPtr;
writeStoreInfo(storeFileName, location, newEdition, &xmlCrc, binaryCrcPtr, &storeInfo, binaryCompressed); // binaryCrcPtr could be nullptr if the binary store save failed

try
{
Expand All @@ -5900,7 +5946,7 @@ class CStoreHelper : implements IStoreHelper, public CInterface
}

clearStoreInfo(storeFileName, remoteBackupLocation, 0, NULL);
writeStoreInfo(storeFileName, remoteBackupLocation, newEdition, &xmlCrc, binaryCrcPtr, &storeInfo); // binaryCrcPtr could be nullptr if the binary store save failed
writeStoreInfo(storeFileName, remoteBackupLocation, newEdition, &xmlCrc, binaryCrcPtr, &storeInfo, binaryCompressed); // binaryCrcPtr could be nullptr if the binary store save failed
PROGLOG("Copy store done");
}
}
Expand Down Expand Up @@ -6029,6 +6075,11 @@ class CStoreHelper : implements IStoreHelper, public CInterface
backupLocation.append(remoteBackupLocation);
return backupLocation;
}
virtual bool isBinaryCompressed() override
{
refreshStoreInfo();
return (storeInfo.crcInfo.flags & CStoreInfo::FLAG_BINARY_COMPRESSED) != 0;
}
friend struct CheckDeltaBlock;
};

Expand Down Expand Up @@ -6342,8 +6393,26 @@ CServerRemoteTree *CCovenSDSManager::loadStoreType(StoreFormat storeFormat, size
Owned<ISerialInputStream> progressStream = createProgressStream(serialStream, 0, fSize, "Load progress", 60);
Owned<ICrcSerialInputStream> crcSerialStream = createCrcInputStream(progressStream);
Owned<IBufferedSerialInputStream> bufInStream = createBufferedInputStream(crcSerialStream, bufferSize);

// Check if binary is compressed and apply decompression if needed
Owned<IBufferedSerialInputStream> finalInputStream = bufInStream;
if (isBinary && iStoreHelper->isBinaryCompressed())
{
LOG(MCdebugProgress, "Decompressing binary store using LZ4");
Owned<IExpander> expander = getExpander("LZ4");
if (expander)
{
Owned<ISerialInputStream> decompressedStream = createDecompressingInputStream(bufInStream, expander);
finalInputStream.setown(createBufferedInputStream(decompressedStream, bufferSize));
}
else
{
throw MakeStringException(0, "Failed to create expander for compressed binary store");
}
}

if (isBinary)
result.setown(createPTreeFromBinary(*bufInStream, nodeCreator));
result.setown(createPTreeFromBinary(*finalInputStream, nodeCreator));
else
{
Owned<IPTreeMaker> treeMaker = createPTreeMaker(ipt_none, nullptr, nodeCreator);
Expand All @@ -6360,11 +6429,11 @@ CServerRemoteTree *CCovenSDSManager::loadStoreType(StoreFormat storeFormat, size
}
};

Owned<ISimpleReadStream> wrapper = new CSerialInputStreamWrapper(bufInStream);
Owned<ISimpleReadStream> wrapper = new CSerialInputStreamWrapper(finalInputStream);
result.setown(createPTree(*wrapper, ipt_none, ptr_ignoreWhiteSpace, treeMaker));
}

bufInStream.clear();
finalInputStream.clear();
unsigned crc = crcSerialStream->queryCrc();

if (storedCrc && storedCrc != crc)
Expand Down
2 changes: 2 additions & 0 deletions dali/base/dasds.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ interface IStoreHelper : extends IInterface
virtual void backup(const char *filename) = 0;
virtual StringBuffer &getPrimaryLocation(StringBuffer &location) = 0;
virtual StringBuffer &getBackupLocation(StringBuffer &backupLocation) = 0;
virtual bool isBinaryCompressed() = 0;
};

enum
Expand All @@ -275,6 +276,7 @@ enum
SH_RecoverFromIncErrors = 0x0002,
SH_BackupErrorFiles = 0x0004,
SH_CheckNewDelta = 0x0008,
SH_CompressBinary = 0x0010,
};
extern da_decl IStoreHelper *createStoreHelper(const char *storeName, const char *location, const char *remoteBackupLocation, unsigned configFlags, unsigned keepStores=0, unsigned delay=5000, const bool *abort=nullptr, bool saveBinary=false);
extern da_decl bool applyXmlDeltas(IPropertyTree &root, IIOStream &stream, bool stopOnError=false);
Expand Down