From e8498cb24c184dff8d447c42836bd0ccc61d77a1 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Mon, 26 Jan 2026 15:14:23 +1300 Subject: [PATCH 1/6] [ML] Better handling of invalid JSON state documents Various changes to the handling of errors when parsing JSON state documents to improve consistency and provide better visibility --- include/core/CJsonStateRestoreTraverser.h | 3 +- lib/api/CFieldDataCategorizer.cc | 24 +++++---- lib/api/unittest/CFieldDataCategorizerTest.cc | 51 ++++++++++++++++++- lib/core/CJsonStateRestoreTraverser.cc | 5 +- lib/core/CStateDecompressor.cc | 15 +++++- 5 files changed, 80 insertions(+), 18 deletions(-) diff --git a/include/core/CJsonStateRestoreTraverser.h b/include/core/CJsonStateRestoreTraverser.h index 96f5f45a11..330bbff032 100644 --- a/include/core/CJsonStateRestoreTraverser.h +++ b/include/core/CJsonStateRestoreTraverser.h @@ -304,8 +304,7 @@ class CORE_EXPORT CJsonStateRestoreTraverser : public CStateRestoreTraverser { bool s_HaveCompleteToken{false}; }; - //! JSON reader istream wrapper - // core::CBoostJsonUnbufferedIStreamWrapper m_ReadStream; + //! JSON stream to read from std::istream& m_ReadStream; //! JSON reader diff --git a/lib/api/CFieldDataCategorizer.cc b/lib/api/CFieldDataCategorizer.cc index 89b4c4ddfb..3473d9fa8a 100644 --- a/lib/api/CFieldDataCategorizer.cc +++ b/lib/api/CFieldDataCategorizer.cc @@ -283,6 +283,16 @@ bool CFieldDataCategorizer::restoreState(core::CDataSearcher& restoreSearcher, LOG_DEBUG(<< "Restore categorizer state"); + auto handleCorruptRestore = [this](const std::string& message){ + LOG_ERROR(<< message); + // This situation is fatal in terms of the categorizer we attempted to restore, + // but returning false here can throw the system into a repeated cycle + // of failure. It's better to reset the categorizer and re-categorize from + // scratch. + this->resetAfterCorruptRestore(); + return true; + }; + try { // Restore from Elasticsearch compressed data. // (To restore from uncompressed data for testing, comment the next line @@ -310,17 +320,13 @@ bool CFieldDataCategorizer::restoreState(core::CDataSearcher& restoreSearcher, core::CJsonStateRestoreTraverser traverser(*strm); if (this->acceptRestoreTraverser(traverser) == false) { - LOG_ERROR(<< "JSON restore failed"); - return false; + // We used to return false here. Putting it at odds with the exception handling case (below). + // We now follow the same logic for both failure branches. + return handleCorruptRestore("JSON restore failed"); } + LOG_DEBUG(<< "JSON restore complete"); } catch (std::exception& e) { - LOG_ERROR(<< "Failed to restore state! " << e.what()); - // This is fatal in terms of the categorizer we attempted to restore, - // but returning false here can throw the system into a repeated cycle - // of failure. It's better to reset the categorizer and re-categorize from - // scratch. - this->resetAfterCorruptRestore(); - return true; + return handleCorruptRestore("Failed to restore state! " + std::string(e.what())); } return true; diff --git a/lib/api/unittest/CFieldDataCategorizerTest.cc b/lib/api/unittest/CFieldDataCategorizerTest.cc index 1ea9e1aac1..13cc45ce73 100644 --- a/lib/api/unittest/CFieldDataCategorizerTest.cc +++ b/lib/api/unittest/CFieldDataCategorizerTest.cc @@ -28,6 +28,7 @@ #include #include #include +#include BOOST_AUTO_TEST_SUITE(CFieldDataCategorizerTest) @@ -210,6 +211,52 @@ std::string setupPerPartitionStopOnWarnTest(bool stopOnWarnAtInit, } } +BOOST_AUTO_TEST_CASE(testRestoreFromBadState) { + core::CLogger::instance().setLoggingLevel(core::CLogger::E_Trace); + model::CLimits limits; + CAnomalyJobConfig config; + BOOST_TEST_REQUIRE(config.initFromFile("testfiles/cat_job_config.json")); + CTestChainedProcessor testChainedProcessor; + + std::ostringstream outputStrm; + core::CJsonOutputStreamWrapper wrappedOutputStream{outputStrm}; + + CTestFieldDataCategorizer categorizer{"job", config.analysisConfig(), limits, + &testChainedProcessor, wrappedOutputStream}; + + std::vector badStates = { + // "Empty" base64 - [] + R"({"compressed": ["H4sIAAAAAAAA","/4uOBQApu0wNAgAAAA=="],"eos":true})", + // Not compressed base64 - "junk" + R"({"compressed": ["anVuawo="],"eos":true})" + // Empty compressed array + R"({"compressed": [],"eos":true})", + // Not a JSON array + R"({"compressed": Junk,"eos":true})", + // Decompresses to "junk" + R"({"compressed": ["H4sIADlIcGkAA8sqzcvmAgAHddRtBQAAAA=="],"eos":true})", + // Invalid JSON + R"({ "foo: "bar" )", + // Missing 'compressed' field + R"({"eos":true})", + // 'compressed' is not an array + R"({"compressed": "a string","eos":true})", + // 'compressed' array contains non-string + R"({"compressed": [123],"eos":true})", + // Invalid base64 content + R"({"compressed": ["not-base64"],"eos":true})", + // Null state document + R"({"compressed": \0,"eos":true})" + }; + + for (const auto& badState : badStates) { + LOG_DEBUG(<< "Restoring from \"" << badState << "\""); + CTestDataSearcher restorer{badState}; + core_t::TTime time{0}; + BOOST_REQUIRE_EQUAL(true, categorizer.restoreState(restorer, time)); + } +} + BOOST_AUTO_TEST_CASE(testWithoutPerPartitionCategorization) { model::CLimits limits; CAnomalyJobConfig config; @@ -576,7 +623,7 @@ BOOST_AUTO_TEST_CASE(testHandleControlMessages) { BOOST_REQUIRE_EQUAL(0, output.find("[{\"flush\":{\"id\":\"7\",\"last_finalized_bucket_end\":0,\"refresh_required\":true}}")); } -BOOST_AUTO_TEST_CASE(testRestoreStateFailsWithEmptyState) { +BOOST_AUTO_TEST_CASE(testRestoreStateRecoversWithEmptyState) { model::CLimits limits; CAnomalyJobConfig config; BOOST_TEST_REQUIRE(config.initFromFile("testfiles/new_persist_categorization.json")); @@ -588,7 +635,7 @@ BOOST_AUTO_TEST_CASE(testRestoreStateFailsWithEmptyState) { core_t::TTime completeToTime{0}; CEmptySearcher restoreSearcher; - BOOST_TEST_REQUIRE(categorizer.restoreState(restoreSearcher, completeToTime) == false); + BOOST_TEST_REQUIRE(categorizer.restoreState(restoreSearcher, completeToTime) == true); } BOOST_AUTO_TEST_CASE(testFlushWritesOnlyChangedCategories) { diff --git a/lib/core/CJsonStateRestoreTraverser.cc b/lib/core/CJsonStateRestoreTraverser.cc index e3b6d25843..1483d66bfd 100644 --- a/lib/core/CJsonStateRestoreTraverser.cc +++ b/lib/core/CJsonStateRestoreTraverser.cc @@ -35,8 +35,7 @@ CJsonStateRestoreTraverser::CJsonStateRestoreTraverser(std::istream& inputStream } bool CJsonStateRestoreTraverser::isEof() const { - // CBoostJsonUnbufferedIStreamWrapper returns \0 when it reaches EOF - return m_ReadStream.peek() == '\0'; + return m_ReadStream.eof(); } bool CJsonStateRestoreTraverser::next() { @@ -402,7 +401,7 @@ bool CJsonStateRestoreTraverser::advance() { } void CJsonStateRestoreTraverser::logError() { - LOG_ERROR(<< "Error parsing JSON: " << m_Reader.last_error() << ", stream state - bad: " + LOG_ERROR(<< "Error parsing JSON: " << "\"" << m_Buffer << "\"" << "\"" << m_Reader.last_error() << ", stream state - bad: " << m_ReadStream.bad() << ", fail: " << m_ReadStream.fail() << ", eof: " << m_ReadStream.eof() << ", bytes remaining: " << m_BytesRemaining << ", buffer position: " << (m_BufferPtr ? (m_BufferPtr - m_Buffer) : -1) diff --git a/lib/core/CStateDecompressor.cc b/lib/core/CStateDecompressor.cc index a78d99c01a..24273f8224 100644 --- a/lib/core/CStateDecompressor.cc +++ b/lib/core/CStateDecompressor.cc @@ -108,9 +108,20 @@ bool CStateDecompressor::CDechunkFilter::parseNext() { do { char c = m_InputStreamWrapper->take(); if (c == '\0') { + std::string message; if (m_ParsingStarted == false) { + message = "Encountered NULL character in stream before parsing has started."; ret = false; } + if (m_Reader->handler().s_Type == SBoostJsonHandler::E_TokenObjectEnd) { + message = "Encountered NULL character in stream after object end."; + ret = false; + } + if (ret == false && message.empty() == false) { + std::string jsonStr(m_Reader->handler().s_CompressedChunk, + m_Reader->handler().s_CompressedChunkLength); + LOG_WARN(<< "Error parsing JSON: \"" << jsonStr << "\". " << message); + } break; } @@ -160,7 +171,7 @@ bool CStateDecompressor::CDechunkFilter::readHeader() { } // If we are here, we have got an empty document from downstream, // so the stream is finished - LOG_TRACE(<< "Failed to find 'compressed' data array!"); + LOG_WARN(<< "Failed to find 'compressed' data array!"); m_Initialised = false; m_IStream.reset(); ++m_CurrentDocNum; @@ -243,7 +254,7 @@ void CStateDecompressor::CDechunkFilter::handleRead(char* s, std::streamsize CStateDecompressor::CDechunkFilter::endOfStream(char* s, std::streamsize n, std::streamsize bytesDone) { - // return [ ] if not m_Initialised + // return [ ] if not m_Initialised - i.e. if no valid json could be found m_EndOfStream = true; if (!m_SentData && bytesDone == 0) { std::streamsize toCopy = std::min(std::streamsize(EMPTY_DATA.size()), n); From bff89ce94720d2effaca596688d23901f9babdf6 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Mon, 26 Jan 2026 16:44:36 +1300 Subject: [PATCH 2/6] Formatting --- lib/api/CFieldDataCategorizer.cc | 2 +- lib/api/unittest/CFieldDataCategorizerTest.cc | 3 +-- lib/core/CJsonStateRestoreTraverser.cc | 4 +++- lib/core/CStateDecompressor.cc | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/api/CFieldDataCategorizer.cc b/lib/api/CFieldDataCategorizer.cc index 3473d9fa8a..6ac4446710 100644 --- a/lib/api/CFieldDataCategorizer.cc +++ b/lib/api/CFieldDataCategorizer.cc @@ -283,7 +283,7 @@ bool CFieldDataCategorizer::restoreState(core::CDataSearcher& restoreSearcher, LOG_DEBUG(<< "Restore categorizer state"); - auto handleCorruptRestore = [this](const std::string& message){ + auto handleCorruptRestore = [this](const std::string& message) { LOG_ERROR(<< message); // This situation is fatal in terms of the categorizer we attempted to restore, // but returning false here can throw the system into a repeated cycle diff --git a/lib/api/unittest/CFieldDataCategorizerTest.cc b/lib/api/unittest/CFieldDataCategorizerTest.cc index 13cc45ce73..491e5a1b1a 100644 --- a/lib/api/unittest/CFieldDataCategorizerTest.cc +++ b/lib/api/unittest/CFieldDataCategorizerTest.cc @@ -246,8 +246,7 @@ BOOST_AUTO_TEST_CASE(testRestoreFromBadState) { // Invalid base64 content R"({"compressed": ["not-base64"],"eos":true})", // Null state document - R"({"compressed": \0,"eos":true})" - }; + R"({"compressed": \0,"eos":true})"}; for (const auto& badState : badStates) { LOG_DEBUG(<< "Restoring from \"" << badState << "\""); diff --git a/lib/core/CJsonStateRestoreTraverser.cc b/lib/core/CJsonStateRestoreTraverser.cc index 1483d66bfd..80813c5d71 100644 --- a/lib/core/CJsonStateRestoreTraverser.cc +++ b/lib/core/CJsonStateRestoreTraverser.cc @@ -401,7 +401,9 @@ bool CJsonStateRestoreTraverser::advance() { } void CJsonStateRestoreTraverser::logError() { - LOG_ERROR(<< "Error parsing JSON: " << "\"" << m_Buffer << "\"" << "\"" << m_Reader.last_error() << ", stream state - bad: " + LOG_ERROR(<< "Error parsing JSON: " + << "\"" << m_Buffer << "\"" + << "\"" << m_Reader.last_error() << ", stream state - bad: " << m_ReadStream.bad() << ", fail: " << m_ReadStream.fail() << ", eof: " << m_ReadStream.eof() << ", bytes remaining: " << m_BytesRemaining << ", buffer position: " << (m_BufferPtr ? (m_BufferPtr - m_Buffer) : -1) diff --git a/lib/core/CStateDecompressor.cc b/lib/core/CStateDecompressor.cc index 24273f8224..5b896f5c53 100644 --- a/lib/core/CStateDecompressor.cc +++ b/lib/core/CStateDecompressor.cc @@ -119,7 +119,7 @@ bool CStateDecompressor::CDechunkFilter::parseNext() { } if (ret == false && message.empty() == false) { std::string jsonStr(m_Reader->handler().s_CompressedChunk, - m_Reader->handler().s_CompressedChunkLength); + m_Reader->handler().s_CompressedChunkLength); LOG_WARN(<< "Error parsing JSON: \"" << jsonStr << "\". " << message); } break; From 75c6624a3a28a59ba075141b27a01b191e6140c2 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Mon, 26 Jan 2026 16:51:54 +1300 Subject: [PATCH 3/6] Update changelog --- docs/CHANGELOG.asciidoc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 20e500660c..ec102853b8 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -27,6 +27,12 @@ //=== Bug Fixes //=== Regressions +== {es} version 9.4.0 + +=== Enhancements + +* Better handling of invalid JSON state documents (See {ml-pull}[]#2895].) + == {es} version 9.3.0 === Enhancements From 5a5964b7ddcea52d72799d6a781fe74dd4ef57b0 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Tue, 27 Jan 2026 14:26:59 +1300 Subject: [PATCH 4/6] Address code review comments --- lib/api/unittest/CFieldDataCategorizerTest.cc | 7 +++++-- lib/core/CStateDecompressor.cc | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/api/unittest/CFieldDataCategorizerTest.cc b/lib/api/unittest/CFieldDataCategorizerTest.cc index 491e5a1b1a..bb9f8de2c5 100644 --- a/lib/api/unittest/CFieldDataCategorizerTest.cc +++ b/lib/api/unittest/CFieldDataCategorizerTest.cc @@ -228,7 +228,7 @@ BOOST_AUTO_TEST_CASE(testRestoreFromBadState) { // "Empty" base64 - [] R"({"compressed": ["H4sIAAAAAAAA","/4uOBQApu0wNAgAAAA=="],"eos":true})", // Not compressed base64 - "junk" - R"({"compressed": ["anVuawo="],"eos":true})" + R"({"compressed": ["anVuawo="],"eos":true})", // Empty compressed array R"({"compressed": [],"eos":true})", // Not a JSON array @@ -246,7 +246,10 @@ BOOST_AUTO_TEST_CASE(testRestoreFromBadState) { // Invalid base64 content R"({"compressed": ["not-base64"],"eos":true})", // Null state document - R"({"compressed": \0,"eos":true})"}; + R"({"compressed": \0,"eos":true})", + // NULL character after object end + R"({"index":{"_id":"logs_services_count_logs_categories_categorizer_state#1"}})" + }; for (const auto& badState : badStates) { LOG_DEBUG(<< "Restoring from \"" << badState << "\""); diff --git a/lib/core/CStateDecompressor.cc b/lib/core/CStateDecompressor.cc index 5b896f5c53..2d1f926551 100644 --- a/lib/core/CStateDecompressor.cc +++ b/lib/core/CStateDecompressor.cc @@ -114,7 +114,10 @@ bool CStateDecompressor::CDechunkFilter::parseNext() { ret = false; } if (m_Reader->handler().s_Type == SBoostJsonHandler::E_TokenObjectEnd) { - message = "Encountered NULL character in stream after object end."; + if (message.size() > 0) { + message += "\n"; + } + message += "Encountered NULL character in stream after object end."; ret = false; } if (ret == false && message.empty() == false) { From ee61c6f9e847fe7add67d6f6c22f509f5ca31cd0 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Tue, 27 Jan 2026 15:50:23 +1300 Subject: [PATCH 5/6] Formatting --- lib/api/unittest/CFieldDataCategorizerTest.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/api/unittest/CFieldDataCategorizerTest.cc b/lib/api/unittest/CFieldDataCategorizerTest.cc index bb9f8de2c5..ae253c0ef3 100644 --- a/lib/api/unittest/CFieldDataCategorizerTest.cc +++ b/lib/api/unittest/CFieldDataCategorizerTest.cc @@ -248,8 +248,7 @@ BOOST_AUTO_TEST_CASE(testRestoreFromBadState) { // Null state document R"({"compressed": \0,"eos":true})", // NULL character after object end - R"({"index":{"_id":"logs_services_count_logs_categories_categorizer_state#1"}})" - }; + R"({"index":{"_id":"logs_services_count_logs_categories_categorizer_state#1"}})"}; for (const auto& badState : badStates) { LOG_DEBUG(<< "Restoring from \"" << badState << "\""); From 587ec703ad0bcf03084d697a6d434f0d895f9aa9 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Wed, 28 Jan 2026 11:12:39 +1300 Subject: [PATCH 6/6] add missing file --- .../unittest/testfiles/cat_job_config.json | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 lib/api/unittest/testfiles/cat_job_config.json diff --git a/lib/api/unittest/testfiles/cat_job_config.json b/lib/api/unittest/testfiles/cat_job_config.json new file mode 100644 index 0000000000..fa49a178f9 --- /dev/null +++ b/lib/api/unittest/testfiles/cat_job_config.json @@ -0,0 +1,26 @@ +{ + "job_id": "logs_services_count_logs_categories", + "analysis_config": { + "categorization_field_name": "message", + "per_partition_categorization": { + "enabled": true, + "stop_on_warn": false + }, + "detectors": [ + { + "detector_description": "count by mlcategory partitionfield=\"service.name\"", + "function": "count", + "by_field_name": "mlcategory", + "partition_field_name": "service.name" + } + ], + "influencers": [ + "mlcategory", + "service.name" + ] + }, + "data_description": { + "time_field": "@timestamp", + "time_format": "epoch_ms" + } +}