From 114f6cdab8922e543f489a12531b69b0f5a5c836 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Sat, 20 Dec 2025 23:46:56 +0100 Subject: [PATCH 01/12] Add progressive AI field enrichments --- .../DocumentationEndpoints.cs | 6 + .../Search/DocumentationDocument.cs | 37 +++ .../ElasticsearchIngestChannel.Mapping.cs | 26 ++ .../ElasticsearchMarkdownExporter.Export.cs | 8 + .../ElasticsearchMarkdownExporter.cs | 20 ++ .../Enrichment/DocumentEnrichmentService.cs | 225 +++++++++++++++++ .../ElasticsearchEnrichmentCache.cs | 239 ++++++++++++++++++ .../Enrichment/ElasticsearchLlmClient.cs | 201 +++++++++++++++ .../Enrichment/EnrichmentOptions.cs | 37 +++ .../Enrichment/IEnrichmentCache.cs | 43 ++++ .../Elasticsearch/Enrichment/ILlmClient.cs | 21 ++ .../Indexing/AssemblerIndexService.cs | 4 + .../IsolatedIndexService.cs | 4 + .../Assembler/AssemblerIndexCommand.cs | 6 +- .../docs-builder/Commands/IndexCommand.cs | 6 +- 15 files changed, 879 insertions(+), 4 deletions(-) create mode 100644 src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/DocumentEnrichmentService.cs create mode 100644 src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs create mode 100644 src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs create mode 100644 src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs create mode 100644 src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs create mode 100644 src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ILlmClient.cs diff --git a/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs b/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs index 51a9797f0..af34da7ec 100644 --- a/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs +++ b/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs @@ -45,4 +45,10 @@ public class ElasticsearchEndpoint public int? BootstrapTimeout { get; set; } public bool NoSemantic { get; set; } public bool ForceReindex { get; set; } + + /// + /// Enable AI enrichment of documents using LLM-generated metadata. + /// When enabled, documents are enriched with summaries, search queries, and questions. + /// + public bool EnableAiEnrichment { get; set; } } diff --git a/src/Elastic.Documentation/Search/DocumentationDocument.cs b/src/Elastic.Documentation/Search/DocumentationDocument.cs index c5d4d274e..632d19596 100644 --- a/src/Elastic.Documentation/Search/DocumentationDocument.cs +++ b/src/Elastic.Documentation/Search/DocumentationDocument.cs @@ -83,4 +83,41 @@ public record DocumentationDocument [JsonPropertyName("hidden")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public bool Hidden { get; set; } + + // AI Enrichment fields - populated by DocumentEnrichmentService + + /// + /// 3-5 sentences dense with technical entities, API names, and core functionality for vector matching. + /// + [JsonPropertyName("ai_rag_optimized_summary")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? AiRagOptimizedSummary { get; set; } + + /// + /// Exactly 5-10 words for a UI tooltip. + /// + [JsonPropertyName("ai_short_summary")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? AiShortSummary { get; set; } + + /// + /// A 3-8 word keyword string representing a high-intent user search for this doc. + /// + [JsonPropertyName("ai_search_query")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? AiSearchQuery { get; set; } + + /// + /// Array of 3-5 specific questions answered by this document. + /// + [JsonPropertyName("ai_questions")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string[]? AiQuestions { get; set; } + + /// + /// Array of 2-4 specific use cases this doc helps with. + /// + [JsonPropertyName("ai_use_cases")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string[]? AiUseCases { get; set; } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs index ced9c8fcd..033a358e7 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs @@ -201,6 +201,32 @@ protected static string CreateMapping(string? inferenceId) => "fields" : { {{(!string.IsNullOrWhiteSpace(inferenceId) ? $"\"semantic_text\": {{{InferenceMapping(inferenceId)}}}" : "")}} } + }, + "ai_rag_optimized_summary": { + "type": "text", + "analyzer": "synonyms_fixed_analyzer", + "search_analyzer": "synonyms_analyzer", + "fields": { + {{(!string.IsNullOrWhiteSpace(inferenceId) ? $"\"semantic_text\": {{{InferenceMapping(inferenceId)}}}" : "")}} + } + }, + "ai_short_summary": { + "type": "text" + }, + "ai_search_query": { + "type": "keyword" + }, + "ai_questions": { + "type": "text", + "fields": { + {{(!string.IsNullOrWhiteSpace(inferenceId) ? $"\"semantic_text\": {{{InferenceMapping(inferenceId)}}}" : "")}} + } + }, + "ai_use_cases": { + "type": "text", + "fields": { + {{(!string.IsNullOrWhiteSpace(inferenceId) ? $"\"semantic_text\": {{{InferenceMapping(inferenceId)}}}" : "")}} + } } } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs index b49efc4fc..cf5a7cfb7 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs @@ -131,6 +131,10 @@ public async ValueTask ExportAsync(MarkdownExportFileContext fileContext, }; CommonEnrichments(doc, currentNavigation); + + // AI enrichment - respects per-run limit, uses cache + _ = await _enrichmentService.TryEnrichAsync(doc, ctx); + AssignDocumentMetadata(doc); if (_indexStrategy == IngestStrategy.Multiplex) @@ -166,6 +170,10 @@ public async ValueTask FinishExportAsync(IDirectoryInfo outputFolder, Canc doc.Abstract = @abstract; doc.Headings = headings; CommonEnrichments(doc, null); + + // AI enrichment - respects per-run limit, uses cache + _ = await _enrichmentService.TryEnrichAsync(doc, ctx); + AssignDocumentMetadata(doc); // Write to channels following the multiplex or reindex strategy diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs index 7076cb229..e572085e9 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs @@ -12,6 +12,7 @@ using Elastic.Ingest.Elasticsearch.Indices; using Elastic.Transport; using Elastic.Transport.Products.Elasticsearch; +using Elastic.Markdown.Exporters.Elasticsearch.Enrichment; using Microsoft.Extensions.Logging; using NetEscapades.EnumGenerators; @@ -41,6 +42,9 @@ public partial class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposa private readonly IReadOnlyCollection _rules; private readonly VersionsConfiguration _versionsConfiguration; private readonly string _fixedSynonymsHash; + private readonly Enrichment.DocumentEnrichmentService _enrichmentService; + private readonly IEnrichmentCache _enrichmentCache; + private readonly ILlmClient _llmClient; public ElasticsearchMarkdownExporter( ILoggerFactory logFactory, @@ -97,6 +101,16 @@ IDocumentationConfigurationContext context _lexicalChannel = new ElasticsearchLexicalIngestChannel(logFactory, collector, es, indexNamespace, _transport, indexTimeSynonyms); _semanticChannel = new ElasticsearchSemanticIngestChannel(logFactory, collector, es, indexNamespace, _transport, indexTimeSynonyms); + + // Create enrichment services + var enrichmentOptions = new EnrichmentOptions { Enabled = es.EnableAiEnrichment }; + _enrichmentCache = new ElasticsearchEnrichmentCache(_transport, logFactory.CreateLogger()); + _llmClient = new ElasticsearchLlmClient(_transport, logFactory.CreateLogger()); + _enrichmentService = new Enrichment.DocumentEnrichmentService( + _enrichmentCache, + _llmClient, + enrichmentOptions, + logFactory.CreateLogger()); } /// @@ -105,6 +119,7 @@ public async ValueTask StartAsync(Cancel ctx = default) _currentLexicalHash = await _lexicalChannel.Channel.GetIndexTemplateHashAsync(ctx) ?? string.Empty; _currentSemanticHash = await _semanticChannel.Channel.GetIndexTemplateHashAsync(ctx) ?? string.Empty; + await _enrichmentService.InitializeAsync(ctx); await PublishSynonymsAsync(ctx); await PublishQueryRulesAsync(ctx); _ = await _lexicalChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx); @@ -230,6 +245,9 @@ private async ValueTask CountAsync(string index, string body, Cancel ctx = /// public async ValueTask StopAsync(Cancel ctx = default) { + // Log AI enrichment progress + _enrichmentService.LogProgress(); + var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest"); var lexicalWriteAlias = string.Format(_lexicalChannel.Channel.Options.IndexFormat, "latest"); @@ -436,6 +454,8 @@ public void Dispose() { _lexicalChannel.Dispose(); _semanticChannel.Dispose(); + _enrichmentService.Dispose(); + _llmClient.Dispose(); GC.SuppressFinalize(this); } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/DocumentEnrichmentService.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/DocumentEnrichmentService.cs new file mode 100644 index 000000000..bc4621d20 --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/DocumentEnrichmentService.cs @@ -0,0 +1,225 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Security.Cryptography; +using System.Text; +using System.Text.Json.Serialization; +using System.Text.RegularExpressions; +using Elastic.Documentation.Search; +using Microsoft.Extensions.Logging; + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Orchestrates document enrichment using an LLM client and cache. +/// +public sealed partial class DocumentEnrichmentService( + IEnrichmentCache cache, + ILlmClient llm, + EnrichmentOptions options, + ILogger logger) : IDisposable +{ + private readonly IEnrichmentCache _cache = cache; + private readonly ILlmClient _llm = llm; + private readonly EnrichmentOptions _options = options; + private readonly ILogger _logger = logger; + + private int _cacheHitCount; + private int _staleRefreshCount; + private int _newEnrichmentCount; + private int _skippedCount; + + public Task InitializeAsync(CancellationToken ct) => + _options.Enabled ? _cache.InitializeAsync(ct) : Task.CompletedTask; + + public async Task TryEnrichAsync(DocumentationDocument doc, CancellationToken ct) + { + if (!_options.Enabled) + return false; + + if (string.IsNullOrWhiteSpace(doc.StrippedBody)) + return false; + + var cacheKey = GenerateCacheKey(doc.Title, doc.StrippedBody); + + if (TryApplyCachedEnrichment(doc, cacheKey)) + { + await TryRefreshStaleCacheAsync(doc, cacheKey, ct); + return true; + } + + return await TryEnrichNewDocumentAsync(doc, cacheKey, ct); + } + + public void LogProgress() + { + if (!_options.Enabled) + { + _logger.LogInformation("AI enrichment is disabled (use --enable-ai-enrichment to enable)"); + return; + } + + _logger.LogInformation( + "Enrichment summary: {CacheHits} cache hits ({StaleRefreshed} stale refreshed), {NewEnrichments} new, {Skipped} skipped (limit: {Limit})", + _cacheHitCount, _staleRefreshCount, _newEnrichmentCount, _skippedCount, _options.MaxNewEnrichmentsPerRun); + + if (_skippedCount > 0) + { + _logger.LogInformation( + "Enrichment progress: {Skipped} documents pending, will complete over subsequent runs", + _skippedCount); + } + } + + public void Dispose() => (_llm as IDisposable)?.Dispose(); + + private bool TryApplyCachedEnrichment(DocumentationDocument doc, string cacheKey) + { + var cached = _cache.TryGet(cacheKey); + if (cached is null) + return false; + + // Defensive check: if cached data is invalid, treat as miss and let it re-enrich + if (!cached.Data.HasData) + { + _logger.LogDebug("Cached entry for {Url} has no valid data, will re-enrich", doc.Url); + return false; + } + + ApplyEnrichment(doc, cached.Data); + _ = Interlocked.Increment(ref _cacheHitCount); + return true; + } + + private async Task TryRefreshStaleCacheAsync(DocumentationDocument doc, string cacheKey, CancellationToken ct) + { + var cached = _cache.TryGet(cacheKey); + // If cache is current version or newer, no refresh needed + if (cached is not null && cached.PromptVersion >= _options.PromptVersion) + return; + + if (!TryClaimEnrichmentSlot()) + return; + + _ = Interlocked.Increment(ref _staleRefreshCount); + + try + { + var fresh = await _llm.EnrichAsync(doc.Title, doc.StrippedBody ?? string.Empty, ct); + if (fresh is not null) + { + await _cache.StoreAsync(cacheKey, fresh, _options.PromptVersion, ct); + ApplyEnrichment(doc, fresh); + } + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + _logger.LogDebug(ex, "Failed to refresh stale cache for {Url}", doc.Url); + } + } + + private async Task TryEnrichNewDocumentAsync(DocumentationDocument doc, string cacheKey, CancellationToken ct) + { + if (!TryClaimEnrichmentSlot()) + { + _logger.LogDebug("Skipping enrichment for {Url} - limit reached", doc.Url); + _ = Interlocked.Increment(ref _skippedCount); + return false; + } + + try + { + var enrichment = await _llm.EnrichAsync(doc.Title, doc.StrippedBody ?? string.Empty, ct); + if (enrichment is not null) + { + await _cache.StoreAsync(cacheKey, enrichment, _options.PromptVersion, ct); + ApplyEnrichment(doc, enrichment); + return true; + } + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + _logger.LogWarning(ex, "Failed to enrich document {Url}", doc.Url); + } + + return false; + } + + /// + /// Tries to get permission to make a new LLM call. + /// + /// Why: We have ~12k documents but only allow 100 new LLM calls per deployment. + /// This keeps each deployment fast. Documents not enriched this run will be + /// enriched in the next deployment. Cache hits are free and don't count. + /// + /// How: Add 1 to counter. If counter is too high, subtract 1 and return false. + /// This is safe when multiple documents run at the same time. + /// + private bool TryClaimEnrichmentSlot() + { + var current = Interlocked.Increment(ref _newEnrichmentCount); + if (current <= _options.MaxNewEnrichmentsPerRun) + return true; + + _ = Interlocked.Decrement(ref _newEnrichmentCount); + return false; + } + + private static void ApplyEnrichment(DocumentationDocument doc, EnrichmentData data) + { + doc.AiRagOptimizedSummary = data.RagOptimizedSummary; + doc.AiShortSummary = data.ShortSummary; + doc.AiSearchQuery = data.SearchQuery; + doc.AiQuestions = data.Questions; + doc.AiUseCases = data.UseCases; + } + + private static string GenerateCacheKey(string title, string body) + { + var normalized = NormalizeContent(title + body); + var hash = SHA256.HashData(Encoding.UTF8.GetBytes(normalized)); + return Convert.ToHexString(hash).ToLowerInvariant(); + } + + private static string NormalizeContent(string input) => + NormalizeRegex().Replace(input, "").ToLowerInvariant(); + + [GeneratedRegex("[^a-zA-Z0-9]")] + private static partial Regex NormalizeRegex(); +} + +/// +/// LLM-generated enrichment data for documentation documents. +/// +public sealed record EnrichmentData +{ + [JsonPropertyName("ai_rag_optimized_summary")] + public string? RagOptimizedSummary { get; init; } + + [JsonPropertyName("ai_short_summary")] + public string? ShortSummary { get; init; } + + [JsonPropertyName("ai_search_query")] + public string? SearchQuery { get; init; } + + [JsonPropertyName("ai_questions")] + public string[]? Questions { get; init; } + + [JsonPropertyName("ai_use_cases")] + public string[]? UseCases { get; init; } + + [JsonIgnore] + public bool HasData => + !string.IsNullOrEmpty(RagOptimizedSummary) || + !string.IsNullOrEmpty(ShortSummary) || + !string.IsNullOrEmpty(SearchQuery) || + Questions is { Length: > 0 } || + UseCases is { Length: > 0 }; +} + +[JsonSerializable(typeof(EnrichmentData))] +[JsonSerializable(typeof(CachedEnrichment))] +[JsonSerializable(typeof(CompletionResponse))] +[JsonSerializable(typeof(InferenceRequest))] +internal sealed partial class EnrichmentSerializerContext : JsonSerializerContext; diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs new file mode 100644 index 000000000..29d9796f5 --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs @@ -0,0 +1,239 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Text.Json; +using System.Text.Json.Serialization; +using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; +using Microsoft.Extensions.Logging; + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Elasticsearch-backed implementation of . +/// Pre-loads all entries into memory at startup for fast lookups. +/// +public sealed class ElasticsearchEnrichmentCache( + DistributedTransport transport, + ILogger logger, + string indexName = "docs-ai-enriched-fields-cache") : IEnrichmentCache +{ + private readonly DistributedTransport _transport = transport; + private readonly ILogger _logger = logger; + private readonly string _indexName = indexName; + private readonly Dictionary _cache = []; + + private const int ScrollBatchSize = 1000; + private const string ScrollTimeout = "1m"; + + // language=json + private const string IndexMapping = """ + { + "settings": { + "number_of_shards": 1, + "number_of_replicas": 1 + }, + "mappings": { + "properties": { + "response_json": { "type": "text", "index": false }, + "created_at": { "type": "date" }, + "prompt_version": { "type": "integer" } + } + } + } + """; + + public int Count => _cache.Count; + + public async Task InitializeAsync(CancellationToken ct) + { + await EnsureIndexExistsAsync(ct); + await PreloadCacheAsync(ct); + } + + public CachedEnrichmentEntry? TryGet(string key) => + _cache.TryGetValue(key, out var entry) ? entry : null; + + public async Task StoreAsync(string key, EnrichmentData data, int promptVersion, CancellationToken ct) + { + _cache[key] = new CachedEnrichmentEntry(data, promptVersion); + await PersistToElasticsearchAsync(key, data, promptVersion, ct); + } + + private async Task PersistToElasticsearchAsync(string key, EnrichmentData data, int promptVersion, CancellationToken ct) + { + try + { + var responseJson = JsonSerializer.Serialize(data, EnrichmentSerializerContext.Default.EnrichmentData); + var cacheItem = new CachedEnrichment + { + ResponseJson = responseJson, + CreatedAt = DateTimeOffset.UtcNow, + PromptVersion = promptVersion + }; + + var body = JsonSerializer.Serialize(cacheItem, EnrichmentSerializerContext.Default.CachedEnrichment); + var response = await _transport.PutAsync( + $"{_indexName}/_doc/{key}", + PostData.String(body), + ct); + + if (!response.ApiCallDetails.HasSuccessfulStatusCode) + _logger.LogWarning("Failed to persist cache entry: {StatusCode}", response.ApiCallDetails.HttpStatusCode); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to persist cache entry for key {Key}", key); + } + } + + private async Task EnsureIndexExistsAsync(CancellationToken ct) + { + var existsResponse = await _transport.HeadAsync(_indexName, ct); + if (existsResponse.ApiCallDetails.HasSuccessfulStatusCode) + return; + + var createResponse = await _transport.PutAsync( + _indexName, + PostData.String(IndexMapping), + ct); + + if (createResponse.ApiCallDetails.HasSuccessfulStatusCode) + _logger.LogInformation("Created cache index {IndexName}", _indexName); + else if (createResponse.ApiCallDetails.HttpStatusCode != 400) // 400 = already exists + _logger.LogWarning("Failed to create cache index: {StatusCode}", createResponse.ApiCallDetails.HttpStatusCode); + } + + private async Task PreloadCacheAsync(CancellationToken ct) + { + var sw = System.Diagnostics.Stopwatch.StartNew(); + + if (!await HasDocumentsAsync(ct)) + { + _logger.LogInformation("Cache index is empty, skipping preload ({ElapsedMs}ms)", sw.ElapsedMilliseconds); + return; + } + + await ScrollAllDocumentsAsync(ct); + + sw.Stop(); + _logger.LogInformation("Pre-loaded {Count} cache entries in {ElapsedMs}ms", _cache.Count, sw.ElapsedMilliseconds); + } + + private async Task HasDocumentsAsync(CancellationToken ct) + { + var countResponse = await _transport.GetAsync($"{_indexName}/_count", ct); + if (!countResponse.ApiCallDetails.HasSuccessfulStatusCode) + { + _logger.LogWarning("Cache count failed: {StatusCode}", countResponse.ApiCallDetails.HttpStatusCode); + return true; // Assume there might be documents + } + + var docCount = countResponse.Body.Get("count") ?? 0; + _logger.LogDebug("Cache index has {Count} documents", docCount); + return docCount > 0; + } + + private async Task ScrollAllDocumentsAsync(CancellationToken ct) + { + var scrollQuery = "{\"size\": " + ScrollBatchSize + ", \"query\": {\"match_all\": {}}}"; + + var searchResponse = await _transport.PostAsync( + $"{_indexName}/_search?scroll={ScrollTimeout}", + PostData.String(scrollQuery), + ct); + + if (!searchResponse.ApiCallDetails.HasSuccessfulStatusCode) + { + _logger.LogWarning("Failed to start cache scroll: {StatusCode}", searchResponse.ApiCallDetails.HttpStatusCode); + return; + } + + _ = ProcessHits(searchResponse); + var scrollId = searchResponse.Body.Get("_scroll_id"); + + while (scrollId is not null) + { + var scrollBody = $$"""{"scroll": "{{ScrollTimeout}}", "scroll_id": "{{scrollId}}"}"""; + var scrollResponse = await _transport.PostAsync( + "_search/scroll", + PostData.String(scrollBody), + ct); + + if (!scrollResponse.ApiCallDetails.HasSuccessfulStatusCode) + break; + + var hitsCount = ProcessHits(scrollResponse); + if (hitsCount == 0) + break; + + scrollId = scrollResponse.Body.Get("_scroll_id"); + } + } + + private int ProcessHits(DynamicResponse response) + { + if (response.ApiCallDetails.ResponseBodyInBytes is not { } responseBytes) + return 0; + + using var doc = JsonDocument.Parse(responseBytes); + if (!doc.RootElement.TryGetProperty("hits", out var hitsObj) || + !hitsObj.TryGetProperty("hits", out var hitsArray)) + return 0; + + var count = 0; + foreach (var hit in hitsArray.EnumerateArray()) + { + if (TryParseHit(hit, out var id, out var entry)) + { + _cache[id] = entry; + count++; + } + } + return count; + } + + private static bool TryParseHit(JsonElement hit, out string id, out CachedEnrichmentEntry entry) + { + id = string.Empty; + entry = default!; + + if (!hit.TryGetProperty("_id", out var idProp) || idProp.GetString() is not { } docId) + return false; + + if (!hit.TryGetProperty("_source", out var source)) + return false; + + if (!source.TryGetProperty("response_json", out var jsonProp) || + jsonProp.GetString() is not { Length: > 0 } responseJson) + return false; + + var promptVersion = source.TryGetProperty("prompt_version", out var versionProp) + ? versionProp.GetInt32() + : 0; + + var data = JsonSerializer.Deserialize(responseJson, EnrichmentSerializerContext.Default.EnrichmentData); + if (data is not { HasData: true }) + return false; + + id = docId; + entry = new CachedEnrichmentEntry(data, promptVersion); + return true; + } +} + +/// +/// Wrapper for storing enrichment data in the cache index. +/// +public sealed record CachedEnrichment +{ + [JsonPropertyName("response_json")] + public required string ResponseJson { get; init; } + + [JsonPropertyName("created_at")] + public required DateTimeOffset CreatedAt { get; init; } + + [JsonPropertyName("prompt_version")] + public required int PromptVersion { get; init; } +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs new file mode 100644 index 000000000..1eaf4b19d --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs @@ -0,0 +1,201 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Text.Json; +using System.Text.Json.Serialization; +using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; +using Microsoft.Extensions.Logging; + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Elasticsearch inference API-backed implementation of . +/// Uses a semaphore to throttle concurrent LLM calls with exponential backoff on 429. +/// +public sealed class ElasticsearchLlmClient( + DistributedTransport transport, + ILogger logger, + int maxConcurrency = 10, + int maxRetries = 5, + string inferenceEndpointId = ".gp-llm-v2-completion") : ILlmClient +{ + private readonly DistributedTransport _transport = transport; + private readonly ILogger _logger = logger; + private readonly SemaphoreSlim _throttle = new(maxConcurrency); + private readonly string _inferenceEndpointId = inferenceEndpointId; + private readonly int _maxRetries = maxRetries; + + public async Task EnrichAsync(string title, string body, CancellationToken ct) + { + await _throttle.WaitAsync(ct); + try + { + return await CallInferenceApiWithRetryAsync(title, body, ct); + } + finally + { + _ = _throttle.Release(); + } + } + + public void Dispose() => _throttle.Dispose(); + + private async Task CallInferenceApiWithRetryAsync(string title, string body, CancellationToken ct) + { + var prompt = BuildPrompt(title, body); + var request = new InferenceRequest { Input = prompt }; + var requestBody = JsonSerializer.Serialize(request, EnrichmentSerializerContext.Default.InferenceRequest); + + for (var attempt = 0; attempt <= _maxRetries; attempt++) + { + var response = await _transport.PostAsync( + $"_inference/completion/{_inferenceEndpointId}", + PostData.String(requestBody), + ct); + + if (response.ApiCallDetails.HasSuccessfulStatusCode) + return ParseResponse(response); + + if (response.ApiCallDetails.HttpStatusCode == 429 && attempt < _maxRetries) + { + var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt)); // 1s, 2s, 4s, 8s, 16s + _logger.LogDebug("Rate limited (429), retrying in {Delay}s (attempt {Attempt}/{MaxRetries})", + delay.TotalSeconds, attempt + 1, _maxRetries); + await Task.Delay(delay, ct); + continue; + } + + _logger.LogWarning("LLM inference failed: {StatusCode}", response.ApiCallDetails.HttpStatusCode); + return null; + } + + return null; + } + + private EnrichmentData? ParseResponse(DynamicResponse response) + { + if (response.ApiCallDetails.ResponseBodyInBytes is not { } responseBytes) + { + _logger.LogWarning("No response body from LLM"); + return null; + } + + string? responseText = null; + try + { + var completionResponse = JsonSerializer.Deserialize(responseBytes, EnrichmentSerializerContext.Default.CompletionResponse); + responseText = completionResponse?.Completion?.FirstOrDefault()?.Result; + + if (string.IsNullOrEmpty(responseText)) + { + _logger.LogWarning("Empty LLM response"); + return null; + } + + responseText = CleanLlmResponse(responseText); + var result = JsonSerializer.Deserialize(responseText, EnrichmentSerializerContext.Default.EnrichmentData); + + if (result is null || !result.HasData) + { + _logger.LogWarning("LLM response parsed but has no data: {Response}", + responseText.Length > 500 ? responseText[..500] + "..." : responseText); + return null; + } + + return result; + } + catch (JsonException ex) + { + _logger.LogWarning("Failed to parse LLM response. Error: {Error}. Response: {Response}", + ex.Message, responseText); + return null; + } + } + + private static string CleanLlmResponse(string response) + { + var cleaned = response.Replace("```json", "").Replace("```", "").Trim(); + + // Fix common LLM issue: extra closing brace + if (cleaned.EndsWith("}}") && !cleaned.Contains("{{")) + cleaned = cleaned[..^1]; + + return cleaned; + } + + private static string BuildPrompt(string title, string body) => + $$""" + ROLE: Expert technical writer creating search metadata for Elastic documentation (Elasticsearch, Kibana, Beats, Logstash). Audience: developers, DevOps, data engineers. + + TASK: Return a single valid JSON object. No markdown, no extra text, no trailing characters. + + JSON SCHEMA: + { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["ai_rag_optimized_summary", "ai_short_summary", "ai_search_query", "ai_questions", "ai_use_cases"], + "additionalProperties": false, + "properties": { + "ai_rag_optimized_summary": { + "type": "string", + "description": "3-5 sentences densely packed with technical entities for semantic vector matching. Include: API endpoint names, method names, parameter names, configuration options, data types, and core functionality. Write for RAG retrieval - someone asking 'how do I configure X' should match this text." + }, + "ai_short_summary": { + "type": "string", + "description": "Exactly 5-10 words for UI tooltip or search snippet. Action-oriented, starts with a verb. Example: 'Configure index lifecycle policies for data retention'" + }, + "ai_search_query": { + "type": "string", + "description": "3-8 keywords representing a realistic search query a developer would type. Include product name and key technical terms. Example: 'elasticsearch bulk api batch indexing'" + }, + "ai_questions": { + "type": "array", + "items": { "type": "string" }, + "minItems": 3, + "maxItems": 5, + "description": "Natural questions a dev would ask (6-15 words). Not too short, not too verbose. Examples: 'How do I bulk index documents?', 'What format does the bulk API use?', 'Why is my bulk request failing?'" + }, + "ai_use_cases": { + "type": "array", + "items": { "type": "string" }, + "minItems": 2, + "maxItems": 4, + "description": "Simple 2-4 word tasks a dev wants to do. Examples: 'index documents', 'check cluster health', 'enable TLS', 'fix slow queries', 'backup data'" + } + } + } + + RULES: + - Extract ONLY from provided content. Never hallucinate APIs or features not mentioned. + - Be specific: 'configure index lifecycle policy' not 'manage data'. + - Avoid generic phrases: no 'comprehensive guide', 'powerful feature', 'easy to use'. + - Output exactly ONE opening brace and ONE closing brace. + + EXAMPLE: + {"ai_rag_optimized_summary":"The Bulk API executes multiple index, create, delete, and update operations in a single NDJSON request. Endpoint: POST _bulk or POST /{index}/_bulk. Each action requires metadata line (index, create, update, delete) followed by optional document source. Supports parameters: routing, pipeline, refresh, require_alias. Returns per-operation results with _id, _version, result status, and error details for partial failures.","ai_short_summary":"Execute batch document operations in single request","ai_search_query":"elasticsearch bulk api batch index update delete","ai_questions":["How do I bulk index documents?","What format does the bulk API use?","How do I handle bulk operation errors?"],"ai_use_cases":["bulk index documents","batch update data","delete many docs"]} + + DOCUMENT: + Title: {{title}} + Content: {{body}} + """; +} + +public sealed record InferenceRequest +{ + [JsonPropertyName("input")] + public required string Input { get; init; } +} + +public sealed record CompletionResponse +{ + [JsonPropertyName("completion")] + public CompletionResult[]? Completion { get; init; } +} + +public sealed record CompletionResult +{ + [JsonPropertyName("result")] + public string? Result { get; init; } +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs new file mode 100644 index 000000000..286d5819d --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs @@ -0,0 +1,37 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Configuration options for document enrichment. +/// +public sealed record EnrichmentOptions +{ + /// + /// Whether enrichment is enabled. + /// + public bool Enabled { get; init; } + + /// + /// Maximum new enrichments per run. Limits LLM calls to prevent long deployments. + /// + public int MaxNewEnrichmentsPerRun { get; init; } = 100; + + /// + /// Maximum concurrent LLM calls. + /// + public int MaxConcurrentLlmCalls { get; init; } = 4; + + /// + /// Version number for cache entries. Bump to trigger gradual re-enrichment. + /// Using int allows future range queries (e.g., re-enrich all entries below version 5). + /// + public int PromptVersion { get; init; } = 1; + + /// + /// Creates options with enrichment disabled. + /// + public static EnrichmentOptions Disabled => new() { Enabled = false }; +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs new file mode 100644 index 000000000..9ed1643ca --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs @@ -0,0 +1,43 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Represents cached enrichment data with its version metadata. +/// +public sealed record CachedEnrichmentEntry(EnrichmentData Data, int PromptVersion); + +/// +/// Abstraction for enrichment cache operations. +/// Enables swapping implementations (Elasticsearch, Redis, in-memory) and testing. +/// +public interface IEnrichmentCache +{ + /// + /// Initializes the cache, including any index bootstrapping and preloading. + /// + Task InitializeAsync(CancellationToken ct); + + /// + /// Attempts to retrieve enrichment data from the cache. + /// + /// The content-addressable cache key. + /// The cached entry if found, null otherwise. + CachedEnrichmentEntry? TryGet(string key); + + /// + /// Stores enrichment data in the cache. + /// + /// The content-addressable cache key. + /// The enrichment data to store. + /// The prompt version used to generate this data. + /// Cancellation token. + Task StoreAsync(string key, EnrichmentData data, int promptVersion, CancellationToken ct); + + /// + /// Gets the number of entries currently in the cache. + /// + int Count { get; } +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ILlmClient.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ILlmClient.cs new file mode 100644 index 000000000..6a577d83a --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ILlmClient.cs @@ -0,0 +1,21 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Abstraction for LLM inference operations. +/// Enables swapping implementations and testing. +/// +public interface ILlmClient : IDisposable +{ + /// + /// Generates enrichment data for the given document content. + /// + /// The document title. + /// The document body content. + /// Cancellation token. + /// The enrichment data if successful, null otherwise. + Task EnrichAsync(string title, string body, CancellationToken ct); +} diff --git a/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs b/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs index 93779d259..355130099 100644 --- a/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs +++ b/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs @@ -34,6 +34,7 @@ ICoreService githubActionsService /// Elasticsearch username (basic auth), alternatively set env DOCUMENTATION_ELASTIC_USERNAME /// Elasticsearch password (basic auth), alternatively set env DOCUMENTATION_ELASTIC_PASSWORD /// Index without semantic fields + /// Enable AI enrichment of documents using LLM-generated metadata /// The number of search threads the inference endpoint should use. Defaults: 8 /// The number of index threads the inference endpoint should use. Defaults: 8 /// Do not use the Elastic Inference Service, bootstrap inference endpoint @@ -61,6 +62,7 @@ public async Task Index(IDiagnosticsCollector collector, string? password = null, // inference options bool? noSemantic = null, + bool? enableAiEnrichment = null, int? searchNumThreads = null, int? indexNumThreads = null, bool? noEis = null, @@ -139,6 +141,8 @@ public async Task Index(IDiagnosticsCollector collector, if (noSemantic.HasValue) cfg.NoSemantic = noSemantic.Value; + if (enableAiEnrichment.HasValue) + cfg.EnableAiEnrichment = enableAiEnrichment.Value; if (forceReindex.HasValue) cfg.ForceReindex = forceReindex.Value; diff --git a/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs b/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs index 3a60df236..b524513f8 100644 --- a/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs +++ b/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs @@ -31,6 +31,7 @@ ICoreService githubActionsService /// Elasticsearch username (basic auth), alternatively set env DOCUMENTATION_ELASTIC_USERNAME /// Elasticsearch password (basic auth), alternatively set env DOCUMENTATION_ELASTIC_PASSWORD /// Index without semantic fields + /// Enable AI enrichment of documents using LLM-generated metadata /// The number of search threads the inference endpoint should use. Defaults: 8 /// The number of index threads the inference endpoint should use. Defaults: 8 /// Do not use the Elastic Inference Service, bootstrap inference endpoint @@ -58,6 +59,7 @@ public async Task Index(IDiagnosticsCollector collector, string? password = null, // inference options bool? noSemantic = null, + bool? enableAiEnrichment = null, int? searchNumThreads = null, int? indexNumThreads = null, bool? noEis = null, @@ -136,6 +138,8 @@ public async Task Index(IDiagnosticsCollector collector, if (noSemantic.HasValue) cfg.NoSemantic = noSemantic.Value; + if (enableAiEnrichment.HasValue) + cfg.EnableAiEnrichment = enableAiEnrichment.Value; if (forceReindex.HasValue) cfg.ForceReindex = forceReindex.Value; diff --git a/src/tooling/docs-builder/Commands/Assembler/AssemblerIndexCommand.cs b/src/tooling/docs-builder/Commands/Assembler/AssemblerIndexCommand.cs index 22bd5fb4b..115dda5b9 100644 --- a/src/tooling/docs-builder/Commands/Assembler/AssemblerIndexCommand.cs +++ b/src/tooling/docs-builder/Commands/Assembler/AssemblerIndexCommand.cs @@ -31,6 +31,7 @@ ICoreService githubActionsService /// Elasticsearch username (basic auth), alternatively set env DOCUMENTATION_ELASTIC_USERNAME /// Elasticsearch password (basic auth), alternatively set env DOCUMENTATION_ELASTIC_PASSWORD /// Index without semantic fields + /// Enable AI enrichment of documents using LLM-generated metadata /// The number of search threads the inference endpoint should use. Defaults: 8 /// The number of index threads the inference endpoint should use. Defaults: 8 /// Do not use the Elastic Inference Service, bootstrap inference endpoint @@ -59,6 +60,7 @@ public async Task Index( // inference options bool? noSemantic = null, + bool? enableAiEnrichment = null, int? searchNumThreads = null, int? indexNumThreads = null, bool? noEis = null, @@ -95,7 +97,7 @@ public async Task Index( // endpoint options endpoint, environment, apiKey, username, password, // inference options - noSemantic, indexNumThreads, searchNumThreads, noEis, bootstrapTimeout, + noSemantic, enableAiEnrichment, indexNumThreads, searchNumThreads, noEis, bootstrapTimeout, // channel and connection options indexNamePrefix, forceReindex, bufferSize, maxRetries, debugMode, // proxy options @@ -108,7 +110,7 @@ static async (s, collector, state, ctx) => await s.Index(collector, state.fs, // endpoint options state.endpoint, state.environment, state.apiKey, state.username, state.password, // inference options - state.noSemantic, state.searchNumThreads, state.indexNumThreads, state.noEis, state.bootstrapTimeout, + state.noSemantic, state.enableAiEnrichment, state.searchNumThreads, state.indexNumThreads, state.noEis, state.bootstrapTimeout, // channel and connection options state.indexNamePrefix, state.forceReindex, state.bufferSize, state.maxRetries, state.debugMode, // proxy options diff --git a/src/tooling/docs-builder/Commands/IndexCommand.cs b/src/tooling/docs-builder/Commands/IndexCommand.cs index c6a38dcee..efc1af596 100644 --- a/src/tooling/docs-builder/Commands/IndexCommand.cs +++ b/src/tooling/docs-builder/Commands/IndexCommand.cs @@ -29,6 +29,7 @@ ICoreService githubActionsService /// Elasticsearch username (basic auth), alternatively set env DOCUMENTATION_ELASTIC_USERNAME /// Elasticsearch password (basic auth), alternatively set env DOCUMENTATION_ELASTIC_PASSWORD /// Index without semantic fields + /// Enable AI enrichment of documents using LLM-generated metadata /// The number of search threads the inference endpoint should use. Defaults: 8 /// The number of index threads the inference endpoint should use. Defaults: 8 /// The prefix for the computed index/alias names. Defaults: semantic-docs @@ -57,6 +58,7 @@ public async Task Index( // inference options bool? noSemantic = null, + bool? enableAiEnrichment = null, int? searchNumThreads = null, int? indexNumThreads = null, bool? noEis = null, @@ -93,7 +95,7 @@ public async Task Index( // endpoint options endpoint, apiKey, username, password, // inference options - noSemantic, indexNumThreads, noEis, searchNumThreads, bootstrapTimeout, + noSemantic, enableAiEnrichment, indexNumThreads, noEis, searchNumThreads, bootstrapTimeout, // channel and connection options indexNamePrefix, forceReindex, bufferSize, maxRetries, debugMode, // proxy options @@ -106,7 +108,7 @@ static async (s, collector, state, ctx) => await s.Index(collector, state.fs, st // endpoint options state.endpoint, state.apiKey, state.username, state.password, // inference options - state.noSemantic, state.searchNumThreads, state.indexNumThreads, state.noEis, state.bootstrapTimeout, + state.noSemantic, state.enableAiEnrichment, state.searchNumThreads, state.indexNumThreads, state.noEis, state.bootstrapTimeout, // channel and connection options state.indexNamePrefix, state.forceReindex, state.bufferSize, state.maxRetries, state.debugMode, // proxy options From f6f118b7f0744959dcd509d23ef2cc333942988f Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Mon, 22 Dec 2025 10:00:10 +0100 Subject: [PATCH 02/12] Format --- .../Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs index e572085e9..5cdfa4e7b 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs @@ -10,9 +10,9 @@ using Elastic.Documentation.Diagnostics; using Elastic.Ingest.Elasticsearch; using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Markdown.Exporters.Elasticsearch.Enrichment; using Elastic.Transport; using Elastic.Transport.Products.Elasticsearch; -using Elastic.Markdown.Exporters.Elasticsearch.Enrichment; using Microsoft.Extensions.Logging; using NetEscapades.EnumGenerators; From 0034a020ac6e5fa0cfef1495f01a8c5d97bc3de9 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Mon, 22 Dec 2025 10:11:27 +0100 Subject: [PATCH 03/12] Potential fix for pull request finding 'Generic catch clause' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> --- .../Enrichment/ElasticsearchEnrichmentCache.cs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs index 29d9796f5..ad87eb610 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs @@ -82,9 +82,21 @@ private async Task PersistToElasticsearchAsync(string key, EnrichmentData data, if (!response.ApiCallDetails.HasSuccessfulStatusCode) _logger.LogWarning("Failed to persist cache entry: {StatusCode}", response.ApiCallDetails.HttpStatusCode); } + catch (OperationCanceledException) + { + // Respect cancellation requests and allow callers to observe them. + throw; + } + catch (TransportException tex) + { + // Transport-related failures are treated as best-effort cache persistence issues. + _logger.LogWarning(tex, "Failed to persist cache entry for key {Key}", key); + } catch (Exception ex) { - _logger.LogWarning(ex, "Failed to persist cache entry for key {Key}", key); + // Unexpected exceptions are logged and rethrown to avoid silently swallowing serious issues. + _logger.LogError(ex, "Unexpected error while persisting cache entry for key {Key}", key); + throw; } } From eca922011ddc17ef22b337e6c0d27755fc47daf9 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Mon, 22 Dec 2025 22:48:09 +0100 Subject: [PATCH 04/12] Use enrich policy and ingest pipeline for cache hits --- .../Search/DocumentationDocument.cs | 8 + .../ElasticsearchIngestChannel.Mapping.cs | 5 +- .../ElasticsearchIngestChannel.cs | 10 +- .../ElasticsearchMarkdownExporter.Export.cs | 60 ++++- .../ElasticsearchMarkdownExporter.cs | 159 ++++++++---- .../ElasticsearchTransportFactory.cs | 42 +++ .../Enrichment/ContentHashGenerator.cs | 25 ++ .../Enrichment/DocumentEnrichmentService.cs | 225 ----------------- .../ElasticsearchEnrichmentCache.cs | 239 +++++++----------- .../Enrichment/ElasticsearchLlmClient.cs | 18 +- .../Enrichment/EnrichPolicyManager.cs | 159 ++++++++++++ .../Enrichment/IEnrichmentCache.cs | 28 +- .../Elasticsearch/Enrichment/ILlmClient.cs | 36 +++ .../Exporters/ExporterExtensions.cs | 3 +- 14 files changed, 577 insertions(+), 440 deletions(-) create mode 100644 src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchTransportFactory.cs create mode 100644 src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ContentHashGenerator.cs delete mode 100644 src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/DocumentEnrichmentService.cs create mode 100644 src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichPolicyManager.cs diff --git a/src/Elastic.Documentation/Search/DocumentationDocument.cs b/src/Elastic.Documentation/Search/DocumentationDocument.cs index 632d19596..901a90ea3 100644 --- a/src/Elastic.Documentation/Search/DocumentationDocument.cs +++ b/src/Elastic.Documentation/Search/DocumentationDocument.cs @@ -86,6 +86,14 @@ public record DocumentationDocument // AI Enrichment fields - populated by DocumentEnrichmentService + /// + /// Content-addressable hash of title + body for AI enrichment cache lookup. + /// Used by the enrich processor to join AI-generated fields at index time. + /// + [JsonPropertyName("content_hash")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? ContentHash { get; set; } + /// /// 3-5 sentences dense with technical entities, API names, and core functionality for vector matching. /// diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs index 033a358e7..d59770e37 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs @@ -11,13 +11,15 @@ public abstract partial class ElasticsearchIngestChannel where TChannel : CatalogIndexChannel { - protected static string CreateMappingSetting(string synonymSetName, string[] synonyms) + protected static string CreateMappingSetting(string synonymSetName, string[] synonyms, string? defaultPipeline = null) { var indexTimeSynonyms = $"[{string.Join(",", synonyms.Select(r => $"\"{r}\""))}]"; + var pipelineSetting = defaultPipeline is not null ? $"\"default_pipeline\": \"{defaultPipeline}\"," : ""; // language=json return $$$""" { + {{{pipelineSetting}}} "analysis": { "normalizer": { "keyword_normalizer": { @@ -156,6 +158,7 @@ protected static string CreateMapping(string? inferenceId) => } }, "hash" : { "type" : "keyword" }, + "content_hash" : { "type" : "keyword" }, "search_title": { "type": "text", "analyzer": "synonyms_fixed_analyzer", diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.cs index 96fca4562..6ff857956 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.cs @@ -21,7 +21,8 @@ public class ElasticsearchLexicalIngestChannel( ElasticsearchEndpoint endpoint, string indexNamespace, DistributedTransport transport, - string[] indexTimeSynonyms + string[] indexTimeSynonyms, + string? defaultPipeline = null ) : ElasticsearchIngestChannel, CatalogIndexChannel> (logFactory, collector, endpoint, transport, o => new(o), t => new(t) @@ -34,7 +35,7 @@ string[] indexTimeSynonyms { "batch_index_date", d.BatchIndexDate.ToString("o") } }), GetMapping = () => CreateMapping(null), - GetMappingSettings = () => CreateMappingSetting($"docs-{indexNamespace}", indexTimeSynonyms), + GetMappingSettings = () => CreateMappingSetting($"docs-{indexNamespace}", indexTimeSynonyms, defaultPipeline), IndexFormat = $"{endpoint.IndexNamePrefix.Replace("semantic", "lexical").ToLowerInvariant()}-{indexNamespace.ToLowerInvariant()}-{{0:yyyy.MM.dd.HHmmss}}", ActiveSearchAlias = $"{endpoint.IndexNamePrefix.Replace("semantic", "lexical").ToLowerInvariant()}-{indexNamespace.ToLowerInvariant()}" @@ -46,14 +47,15 @@ public class ElasticsearchSemanticIngestChannel( ElasticsearchEndpoint endpoint, string indexNamespace, DistributedTransport transport, - string[] indexTimeSynonyms + string[] indexTimeSynonyms, + string? defaultPipeline = null ) : ElasticsearchIngestChannel, SemanticIndexChannel> (logFactory, collector, endpoint, transport, o => new(o), t => new(t) { BulkOperationIdLookup = d => d.Url, GetMapping = (inferenceId, _) => CreateMapping(inferenceId), - GetMappingSettings = (_, _) => CreateMappingSetting($"docs-{indexNamespace}", indexTimeSynonyms), + GetMappingSettings = (_, _) => CreateMappingSetting($"docs-{indexNamespace}", indexTimeSynonyms, defaultPipeline), IndexFormat = $"{endpoint.IndexNamePrefix.ToLowerInvariant()}-{indexNamespace.ToLowerInvariant()}-{{0:yyyy.MM.dd.HHmmss}}", ActiveSearchAlias = $"{endpoint.IndexNamePrefix}-{indexNamespace.ToLowerInvariant()}", IndexNumThreads = endpoint.IndexNumThreads, diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs index cf5a7cfb7..49cbb0748 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs @@ -8,6 +8,7 @@ using Elastic.Documentation.Navigation; using Elastic.Documentation.Search; using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Markdown.Exporters.Elasticsearch.Enrichment; using Elastic.Markdown.Helpers; using Markdig.Syntax; using Microsoft.Extensions.Logging; @@ -132,8 +133,11 @@ public async ValueTask ExportAsync(MarkdownExportFileContext fileContext, CommonEnrichments(doc, currentNavigation); - // AI enrichment - respects per-run limit, uses cache - _ = await _enrichmentService.TryEnrichAsync(doc, ctx); + // AI Enrichment - hybrid approach: + // - Cache hits: enrich processor applies fields at index time + // - Cache misses: apply fields inline before indexing + doc.ContentHash = ContentHashGenerator.Generate(doc.Title, doc.StrippedBody ?? string.Empty); + await TryEnrichDocumentAsync(doc, ctx); AssignDocumentMetadata(doc); @@ -171,8 +175,9 @@ public async ValueTask FinishExportAsync(IDirectoryInfo outputFolder, Canc doc.Headings = headings; CommonEnrichments(doc, null); - // AI enrichment - respects per-run limit, uses cache - _ = await _enrichmentService.TryEnrichAsync(doc, ctx); + // AI Enrichment - hybrid approach + doc.ContentHash = ContentHashGenerator.Generate(doc.Title, doc.StrippedBody ?? string.Empty); + await TryEnrichDocumentAsync(doc, ctx); AssignDocumentMetadata(doc); @@ -199,4 +204,51 @@ public async ValueTask FinishExportAsync(IDirectoryInfo outputFolder, Canc return true; } + /// + /// Hybrid AI enrichment: cache hits rely on enrich processor, cache misses apply fields inline. + /// + private async ValueTask TryEnrichDocumentAsync(DocumentationDocument doc, Cancel ctx) + { + if (_enrichmentCache is null || _llmClient is null || string.IsNullOrWhiteSpace(doc.ContentHash)) + return; + + // Check if enrichment exists in cache + if (_enrichmentCache.Exists(doc.ContentHash)) + { + // Cache hit - enrich processor will apply fields at index time + _ = Interlocked.Increment(ref _cacheHitCount); + return; + } + + // Check if we've hit the limit for new enrichments + var current = Interlocked.Increment(ref _newEnrichmentCount); + if (current > _enrichmentOptions.MaxNewEnrichmentsPerRun) + { + _ = Interlocked.Decrement(ref _newEnrichmentCount); + return; + } + + // Cache miss - generate enrichment inline and apply directly + try + { + var enrichment = await _llmClient.EnrichAsync(doc.Title, doc.StrippedBody ?? string.Empty, ctx); + if (enrichment is not { HasData: true }) + return; + + // Store in cache for future runs + await _enrichmentCache.StoreAsync(doc.ContentHash, enrichment, _enrichmentOptions.PromptVersion, ctx); + + // Apply fields directly (enrich processor won't have this entry yet) + doc.AiRagOptimizedSummary = enrichment.RagOptimizedSummary; + doc.AiShortSummary = enrichment.ShortSummary; + doc.AiSearchQuery = enrichment.SearchQuery; + doc.AiQuestions = enrichment.Questions; + doc.AiUseCases = enrichment.UseCases; + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + _logger.LogWarning(ex, "Failed to enrich document {Url}", doc.Url); + _ = Interlocked.Decrement(ref _newEnrichmentCount); + } + } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs index 5cdfa4e7b..f974b016b 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs @@ -42,9 +42,14 @@ public partial class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposa private readonly IReadOnlyCollection _rules; private readonly VersionsConfiguration _versionsConfiguration; private readonly string _fixedSynonymsHash; - private readonly Enrichment.DocumentEnrichmentService _enrichmentService; - private readonly IEnrichmentCache _enrichmentCache; - private readonly ILlmClient _llmClient; + + // AI Enrichment - hybrid approach: cache hits use enrich processor, misses are applied inline + private readonly ElasticsearchEnrichmentCache? _enrichmentCache; + private readonly ElasticsearchLlmClient? _llmClient; + private readonly EnrichPolicyManager? _enrichPolicyManager; + private readonly EnrichmentOptions _enrichmentOptions = new(); + private int _newEnrichmentCount; + private int _cacheHitCount; public ElasticsearchMarkdownExporter( ILoggerFactory logFactory, @@ -65,30 +70,7 @@ IDocumentationConfigurationContext context _rules = context.SearchConfiguration.Rules; var es = endpoints.Elasticsearch; - var configuration = new ElasticsearchConfiguration(es.Uri) - { - Authentication = es.ApiKey is { } apiKey - ? new ApiKey(apiKey) - : es is { Username: { } username, Password: { } password } - ? new BasicAuthentication(username, password) - : null, - EnableHttpCompression = true, - //DebugMode = _endpoint.DebugMode, - DebugMode = true, - CertificateFingerprint = _endpoint.CertificateFingerprint, - ProxyAddress = _endpoint.ProxyAddress, - ProxyPassword = _endpoint.ProxyPassword, - ProxyUsername = _endpoint.ProxyUsername, - ServerCertificateValidationCallback = _endpoint.DisableSslVerification - ? CertificateValidations.AllowAll - : _endpoint.Certificate is { } cert - ? _endpoint.CertificateIsNotRoot - ? CertificateValidations.AuthorityPartOfChain(cert) - : CertificateValidations.AuthorityIsRoot(cert) - : null - }; - - _transport = new DistributedTransport(configuration); + _transport = ElasticsearchTransportFactory.Create(es); string[] fixedSynonyms = ["esql", "data-stream", "data-streams", "machine-learning"]; var indexTimeSynonyms = _synonyms.Aggregate(new List(), (acc, synonym) => @@ -99,27 +81,44 @@ IDocumentationConfigurationContext context }).Where(r => fixedSynonyms.Contains(r.Id)).Select(r => r.Synonyms).ToArray(); _fixedSynonymsHash = HashedBulkUpdate.CreateHash(string.Join(",", indexTimeSynonyms)); - _lexicalChannel = new ElasticsearchLexicalIngestChannel(logFactory, collector, es, indexNamespace, _transport, indexTimeSynonyms); - _semanticChannel = new ElasticsearchSemanticIngestChannel(logFactory, collector, es, indexNamespace, _transport, indexTimeSynonyms); - - // Create enrichment services - var enrichmentOptions = new EnrichmentOptions { Enabled = es.EnableAiEnrichment }; - _enrichmentCache = new ElasticsearchEnrichmentCache(_transport, logFactory.CreateLogger()); - _llmClient = new ElasticsearchLlmClient(_transport, logFactory.CreateLogger()); - _enrichmentService = new Enrichment.DocumentEnrichmentService( - _enrichmentCache, - _llmClient, - enrichmentOptions, - logFactory.CreateLogger()); + // Use AI enrichment pipeline if enabled - hybrid approach: + // - Cache hits: enrich processor applies fields at index time + // - Cache misses: apply fields inline before indexing + var aiPipeline = es.EnableAiEnrichment ? EnrichPolicyManager.PipelineName : null; + _lexicalChannel = new ElasticsearchLexicalIngestChannel(logFactory, collector, es, indexNamespace, _transport, indexTimeSynonyms, aiPipeline); + _semanticChannel = new ElasticsearchSemanticIngestChannel(logFactory, collector, es, indexNamespace, _transport, indexTimeSynonyms, aiPipeline); + + // Initialize AI enrichment services if enabled + if (es.EnableAiEnrichment) + { + _enrichmentCache = new ElasticsearchEnrichmentCache(_transport, logFactory.CreateLogger()); + _llmClient = new ElasticsearchLlmClient(_transport, logFactory.CreateLogger()); + _enrichPolicyManager = new EnrichPolicyManager(_transport, logFactory.CreateLogger(), _enrichmentCache.IndexName); + } } /// public async ValueTask StartAsync(Cancel ctx = default) { + // Initialize AI enrichment cache (pre-loads existing hashes into memory) + if (_enrichmentCache is not null && _enrichPolicyManager is not null) + { + _logger.LogInformation("Initializing AI enrichment cache..."); + await _enrichmentCache.InitializeAsync(ctx); + _logger.LogInformation("AI enrichment cache ready with {Count} existing entries", _enrichmentCache.Count); + + // The enrich pipeline must exist before indexing (used as default_pipeline). + // The pipeline's enrich processor requires the .enrich-* index to exist, + // which is created by executing the policy. We execute even with an empty + // cache index - it just creates an empty enrich index that returns no matches. + _logger.LogInformation("Setting up enrich policy and pipeline..."); + await _enrichPolicyManager.ExecutePolicyAsync(ctx); + await _enrichPolicyManager.EnsurePipelineExistsAsync(ctx); + } + _currentLexicalHash = await _lexicalChannel.Channel.GetIndexTemplateHashAsync(ctx) ?? string.Empty; _currentSemanticHash = await _semanticChannel.Channel.GetIndexTemplateHashAsync(ctx) ?? string.Empty; - await _enrichmentService.InitializeAsync(ctx); await PublishSynonymsAsync(ctx); await PublishQueryRulesAsync(ctx); _ = await _lexicalChannel.Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, null, ctx); @@ -245,9 +244,6 @@ private async ValueTask CountAsync(string index, string body, Cancel ctx = /// public async ValueTask StopAsync(Cancel ctx = default) { - // Log AI enrichment progress - _enrichmentService.LogProgress(); - var semanticWriteAlias = string.Format(_semanticChannel.Channel.Options.IndexFormat, "latest"); var lexicalWriteAlias = string.Format(_lexicalChannel.Channel.Options.IndexFormat, "latest"); @@ -347,6 +343,80 @@ public async ValueTask StopAsync(Cancel ctx = default) _logger.LogInformation("Finish sync to semantic index using {IndexStrategy} strategy", _indexStrategy.ToStringFast(true)); await QueryDocumentCounts(ctx); + + // Execute enrich policy so new cache entries are available for next run + await ExecuteEnrichPolicyIfNeededAsync(ctx); + } + + private async ValueTask ExecuteEnrichPolicyIfNeededAsync(Cancel ctx) + { + if (_enrichmentCache is null || _enrichPolicyManager is null) + return; + + _logger.LogInformation( + "AI enrichment complete: {CacheHits} cache hits, {NewEnrichments} new enrichments (limit: {Limit})", + _cacheHitCount, _newEnrichmentCount, _enrichmentOptions.MaxNewEnrichmentsPerRun); + + if (_enrichmentCache.Count > 0) + { + _logger.LogInformation("Executing enrich policy to update internal index with {Count} total entries...", _enrichmentCache.Count); + await _enrichPolicyManager.ExecutePolicyAsync(ctx); + + // Backfill: Apply AI fields to documents that were skipped by hash-based upsert + await BackfillMissingAiFieldsAsync(ctx); + } + } + + private async ValueTask BackfillMissingAiFieldsAsync(Cancel ctx) + { + // Why backfill is needed: + // The exporter uses hash-based upsert - unchanged documents are skipped during indexing. + // These skipped documents never pass through the ingest pipeline, so they miss AI fields. + // This backfill runs _update_by_query with the AI pipeline to enrich those documents. + // + // Only backfill the semantic index - it's what the search API uses. + // The lexical index is just an intermediate step for reindexing. + if (_endpoint.NoSemantic || _enrichmentCache is null) + return; + + var semanticAlias = _semanticChannel.Channel.Options.ActiveSearchAlias; + + _logger.LogInformation( + "Starting AI backfill for documents missing AI fields (cache has {CacheCount} entries)", + _enrichmentCache.Count); + + // Find documents with content_hash but missing AI fields - these need the pipeline applied + var query = /*lang=json,strict*/ """ + { + "query": { + "bool": { + "must": { "exists": { "field": "content_hash" } }, + "must_not": { "exists": { "field": "ai_questions" } } + } + } + } + """; + + await RunBackfillQuery(semanticAlias, query, ctx); + } + + private async ValueTask RunBackfillQuery(string indexAlias, string query, Cancel ctx) + { + var pipeline = EnrichPolicyManager.PipelineName; + var url = $"/{indexAlias}/_update_by_query?pipeline={pipeline}&timeout=10m"; + + var response = await _transport.PostAsync(url, PostData.String(query), ctx); + + if (!response.ApiCallDetails.HasSuccessfulStatusCode) + { + _logger.LogWarning("AI backfill failed for {Index}: {Response}", indexAlias, response); + return; + } + + var updated = response.Body.Get("updated"); + _logger.LogInformation( + "AI backfill complete for {Index}: {Updated} documents processed (only those with matching cache entries get AI fields)", + indexAlias, updated); } private async ValueTask QueryIngestStatistics(string lexicalWriteAlias, Cancel ctx) @@ -454,8 +524,7 @@ public void Dispose() { _lexicalChannel.Dispose(); _semanticChannel.Dispose(); - _enrichmentService.Dispose(); - _llmClient.Dispose(); + _llmClient?.Dispose(); GC.SuppressFinalize(this); } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchTransportFactory.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchTransportFactory.cs new file mode 100644 index 000000000..78652bd1d --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchTransportFactory.cs @@ -0,0 +1,42 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Elastic.Documentation.Configuration; +using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; + +namespace Elastic.Markdown.Exporters.Elasticsearch; + +/// +/// Factory for creating Elasticsearch transport from endpoint configuration. +/// +public static class ElasticsearchTransportFactory +{ + public static DistributedTransport Create(ElasticsearchEndpoint endpoint) + { + var configuration = new ElasticsearchConfiguration(endpoint.Uri) + { + Authentication = endpoint.ApiKey is { } apiKey + ? new ApiKey(apiKey) + : endpoint is { Username: { } username, Password: { } password } + ? new BasicAuthentication(username, password) + : null, + EnableHttpCompression = true, + DebugMode = endpoint.DebugMode, + CertificateFingerprint = endpoint.CertificateFingerprint, + ProxyAddress = endpoint.ProxyAddress, + ProxyPassword = endpoint.ProxyPassword, + ProxyUsername = endpoint.ProxyUsername, + ServerCertificateValidationCallback = endpoint.DisableSslVerification + ? CertificateValidations.AllowAll + : endpoint.Certificate is { } cert + ? endpoint.CertificateIsNotRoot + ? CertificateValidations.AuthorityPartOfChain(cert) + : CertificateValidations.AuthorityIsRoot(cert) + : null + }; + + return new DistributedTransport(configuration); + } +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ContentHashGenerator.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ContentHashGenerator.cs new file mode 100644 index 000000000..9c4cab928 --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ContentHashGenerator.cs @@ -0,0 +1,25 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Security.Cryptography; +using System.Text; +using System.Text.RegularExpressions; + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Generates content-addressable hashes for AI enrichment cache lookups. +/// +public static partial class ContentHashGenerator +{ + public static string Generate(string title, string body) + { + var normalized = NormalizeRegex().Replace(title + body, "").ToLowerInvariant(); + var hash = SHA256.HashData(Encoding.UTF8.GetBytes(normalized)); + return Convert.ToHexString(hash).ToLowerInvariant(); + } + + [GeneratedRegex("[^a-zA-Z0-9]")] + private static partial Regex NormalizeRegex(); +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/DocumentEnrichmentService.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/DocumentEnrichmentService.cs deleted file mode 100644 index bc4621d20..000000000 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/DocumentEnrichmentService.cs +++ /dev/null @@ -1,225 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -using System.Security.Cryptography; -using System.Text; -using System.Text.Json.Serialization; -using System.Text.RegularExpressions; -using Elastic.Documentation.Search; -using Microsoft.Extensions.Logging; - -namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; - -/// -/// Orchestrates document enrichment using an LLM client and cache. -/// -public sealed partial class DocumentEnrichmentService( - IEnrichmentCache cache, - ILlmClient llm, - EnrichmentOptions options, - ILogger logger) : IDisposable -{ - private readonly IEnrichmentCache _cache = cache; - private readonly ILlmClient _llm = llm; - private readonly EnrichmentOptions _options = options; - private readonly ILogger _logger = logger; - - private int _cacheHitCount; - private int _staleRefreshCount; - private int _newEnrichmentCount; - private int _skippedCount; - - public Task InitializeAsync(CancellationToken ct) => - _options.Enabled ? _cache.InitializeAsync(ct) : Task.CompletedTask; - - public async Task TryEnrichAsync(DocumentationDocument doc, CancellationToken ct) - { - if (!_options.Enabled) - return false; - - if (string.IsNullOrWhiteSpace(doc.StrippedBody)) - return false; - - var cacheKey = GenerateCacheKey(doc.Title, doc.StrippedBody); - - if (TryApplyCachedEnrichment(doc, cacheKey)) - { - await TryRefreshStaleCacheAsync(doc, cacheKey, ct); - return true; - } - - return await TryEnrichNewDocumentAsync(doc, cacheKey, ct); - } - - public void LogProgress() - { - if (!_options.Enabled) - { - _logger.LogInformation("AI enrichment is disabled (use --enable-ai-enrichment to enable)"); - return; - } - - _logger.LogInformation( - "Enrichment summary: {CacheHits} cache hits ({StaleRefreshed} stale refreshed), {NewEnrichments} new, {Skipped} skipped (limit: {Limit})", - _cacheHitCount, _staleRefreshCount, _newEnrichmentCount, _skippedCount, _options.MaxNewEnrichmentsPerRun); - - if (_skippedCount > 0) - { - _logger.LogInformation( - "Enrichment progress: {Skipped} documents pending, will complete over subsequent runs", - _skippedCount); - } - } - - public void Dispose() => (_llm as IDisposable)?.Dispose(); - - private bool TryApplyCachedEnrichment(DocumentationDocument doc, string cacheKey) - { - var cached = _cache.TryGet(cacheKey); - if (cached is null) - return false; - - // Defensive check: if cached data is invalid, treat as miss and let it re-enrich - if (!cached.Data.HasData) - { - _logger.LogDebug("Cached entry for {Url} has no valid data, will re-enrich", doc.Url); - return false; - } - - ApplyEnrichment(doc, cached.Data); - _ = Interlocked.Increment(ref _cacheHitCount); - return true; - } - - private async Task TryRefreshStaleCacheAsync(DocumentationDocument doc, string cacheKey, CancellationToken ct) - { - var cached = _cache.TryGet(cacheKey); - // If cache is current version or newer, no refresh needed - if (cached is not null && cached.PromptVersion >= _options.PromptVersion) - return; - - if (!TryClaimEnrichmentSlot()) - return; - - _ = Interlocked.Increment(ref _staleRefreshCount); - - try - { - var fresh = await _llm.EnrichAsync(doc.Title, doc.StrippedBody ?? string.Empty, ct); - if (fresh is not null) - { - await _cache.StoreAsync(cacheKey, fresh, _options.PromptVersion, ct); - ApplyEnrichment(doc, fresh); - } - } - catch (Exception ex) when (ex is not OperationCanceledException) - { - _logger.LogDebug(ex, "Failed to refresh stale cache for {Url}", doc.Url); - } - } - - private async Task TryEnrichNewDocumentAsync(DocumentationDocument doc, string cacheKey, CancellationToken ct) - { - if (!TryClaimEnrichmentSlot()) - { - _logger.LogDebug("Skipping enrichment for {Url} - limit reached", doc.Url); - _ = Interlocked.Increment(ref _skippedCount); - return false; - } - - try - { - var enrichment = await _llm.EnrichAsync(doc.Title, doc.StrippedBody ?? string.Empty, ct); - if (enrichment is not null) - { - await _cache.StoreAsync(cacheKey, enrichment, _options.PromptVersion, ct); - ApplyEnrichment(doc, enrichment); - return true; - } - } - catch (Exception ex) when (ex is not OperationCanceledException) - { - _logger.LogWarning(ex, "Failed to enrich document {Url}", doc.Url); - } - - return false; - } - - /// - /// Tries to get permission to make a new LLM call. - /// - /// Why: We have ~12k documents but only allow 100 new LLM calls per deployment. - /// This keeps each deployment fast. Documents not enriched this run will be - /// enriched in the next deployment. Cache hits are free and don't count. - /// - /// How: Add 1 to counter. If counter is too high, subtract 1 and return false. - /// This is safe when multiple documents run at the same time. - /// - private bool TryClaimEnrichmentSlot() - { - var current = Interlocked.Increment(ref _newEnrichmentCount); - if (current <= _options.MaxNewEnrichmentsPerRun) - return true; - - _ = Interlocked.Decrement(ref _newEnrichmentCount); - return false; - } - - private static void ApplyEnrichment(DocumentationDocument doc, EnrichmentData data) - { - doc.AiRagOptimizedSummary = data.RagOptimizedSummary; - doc.AiShortSummary = data.ShortSummary; - doc.AiSearchQuery = data.SearchQuery; - doc.AiQuestions = data.Questions; - doc.AiUseCases = data.UseCases; - } - - private static string GenerateCacheKey(string title, string body) - { - var normalized = NormalizeContent(title + body); - var hash = SHA256.HashData(Encoding.UTF8.GetBytes(normalized)); - return Convert.ToHexString(hash).ToLowerInvariant(); - } - - private static string NormalizeContent(string input) => - NormalizeRegex().Replace(input, "").ToLowerInvariant(); - - [GeneratedRegex("[^a-zA-Z0-9]")] - private static partial Regex NormalizeRegex(); -} - -/// -/// LLM-generated enrichment data for documentation documents. -/// -public sealed record EnrichmentData -{ - [JsonPropertyName("ai_rag_optimized_summary")] - public string? RagOptimizedSummary { get; init; } - - [JsonPropertyName("ai_short_summary")] - public string? ShortSummary { get; init; } - - [JsonPropertyName("ai_search_query")] - public string? SearchQuery { get; init; } - - [JsonPropertyName("ai_questions")] - public string[]? Questions { get; init; } - - [JsonPropertyName("ai_use_cases")] - public string[]? UseCases { get; init; } - - [JsonIgnore] - public bool HasData => - !string.IsNullOrEmpty(RagOptimizedSummary) || - !string.IsNullOrEmpty(ShortSummary) || - !string.IsNullOrEmpty(SearchQuery) || - Questions is { Length: > 0 } || - UseCases is { Length: > 0 }; -} - -[JsonSerializable(typeof(EnrichmentData))] -[JsonSerializable(typeof(CachedEnrichment))] -[JsonSerializable(typeof(CompletionResponse))] -[JsonSerializable(typeof(InferenceRequest))] -internal sealed partial class EnrichmentSerializerContext : JsonSerializerContext; diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs index ad87eb610..b9cfc2832 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs @@ -2,6 +2,7 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System.Collections.Concurrent; using System.Text.Json; using System.Text.Json.Serialization; using Elastic.Transport; @@ -11,8 +12,8 @@ namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; /// -/// Elasticsearch-backed implementation of . -/// Pre-loads all entries into memory at startup for fast lookups. +/// Elasticsearch-backed enrichment cache for use with the enrich processor. +/// Stores AI-generated enrichment fields directly (not as JSON string) for efficient lookups. /// public sealed class ElasticsearchEnrichmentCache( DistributedTransport transport, @@ -21,22 +22,22 @@ public sealed class ElasticsearchEnrichmentCache( { private readonly DistributedTransport _transport = transport; private readonly ILogger _logger = logger; - private readonly string _indexName = indexName; - private readonly Dictionary _cache = []; + private readonly ConcurrentDictionary _existingHashes = new(); - private const int ScrollBatchSize = 1000; - private const string ScrollTimeout = "1m"; + public string IndexName { get; } = indexName; // language=json + // Note: No settings block - Serverless doesn't allow number_of_shards/replicas private const string IndexMapping = """ { - "settings": { - "number_of_shards": 1, - "number_of_replicas": 1 - }, "mappings": { "properties": { - "response_json": { "type": "text", "index": false }, + "content_hash": { "type": "keyword" }, + "ai_rag_optimized_summary": { "type": "text" }, + "ai_short_summary": { "type": "text" }, + "ai_search_query": { "type": "text" }, + "ai_questions": { "type": "text" }, + "ai_use_cases": { "type": "text" }, "created_at": { "type": "date" }, "prompt_version": { "type": "integer" } } @@ -44,131 +45,94 @@ public sealed class ElasticsearchEnrichmentCache( } """; - public int Count => _cache.Count; + public int Count => _existingHashes.Count; public async Task InitializeAsync(CancellationToken ct) { await EnsureIndexExistsAsync(ct); - await PreloadCacheAsync(ct); + await LoadExistingHashesAsync(ct); } - public CachedEnrichmentEntry? TryGet(string key) => - _cache.TryGetValue(key, out var entry) ? entry : null; + public bool Exists(string contentHash) => _existingHashes.ContainsKey(contentHash); - public async Task StoreAsync(string key, EnrichmentData data, int promptVersion, CancellationToken ct) - { - _cache[key] = new CachedEnrichmentEntry(data, promptVersion); - await PersistToElasticsearchAsync(key, data, promptVersion, ct); - } + public Task GetAsync(string contentHash, CancellationToken ct) => + Task.FromResult(null); // Not used - enrich processor handles cache hits - private async Task PersistToElasticsearchAsync(string key, EnrichmentData data, int promptVersion, CancellationToken ct) + public async Task StoreAsync(string contentHash, EnrichmentData data, int promptVersion, CancellationToken ct) { - try + var cacheEntry = new CacheIndexEntry { - var responseJson = JsonSerializer.Serialize(data, EnrichmentSerializerContext.Default.EnrichmentData); - var cacheItem = new CachedEnrichment - { - ResponseJson = responseJson, - CreatedAt = DateTimeOffset.UtcNow, - PromptVersion = promptVersion - }; - - var body = JsonSerializer.Serialize(cacheItem, EnrichmentSerializerContext.Default.CachedEnrichment); - var response = await _transport.PutAsync( - $"{_indexName}/_doc/{key}", - PostData.String(body), - ct); + ContentHash = contentHash, + AiRagOptimizedSummary = data.RagOptimizedSummary, + AiShortSummary = data.ShortSummary, + AiSearchQuery = data.SearchQuery, + AiQuestions = data.Questions, + AiUseCases = data.UseCases, + CreatedAt = DateTimeOffset.UtcNow, + PromptVersion = promptVersion + }; + + var body = JsonSerializer.Serialize(cacheEntry, EnrichmentSerializerContext.Default.CacheIndexEntry); + var response = await _transport.PutAsync( + $"{IndexName}/_doc/{contentHash}", + PostData.String(body), + ct); - if (!response.ApiCallDetails.HasSuccessfulStatusCode) - _logger.LogWarning("Failed to persist cache entry: {StatusCode}", response.ApiCallDetails.HttpStatusCode); - } - catch (OperationCanceledException) - { - // Respect cancellation requests and allow callers to observe them. - throw; - } - catch (TransportException tex) - { - // Transport-related failures are treated as best-effort cache persistence issues. - _logger.LogWarning(tex, "Failed to persist cache entry for key {Key}", key); - } - catch (Exception ex) - { - // Unexpected exceptions are logged and rethrown to avoid silently swallowing serious issues. - _logger.LogError(ex, "Unexpected error while persisting cache entry for key {Key}", key); - throw; - } + if (response.ApiCallDetails.HasSuccessfulStatusCode) + _ = _existingHashes.TryAdd(contentHash, 0); + else + _logger.LogWarning("Failed to store enrichment: {StatusCode}", response.ApiCallDetails.HttpStatusCode); } private async Task EnsureIndexExistsAsync(CancellationToken ct) { - var existsResponse = await _transport.HeadAsync(_indexName, ct); + var existsResponse = await _transport.HeadAsync(IndexName, ct); if (existsResponse.ApiCallDetails.HasSuccessfulStatusCode) + { + _logger.LogDebug("Enrichment cache index {IndexName} already exists", IndexName); return; + } + _logger.LogInformation("Creating enrichment cache index {IndexName}...", IndexName); var createResponse = await _transport.PutAsync( - _indexName, + IndexName, PostData.String(IndexMapping), ct); if (createResponse.ApiCallDetails.HasSuccessfulStatusCode) - _logger.LogInformation("Created cache index {IndexName}", _indexName); - else if (createResponse.ApiCallDetails.HttpStatusCode != 400) // 400 = already exists - _logger.LogWarning("Failed to create cache index: {StatusCode}", createResponse.ApiCallDetails.HttpStatusCode); + _logger.LogInformation("Created enrichment cache index {IndexName}", IndexName); + else if (createResponse.ApiCallDetails.HttpStatusCode == 400 && + createResponse.Body?.Contains("resource_already_exists_exception") == true) + _logger.LogDebug("Enrichment cache index {IndexName} already exists (race condition)", IndexName); + else + _logger.LogError("Failed to create cache index: {StatusCode} - {Response}", + createResponse.ApiCallDetails.HttpStatusCode, createResponse.Body); } - private async Task PreloadCacheAsync(CancellationToken ct) + private async Task LoadExistingHashesAsync(CancellationToken ct) { var sw = System.Diagnostics.Stopwatch.StartNew(); - if (!await HasDocumentsAsync(ct)) - { - _logger.LogInformation("Cache index is empty, skipping preload ({ElapsedMs}ms)", sw.ElapsedMilliseconds); - return; - } - - await ScrollAllDocumentsAsync(ct); + // Only fetch _id to minimize memory - we use _id as the hash + var scrollQuery = /*lang=json,strict*/ """{"size": 1000, "_source": false, "query": {"match_all": {}}}"""; - sw.Stop(); - _logger.LogInformation("Pre-loaded {Count} cache entries in {ElapsedMs}ms", _cache.Count, sw.ElapsedMilliseconds); - } - - private async Task HasDocumentsAsync(CancellationToken ct) - { - var countResponse = await _transport.GetAsync($"{_indexName}/_count", ct); - if (!countResponse.ApiCallDetails.HasSuccessfulStatusCode) - { - _logger.LogWarning("Cache count failed: {StatusCode}", countResponse.ApiCallDetails.HttpStatusCode); - return true; // Assume there might be documents - } - - var docCount = countResponse.Body.Get("count") ?? 0; - _logger.LogDebug("Cache index has {Count} documents", docCount); - return docCount > 0; - } - - private async Task ScrollAllDocumentsAsync(CancellationToken ct) - { - var scrollQuery = "{\"size\": " + ScrollBatchSize + ", \"query\": {\"match_all\": {}}}"; - - var searchResponse = await _transport.PostAsync( - $"{_indexName}/_search?scroll={ScrollTimeout}", + var searchResponse = await _transport.PostAsync( + $"{IndexName}/_search?scroll=1m", PostData.String(scrollQuery), ct); if (!searchResponse.ApiCallDetails.HasSuccessfulStatusCode) { - _logger.LogWarning("Failed to start cache scroll: {StatusCode}", searchResponse.ApiCallDetails.HttpStatusCode); + _logger.LogWarning("Failed to load existing hashes: {StatusCode}", searchResponse.ApiCallDetails.HttpStatusCode); return; } - _ = ProcessHits(searchResponse); - var scrollId = searchResponse.Body.Get("_scroll_id"); + var (count, scrollId) = ProcessHashHits(searchResponse.Body); - while (scrollId is not null) + while (scrollId is not null && count > 0) { - var scrollBody = $$"""{"scroll": "{{ScrollTimeout}}", "scroll_id": "{{scrollId}}"}"""; - var scrollResponse = await _transport.PostAsync( + var scrollBody = $$"""{"scroll": "1m", "scroll_id": "{{scrollId}}"}"""; + var scrollResponse = await _transport.PostAsync( "_search/scroll", PostData.String(scrollBody), ct); @@ -176,72 +140,65 @@ private async Task ScrollAllDocumentsAsync(CancellationToken ct) if (!scrollResponse.ApiCallDetails.HasSuccessfulStatusCode) break; - var hitsCount = ProcessHits(scrollResponse); - if (hitsCount == 0) - break; - - scrollId = scrollResponse.Body.Get("_scroll_id"); + (count, scrollId) = ProcessHashHits(scrollResponse.Body); } + + _logger.LogInformation("Loaded {Count} existing enrichment hashes in {ElapsedMs}ms", + _existingHashes.Count, sw.ElapsedMilliseconds); } - private int ProcessHits(DynamicResponse response) + private (int count, string? scrollId) ProcessHashHits(string? responseBody) { - if (response.ApiCallDetails.ResponseBodyInBytes is not { } responseBytes) - return 0; + if (string.IsNullOrEmpty(responseBody)) + return (0, null); + + using var doc = JsonDocument.Parse(responseBody); + + var scrollId = doc.RootElement.TryGetProperty("_scroll_id", out var scrollIdProp) + ? scrollIdProp.GetString() + : null; - using var doc = JsonDocument.Parse(responseBytes); if (!doc.RootElement.TryGetProperty("hits", out var hitsObj) || !hitsObj.TryGetProperty("hits", out var hitsArray)) - return 0; + return (0, scrollId); var count = 0; foreach (var hit in hitsArray.EnumerateArray()) { - if (TryParseHit(hit, out var id, out var entry)) + // Use _id as the hash (we store content_hash as both _id and field) + if (hit.TryGetProperty("_id", out var idProp) && idProp.GetString() is { } id) { - _cache[id] = entry; + _ = _existingHashes.TryAdd(id, 0); count++; } } - return count; + return (count, scrollId); } +} - private static bool TryParseHit(JsonElement hit, out string id, out CachedEnrichmentEntry entry) - { - id = string.Empty; - entry = default!; - - if (!hit.TryGetProperty("_id", out var idProp) || idProp.GetString() is not { } docId) - return false; +/// +/// Document structure for the enrichment cache index. +/// Fields are stored directly for use with the enrich processor. +/// +public sealed record CacheIndexEntry +{ + [JsonPropertyName("content_hash")] + public required string ContentHash { get; init; } - if (!hit.TryGetProperty("_source", out var source)) - return false; + [JsonPropertyName("ai_rag_optimized_summary")] + public string? AiRagOptimizedSummary { get; init; } - if (!source.TryGetProperty("response_json", out var jsonProp) || - jsonProp.GetString() is not { Length: > 0 } responseJson) - return false; + [JsonPropertyName("ai_short_summary")] + public string? AiShortSummary { get; init; } - var promptVersion = source.TryGetProperty("prompt_version", out var versionProp) - ? versionProp.GetInt32() - : 0; + [JsonPropertyName("ai_search_query")] + public string? AiSearchQuery { get; init; } - var data = JsonSerializer.Deserialize(responseJson, EnrichmentSerializerContext.Default.EnrichmentData); - if (data is not { HasData: true }) - return false; + [JsonPropertyName("ai_questions")] + public string[]? AiQuestions { get; init; } - id = docId; - entry = new CachedEnrichmentEntry(data, promptVersion); - return true; - } -} - -/// -/// Wrapper for storing enrichment data in the cache index. -/// -public sealed record CachedEnrichment -{ - [JsonPropertyName("response_json")] - public required string ResponseJson { get; init; } + [JsonPropertyName("ai_use_cases")] + public string[]? AiUseCases { get; init; } [JsonPropertyName("created_at")] public required DateTimeOffset CreatedAt { get; init; } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs index 1eaf4b19d..8b74b6561 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs @@ -19,7 +19,7 @@ public sealed class ElasticsearchLlmClient( ILogger logger, int maxConcurrency = 10, int maxRetries = 5, - string inferenceEndpointId = ".gp-llm-v2-completion") : ILlmClient + string inferenceEndpointId = ".gp-llm-v2-completion") : ILlmClient, IDisposable { private readonly DistributedTransport _transport = transport; private readonly ILogger _logger = logger; @@ -50,13 +50,13 @@ public sealed class ElasticsearchLlmClient( for (var attempt = 0; attempt <= _maxRetries; attempt++) { - var response = await _transport.PostAsync( + var response = await _transport.PostAsync( $"_inference/completion/{_inferenceEndpointId}", PostData.String(requestBody), ct); if (response.ApiCallDetails.HasSuccessfulStatusCode) - return ParseResponse(response); + return ParseResponse(response.Body); if (response.ApiCallDetails.HttpStatusCode == 429 && attempt < _maxRetries) { @@ -67,16 +67,17 @@ public sealed class ElasticsearchLlmClient( continue; } - _logger.LogWarning("LLM inference failed: {StatusCode}", response.ApiCallDetails.HttpStatusCode); + _logger.LogWarning("LLM inference failed: {StatusCode} - {Body}", + response.ApiCallDetails.HttpStatusCode, response.Body); return null; } return null; } - private EnrichmentData? ParseResponse(DynamicResponse response) + private EnrichmentData? ParseResponse(string? responseBody) { - if (response.ApiCallDetails.ResponseBodyInBytes is not { } responseBytes) + if (string.IsNullOrEmpty(responseBody)) { _logger.LogWarning("No response body from LLM"); return null; @@ -85,7 +86,7 @@ public sealed class ElasticsearchLlmClient( string? responseText = null; try { - var completionResponse = JsonSerializer.Deserialize(responseBytes, EnrichmentSerializerContext.Default.CompletionResponse); + var completionResponse = JsonSerializer.Deserialize(responseBody, EnrichmentSerializerContext.Default.CompletionResponse); responseText = completionResponse?.Completion?.FirstOrDefault()?.Result; if (string.IsNullOrEmpty(responseText)) @@ -122,6 +123,9 @@ private static string CleanLlmResponse(string response) if (cleaned.EndsWith("}}") && !cleaned.Contains("{{")) cleaned = cleaned[..^1]; + // Fix common LLM issue: trailing backticks from incomplete code block syntax + cleaned = cleaned.TrimEnd('`'); + return cleaned; } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichPolicyManager.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichPolicyManager.cs new file mode 100644 index 000000000..e0ae97500 --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichPolicyManager.cs @@ -0,0 +1,159 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; +using Microsoft.Extensions.Logging; + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Manages the Elasticsearch enrich policy and ingest pipeline for AI document enrichment. +/// +public sealed class EnrichPolicyManager( + DistributedTransport transport, + ILogger logger, + string cacheIndexName = "docs-ai-enriched-fields-cache") +{ + private readonly DistributedTransport _transport = transport; + private readonly ILogger _logger = logger; + private readonly string _cacheIndexName = cacheIndexName; + + public const string PolicyName = "ai-enrichment-policy"; + public const string PipelineName = "ai-enrichment-pipeline"; + + + // language=json + private const string IngestPipelineBody = """ + { + "description": "Enriches documents with AI-generated fields from the enrichment cache", + "processors": [ + { + "enrich": { + "policy_name": "ai-enrichment-policy", + "field": "content_hash", + "target_field": "ai_enrichment", + "max_matches": 1, + "ignore_missing": true + } + }, + { + "script": { + "description": "Flatten ai_enrichment fields to document root", + "if": "ctx.ai_enrichment != null", + "source": "if (ctx.ai_enrichment.ai_rag_optimized_summary != null) ctx.ai_rag_optimized_summary = ctx.ai_enrichment.ai_rag_optimized_summary; if (ctx.ai_enrichment.ai_short_summary != null) ctx.ai_short_summary = ctx.ai_enrichment.ai_short_summary; if (ctx.ai_enrichment.ai_search_query != null) ctx.ai_search_query = ctx.ai_enrichment.ai_search_query; if (ctx.ai_enrichment.ai_questions != null) ctx.ai_questions = ctx.ai_enrichment.ai_questions; if (ctx.ai_enrichment.ai_use_cases != null) ctx.ai_use_cases = ctx.ai_enrichment.ai_use_cases; ctx.remove('ai_enrichment');" + } + } + ] + } + """; + + /// + /// Ensures the enrich policy exists and creates it if not. + /// + public async Task EnsurePolicyExistsAsync(CancellationToken ct) + { + // Check if policy exists - GET returns 200 with policies array, or 404 if not found + var existsResponse = await _transport.GetAsync($"_enrich/policy/{PolicyName}", ct); + + if (existsResponse.ApiCallDetails.HasSuccessfulStatusCode && + existsResponse.Body?.Contains(PolicyName) == true) + { + _logger.LogInformation("Enrich policy {PolicyName} already exists", PolicyName); + return; + } + + _logger.LogInformation("Creating enrich policy {PolicyName} for index {CacheIndex}...", PolicyName, _cacheIndexName); + + // language=json + var policyBody = $$""" + { + "match": { + "indices": "{{_cacheIndexName}}", + "match_field": "content_hash", + "enrich_fields": ["ai_rag_optimized_summary", "ai_short_summary", "ai_search_query", "ai_questions", "ai_use_cases"] + } + } + """; + + var createResponse = await _transport.PutAsync( + $"_enrich/policy/{PolicyName}", + PostData.String(policyBody), + ct); + + _logger.LogInformation("Policy creation response: {StatusCode} - {Response}", + createResponse.ApiCallDetails.HttpStatusCode, createResponse.Body); + + if (createResponse.ApiCallDetails.HasSuccessfulStatusCode) + _logger.LogInformation("Created enrich policy {PolicyName}", PolicyName); + else + _logger.LogError("Failed to create enrich policy: {StatusCode} - {Response}", + createResponse.ApiCallDetails.HttpStatusCode, createResponse.Body); + } + + /// + /// Executes the enrich policy to rebuild the enrich index with latest data. + /// Call this after adding new entries to the cache index. + /// + public async Task ExecutePolicyAsync(CancellationToken ct) + { + // Verify policy exists before executing + var checkResponse = await _transport.GetAsync($"_enrich/policy/{PolicyName}", ct); + _logger.LogDebug("Pre-execute policy check: {StatusCode} - {Body}", + checkResponse.ApiCallDetails.HttpStatusCode, checkResponse.Body); + + if (!checkResponse.ApiCallDetails.HasSuccessfulStatusCode || + checkResponse.Body?.Contains(PolicyName) != true) + { + _logger.LogInformation("Policy {PolicyName} not found, creating...", PolicyName); + await EnsurePolicyExistsAsync(ct); + // Small delay for Serverless propagation + await Task.Delay(2000, ct); + } + + _logger.LogInformation("Executing enrich policy {PolicyName}...", PolicyName); + + var response = await _transport.PostAsync( + $"_enrich/policy/{PolicyName}/_execute", + PostData.Empty, + ct); + + if (response.ApiCallDetails.HasSuccessfulStatusCode) + _logger.LogInformation("Enrich policy executed successfully"); + else + _logger.LogWarning("Enrich policy execution failed (may be empty): {StatusCode} - {Response}", + response.ApiCallDetails.HttpStatusCode, response.Body); + } + + /// + /// Ensures the ingest pipeline exists and creates it if not. + /// + public async Task EnsurePipelineExistsAsync(CancellationToken ct) + { + var existsResponse = await _transport.GetAsync($"_ingest/pipeline/{PipelineName}", ct); + + if (existsResponse.ApiCallDetails.HasSuccessfulStatusCode) + { + _logger.LogInformation("Ingest pipeline {PipelineName} already exists", PipelineName); + return; + } + + _logger.LogInformation("Creating ingest pipeline {PipelineName}...", PipelineName); + var createResponse = await _transport.PutAsync( + $"_ingest/pipeline/{PipelineName}", + PostData.String(IngestPipelineBody), + ct); + + if (createResponse.ApiCallDetails.HasSuccessfulStatusCode) + _logger.LogInformation("Created ingest pipeline {PipelineName}", PipelineName); + else + _logger.LogError("Failed to create ingest pipeline: {StatusCode} - {Response}", + createResponse.ApiCallDetails.HttpStatusCode, createResponse.Body); + } + + /// + /// Ensures the enrich policy exists. Pipeline is created separately in StartAsync. + /// + public async Task InitializeAsync(CancellationToken ct) => await EnsurePolicyExistsAsync(ct); +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs index 9ed1643ca..7765044ed 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs @@ -11,30 +11,36 @@ public sealed record CachedEnrichmentEntry(EnrichmentData Data, int PromptVersio /// /// Abstraction for enrichment cache operations. -/// Enables swapping implementations (Elasticsearch, Redis, in-memory) and testing. +/// With the enrich processor pattern, the cache stores enrichment data that +/// gets joined to documents at index time via an Elasticsearch enrich processor. /// public interface IEnrichmentCache { /// - /// Initializes the cache, including any index bootstrapping and preloading. + /// The name of the cache index. + /// + string IndexName { get; } + + /// + /// Initializes the cache, including index creation and loading existing hashes. /// Task InitializeAsync(CancellationToken ct); /// - /// Attempts to retrieve enrichment data from the cache. + /// Checks if an enrichment exists for the given content hash. + /// + bool Exists(string contentHash); + + /// + /// Fetches enrichment data from the cache by content hash. + /// Returns null if not found. /// - /// The content-addressable cache key. - /// The cached entry if found, null otherwise. - CachedEnrichmentEntry? TryGet(string key); + Task GetAsync(string contentHash, CancellationToken ct); /// /// Stores enrichment data in the cache. /// - /// The content-addressable cache key. - /// The enrichment data to store. - /// The prompt version used to generate this data. - /// Cancellation token. - Task StoreAsync(string key, EnrichmentData data, int promptVersion, CancellationToken ct); + Task StoreAsync(string contentHash, EnrichmentData data, int promptVersion, CancellationToken ct); /// /// Gets the number of entries currently in the cache. diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ILlmClient.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ILlmClient.cs index 6a577d83a..5195f39d7 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ILlmClient.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ILlmClient.cs @@ -2,6 +2,8 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System.Text.Json.Serialization; + namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; /// @@ -19,3 +21,37 @@ public interface ILlmClient : IDisposable /// The enrichment data if successful, null otherwise. Task EnrichAsync(string title, string body, CancellationToken ct); } + +/// +/// AI-generated enrichment fields for a document. +/// +public sealed record EnrichmentData +{ + [JsonPropertyName("ai_rag_optimized_summary")] + public string? RagOptimizedSummary { get; init; } + + [JsonPropertyName("ai_short_summary")] + public string? ShortSummary { get; init; } + + [JsonPropertyName("ai_search_query")] + public string? SearchQuery { get; init; } + + [JsonPropertyName("ai_questions")] + public string[]? Questions { get; init; } + + [JsonPropertyName("ai_use_cases")] + public string[]? UseCases { get; init; } + + public bool HasData => + !string.IsNullOrEmpty(RagOptimizedSummary) || + !string.IsNullOrEmpty(ShortSummary) || + !string.IsNullOrEmpty(SearchQuery) || + Questions is { Length: > 0 } || + UseCases is { Length: > 0 }; +} + +[JsonSerializable(typeof(EnrichmentData))] +[JsonSerializable(typeof(InferenceRequest))] +[JsonSerializable(typeof(CompletionResponse))] +[JsonSerializable(typeof(CacheIndexEntry))] +internal sealed partial class EnrichmentSerializerContext : JsonSerializerContext; diff --git a/src/Elastic.Markdown/Exporters/ExporterExtensions.cs b/src/Elastic.Markdown/Exporters/ExporterExtensions.cs index ad7577bd5..cec7388f3 100644 --- a/src/Elastic.Markdown/Exporters/ExporterExtensions.cs +++ b/src/Elastic.Markdown/Exporters/ExporterExtensions.cs @@ -18,7 +18,7 @@ public static IReadOnlyCollection CreateMarkdownExporters( string indexNamespace ) { - var markdownExporters = new List(3); + var markdownExporters = new List(4); if (exportOptions.Contains(Exporter.LLMText)) markdownExporters.Add(new LlmMarkdownExporter()); if (exportOptions.Contains(Exporter.Configuration)) @@ -28,4 +28,3 @@ string indexNamespace return markdownExporters; } } - From ab9a89589f12ef6dea0d077a9c8fc97b21d50a49 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Mon, 22 Dec 2025 23:33:14 +0100 Subject: [PATCH 05/12] Rename to enrichment_key and add prompt hash to the key --- .../Search/DocumentationDocument.cs | 8 ++-- .../ElasticsearchMarkdownExporter.Export.cs | 10 ++--- .../ElasticsearchEnrichmentCache.cs | 44 +++++++++---------- .../Enrichment/ElasticsearchLlmClient.cs | 14 ++++++ ...Generator.cs => EnrichmentKeyGenerator.cs} | 9 ++-- .../Enrichment/EnrichmentOptions.cs | 6 --- .../Enrichment/IEnrichmentCache.cs | 17 ++----- 7 files changed, 53 insertions(+), 55 deletions(-) rename src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/{ContentHashGenerator.cs => EnrichmentKeyGenerator.cs} (65%) diff --git a/src/Elastic.Documentation/Search/DocumentationDocument.cs b/src/Elastic.Documentation/Search/DocumentationDocument.cs index 901a90ea3..60759ba33 100644 --- a/src/Elastic.Documentation/Search/DocumentationDocument.cs +++ b/src/Elastic.Documentation/Search/DocumentationDocument.cs @@ -87,12 +87,12 @@ public record DocumentationDocument // AI Enrichment fields - populated by DocumentEnrichmentService /// - /// Content-addressable hash of title + body for AI enrichment cache lookup. - /// Used by the enrich processor to join AI-generated fields at index time. + /// Key for enrichment cache lookups. Derived from normalized content + prompt hash. + /// Used by enrich processor to join AI-generated fields at index time. /// - [JsonPropertyName("content_hash")] + [JsonPropertyName("enrichment_key")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] - public string? ContentHash { get; set; } + public string? EnrichmentKey { get; set; } /// /// 3-5 sentences dense with technical entities, API names, and core functionality for vector matching. diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs index 49cbb0748..6ffc6834d 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs @@ -136,7 +136,7 @@ public async ValueTask ExportAsync(MarkdownExportFileContext fileContext, // AI Enrichment - hybrid approach: // - Cache hits: enrich processor applies fields at index time // - Cache misses: apply fields inline before indexing - doc.ContentHash = ContentHashGenerator.Generate(doc.Title, doc.StrippedBody ?? string.Empty); + doc.EnrichmentKey = EnrichmentKeyGenerator.Generate(doc.Title, doc.StrippedBody ?? string.Empty); await TryEnrichDocumentAsync(doc, ctx); AssignDocumentMetadata(doc); @@ -176,7 +176,7 @@ public async ValueTask FinishExportAsync(IDirectoryInfo outputFolder, Canc CommonEnrichments(doc, null); // AI Enrichment - hybrid approach - doc.ContentHash = ContentHashGenerator.Generate(doc.Title, doc.StrippedBody ?? string.Empty); + doc.EnrichmentKey = EnrichmentKeyGenerator.Generate(doc.Title, doc.StrippedBody ?? string.Empty); await TryEnrichDocumentAsync(doc, ctx); AssignDocumentMetadata(doc); @@ -209,11 +209,11 @@ public async ValueTask FinishExportAsync(IDirectoryInfo outputFolder, Canc /// private async ValueTask TryEnrichDocumentAsync(DocumentationDocument doc, Cancel ctx) { - if (_enrichmentCache is null || _llmClient is null || string.IsNullOrWhiteSpace(doc.ContentHash)) + if (_enrichmentCache is null || _llmClient is null || string.IsNullOrWhiteSpace(doc.EnrichmentKey)) return; // Check if enrichment exists in cache - if (_enrichmentCache.Exists(doc.ContentHash)) + if (_enrichmentCache.Exists(doc.EnrichmentKey)) { // Cache hit - enrich processor will apply fields at index time _ = Interlocked.Increment(ref _cacheHitCount); @@ -236,7 +236,7 @@ private async ValueTask TryEnrichDocumentAsync(DocumentationDocument doc, Cancel return; // Store in cache for future runs - await _enrichmentCache.StoreAsync(doc.ContentHash, enrichment, _enrichmentOptions.PromptVersion, ctx); + await _enrichmentCache.StoreAsync(doc.EnrichmentKey, enrichment, ctx); // Apply fields directly (enrich processor won't have this entry yet) doc.AiRagOptimizedSummary = enrichment.RagOptimizedSummary; diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs index b9cfc2832..9247048d8 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs @@ -32,15 +32,15 @@ public sealed class ElasticsearchEnrichmentCache( { "mappings": { "properties": { - "content_hash": { "type": "keyword" }, - "ai_rag_optimized_summary": { "type": "text" }, - "ai_short_summary": { "type": "text" }, - "ai_search_query": { "type": "text" }, - "ai_questions": { "type": "text" }, - "ai_use_cases": { "type": "text" }, - "created_at": { "type": "date" }, - "prompt_version": { "type": "integer" } - } + "enrichment_key": { "type": "keyword" }, + "ai_rag_optimized_summary": { "type": "text" }, + "ai_short_summary": { "type": "text" }, + "ai_search_query": { "type": "text" }, + "ai_questions": { "type": "text" }, + "ai_use_cases": { "type": "text" }, + "created_at": { "type": "date" }, + "prompt_hash": { "type": "keyword" } + } } } """; @@ -53,33 +53,31 @@ public async Task InitializeAsync(CancellationToken ct) await LoadExistingHashesAsync(ct); } - public bool Exists(string contentHash) => _existingHashes.ContainsKey(contentHash); - - public Task GetAsync(string contentHash, CancellationToken ct) => - Task.FromResult(null); // Not used - enrich processor handles cache hits + public bool Exists(string enrichmentKey) => _existingHashes.ContainsKey(enrichmentKey); - public async Task StoreAsync(string contentHash, EnrichmentData data, int promptVersion, CancellationToken ct) + public async Task StoreAsync(string enrichmentKey, EnrichmentData data, CancellationToken ct) { + var promptHash = ElasticsearchLlmClient.PromptHash; var cacheEntry = new CacheIndexEntry { - ContentHash = contentHash, + EnrichmentKey = enrichmentKey, AiRagOptimizedSummary = data.RagOptimizedSummary, AiShortSummary = data.ShortSummary, AiSearchQuery = data.SearchQuery, AiQuestions = data.Questions, AiUseCases = data.UseCases, CreatedAt = DateTimeOffset.UtcNow, - PromptVersion = promptVersion + PromptHash = promptHash }; var body = JsonSerializer.Serialize(cacheEntry, EnrichmentSerializerContext.Default.CacheIndexEntry); var response = await _transport.PutAsync( - $"{IndexName}/_doc/{contentHash}", + $"{IndexName}/_doc/{enrichmentKey}", PostData.String(body), ct); if (response.ApiCallDetails.HasSuccessfulStatusCode) - _ = _existingHashes.TryAdd(contentHash, 0); + _ = _existingHashes.TryAdd(enrichmentKey, 0); else _logger.LogWarning("Failed to store enrichment: {StatusCode}", response.ApiCallDetails.HttpStatusCode); } @@ -165,7 +163,7 @@ private async Task LoadExistingHashesAsync(CancellationToken ct) var count = 0; foreach (var hit in hitsArray.EnumerateArray()) { - // Use _id as the hash (we store content_hash as both _id and field) + // Use _id as the enrichment key if (hit.TryGetProperty("_id", out var idProp) && idProp.GetString() is { } id) { _ = _existingHashes.TryAdd(id, 0); @@ -182,8 +180,8 @@ private async Task LoadExistingHashesAsync(CancellationToken ct) /// public sealed record CacheIndexEntry { - [JsonPropertyName("content_hash")] - public required string ContentHash { get; init; } + [JsonPropertyName("enrichment_key")] + public required string EnrichmentKey { get; init; } [JsonPropertyName("ai_rag_optimized_summary")] public string? AiRagOptimizedSummary { get; init; } @@ -203,6 +201,6 @@ public sealed record CacheIndexEntry [JsonPropertyName("created_at")] public required DateTimeOffset CreatedAt { get; init; } - [JsonPropertyName("prompt_version")] - public required int PromptVersion { get; init; } + [JsonPropertyName("prompt_hash")] + public required string PromptHash { get; init; } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs index 8b74b6561..059f400d3 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs @@ -2,6 +2,8 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System.Security.Cryptography; +using System.Text; using System.Text.Json; using System.Text.Json.Serialization; using Elastic.Transport; @@ -27,6 +29,18 @@ public sealed class ElasticsearchLlmClient( private readonly string _inferenceEndpointId = inferenceEndpointId; private readonly int _maxRetries = maxRetries; + private static readonly Lazy PromptHashLazy = new(() => + { + var prompt = BuildPrompt("", ""); + var hash = SHA256.HashData(Encoding.UTF8.GetBytes(prompt)); + return Convert.ToHexString(hash).ToLowerInvariant(); + }); + + /// + /// Hash of the prompt template. Changes when the prompt changes, triggering cache invalidation. + /// + public static string PromptHash => PromptHashLazy.Value; + public async Task EnrichAsync(string title, string body, CancellationToken ct) { await _throttle.WaitAsync(ct); diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ContentHashGenerator.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentKeyGenerator.cs similarity index 65% rename from src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ContentHashGenerator.cs rename to src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentKeyGenerator.cs index 9c4cab928..48735b2e8 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ContentHashGenerator.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentKeyGenerator.cs @@ -9,14 +9,17 @@ namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; /// -/// Generates content-addressable hashes for AI enrichment cache lookups. +/// Generates enrichment keys for AI enrichment cache lookups. +/// The key includes the prompt hash so that prompt changes trigger automatic cache invalidation. /// -public static partial class ContentHashGenerator +public static partial class EnrichmentKeyGenerator { public static string Generate(string title, string body) { var normalized = NormalizeRegex().Replace(title + body, "").ToLowerInvariant(); - var hash = SHA256.HashData(Encoding.UTF8.GetBytes(normalized)); + var promptHash = ElasticsearchLlmClient.PromptHash; + var input = normalized + promptHash; + var hash = SHA256.HashData(Encoding.UTF8.GetBytes(input)); return Convert.ToHexString(hash).ToLowerInvariant(); } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs index 286d5819d..c3550e229 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs @@ -24,12 +24,6 @@ public sealed record EnrichmentOptions /// public int MaxConcurrentLlmCalls { get; init; } = 4; - /// - /// Version number for cache entries. Bump to trigger gradual re-enrichment. - /// Using int allows future range queries (e.g., re-enrich all entries below version 5). - /// - public int PromptVersion { get; init; } = 1; - /// /// Creates options with enrichment disabled. /// diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs index 7765044ed..c46245a33 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs @@ -4,11 +4,6 @@ namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; -/// -/// Represents cached enrichment data with its version metadata. -/// -public sealed record CachedEnrichmentEntry(EnrichmentData Data, int PromptVersion); - /// /// Abstraction for enrichment cache operations. /// With the enrich processor pattern, the cache stores enrichment data that @@ -27,20 +22,14 @@ public interface IEnrichmentCache Task InitializeAsync(CancellationToken ct); /// - /// Checks if an enrichment exists for the given content hash. - /// - bool Exists(string contentHash); - - /// - /// Fetches enrichment data from the cache by content hash. - /// Returns null if not found. + /// Checks if an enrichment exists for the given enrichment key. /// - Task GetAsync(string contentHash, CancellationToken ct); + bool Exists(string enrichmentKey); /// /// Stores enrichment data in the cache. /// - Task StoreAsync(string contentHash, EnrichmentData data, int promptVersion, CancellationToken ct); + Task StoreAsync(string enrichmentKey, EnrichmentData data, CancellationToken ct); /// /// Gets the number of entries currently in the cache. From 786afd8ee1c53b12caca353cc45fa08be738eb06 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 23 Dec 2025 01:19:01 +0100 Subject: [PATCH 06/12] Potential fix for pull request finding 'Missed opportunity to use Where' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> --- .../Enrichment/ElasticsearchEnrichmentCache.cs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs index 9247048d8..eadf82893 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs @@ -5,6 +5,7 @@ using System.Collections.Concurrent; using System.Text.Json; using System.Text.Json.Serialization; +using System.Linq; using Elastic.Transport; using Elastic.Transport.Products.Elasticsearch; using Microsoft.Extensions.Logging; @@ -161,14 +162,16 @@ private async Task LoadExistingHashesAsync(CancellationToken ct) return (0, scrollId); var count = 0; - foreach (var hit in hitsArray.EnumerateArray()) + var ids = hitsArray + .EnumerateArray() + .Select(hit => hit.TryGetProperty("_id", out var idProp) ? idProp.GetString() : null) + .Where(id => id is not null)!; + + foreach (var id in ids) { // Use _id as the enrichment key - if (hit.TryGetProperty("_id", out var idProp) && idProp.GetString() is { } id) - { - _ = _existingHashes.TryAdd(id, 0); - count++; - } + _ = _existingHashes.TryAdd(id, 0); + count++; } return (count, scrollId); } From 2c34f4ca76e1dc8036c5944f52f42e28ccc97e0c Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 23 Dec 2025 09:54:48 +0100 Subject: [PATCH 07/12] Fix staleness logic --- .../ElasticsearchIngestChannel.Mapping.cs | 2 +- .../ElasticsearchMarkdownExporter.Export.cs | 14 ++-- .../ElasticsearchMarkdownExporter.cs | 10 +-- .../ElasticsearchEnrichmentCache.cs | 83 +++++++++++++------ .../Enrichment/EnrichPolicyManager.cs | 4 +- .../Enrichment/EnrichmentKeyGenerator.cs | 9 +- .../Enrichment/EnrichmentOptions.cs | 3 +- 7 files changed, 82 insertions(+), 43 deletions(-) diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs index d59770e37..124ae5122 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs @@ -158,7 +158,7 @@ protected static string CreateMapping(string? inferenceId) => } }, "hash" : { "type" : "keyword" }, - "content_hash" : { "type" : "keyword" }, + "enrichment_key" : { "type" : "keyword" }, "search_title": { "type": "text", "analyzer": "synonyms_fixed_analyzer", diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs index 6ffc6834d..a408987a6 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs @@ -206,13 +206,15 @@ public async ValueTask FinishExportAsync(IDirectoryInfo outputFolder, Canc /// /// Hybrid AI enrichment: cache hits rely on enrich processor, cache misses apply fields inline. + /// Stale entries (with old prompt hash) are treated as non-existent and will be regenerated. /// private async ValueTask TryEnrichDocumentAsync(DocumentationDocument doc, Cancel ctx) { if (_enrichmentCache is null || _llmClient is null || string.IsNullOrWhiteSpace(doc.EnrichmentKey)) return; - // Check if enrichment exists in cache + // Check if valid enrichment exists in cache (current prompt hash) + // Stale entries are treated as non-existent and will be regenerated if (_enrichmentCache.Exists(doc.EnrichmentKey)) { // Cache hit - enrich processor will apply fields at index time @@ -220,15 +222,15 @@ private async ValueTask TryEnrichDocumentAsync(DocumentationDocument doc, Cancel return; } - // Check if we've hit the limit for new enrichments - var current = Interlocked.Increment(ref _newEnrichmentCount); + // Check if we've hit the limit for enrichments + var current = Interlocked.Increment(ref _enrichmentCount); if (current > _enrichmentOptions.MaxNewEnrichmentsPerRun) { - _ = Interlocked.Decrement(ref _newEnrichmentCount); + _ = Interlocked.Decrement(ref _enrichmentCount); return; } - // Cache miss - generate enrichment inline and apply directly + // Cache miss (or stale) - generate enrichment inline and apply directly try { var enrichment = await _llmClient.EnrichAsync(doc.Title, doc.StrippedBody ?? string.Empty, ctx); @@ -248,7 +250,7 @@ private async ValueTask TryEnrichDocumentAsync(DocumentationDocument doc, Cancel catch (Exception ex) when (ex is not OperationCanceledException) { _logger.LogWarning(ex, "Failed to enrich document {Url}", doc.Url); - _ = Interlocked.Decrement(ref _newEnrichmentCount); + _ = Interlocked.Decrement(ref _enrichmentCount); } } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs index f974b016b..58b5bae16 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs @@ -48,7 +48,7 @@ public partial class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposa private readonly ElasticsearchLlmClient? _llmClient; private readonly EnrichPolicyManager? _enrichPolicyManager; private readonly EnrichmentOptions _enrichmentOptions = new(); - private int _newEnrichmentCount; + private int _enrichmentCount; private int _cacheHitCount; public ElasticsearchMarkdownExporter( @@ -354,8 +354,8 @@ private async ValueTask ExecuteEnrichPolicyIfNeededAsync(Cancel ctx) return; _logger.LogInformation( - "AI enrichment complete: {CacheHits} cache hits, {NewEnrichments} new enrichments (limit: {Limit})", - _cacheHitCount, _newEnrichmentCount, _enrichmentOptions.MaxNewEnrichmentsPerRun); + "AI enrichment complete: {CacheHits} cache hits, {Enrichments} enrichments generated (limit: {Limit})", + _cacheHitCount, _enrichmentCount, _enrichmentOptions.MaxNewEnrichmentsPerRun); if (_enrichmentCache.Count > 0) { @@ -385,12 +385,12 @@ private async ValueTask BackfillMissingAiFieldsAsync(Cancel ctx) "Starting AI backfill for documents missing AI fields (cache has {CacheCount} entries)", _enrichmentCache.Count); - // Find documents with content_hash but missing AI fields - these need the pipeline applied + // Find documents with enrichment_key but missing AI fields - these need the pipeline applied var query = /*lang=json,strict*/ """ { "query": { "bool": { - "must": { "exists": { "field": "content_hash" } }, + "must": { "exists": { "field": "enrichment_key" } }, "must_not": { "exists": { "field": "ai_questions" } } } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs index eadf82893..de56497c3 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs @@ -15,6 +15,7 @@ namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; /// /// Elasticsearch-backed enrichment cache for use with the enrich processor. /// Stores AI-generated enrichment fields directly (not as JSON string) for efficient lookups. +/// Only entries with the current prompt hash are considered valid - stale entries are treated as non-existent. /// public sealed class ElasticsearchEnrichmentCache( DistributedTransport transport, @@ -23,7 +24,9 @@ public sealed class ElasticsearchEnrichmentCache( { private readonly DistributedTransport _transport = transport; private readonly ILogger _logger = logger; - private readonly ConcurrentDictionary _existingHashes = new(); + + // Only contains entries with current prompt hash - stale entries are excluded + private readonly ConcurrentDictionary _validEntries = new(); public string IndexName { get; } = indexName; @@ -46,7 +49,15 @@ public sealed class ElasticsearchEnrichmentCache( } """; - public int Count => _existingHashes.Count; + /// + /// Number of valid cache entries (with current prompt hash). + /// + public int Count => _validEntries.Count; + + /// + /// Number of stale entries found during initialization (for logging only). + /// + public int StaleCount { get; private set; } public async Task InitializeAsync(CancellationToken ct) { @@ -54,7 +65,11 @@ public async Task InitializeAsync(CancellationToken ct) await LoadExistingHashesAsync(ct); } - public bool Exists(string enrichmentKey) => _existingHashes.ContainsKey(enrichmentKey); + /// + /// Checks if a valid enrichment exists in the cache (with current prompt hash). + /// Stale entries are treated as non-existent and will be regenerated. + /// + public bool Exists(string enrichmentKey) => _validEntries.ContainsKey(enrichmentKey); public async Task StoreAsync(string enrichmentKey, EnrichmentData data, CancellationToken ct) { @@ -78,7 +93,7 @@ public async Task StoreAsync(string enrichmentKey, EnrichmentData data, Cancella ct); if (response.ApiCallDetails.HasSuccessfulStatusCode) - _ = _existingHashes.TryAdd(enrichmentKey, 0); + _ = _validEntries.TryAdd(enrichmentKey, 0); else _logger.LogWarning("Failed to store enrichment: {StatusCode}", response.ApiCallDetails.HttpStatusCode); } @@ -111,9 +126,12 @@ private async Task EnsureIndexExistsAsync(CancellationToken ct) private async Task LoadExistingHashesAsync(CancellationToken ct) { var sw = System.Diagnostics.Stopwatch.StartNew(); + var currentPromptHash = ElasticsearchLlmClient.PromptHash; + var staleCount = 0; + var totalCount = 0; - // Only fetch _id to minimize memory - we use _id as the hash - var scrollQuery = /*lang=json,strict*/ """{"size": 1000, "_source": false, "query": {"match_all": {}}}"""; + // Fetch _id and prompt_hash to determine validity + var scrollQuery = /*lang=json,strict*/ """{"size": 1000, "_source": ["prompt_hash"], "query": {"match_all": {}}}"""; var searchResponse = await _transport.PostAsync( $"{IndexName}/_search?scroll=1m", @@ -126,9 +144,11 @@ private async Task LoadExistingHashesAsync(CancellationToken ct) return; } - var (count, scrollId) = ProcessHashHits(searchResponse.Body); + var (batchTotal, batchStale, scrollId) = ProcessHashHits(searchResponse.Body, currentPromptHash); + totalCount += batchTotal; + staleCount += batchStale; - while (scrollId is not null && count > 0) + while (scrollId is not null && batchTotal > 0) { var scrollBody = $$"""{"scroll": "1m", "scroll_id": "{{scrollId}}"}"""; var scrollResponse = await _transport.PostAsync( @@ -139,17 +159,21 @@ private async Task LoadExistingHashesAsync(CancellationToken ct) if (!scrollResponse.ApiCallDetails.HasSuccessfulStatusCode) break; - (count, scrollId) = ProcessHashHits(scrollResponse.Body); + (batchTotal, batchStale, scrollId) = ProcessHashHits(scrollResponse.Body, currentPromptHash); + totalCount += batchTotal; + staleCount += batchStale; } - _logger.LogInformation("Loaded {Count} existing enrichment hashes in {ElapsedMs}ms", - _existingHashes.Count, sw.ElapsedMilliseconds); + StaleCount = staleCount; + _logger.LogInformation( + "Loaded {Total} enrichment cache entries: {Valid} valid (current prompt), {Stale} stale (will be refreshed) in {ElapsedMs}ms", + totalCount, _validEntries.Count, staleCount, sw.ElapsedMilliseconds); } - private (int count, string? scrollId) ProcessHashHits(string? responseBody) + private (int total, int stale, string? scrollId) ProcessHashHits(string? responseBody, string currentPromptHash) { if (string.IsNullOrEmpty(responseBody)) - return (0, null); + return (0, 0, null); using var doc = JsonDocument.Parse(responseBody); @@ -159,21 +183,30 @@ private async Task LoadExistingHashesAsync(CancellationToken ct) if (!doc.RootElement.TryGetProperty("hits", out var hitsObj) || !hitsObj.TryGetProperty("hits", out var hitsArray)) - return (0, scrollId); + return (0, 0, scrollId); - var count = 0; - var ids = hitsArray - .EnumerateArray() - .Select(hit => hit.TryGetProperty("_id", out var idProp) ? idProp.GetString() : null) - .Where(id => id is not null)!; - - foreach (var id in ids) + var total = 0; + var stale = 0; + foreach (var hit in hitsArray.EnumerateArray()) { - // Use _id as the enrichment key - _ = _existingHashes.TryAdd(id, 0); - count++; + if (hit.TryGetProperty("_id", out var idProp) && idProp.GetString() is { } id) + { + total++; + + // Only add entries with current prompt hash - stale entries are ignored + if (hit.TryGetProperty("_source", out var source) && + source.TryGetProperty("prompt_hash", out var promptHashProp) && + promptHashProp.GetString() == currentPromptHash) + { + _ = _validEntries.TryAdd(id, 0); + } + else + { + stale++; + } + } } - return (count, scrollId); + return (total, stale, scrollId); } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichPolicyManager.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichPolicyManager.cs index e0ae97500..7683daad0 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichPolicyManager.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichPolicyManager.cs @@ -32,7 +32,7 @@ public sealed class EnrichPolicyManager( { "enrich": { "policy_name": "ai-enrichment-policy", - "field": "content_hash", + "field": "enrichment_key", "target_field": "ai_enrichment", "max_matches": 1, "ignore_missing": true @@ -71,7 +71,7 @@ public async Task EnsurePolicyExistsAsync(CancellationToken ct) { "match": { "indices": "{{_cacheIndexName}}", - "match_field": "content_hash", + "match_field": "enrichment_key", "enrich_fields": ["ai_rag_optimized_summary", "ai_short_summary", "ai_search_query", "ai_questions", "ai_use_cases"] } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentKeyGenerator.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentKeyGenerator.cs index 48735b2e8..a22bbb44f 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentKeyGenerator.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentKeyGenerator.cs @@ -14,12 +14,15 @@ namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; /// public static partial class EnrichmentKeyGenerator { + /// + /// Generates a content-based enrichment key (without prompt hash). + /// This allows stale enrichments from old prompts to still be applied when the prompt changes. + /// New enrichments will gradually replace stale ones as they're generated. + /// public static string Generate(string title, string body) { var normalized = NormalizeRegex().Replace(title + body, "").ToLowerInvariant(); - var promptHash = ElasticsearchLlmClient.PromptHash; - var input = normalized + promptHash; - var hash = SHA256.HashData(Encoding.UTF8.GetBytes(input)); + var hash = SHA256.HashData(Encoding.UTF8.GetBytes(normalized)); return Convert.ToHexString(hash).ToLowerInvariant(); } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs index c3550e229..c5e433dc3 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs @@ -15,7 +15,8 @@ public sealed record EnrichmentOptions public bool Enabled { get; init; } /// - /// Maximum new enrichments per run. Limits LLM calls to prevent long deployments. + /// Maximum enrichments per run (new + stale refresh). Limits LLM calls to prevent long deployments. + /// Stale entries (with old prompt hash) are treated as non-existent and count toward this limit. /// public int MaxNewEnrichmentsPerRun { get; init; } = 100; From 8edb9da77e4d364942898ee5f09b417eaf83bb99 Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 23 Dec 2025 10:00:37 +0100 Subject: [PATCH 08/12] Fix formatting --- .../Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs index de56497c3..b668e44ee 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs @@ -3,9 +3,9 @@ // See the LICENSE file in the project root for more information using System.Collections.Concurrent; +using System.Linq; using System.Text.Json; using System.Text.Json.Serialization; -using System.Linq; using Elastic.Transport; using Elastic.Transport.Products.Elasticsearch; using Microsoft.Extensions.Logging; From 9ab12bc519008189a8917e11b0440e65fa90875d Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 23 Dec 2025 10:06:49 +0100 Subject: [PATCH 09/12] Potential fix for pull request finding 'Missed opportunity to use Where' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> --- .../ElasticsearchEnrichmentCache.cs | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs index b668e44ee..240abf74d 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs @@ -187,23 +187,29 @@ private async Task LoadExistingHashesAsync(CancellationToken ct) var total = 0; var stale = 0; - foreach (var hit in hitsArray.EnumerateArray()) + foreach (var entry in hitsArray + .EnumerateArray() + .Select(hit => new + { + Hit = hit, + Id = hit.TryGetProperty("_id", out var idProp) ? idProp.GetString() : null + }) + .Where(e => e.Id is not null)) { - if (hit.TryGetProperty("_id", out var idProp) && idProp.GetString() is { } id) + var hit = entry.Hit; + var id = entry.Id!; + total++; + + // Only add entries with current prompt hash - stale entries are ignored + if (hit.TryGetProperty("_source", out var source) && + source.TryGetProperty("prompt_hash", out var promptHashProp) && + promptHashProp.GetString() == currentPromptHash) + { + _ = _validEntries.TryAdd(id, 0); + } + else { - total++; - - // Only add entries with current prompt hash - stale entries are ignored - if (hit.TryGetProperty("_source", out var source) && - source.TryGetProperty("prompt_hash", out var promptHashProp) && - promptHashProp.GetString() == currentPromptHash) - { - _ = _validEntries.TryAdd(id, 0); - } - else - { - stale++; - } + stale++; } } return (total, stale, scrollId); From 95851e5762ad54e08df8ecf67e4cc603102ef66a Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 23 Dec 2025 11:13:10 +0100 Subject: [PATCH 10/12] Reduce 429s --- .../DocumentationEndpoints.cs | 6 +- .../ElasticsearchMarkdownExporter.cs | 108 +++++++++++++----- 2 files changed, 84 insertions(+), 30 deletions(-) diff --git a/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs b/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs index af34da7ec..367fe844b 100644 --- a/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs +++ b/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs @@ -22,15 +22,15 @@ public class ElasticsearchEndpoint // inference options public int SearchNumThreads { get; set; } = 8; - public int IndexNumThreads { get; set; } = 8; + public int IndexNumThreads { get; set; } = 4; // Reduced for Serverless rate limits public bool NoElasticInferenceService { get; set; } // index options public string IndexNamePrefix { get; set; } = "semantic-docs"; // channel buffer options - public int BufferSize { get; set; } = 100; - public int MaxRetries { get; set; } = 3; + public int BufferSize { get; set; } = 50; // Reduced for Serverless rate limits + public int MaxRetries { get; set; } = 5; // Increased for 429 retries // connection options public bool DebugMode { get; set; } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs index 58b5bae16..b976e04ee 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs @@ -97,6 +97,41 @@ IDocumentationConfigurationContext context } } + private const int MaxRetries = 5; + + /// + /// Executes an Elasticsearch API call with exponential backoff retry on 429 (rate limit) errors. + /// + private async Task WithRetryAsync( + Func> apiCall, + string operationName, + Cancel ctx) where TResponse : TransportResponse + { + for (var attempt = 0; attempt <= MaxRetries; attempt++) + { + var response = await apiCall(); + + if (response.ApiCallDetails.HasSuccessfulStatusCode) + return response; + + if (response.ApiCallDetails.HttpStatusCode == 429 && attempt < MaxRetries) + { + var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt)); // 1s, 2s, 4s, 8s, 16s + _logger.LogWarning( + "Rate limited (429) on {Operation}, retrying in {Delay}s (attempt {Attempt}/{MaxRetries})", + operationName, delay.TotalSeconds, attempt + 1, MaxRetries); + await Task.Delay(delay, ctx); + continue; + } + + // Not a 429 or exhausted retries - return the response for caller to handle + return response; + } + + // Should never reach here, but satisfy compiler + return await apiCall(); + } + /// public async ValueTask StartAsync(Cancel ctx = default) { @@ -187,7 +222,10 @@ private async Task PutSynonyms(SynonymsSet synonymsSet, string setName, Cancel c { var json = JsonSerializer.Serialize(synonymsSet, SynonymSerializerContext.Default.SynonymsSet); - var response = await _transport.PutAsync($"_synonyms/{setName}", PostData.String(json), ctx); + var response = await WithRetryAsync( + () => _transport.PutAsync($"_synonyms/{setName}", PostData.String(json), ctx), + $"PUT _synonyms/{setName}", + ctx); if (!response.ApiCallDetails.HasSuccessfulStatusCode) _collector.EmitGlobalError($"Failed to publish synonym set '{setName}'. Reason: {response.ApiCallDetails.OriginalException?.Message ?? response.ToString()}"); @@ -227,7 +265,10 @@ private async Task PutQueryRuleset(QueryRuleset ruleset, string rulesetName, Can { var json = JsonSerializer.Serialize(ruleset, QueryRulesetSerializerContext.Default.QueryRuleset); - var response = await _transport.PutAsync($"_query_rules/{rulesetName}", PostData.String(json), ctx); + var response = await WithRetryAsync( + () => _transport.PutAsync($"_query_rules/{rulesetName}", PostData.String(json), ctx), + $"PUT _query_rules/{rulesetName}", + ctx); if (!response.ApiCallDetails.HasSuccessfulStatusCode) _collector.EmitGlobalError($"Failed to publish query ruleset '{rulesetName}'. Reason: {response.ApiCallDetails.OriginalException?.Message ?? response.ToString()}"); @@ -237,7 +278,10 @@ private async Task PutQueryRuleset(QueryRuleset ruleset, string rulesetName, Can private async ValueTask CountAsync(string index, string body, Cancel ctx = default) { - var countResponse = await _transport.PostAsync($"/{index}/_count", PostData.String(body), ctx); + var countResponse = await WithRetryAsync( + () => _transport.PostAsync($"/{index}/_count", PostData.String(body), ctx), + $"POST {index}/_count", + ctx); return countResponse.Body.Get("count"); } @@ -405,7 +449,10 @@ private async ValueTask RunBackfillQuery(string indexAlias, string query, Cancel var pipeline = EnrichPolicyManager.PipelineName; var url = $"/{indexAlias}/_update_by_query?pipeline={pipeline}&timeout=10m"; - var response = await _transport.PostAsync(url, PostData.String(query), ctx); + var response = await WithRetryAsync( + () => _transport.PostAsync(url, PostData.String(query), ctx), + $"POST {indexAlias}/_update_by_query", + ctx); if (!response.ApiCallDetails.HasSuccessfulStatusCode) { @@ -458,7 +505,10 @@ private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx) } }"); var reindexUrl = $"/{lexicalWriteAlias}/_delete_by_query?wait_for_completion=false"; - var deleteOldLexicalDocs = await _transport.PostAsync(reindexUrl, request, ctx); + var deleteOldLexicalDocs = await WithRetryAsync( + () => _transport.PostAsync(reindexUrl, request, ctx), + $"POST {lexicalWriteAlias}/_delete_by_query", + ctx); var taskId = deleteOldLexicalDocs.Body.Get("task"); if (string.IsNullOrWhiteSpace(taskId)) { @@ -467,10 +517,18 @@ private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx) return; } _logger.LogInformation("_delete_by_query task id: {TaskId}", taskId); + await PollTaskUntilComplete(taskId, "_delete_by_query", lexicalWriteAlias, null, ctx); + } + + private async ValueTask PollTaskUntilComplete(string taskId, string operation, string sourceIndex, string? destIndex, Cancel ctx) + { bool completed; do { - var reindexTask = await _transport.GetAsync($"/_tasks/{taskId}", ctx); + var reindexTask = await WithRetryAsync( + () => _transport.GetAsync($"/_tasks/{taskId}", ctx), + $"GET _tasks/{taskId}", + ctx); completed = reindexTask.Body.Get("completed"); var total = reindexTask.Body.Get("task.status.total"); var updated = reindexTask.Body.Get("task.status.updated"); @@ -479,8 +537,18 @@ private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx) var batches = reindexTask.Body.Get("task.status.batches"); var runningTimeInNanos = reindexTask.Body.Get("task.running_time_in_nanos"); var time = TimeSpan.FromMicroseconds(runningTimeInNanos / 1000); - _logger.LogInformation("_delete_by_query '{SourceIndex}': {RunningTimeInNanos} Documents {Total}: {Updated} updated, {Created} created, {Deleted} deleted, {Batches} batches", - lexicalWriteAlias, time.ToString(@"hh\:mm\:ss"), total, updated, created, deleted, batches); + + if (destIndex is not null) + { + _logger.LogInformation("{Operation}: {Time} '{SourceIndex}' => '{DestIndex}'. Documents {Total}: {Updated} updated, {Created} created, {Deleted} deleted, {Batches} batches", + operation, time.ToString(@"hh\:mm\:ss"), sourceIndex, destIndex, total, updated, created, deleted, batches); + } + else + { + _logger.LogInformation("{Operation} '{SourceIndex}': {Time} Documents {Total}: {Updated} updated, {Created} created, {Deleted} deleted, {Batches} batches", + operation, sourceIndex, time.ToString(@"hh\:mm\:ss"), total, updated, created, deleted, batches); + } + if (!completed) await Task.Delay(TimeSpan.FromSeconds(5), ctx); @@ -490,7 +558,10 @@ private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx) private async ValueTask DoReindex(PostData request, string lexicalWriteAlias, string semanticWriteAlias, string typeOfSync, Cancel ctx) { var reindexUrl = "/_reindex?wait_for_completion=false&scroll=10m"; - var reindexNewChanges = await _transport.PostAsync(reindexUrl, request, ctx); + var reindexNewChanges = await WithRetryAsync( + () => _transport.PostAsync(reindexUrl, request, ctx), + $"POST _reindex ({typeOfSync})", + ctx); var taskId = reindexNewChanges.Body.Get("task"); if (string.IsNullOrWhiteSpace(taskId)) { @@ -499,24 +570,7 @@ private async ValueTask DoReindex(PostData request, string lexicalWriteAlias, st return; } _logger.LogInformation("_reindex {Type} task id: {TaskId}", typeOfSync, taskId); - bool completed; - do - { - var reindexTask = await _transport.GetAsync($"/_tasks/{taskId}", ctx); - completed = reindexTask.Body.Get("completed"); - var total = reindexTask.Body.Get("task.status.total"); - var updated = reindexTask.Body.Get("task.status.updated"); - var created = reindexTask.Body.Get("task.status.created"); - var deleted = reindexTask.Body.Get("task.status.deleted"); - var batches = reindexTask.Body.Get("task.status.batches"); - var runningTimeInNanos = reindexTask.Body.Get("task.running_time_in_nanos"); - var time = TimeSpan.FromMicroseconds(runningTimeInNanos / 1000); - _logger.LogInformation("_reindex {Type}: {RunningTimeInNanos} '{SourceIndex}' => '{DestinationIndex}'. Documents {Total}: {Updated} updated, {Created} created, {Deleted} deleted, {Batches} batches", - typeOfSync, time.ToString(@"hh\:mm\:ss"), lexicalWriteAlias, semanticWriteAlias, total, updated, created, deleted, batches); - if (!completed) - await Task.Delay(TimeSpan.FromSeconds(5), ctx); - - } while (!completed); + await PollTaskUntilComplete(taskId, $"_reindex {typeOfSync}", lexicalWriteAlias, semanticWriteAlias, ctx); } /// From 9f7ddbba68b9f5930867b2515d470a233effecad Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 23 Dec 2025 13:26:26 +0100 Subject: [PATCH 11/12] Run _update_by_query in the background --- .../Elasticsearch/ElasticsearchMarkdownExporter.cs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs index b976e04ee..3bb699698 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs @@ -447,23 +447,22 @@ private async ValueTask BackfillMissingAiFieldsAsync(Cancel ctx) private async ValueTask RunBackfillQuery(string indexAlias, string query, Cancel ctx) { var pipeline = EnrichPolicyManager.PipelineName; - var url = $"/{indexAlias}/_update_by_query?pipeline={pipeline}&timeout=10m"; + var url = $"/{indexAlias}/_update_by_query?pipeline={pipeline}&wait_for_completion=false"; var response = await WithRetryAsync( () => _transport.PostAsync(url, PostData.String(query), ctx), $"POST {indexAlias}/_update_by_query", ctx); - if (!response.ApiCallDetails.HasSuccessfulStatusCode) + var taskId = response.Body.Get("task"); + if (string.IsNullOrWhiteSpace(taskId)) { _logger.LogWarning("AI backfill failed for {Index}: {Response}", indexAlias, response); return; } - var updated = response.Body.Get("updated"); - _logger.LogInformation( - "AI backfill complete for {Index}: {Updated} documents processed (only those with matching cache entries get AI fields)", - indexAlias, updated); + _logger.LogInformation("AI backfill task id: {TaskId}", taskId); + await PollTaskUntilComplete(taskId, "_update_by_query (AI backfill)", indexAlias, null, ctx); } private async ValueTask QueryIngestStatistics(string lexicalWriteAlias, Cancel ctx) From c61fcdb279b18b12a6254a689fb28da40431c38d Mon Sep 17 00:00:00 2001 From: Jan Calanog Date: Tue, 23 Dec 2025 13:31:24 +0100 Subject: [PATCH 12/12] Add URL to cache entry --- .../ElasticsearchMarkdownExporter.Export.cs | 2 +- .../Enrichment/ElasticsearchEnrichmentCache.cs | 10 +++++++++- .../Elasticsearch/Enrichment/IEnrichmentCache.cs | 6 +++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs index a408987a6..b1a28f003 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs @@ -238,7 +238,7 @@ private async ValueTask TryEnrichDocumentAsync(DocumentationDocument doc, Cancel return; // Store in cache for future runs - await _enrichmentCache.StoreAsync(doc.EnrichmentKey, enrichment, ctx); + await _enrichmentCache.StoreAsync(doc.EnrichmentKey, doc.Url, enrichment, ctx); // Apply fields directly (enrich processor won't have this entry yet) doc.AiRagOptimizedSummary = enrichment.RagOptimizedSummary; diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs index 240abf74d..e1978e136 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs @@ -37,6 +37,7 @@ public sealed class ElasticsearchEnrichmentCache( "mappings": { "properties": { "enrichment_key": { "type": "keyword" }, + "url": { "type": "keyword" }, "ai_rag_optimized_summary": { "type": "text" }, "ai_short_summary": { "type": "text" }, "ai_search_query": { "type": "text" }, @@ -71,12 +72,13 @@ public async Task InitializeAsync(CancellationToken ct) /// public bool Exists(string enrichmentKey) => _validEntries.ContainsKey(enrichmentKey); - public async Task StoreAsync(string enrichmentKey, EnrichmentData data, CancellationToken ct) + public async Task StoreAsync(string enrichmentKey, string url, EnrichmentData data, CancellationToken ct) { var promptHash = ElasticsearchLlmClient.PromptHash; var cacheEntry = new CacheIndexEntry { EnrichmentKey = enrichmentKey, + Url = url, AiRagOptimizedSummary = data.RagOptimizedSummary, AiShortSummary = data.ShortSummary, AiSearchQuery = data.SearchQuery, @@ -225,6 +227,12 @@ public sealed record CacheIndexEntry [JsonPropertyName("enrichment_key")] public required string EnrichmentKey { get; init; } + /// + /// Document URL for debugging - helps identify which document this cache entry belongs to. + /// + [JsonPropertyName("url")] + public string? Url { get; init; } + [JsonPropertyName("ai_rag_optimized_summary")] public string? AiRagOptimizedSummary { get; init; } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs index c46245a33..a57a34f10 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs @@ -29,7 +29,11 @@ public interface IEnrichmentCache /// /// Stores enrichment data in the cache. /// - Task StoreAsync(string enrichmentKey, EnrichmentData data, CancellationToken ct); + /// The enrichment key (content hash). + /// The document URL for debugging. + /// The enrichment data to store. + /// Cancellation token. + Task StoreAsync(string enrichmentKey, string url, EnrichmentData data, CancellationToken ct); /// /// Gets the number of entries currently in the cache.