diff --git a/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs b/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs index 51a9797f0..367fe844b 100644 --- a/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs +++ b/src/Elastic.Documentation.Configuration/DocumentationEndpoints.cs @@ -22,15 +22,15 @@ public class ElasticsearchEndpoint // inference options public int SearchNumThreads { get; set; } = 8; - public int IndexNumThreads { get; set; } = 8; + public int IndexNumThreads { get; set; } = 4; // Reduced for Serverless rate limits public bool NoElasticInferenceService { get; set; } // index options public string IndexNamePrefix { get; set; } = "semantic-docs"; // channel buffer options - public int BufferSize { get; set; } = 100; - public int MaxRetries { get; set; } = 3; + public int BufferSize { get; set; } = 50; // Reduced for Serverless rate limits + public int MaxRetries { get; set; } = 5; // Increased for 429 retries // connection options public bool DebugMode { get; set; } @@ -45,4 +45,10 @@ public class ElasticsearchEndpoint public int? BootstrapTimeout { get; set; } public bool NoSemantic { get; set; } public bool ForceReindex { get; set; } + + /// + /// Enable AI enrichment of documents using LLM-generated metadata. + /// When enabled, documents are enriched with summaries, search queries, and questions. + /// + public bool EnableAiEnrichment { get; set; } } diff --git a/src/Elastic.Documentation/Search/DocumentationDocument.cs b/src/Elastic.Documentation/Search/DocumentationDocument.cs index c5d4d274e..60759ba33 100644 --- a/src/Elastic.Documentation/Search/DocumentationDocument.cs +++ b/src/Elastic.Documentation/Search/DocumentationDocument.cs @@ -83,4 +83,49 @@ public record DocumentationDocument [JsonPropertyName("hidden")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public bool Hidden { get; set; } + + // AI Enrichment fields - populated by DocumentEnrichmentService + + /// + /// Key for enrichment cache lookups. Derived from normalized content + prompt hash. + /// Used by enrich processor to join AI-generated fields at index time. + /// + [JsonPropertyName("enrichment_key")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? EnrichmentKey { get; set; } + + /// + /// 3-5 sentences dense with technical entities, API names, and core functionality for vector matching. + /// + [JsonPropertyName("ai_rag_optimized_summary")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? AiRagOptimizedSummary { get; set; } + + /// + /// Exactly 5-10 words for a UI tooltip. + /// + [JsonPropertyName("ai_short_summary")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? AiShortSummary { get; set; } + + /// + /// A 3-8 word keyword string representing a high-intent user search for this doc. + /// + [JsonPropertyName("ai_search_query")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string? AiSearchQuery { get; set; } + + /// + /// Array of 3-5 specific questions answered by this document. + /// + [JsonPropertyName("ai_questions")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string[]? AiQuestions { get; set; } + + /// + /// Array of 2-4 specific use cases this doc helps with. + /// + [JsonPropertyName("ai_use_cases")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string[]? AiUseCases { get; set; } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs index ced9c8fcd..124ae5122 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.Mapping.cs @@ -11,13 +11,15 @@ public abstract partial class ElasticsearchIngestChannel where TChannel : CatalogIndexChannel { - protected static string CreateMappingSetting(string synonymSetName, string[] synonyms) + protected static string CreateMappingSetting(string synonymSetName, string[] synonyms, string? defaultPipeline = null) { var indexTimeSynonyms = $"[{string.Join(",", synonyms.Select(r => $"\"{r}\""))}]"; + var pipelineSetting = defaultPipeline is not null ? $"\"default_pipeline\": \"{defaultPipeline}\"," : ""; // language=json return $$$""" { + {{{pipelineSetting}}} "analysis": { "normalizer": { "keyword_normalizer": { @@ -156,6 +158,7 @@ protected static string CreateMapping(string? inferenceId) => } }, "hash" : { "type" : "keyword" }, + "enrichment_key" : { "type" : "keyword" }, "search_title": { "type": "text", "analyzer": "synonyms_fixed_analyzer", @@ -201,6 +204,32 @@ protected static string CreateMapping(string? inferenceId) => "fields" : { {{(!string.IsNullOrWhiteSpace(inferenceId) ? $"\"semantic_text\": {{{InferenceMapping(inferenceId)}}}" : "")}} } + }, + "ai_rag_optimized_summary": { + "type": "text", + "analyzer": "synonyms_fixed_analyzer", + "search_analyzer": "synonyms_analyzer", + "fields": { + {{(!string.IsNullOrWhiteSpace(inferenceId) ? $"\"semantic_text\": {{{InferenceMapping(inferenceId)}}}" : "")}} + } + }, + "ai_short_summary": { + "type": "text" + }, + "ai_search_query": { + "type": "keyword" + }, + "ai_questions": { + "type": "text", + "fields": { + {{(!string.IsNullOrWhiteSpace(inferenceId) ? $"\"semantic_text\": {{{InferenceMapping(inferenceId)}}}" : "")}} + } + }, + "ai_use_cases": { + "type": "text", + "fields": { + {{(!string.IsNullOrWhiteSpace(inferenceId) ? $"\"semantic_text\": {{{InferenceMapping(inferenceId)}}}" : "")}} + } } } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.cs index 96fca4562..6ff857956 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchIngestChannel.cs @@ -21,7 +21,8 @@ public class ElasticsearchLexicalIngestChannel( ElasticsearchEndpoint endpoint, string indexNamespace, DistributedTransport transport, - string[] indexTimeSynonyms + string[] indexTimeSynonyms, + string? defaultPipeline = null ) : ElasticsearchIngestChannel, CatalogIndexChannel> (logFactory, collector, endpoint, transport, o => new(o), t => new(t) @@ -34,7 +35,7 @@ string[] indexTimeSynonyms { "batch_index_date", d.BatchIndexDate.ToString("o") } }), GetMapping = () => CreateMapping(null), - GetMappingSettings = () => CreateMappingSetting($"docs-{indexNamespace}", indexTimeSynonyms), + GetMappingSettings = () => CreateMappingSetting($"docs-{indexNamespace}", indexTimeSynonyms, defaultPipeline), IndexFormat = $"{endpoint.IndexNamePrefix.Replace("semantic", "lexical").ToLowerInvariant()}-{indexNamespace.ToLowerInvariant()}-{{0:yyyy.MM.dd.HHmmss}}", ActiveSearchAlias = $"{endpoint.IndexNamePrefix.Replace("semantic", "lexical").ToLowerInvariant()}-{indexNamespace.ToLowerInvariant()}" @@ -46,14 +47,15 @@ public class ElasticsearchSemanticIngestChannel( ElasticsearchEndpoint endpoint, string indexNamespace, DistributedTransport transport, - string[] indexTimeSynonyms + string[] indexTimeSynonyms, + string? defaultPipeline = null ) : ElasticsearchIngestChannel, SemanticIndexChannel> (logFactory, collector, endpoint, transport, o => new(o), t => new(t) { BulkOperationIdLookup = d => d.Url, GetMapping = (inferenceId, _) => CreateMapping(inferenceId), - GetMappingSettings = (_, _) => CreateMappingSetting($"docs-{indexNamespace}", indexTimeSynonyms), + GetMappingSettings = (_, _) => CreateMappingSetting($"docs-{indexNamespace}", indexTimeSynonyms, defaultPipeline), IndexFormat = $"{endpoint.IndexNamePrefix.ToLowerInvariant()}-{indexNamespace.ToLowerInvariant()}-{{0:yyyy.MM.dd.HHmmss}}", ActiveSearchAlias = $"{endpoint.IndexNamePrefix}-{indexNamespace.ToLowerInvariant()}", IndexNumThreads = endpoint.IndexNumThreads, diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs index b49efc4fc..b1a28f003 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.Export.cs @@ -8,6 +8,7 @@ using Elastic.Documentation.Navigation; using Elastic.Documentation.Search; using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Markdown.Exporters.Elasticsearch.Enrichment; using Elastic.Markdown.Helpers; using Markdig.Syntax; using Microsoft.Extensions.Logging; @@ -131,6 +132,13 @@ public async ValueTask ExportAsync(MarkdownExportFileContext fileContext, }; CommonEnrichments(doc, currentNavigation); + + // AI Enrichment - hybrid approach: + // - Cache hits: enrich processor applies fields at index time + // - Cache misses: apply fields inline before indexing + doc.EnrichmentKey = EnrichmentKeyGenerator.Generate(doc.Title, doc.StrippedBody ?? string.Empty); + await TryEnrichDocumentAsync(doc, ctx); + AssignDocumentMetadata(doc); if (_indexStrategy == IngestStrategy.Multiplex) @@ -166,6 +174,11 @@ public async ValueTask FinishExportAsync(IDirectoryInfo outputFolder, Canc doc.Abstract = @abstract; doc.Headings = headings; CommonEnrichments(doc, null); + + // AI Enrichment - hybrid approach + doc.EnrichmentKey = EnrichmentKeyGenerator.Generate(doc.Title, doc.StrippedBody ?? string.Empty); + await TryEnrichDocumentAsync(doc, ctx); + AssignDocumentMetadata(doc); // Write to channels following the multiplex or reindex strategy @@ -191,4 +204,53 @@ public async ValueTask FinishExportAsync(IDirectoryInfo outputFolder, Canc return true; } + /// + /// Hybrid AI enrichment: cache hits rely on enrich processor, cache misses apply fields inline. + /// Stale entries (with old prompt hash) are treated as non-existent and will be regenerated. + /// + private async ValueTask TryEnrichDocumentAsync(DocumentationDocument doc, Cancel ctx) + { + if (_enrichmentCache is null || _llmClient is null || string.IsNullOrWhiteSpace(doc.EnrichmentKey)) + return; + + // Check if valid enrichment exists in cache (current prompt hash) + // Stale entries are treated as non-existent and will be regenerated + if (_enrichmentCache.Exists(doc.EnrichmentKey)) + { + // Cache hit - enrich processor will apply fields at index time + _ = Interlocked.Increment(ref _cacheHitCount); + return; + } + + // Check if we've hit the limit for enrichments + var current = Interlocked.Increment(ref _enrichmentCount); + if (current > _enrichmentOptions.MaxNewEnrichmentsPerRun) + { + _ = Interlocked.Decrement(ref _enrichmentCount); + return; + } + + // Cache miss (or stale) - generate enrichment inline and apply directly + try + { + var enrichment = await _llmClient.EnrichAsync(doc.Title, doc.StrippedBody ?? string.Empty, ctx); + if (enrichment is not { HasData: true }) + return; + + // Store in cache for future runs + await _enrichmentCache.StoreAsync(doc.EnrichmentKey, doc.Url, enrichment, ctx); + + // Apply fields directly (enrich processor won't have this entry yet) + doc.AiRagOptimizedSummary = enrichment.RagOptimizedSummary; + doc.AiShortSummary = enrichment.ShortSummary; + doc.AiSearchQuery = enrichment.SearchQuery; + doc.AiQuestions = enrichment.Questions; + doc.AiUseCases = enrichment.UseCases; + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + _logger.LogWarning(ex, "Failed to enrich document {Url}", doc.Url); + _ = Interlocked.Decrement(ref _enrichmentCount); + } + } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs index 7076cb229..3bb699698 100644 --- a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchMarkdownExporter.cs @@ -10,6 +10,7 @@ using Elastic.Documentation.Diagnostics; using Elastic.Ingest.Elasticsearch; using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Markdown.Exporters.Elasticsearch.Enrichment; using Elastic.Transport; using Elastic.Transport.Products.Elasticsearch; using Microsoft.Extensions.Logging; @@ -42,6 +43,14 @@ public partial class ElasticsearchMarkdownExporter : IMarkdownExporter, IDisposa private readonly VersionsConfiguration _versionsConfiguration; private readonly string _fixedSynonymsHash; + // AI Enrichment - hybrid approach: cache hits use enrich processor, misses are applied inline + private readonly ElasticsearchEnrichmentCache? _enrichmentCache; + private readonly ElasticsearchLlmClient? _llmClient; + private readonly EnrichPolicyManager? _enrichPolicyManager; + private readonly EnrichmentOptions _enrichmentOptions = new(); + private int _enrichmentCount; + private int _cacheHitCount; + public ElasticsearchMarkdownExporter( ILoggerFactory logFactory, IDiagnosticsCollector collector, @@ -61,30 +70,7 @@ IDocumentationConfigurationContext context _rules = context.SearchConfiguration.Rules; var es = endpoints.Elasticsearch; - var configuration = new ElasticsearchConfiguration(es.Uri) - { - Authentication = es.ApiKey is { } apiKey - ? new ApiKey(apiKey) - : es is { Username: { } username, Password: { } password } - ? new BasicAuthentication(username, password) - : null, - EnableHttpCompression = true, - //DebugMode = _endpoint.DebugMode, - DebugMode = true, - CertificateFingerprint = _endpoint.CertificateFingerprint, - ProxyAddress = _endpoint.ProxyAddress, - ProxyPassword = _endpoint.ProxyPassword, - ProxyUsername = _endpoint.ProxyUsername, - ServerCertificateValidationCallback = _endpoint.DisableSslVerification - ? CertificateValidations.AllowAll - : _endpoint.Certificate is { } cert - ? _endpoint.CertificateIsNotRoot - ? CertificateValidations.AuthorityPartOfChain(cert) - : CertificateValidations.AuthorityIsRoot(cert) - : null - }; - - _transport = new DistributedTransport(configuration); + _transport = ElasticsearchTransportFactory.Create(es); string[] fixedSynonyms = ["esql", "data-stream", "data-streams", "machine-learning"]; var indexTimeSynonyms = _synonyms.Aggregate(new List(), (acc, synonym) => @@ -95,13 +81,76 @@ IDocumentationConfigurationContext context }).Where(r => fixedSynonyms.Contains(r.Id)).Select(r => r.Synonyms).ToArray(); _fixedSynonymsHash = HashedBulkUpdate.CreateHash(string.Join(",", indexTimeSynonyms)); - _lexicalChannel = new ElasticsearchLexicalIngestChannel(logFactory, collector, es, indexNamespace, _transport, indexTimeSynonyms); - _semanticChannel = new ElasticsearchSemanticIngestChannel(logFactory, collector, es, indexNamespace, _transport, indexTimeSynonyms); + // Use AI enrichment pipeline if enabled - hybrid approach: + // - Cache hits: enrich processor applies fields at index time + // - Cache misses: apply fields inline before indexing + var aiPipeline = es.EnableAiEnrichment ? EnrichPolicyManager.PipelineName : null; + _lexicalChannel = new ElasticsearchLexicalIngestChannel(logFactory, collector, es, indexNamespace, _transport, indexTimeSynonyms, aiPipeline); + _semanticChannel = new ElasticsearchSemanticIngestChannel(logFactory, collector, es, indexNamespace, _transport, indexTimeSynonyms, aiPipeline); + + // Initialize AI enrichment services if enabled + if (es.EnableAiEnrichment) + { + _enrichmentCache = new ElasticsearchEnrichmentCache(_transport, logFactory.CreateLogger()); + _llmClient = new ElasticsearchLlmClient(_transport, logFactory.CreateLogger()); + _enrichPolicyManager = new EnrichPolicyManager(_transport, logFactory.CreateLogger(), _enrichmentCache.IndexName); + } + } + + private const int MaxRetries = 5; + + /// + /// Executes an Elasticsearch API call with exponential backoff retry on 429 (rate limit) errors. + /// + private async Task WithRetryAsync( + Func> apiCall, + string operationName, + Cancel ctx) where TResponse : TransportResponse + { + for (var attempt = 0; attempt <= MaxRetries; attempt++) + { + var response = await apiCall(); + + if (response.ApiCallDetails.HasSuccessfulStatusCode) + return response; + + if (response.ApiCallDetails.HttpStatusCode == 429 && attempt < MaxRetries) + { + var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt)); // 1s, 2s, 4s, 8s, 16s + _logger.LogWarning( + "Rate limited (429) on {Operation}, retrying in {Delay}s (attempt {Attempt}/{MaxRetries})", + operationName, delay.TotalSeconds, attempt + 1, MaxRetries); + await Task.Delay(delay, ctx); + continue; + } + + // Not a 429 or exhausted retries - return the response for caller to handle + return response; + } + + // Should never reach here, but satisfy compiler + return await apiCall(); } /// public async ValueTask StartAsync(Cancel ctx = default) { + // Initialize AI enrichment cache (pre-loads existing hashes into memory) + if (_enrichmentCache is not null && _enrichPolicyManager is not null) + { + _logger.LogInformation("Initializing AI enrichment cache..."); + await _enrichmentCache.InitializeAsync(ctx); + _logger.LogInformation("AI enrichment cache ready with {Count} existing entries", _enrichmentCache.Count); + + // The enrich pipeline must exist before indexing (used as default_pipeline). + // The pipeline's enrich processor requires the .enrich-* index to exist, + // which is created by executing the policy. We execute even with an empty + // cache index - it just creates an empty enrich index that returns no matches. + _logger.LogInformation("Setting up enrich policy and pipeline..."); + await _enrichPolicyManager.ExecutePolicyAsync(ctx); + await _enrichPolicyManager.EnsurePipelineExistsAsync(ctx); + } + _currentLexicalHash = await _lexicalChannel.Channel.GetIndexTemplateHashAsync(ctx) ?? string.Empty; _currentSemanticHash = await _semanticChannel.Channel.GetIndexTemplateHashAsync(ctx) ?? string.Empty; @@ -173,7 +222,10 @@ private async Task PutSynonyms(SynonymsSet synonymsSet, string setName, Cancel c { var json = JsonSerializer.Serialize(synonymsSet, SynonymSerializerContext.Default.SynonymsSet); - var response = await _transport.PutAsync($"_synonyms/{setName}", PostData.String(json), ctx); + var response = await WithRetryAsync( + () => _transport.PutAsync($"_synonyms/{setName}", PostData.String(json), ctx), + $"PUT _synonyms/{setName}", + ctx); if (!response.ApiCallDetails.HasSuccessfulStatusCode) _collector.EmitGlobalError($"Failed to publish synonym set '{setName}'. Reason: {response.ApiCallDetails.OriginalException?.Message ?? response.ToString()}"); @@ -213,7 +265,10 @@ private async Task PutQueryRuleset(QueryRuleset ruleset, string rulesetName, Can { var json = JsonSerializer.Serialize(ruleset, QueryRulesetSerializerContext.Default.QueryRuleset); - var response = await _transport.PutAsync($"_query_rules/{rulesetName}", PostData.String(json), ctx); + var response = await WithRetryAsync( + () => _transport.PutAsync($"_query_rules/{rulesetName}", PostData.String(json), ctx), + $"PUT _query_rules/{rulesetName}", + ctx); if (!response.ApiCallDetails.HasSuccessfulStatusCode) _collector.EmitGlobalError($"Failed to publish query ruleset '{rulesetName}'. Reason: {response.ApiCallDetails.OriginalException?.Message ?? response.ToString()}"); @@ -223,7 +278,10 @@ private async Task PutQueryRuleset(QueryRuleset ruleset, string rulesetName, Can private async ValueTask CountAsync(string index, string body, Cancel ctx = default) { - var countResponse = await _transport.PostAsync($"/{index}/_count", PostData.String(body), ctx); + var countResponse = await WithRetryAsync( + () => _transport.PostAsync($"/{index}/_count", PostData.String(body), ctx), + $"POST {index}/_count", + ctx); return countResponse.Body.Get("count"); } @@ -329,6 +387,82 @@ public async ValueTask StopAsync(Cancel ctx = default) _logger.LogInformation("Finish sync to semantic index using {IndexStrategy} strategy", _indexStrategy.ToStringFast(true)); await QueryDocumentCounts(ctx); + + // Execute enrich policy so new cache entries are available for next run + await ExecuteEnrichPolicyIfNeededAsync(ctx); + } + + private async ValueTask ExecuteEnrichPolicyIfNeededAsync(Cancel ctx) + { + if (_enrichmentCache is null || _enrichPolicyManager is null) + return; + + _logger.LogInformation( + "AI enrichment complete: {CacheHits} cache hits, {Enrichments} enrichments generated (limit: {Limit})", + _cacheHitCount, _enrichmentCount, _enrichmentOptions.MaxNewEnrichmentsPerRun); + + if (_enrichmentCache.Count > 0) + { + _logger.LogInformation("Executing enrich policy to update internal index with {Count} total entries...", _enrichmentCache.Count); + await _enrichPolicyManager.ExecutePolicyAsync(ctx); + + // Backfill: Apply AI fields to documents that were skipped by hash-based upsert + await BackfillMissingAiFieldsAsync(ctx); + } + } + + private async ValueTask BackfillMissingAiFieldsAsync(Cancel ctx) + { + // Why backfill is needed: + // The exporter uses hash-based upsert - unchanged documents are skipped during indexing. + // These skipped documents never pass through the ingest pipeline, so they miss AI fields. + // This backfill runs _update_by_query with the AI pipeline to enrich those documents. + // + // Only backfill the semantic index - it's what the search API uses. + // The lexical index is just an intermediate step for reindexing. + if (_endpoint.NoSemantic || _enrichmentCache is null) + return; + + var semanticAlias = _semanticChannel.Channel.Options.ActiveSearchAlias; + + _logger.LogInformation( + "Starting AI backfill for documents missing AI fields (cache has {CacheCount} entries)", + _enrichmentCache.Count); + + // Find documents with enrichment_key but missing AI fields - these need the pipeline applied + var query = /*lang=json,strict*/ """ + { + "query": { + "bool": { + "must": { "exists": { "field": "enrichment_key" } }, + "must_not": { "exists": { "field": "ai_questions" } } + } + } + } + """; + + await RunBackfillQuery(semanticAlias, query, ctx); + } + + private async ValueTask RunBackfillQuery(string indexAlias, string query, Cancel ctx) + { + var pipeline = EnrichPolicyManager.PipelineName; + var url = $"/{indexAlias}/_update_by_query?pipeline={pipeline}&wait_for_completion=false"; + + var response = await WithRetryAsync( + () => _transport.PostAsync(url, PostData.String(query), ctx), + $"POST {indexAlias}/_update_by_query", + ctx); + + var taskId = response.Body.Get("task"); + if (string.IsNullOrWhiteSpace(taskId)) + { + _logger.LogWarning("AI backfill failed for {Index}: {Response}", indexAlias, response); + return; + } + + _logger.LogInformation("AI backfill task id: {TaskId}", taskId); + await PollTaskUntilComplete(taskId, "_update_by_query (AI backfill)", indexAlias, null, ctx); } private async ValueTask QueryIngestStatistics(string lexicalWriteAlias, Cancel ctx) @@ -370,7 +504,10 @@ private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx) } }"); var reindexUrl = $"/{lexicalWriteAlias}/_delete_by_query?wait_for_completion=false"; - var deleteOldLexicalDocs = await _transport.PostAsync(reindexUrl, request, ctx); + var deleteOldLexicalDocs = await WithRetryAsync( + () => _transport.PostAsync(reindexUrl, request, ctx), + $"POST {lexicalWriteAlias}/_delete_by_query", + ctx); var taskId = deleteOldLexicalDocs.Body.Get("task"); if (string.IsNullOrWhiteSpace(taskId)) { @@ -379,10 +516,18 @@ private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx) return; } _logger.LogInformation("_delete_by_query task id: {TaskId}", taskId); + await PollTaskUntilComplete(taskId, "_delete_by_query", lexicalWriteAlias, null, ctx); + } + + private async ValueTask PollTaskUntilComplete(string taskId, string operation, string sourceIndex, string? destIndex, Cancel ctx) + { bool completed; do { - var reindexTask = await _transport.GetAsync($"/_tasks/{taskId}", ctx); + var reindexTask = await WithRetryAsync( + () => _transport.GetAsync($"/_tasks/{taskId}", ctx), + $"GET _tasks/{taskId}", + ctx); completed = reindexTask.Body.Get("completed"); var total = reindexTask.Body.Get("task.status.total"); var updated = reindexTask.Body.Get("task.status.updated"); @@ -391,8 +536,18 @@ private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx) var batches = reindexTask.Body.Get("task.status.batches"); var runningTimeInNanos = reindexTask.Body.Get("task.running_time_in_nanos"); var time = TimeSpan.FromMicroseconds(runningTimeInNanos / 1000); - _logger.LogInformation("_delete_by_query '{SourceIndex}': {RunningTimeInNanos} Documents {Total}: {Updated} updated, {Created} created, {Deleted} deleted, {Batches} batches", - lexicalWriteAlias, time.ToString(@"hh\:mm\:ss"), total, updated, created, deleted, batches); + + if (destIndex is not null) + { + _logger.LogInformation("{Operation}: {Time} '{SourceIndex}' => '{DestIndex}'. Documents {Total}: {Updated} updated, {Created} created, {Deleted} deleted, {Batches} batches", + operation, time.ToString(@"hh\:mm\:ss"), sourceIndex, destIndex, total, updated, created, deleted, batches); + } + else + { + _logger.LogInformation("{Operation} '{SourceIndex}': {Time} Documents {Total}: {Updated} updated, {Created} created, {Deleted} deleted, {Batches} batches", + operation, sourceIndex, time.ToString(@"hh\:mm\:ss"), total, updated, created, deleted, batches); + } + if (!completed) await Task.Delay(TimeSpan.FromSeconds(5), ctx); @@ -402,7 +557,10 @@ private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx) private async ValueTask DoReindex(PostData request, string lexicalWriteAlias, string semanticWriteAlias, string typeOfSync, Cancel ctx) { var reindexUrl = "/_reindex?wait_for_completion=false&scroll=10m"; - var reindexNewChanges = await _transport.PostAsync(reindexUrl, request, ctx); + var reindexNewChanges = await WithRetryAsync( + () => _transport.PostAsync(reindexUrl, request, ctx), + $"POST _reindex ({typeOfSync})", + ctx); var taskId = reindexNewChanges.Body.Get("task"); if (string.IsNullOrWhiteSpace(taskId)) { @@ -411,24 +569,7 @@ private async ValueTask DoReindex(PostData request, string lexicalWriteAlias, st return; } _logger.LogInformation("_reindex {Type} task id: {TaskId}", typeOfSync, taskId); - bool completed; - do - { - var reindexTask = await _transport.GetAsync($"/_tasks/{taskId}", ctx); - completed = reindexTask.Body.Get("completed"); - var total = reindexTask.Body.Get("task.status.total"); - var updated = reindexTask.Body.Get("task.status.updated"); - var created = reindexTask.Body.Get("task.status.created"); - var deleted = reindexTask.Body.Get("task.status.deleted"); - var batches = reindexTask.Body.Get("task.status.batches"); - var runningTimeInNanos = reindexTask.Body.Get("task.running_time_in_nanos"); - var time = TimeSpan.FromMicroseconds(runningTimeInNanos / 1000); - _logger.LogInformation("_reindex {Type}: {RunningTimeInNanos} '{SourceIndex}' => '{DestinationIndex}'. Documents {Total}: {Updated} updated, {Created} created, {Deleted} deleted, {Batches} batches", - typeOfSync, time.ToString(@"hh\:mm\:ss"), lexicalWriteAlias, semanticWriteAlias, total, updated, created, deleted, batches); - if (!completed) - await Task.Delay(TimeSpan.FromSeconds(5), ctx); - - } while (!completed); + await PollTaskUntilComplete(taskId, $"_reindex {typeOfSync}", lexicalWriteAlias, semanticWriteAlias, ctx); } /// @@ -436,6 +577,7 @@ public void Dispose() { _lexicalChannel.Dispose(); _semanticChannel.Dispose(); + _llmClient?.Dispose(); GC.SuppressFinalize(this); } } diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchTransportFactory.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchTransportFactory.cs new file mode 100644 index 000000000..78652bd1d --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/ElasticsearchTransportFactory.cs @@ -0,0 +1,42 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Elastic.Documentation.Configuration; +using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; + +namespace Elastic.Markdown.Exporters.Elasticsearch; + +/// +/// Factory for creating Elasticsearch transport from endpoint configuration. +/// +public static class ElasticsearchTransportFactory +{ + public static DistributedTransport Create(ElasticsearchEndpoint endpoint) + { + var configuration = new ElasticsearchConfiguration(endpoint.Uri) + { + Authentication = endpoint.ApiKey is { } apiKey + ? new ApiKey(apiKey) + : endpoint is { Username: { } username, Password: { } password } + ? new BasicAuthentication(username, password) + : null, + EnableHttpCompression = true, + DebugMode = endpoint.DebugMode, + CertificateFingerprint = endpoint.CertificateFingerprint, + ProxyAddress = endpoint.ProxyAddress, + ProxyPassword = endpoint.ProxyPassword, + ProxyUsername = endpoint.ProxyUsername, + ServerCertificateValidationCallback = endpoint.DisableSslVerification + ? CertificateValidations.AllowAll + : endpoint.Certificate is { } cert + ? endpoint.CertificateIsNotRoot + ? CertificateValidations.AuthorityPartOfChain(cert) + : CertificateValidations.AuthorityIsRoot(cert) + : null + }; + + return new DistributedTransport(configuration); + } +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs new file mode 100644 index 000000000..e1978e136 --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchEnrichmentCache.cs @@ -0,0 +1,256 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Collections.Concurrent; +using System.Linq; +using System.Text.Json; +using System.Text.Json.Serialization; +using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; +using Microsoft.Extensions.Logging; + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Elasticsearch-backed enrichment cache for use with the enrich processor. +/// Stores AI-generated enrichment fields directly (not as JSON string) for efficient lookups. +/// Only entries with the current prompt hash are considered valid - stale entries are treated as non-existent. +/// +public sealed class ElasticsearchEnrichmentCache( + DistributedTransport transport, + ILogger logger, + string indexName = "docs-ai-enriched-fields-cache") : IEnrichmentCache +{ + private readonly DistributedTransport _transport = transport; + private readonly ILogger _logger = logger; + + // Only contains entries with current prompt hash - stale entries are excluded + private readonly ConcurrentDictionary _validEntries = new(); + + public string IndexName { get; } = indexName; + + // language=json + // Note: No settings block - Serverless doesn't allow number_of_shards/replicas + private const string IndexMapping = """ + { + "mappings": { + "properties": { + "enrichment_key": { "type": "keyword" }, + "url": { "type": "keyword" }, + "ai_rag_optimized_summary": { "type": "text" }, + "ai_short_summary": { "type": "text" }, + "ai_search_query": { "type": "text" }, + "ai_questions": { "type": "text" }, + "ai_use_cases": { "type": "text" }, + "created_at": { "type": "date" }, + "prompt_hash": { "type": "keyword" } + } + } + } + """; + + /// + /// Number of valid cache entries (with current prompt hash). + /// + public int Count => _validEntries.Count; + + /// + /// Number of stale entries found during initialization (for logging only). + /// + public int StaleCount { get; private set; } + + public async Task InitializeAsync(CancellationToken ct) + { + await EnsureIndexExistsAsync(ct); + await LoadExistingHashesAsync(ct); + } + + /// + /// Checks if a valid enrichment exists in the cache (with current prompt hash). + /// Stale entries are treated as non-existent and will be regenerated. + /// + public bool Exists(string enrichmentKey) => _validEntries.ContainsKey(enrichmentKey); + + public async Task StoreAsync(string enrichmentKey, string url, EnrichmentData data, CancellationToken ct) + { + var promptHash = ElasticsearchLlmClient.PromptHash; + var cacheEntry = new CacheIndexEntry + { + EnrichmentKey = enrichmentKey, + Url = url, + AiRagOptimizedSummary = data.RagOptimizedSummary, + AiShortSummary = data.ShortSummary, + AiSearchQuery = data.SearchQuery, + AiQuestions = data.Questions, + AiUseCases = data.UseCases, + CreatedAt = DateTimeOffset.UtcNow, + PromptHash = promptHash + }; + + var body = JsonSerializer.Serialize(cacheEntry, EnrichmentSerializerContext.Default.CacheIndexEntry); + var response = await _transport.PutAsync( + $"{IndexName}/_doc/{enrichmentKey}", + PostData.String(body), + ct); + + if (response.ApiCallDetails.HasSuccessfulStatusCode) + _ = _validEntries.TryAdd(enrichmentKey, 0); + else + _logger.LogWarning("Failed to store enrichment: {StatusCode}", response.ApiCallDetails.HttpStatusCode); + } + + private async Task EnsureIndexExistsAsync(CancellationToken ct) + { + var existsResponse = await _transport.HeadAsync(IndexName, ct); + if (existsResponse.ApiCallDetails.HasSuccessfulStatusCode) + { + _logger.LogDebug("Enrichment cache index {IndexName} already exists", IndexName); + return; + } + + _logger.LogInformation("Creating enrichment cache index {IndexName}...", IndexName); + var createResponse = await _transport.PutAsync( + IndexName, + PostData.String(IndexMapping), + ct); + + if (createResponse.ApiCallDetails.HasSuccessfulStatusCode) + _logger.LogInformation("Created enrichment cache index {IndexName}", IndexName); + else if (createResponse.ApiCallDetails.HttpStatusCode == 400 && + createResponse.Body?.Contains("resource_already_exists_exception") == true) + _logger.LogDebug("Enrichment cache index {IndexName} already exists (race condition)", IndexName); + else + _logger.LogError("Failed to create cache index: {StatusCode} - {Response}", + createResponse.ApiCallDetails.HttpStatusCode, createResponse.Body); + } + + private async Task LoadExistingHashesAsync(CancellationToken ct) + { + var sw = System.Diagnostics.Stopwatch.StartNew(); + var currentPromptHash = ElasticsearchLlmClient.PromptHash; + var staleCount = 0; + var totalCount = 0; + + // Fetch _id and prompt_hash to determine validity + var scrollQuery = /*lang=json,strict*/ """{"size": 1000, "_source": ["prompt_hash"], "query": {"match_all": {}}}"""; + + var searchResponse = await _transport.PostAsync( + $"{IndexName}/_search?scroll=1m", + PostData.String(scrollQuery), + ct); + + if (!searchResponse.ApiCallDetails.HasSuccessfulStatusCode) + { + _logger.LogWarning("Failed to load existing hashes: {StatusCode}", searchResponse.ApiCallDetails.HttpStatusCode); + return; + } + + var (batchTotal, batchStale, scrollId) = ProcessHashHits(searchResponse.Body, currentPromptHash); + totalCount += batchTotal; + staleCount += batchStale; + + while (scrollId is not null && batchTotal > 0) + { + var scrollBody = $$"""{"scroll": "1m", "scroll_id": "{{scrollId}}"}"""; + var scrollResponse = await _transport.PostAsync( + "_search/scroll", + PostData.String(scrollBody), + ct); + + if (!scrollResponse.ApiCallDetails.HasSuccessfulStatusCode) + break; + + (batchTotal, batchStale, scrollId) = ProcessHashHits(scrollResponse.Body, currentPromptHash); + totalCount += batchTotal; + staleCount += batchStale; + } + + StaleCount = staleCount; + _logger.LogInformation( + "Loaded {Total} enrichment cache entries: {Valid} valid (current prompt), {Stale} stale (will be refreshed) in {ElapsedMs}ms", + totalCount, _validEntries.Count, staleCount, sw.ElapsedMilliseconds); + } + + private (int total, int stale, string? scrollId) ProcessHashHits(string? responseBody, string currentPromptHash) + { + if (string.IsNullOrEmpty(responseBody)) + return (0, 0, null); + + using var doc = JsonDocument.Parse(responseBody); + + var scrollId = doc.RootElement.TryGetProperty("_scroll_id", out var scrollIdProp) + ? scrollIdProp.GetString() + : null; + + if (!doc.RootElement.TryGetProperty("hits", out var hitsObj) || + !hitsObj.TryGetProperty("hits", out var hitsArray)) + return (0, 0, scrollId); + + var total = 0; + var stale = 0; + foreach (var entry in hitsArray + .EnumerateArray() + .Select(hit => new + { + Hit = hit, + Id = hit.TryGetProperty("_id", out var idProp) ? idProp.GetString() : null + }) + .Where(e => e.Id is not null)) + { + var hit = entry.Hit; + var id = entry.Id!; + total++; + + // Only add entries with current prompt hash - stale entries are ignored + if (hit.TryGetProperty("_source", out var source) && + source.TryGetProperty("prompt_hash", out var promptHashProp) && + promptHashProp.GetString() == currentPromptHash) + { + _ = _validEntries.TryAdd(id, 0); + } + else + { + stale++; + } + } + return (total, stale, scrollId); + } +} + +/// +/// Document structure for the enrichment cache index. +/// Fields are stored directly for use with the enrich processor. +/// +public sealed record CacheIndexEntry +{ + [JsonPropertyName("enrichment_key")] + public required string EnrichmentKey { get; init; } + + /// + /// Document URL for debugging - helps identify which document this cache entry belongs to. + /// + [JsonPropertyName("url")] + public string? Url { get; init; } + + [JsonPropertyName("ai_rag_optimized_summary")] + public string? AiRagOptimizedSummary { get; init; } + + [JsonPropertyName("ai_short_summary")] + public string? AiShortSummary { get; init; } + + [JsonPropertyName("ai_search_query")] + public string? AiSearchQuery { get; init; } + + [JsonPropertyName("ai_questions")] + public string[]? AiQuestions { get; init; } + + [JsonPropertyName("ai_use_cases")] + public string[]? AiUseCases { get; init; } + + [JsonPropertyName("created_at")] + public required DateTimeOffset CreatedAt { get; init; } + + [JsonPropertyName("prompt_hash")] + public required string PromptHash { get; init; } +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs new file mode 100644 index 000000000..059f400d3 --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ElasticsearchLlmClient.cs @@ -0,0 +1,219 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Security.Cryptography; +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; +using Microsoft.Extensions.Logging; + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Elasticsearch inference API-backed implementation of . +/// Uses a semaphore to throttle concurrent LLM calls with exponential backoff on 429. +/// +public sealed class ElasticsearchLlmClient( + DistributedTransport transport, + ILogger logger, + int maxConcurrency = 10, + int maxRetries = 5, + string inferenceEndpointId = ".gp-llm-v2-completion") : ILlmClient, IDisposable +{ + private readonly DistributedTransport _transport = transport; + private readonly ILogger _logger = logger; + private readonly SemaphoreSlim _throttle = new(maxConcurrency); + private readonly string _inferenceEndpointId = inferenceEndpointId; + private readonly int _maxRetries = maxRetries; + + private static readonly Lazy PromptHashLazy = new(() => + { + var prompt = BuildPrompt("", ""); + var hash = SHA256.HashData(Encoding.UTF8.GetBytes(prompt)); + return Convert.ToHexString(hash).ToLowerInvariant(); + }); + + /// + /// Hash of the prompt template. Changes when the prompt changes, triggering cache invalidation. + /// + public static string PromptHash => PromptHashLazy.Value; + + public async Task EnrichAsync(string title, string body, CancellationToken ct) + { + await _throttle.WaitAsync(ct); + try + { + return await CallInferenceApiWithRetryAsync(title, body, ct); + } + finally + { + _ = _throttle.Release(); + } + } + + public void Dispose() => _throttle.Dispose(); + + private async Task CallInferenceApiWithRetryAsync(string title, string body, CancellationToken ct) + { + var prompt = BuildPrompt(title, body); + var request = new InferenceRequest { Input = prompt }; + var requestBody = JsonSerializer.Serialize(request, EnrichmentSerializerContext.Default.InferenceRequest); + + for (var attempt = 0; attempt <= _maxRetries; attempt++) + { + var response = await _transport.PostAsync( + $"_inference/completion/{_inferenceEndpointId}", + PostData.String(requestBody), + ct); + + if (response.ApiCallDetails.HasSuccessfulStatusCode) + return ParseResponse(response.Body); + + if (response.ApiCallDetails.HttpStatusCode == 429 && attempt < _maxRetries) + { + var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt)); // 1s, 2s, 4s, 8s, 16s + _logger.LogDebug("Rate limited (429), retrying in {Delay}s (attempt {Attempt}/{MaxRetries})", + delay.TotalSeconds, attempt + 1, _maxRetries); + await Task.Delay(delay, ct); + continue; + } + + _logger.LogWarning("LLM inference failed: {StatusCode} - {Body}", + response.ApiCallDetails.HttpStatusCode, response.Body); + return null; + } + + return null; + } + + private EnrichmentData? ParseResponse(string? responseBody) + { + if (string.IsNullOrEmpty(responseBody)) + { + _logger.LogWarning("No response body from LLM"); + return null; + } + + string? responseText = null; + try + { + var completionResponse = JsonSerializer.Deserialize(responseBody, EnrichmentSerializerContext.Default.CompletionResponse); + responseText = completionResponse?.Completion?.FirstOrDefault()?.Result; + + if (string.IsNullOrEmpty(responseText)) + { + _logger.LogWarning("Empty LLM response"); + return null; + } + + responseText = CleanLlmResponse(responseText); + var result = JsonSerializer.Deserialize(responseText, EnrichmentSerializerContext.Default.EnrichmentData); + + if (result is null || !result.HasData) + { + _logger.LogWarning("LLM response parsed but has no data: {Response}", + responseText.Length > 500 ? responseText[..500] + "..." : responseText); + return null; + } + + return result; + } + catch (JsonException ex) + { + _logger.LogWarning("Failed to parse LLM response. Error: {Error}. Response: {Response}", + ex.Message, responseText); + return null; + } + } + + private static string CleanLlmResponse(string response) + { + var cleaned = response.Replace("```json", "").Replace("```", "").Trim(); + + // Fix common LLM issue: extra closing brace + if (cleaned.EndsWith("}}") && !cleaned.Contains("{{")) + cleaned = cleaned[..^1]; + + // Fix common LLM issue: trailing backticks from incomplete code block syntax + cleaned = cleaned.TrimEnd('`'); + + return cleaned; + } + + private static string BuildPrompt(string title, string body) => + $$""" + ROLE: Expert technical writer creating search metadata for Elastic documentation (Elasticsearch, Kibana, Beats, Logstash). Audience: developers, DevOps, data engineers. + + TASK: Return a single valid JSON object. No markdown, no extra text, no trailing characters. + + JSON SCHEMA: + { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["ai_rag_optimized_summary", "ai_short_summary", "ai_search_query", "ai_questions", "ai_use_cases"], + "additionalProperties": false, + "properties": { + "ai_rag_optimized_summary": { + "type": "string", + "description": "3-5 sentences densely packed with technical entities for semantic vector matching. Include: API endpoint names, method names, parameter names, configuration options, data types, and core functionality. Write for RAG retrieval - someone asking 'how do I configure X' should match this text." + }, + "ai_short_summary": { + "type": "string", + "description": "Exactly 5-10 words for UI tooltip or search snippet. Action-oriented, starts with a verb. Example: 'Configure index lifecycle policies for data retention'" + }, + "ai_search_query": { + "type": "string", + "description": "3-8 keywords representing a realistic search query a developer would type. Include product name and key technical terms. Example: 'elasticsearch bulk api batch indexing'" + }, + "ai_questions": { + "type": "array", + "items": { "type": "string" }, + "minItems": 3, + "maxItems": 5, + "description": "Natural questions a dev would ask (6-15 words). Not too short, not too verbose. Examples: 'How do I bulk index documents?', 'What format does the bulk API use?', 'Why is my bulk request failing?'" + }, + "ai_use_cases": { + "type": "array", + "items": { "type": "string" }, + "minItems": 2, + "maxItems": 4, + "description": "Simple 2-4 word tasks a dev wants to do. Examples: 'index documents', 'check cluster health', 'enable TLS', 'fix slow queries', 'backup data'" + } + } + } + + RULES: + - Extract ONLY from provided content. Never hallucinate APIs or features not mentioned. + - Be specific: 'configure index lifecycle policy' not 'manage data'. + - Avoid generic phrases: no 'comprehensive guide', 'powerful feature', 'easy to use'. + - Output exactly ONE opening brace and ONE closing brace. + + EXAMPLE: + {"ai_rag_optimized_summary":"The Bulk API executes multiple index, create, delete, and update operations in a single NDJSON request. Endpoint: POST _bulk or POST /{index}/_bulk. Each action requires metadata line (index, create, update, delete) followed by optional document source. Supports parameters: routing, pipeline, refresh, require_alias. Returns per-operation results with _id, _version, result status, and error details for partial failures.","ai_short_summary":"Execute batch document operations in single request","ai_search_query":"elasticsearch bulk api batch index update delete","ai_questions":["How do I bulk index documents?","What format does the bulk API use?","How do I handle bulk operation errors?"],"ai_use_cases":["bulk index documents","batch update data","delete many docs"]} + + DOCUMENT: + Title: {{title}} + Content: {{body}} + """; +} + +public sealed record InferenceRequest +{ + [JsonPropertyName("input")] + public required string Input { get; init; } +} + +public sealed record CompletionResponse +{ + [JsonPropertyName("completion")] + public CompletionResult[]? Completion { get; init; } +} + +public sealed record CompletionResult +{ + [JsonPropertyName("result")] + public string? Result { get; init; } +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichPolicyManager.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichPolicyManager.cs new file mode 100644 index 000000000..7683daad0 --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichPolicyManager.cs @@ -0,0 +1,159 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; +using Microsoft.Extensions.Logging; + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Manages the Elasticsearch enrich policy and ingest pipeline for AI document enrichment. +/// +public sealed class EnrichPolicyManager( + DistributedTransport transport, + ILogger logger, + string cacheIndexName = "docs-ai-enriched-fields-cache") +{ + private readonly DistributedTransport _transport = transport; + private readonly ILogger _logger = logger; + private readonly string _cacheIndexName = cacheIndexName; + + public const string PolicyName = "ai-enrichment-policy"; + public const string PipelineName = "ai-enrichment-pipeline"; + + + // language=json + private const string IngestPipelineBody = """ + { + "description": "Enriches documents with AI-generated fields from the enrichment cache", + "processors": [ + { + "enrich": { + "policy_name": "ai-enrichment-policy", + "field": "enrichment_key", + "target_field": "ai_enrichment", + "max_matches": 1, + "ignore_missing": true + } + }, + { + "script": { + "description": "Flatten ai_enrichment fields to document root", + "if": "ctx.ai_enrichment != null", + "source": "if (ctx.ai_enrichment.ai_rag_optimized_summary != null) ctx.ai_rag_optimized_summary = ctx.ai_enrichment.ai_rag_optimized_summary; if (ctx.ai_enrichment.ai_short_summary != null) ctx.ai_short_summary = ctx.ai_enrichment.ai_short_summary; if (ctx.ai_enrichment.ai_search_query != null) ctx.ai_search_query = ctx.ai_enrichment.ai_search_query; if (ctx.ai_enrichment.ai_questions != null) ctx.ai_questions = ctx.ai_enrichment.ai_questions; if (ctx.ai_enrichment.ai_use_cases != null) ctx.ai_use_cases = ctx.ai_enrichment.ai_use_cases; ctx.remove('ai_enrichment');" + } + } + ] + } + """; + + /// + /// Ensures the enrich policy exists and creates it if not. + /// + public async Task EnsurePolicyExistsAsync(CancellationToken ct) + { + // Check if policy exists - GET returns 200 with policies array, or 404 if not found + var existsResponse = await _transport.GetAsync($"_enrich/policy/{PolicyName}", ct); + + if (existsResponse.ApiCallDetails.HasSuccessfulStatusCode && + existsResponse.Body?.Contains(PolicyName) == true) + { + _logger.LogInformation("Enrich policy {PolicyName} already exists", PolicyName); + return; + } + + _logger.LogInformation("Creating enrich policy {PolicyName} for index {CacheIndex}...", PolicyName, _cacheIndexName); + + // language=json + var policyBody = $$""" + { + "match": { + "indices": "{{_cacheIndexName}}", + "match_field": "enrichment_key", + "enrich_fields": ["ai_rag_optimized_summary", "ai_short_summary", "ai_search_query", "ai_questions", "ai_use_cases"] + } + } + """; + + var createResponse = await _transport.PutAsync( + $"_enrich/policy/{PolicyName}", + PostData.String(policyBody), + ct); + + _logger.LogInformation("Policy creation response: {StatusCode} - {Response}", + createResponse.ApiCallDetails.HttpStatusCode, createResponse.Body); + + if (createResponse.ApiCallDetails.HasSuccessfulStatusCode) + _logger.LogInformation("Created enrich policy {PolicyName}", PolicyName); + else + _logger.LogError("Failed to create enrich policy: {StatusCode} - {Response}", + createResponse.ApiCallDetails.HttpStatusCode, createResponse.Body); + } + + /// + /// Executes the enrich policy to rebuild the enrich index with latest data. + /// Call this after adding new entries to the cache index. + /// + public async Task ExecutePolicyAsync(CancellationToken ct) + { + // Verify policy exists before executing + var checkResponse = await _transport.GetAsync($"_enrich/policy/{PolicyName}", ct); + _logger.LogDebug("Pre-execute policy check: {StatusCode} - {Body}", + checkResponse.ApiCallDetails.HttpStatusCode, checkResponse.Body); + + if (!checkResponse.ApiCallDetails.HasSuccessfulStatusCode || + checkResponse.Body?.Contains(PolicyName) != true) + { + _logger.LogInformation("Policy {PolicyName} not found, creating...", PolicyName); + await EnsurePolicyExistsAsync(ct); + // Small delay for Serverless propagation + await Task.Delay(2000, ct); + } + + _logger.LogInformation("Executing enrich policy {PolicyName}...", PolicyName); + + var response = await _transport.PostAsync( + $"_enrich/policy/{PolicyName}/_execute", + PostData.Empty, + ct); + + if (response.ApiCallDetails.HasSuccessfulStatusCode) + _logger.LogInformation("Enrich policy executed successfully"); + else + _logger.LogWarning("Enrich policy execution failed (may be empty): {StatusCode} - {Response}", + response.ApiCallDetails.HttpStatusCode, response.Body); + } + + /// + /// Ensures the ingest pipeline exists and creates it if not. + /// + public async Task EnsurePipelineExistsAsync(CancellationToken ct) + { + var existsResponse = await _transport.GetAsync($"_ingest/pipeline/{PipelineName}", ct); + + if (existsResponse.ApiCallDetails.HasSuccessfulStatusCode) + { + _logger.LogInformation("Ingest pipeline {PipelineName} already exists", PipelineName); + return; + } + + _logger.LogInformation("Creating ingest pipeline {PipelineName}...", PipelineName); + var createResponse = await _transport.PutAsync( + $"_ingest/pipeline/{PipelineName}", + PostData.String(IngestPipelineBody), + ct); + + if (createResponse.ApiCallDetails.HasSuccessfulStatusCode) + _logger.LogInformation("Created ingest pipeline {PipelineName}", PipelineName); + else + _logger.LogError("Failed to create ingest pipeline: {StatusCode} - {Response}", + createResponse.ApiCallDetails.HttpStatusCode, createResponse.Body); + } + + /// + /// Ensures the enrich policy exists. Pipeline is created separately in StartAsync. + /// + public async Task InitializeAsync(CancellationToken ct) => await EnsurePolicyExistsAsync(ct); +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentKeyGenerator.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentKeyGenerator.cs new file mode 100644 index 000000000..a22bbb44f --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentKeyGenerator.cs @@ -0,0 +1,31 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Security.Cryptography; +using System.Text; +using System.Text.RegularExpressions; + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Generates enrichment keys for AI enrichment cache lookups. +/// The key includes the prompt hash so that prompt changes trigger automatic cache invalidation. +/// +public static partial class EnrichmentKeyGenerator +{ + /// + /// Generates a content-based enrichment key (without prompt hash). + /// This allows stale enrichments from old prompts to still be applied when the prompt changes. + /// New enrichments will gradually replace stale ones as they're generated. + /// + public static string Generate(string title, string body) + { + var normalized = NormalizeRegex().Replace(title + body, "").ToLowerInvariant(); + var hash = SHA256.HashData(Encoding.UTF8.GetBytes(normalized)); + return Convert.ToHexString(hash).ToLowerInvariant(); + } + + [GeneratedRegex("[^a-zA-Z0-9]")] + private static partial Regex NormalizeRegex(); +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs new file mode 100644 index 000000000..c5e433dc3 --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/EnrichmentOptions.cs @@ -0,0 +1,32 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Configuration options for document enrichment. +/// +public sealed record EnrichmentOptions +{ + /// + /// Whether enrichment is enabled. + /// + public bool Enabled { get; init; } + + /// + /// Maximum enrichments per run (new + stale refresh). Limits LLM calls to prevent long deployments. + /// Stale entries (with old prompt hash) are treated as non-existent and count toward this limit. + /// + public int MaxNewEnrichmentsPerRun { get; init; } = 100; + + /// + /// Maximum concurrent LLM calls. + /// + public int MaxConcurrentLlmCalls { get; init; } = 4; + + /// + /// Creates options with enrichment disabled. + /// + public static EnrichmentOptions Disabled => new() { Enabled = false }; +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs new file mode 100644 index 000000000..a57a34f10 --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/IEnrichmentCache.cs @@ -0,0 +1,42 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Abstraction for enrichment cache operations. +/// With the enrich processor pattern, the cache stores enrichment data that +/// gets joined to documents at index time via an Elasticsearch enrich processor. +/// +public interface IEnrichmentCache +{ + /// + /// The name of the cache index. + /// + string IndexName { get; } + + /// + /// Initializes the cache, including index creation and loading existing hashes. + /// + Task InitializeAsync(CancellationToken ct); + + /// + /// Checks if an enrichment exists for the given enrichment key. + /// + bool Exists(string enrichmentKey); + + /// + /// Stores enrichment data in the cache. + /// + /// The enrichment key (content hash). + /// The document URL for debugging. + /// The enrichment data to store. + /// Cancellation token. + Task StoreAsync(string enrichmentKey, string url, EnrichmentData data, CancellationToken ct); + + /// + /// Gets the number of entries currently in the cache. + /// + int Count { get; } +} diff --git a/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ILlmClient.cs b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ILlmClient.cs new file mode 100644 index 000000000..5195f39d7 --- /dev/null +++ b/src/Elastic.Markdown/Exporters/Elasticsearch/Enrichment/ILlmClient.cs @@ -0,0 +1,57 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Text.Json.Serialization; + +namespace Elastic.Markdown.Exporters.Elasticsearch.Enrichment; + +/// +/// Abstraction for LLM inference operations. +/// Enables swapping implementations and testing. +/// +public interface ILlmClient : IDisposable +{ + /// + /// Generates enrichment data for the given document content. + /// + /// The document title. + /// The document body content. + /// Cancellation token. + /// The enrichment data if successful, null otherwise. + Task EnrichAsync(string title, string body, CancellationToken ct); +} + +/// +/// AI-generated enrichment fields for a document. +/// +public sealed record EnrichmentData +{ + [JsonPropertyName("ai_rag_optimized_summary")] + public string? RagOptimizedSummary { get; init; } + + [JsonPropertyName("ai_short_summary")] + public string? ShortSummary { get; init; } + + [JsonPropertyName("ai_search_query")] + public string? SearchQuery { get; init; } + + [JsonPropertyName("ai_questions")] + public string[]? Questions { get; init; } + + [JsonPropertyName("ai_use_cases")] + public string[]? UseCases { get; init; } + + public bool HasData => + !string.IsNullOrEmpty(RagOptimizedSummary) || + !string.IsNullOrEmpty(ShortSummary) || + !string.IsNullOrEmpty(SearchQuery) || + Questions is { Length: > 0 } || + UseCases is { Length: > 0 }; +} + +[JsonSerializable(typeof(EnrichmentData))] +[JsonSerializable(typeof(InferenceRequest))] +[JsonSerializable(typeof(CompletionResponse))] +[JsonSerializable(typeof(CacheIndexEntry))] +internal sealed partial class EnrichmentSerializerContext : JsonSerializerContext; diff --git a/src/Elastic.Markdown/Exporters/ExporterExtensions.cs b/src/Elastic.Markdown/Exporters/ExporterExtensions.cs index ad7577bd5..cec7388f3 100644 --- a/src/Elastic.Markdown/Exporters/ExporterExtensions.cs +++ b/src/Elastic.Markdown/Exporters/ExporterExtensions.cs @@ -18,7 +18,7 @@ public static IReadOnlyCollection CreateMarkdownExporters( string indexNamespace ) { - var markdownExporters = new List(3); + var markdownExporters = new List(4); if (exportOptions.Contains(Exporter.LLMText)) markdownExporters.Add(new LlmMarkdownExporter()); if (exportOptions.Contains(Exporter.Configuration)) @@ -28,4 +28,3 @@ string indexNamespace return markdownExporters; } } - diff --git a/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs b/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs index 93779d259..355130099 100644 --- a/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs +++ b/src/services/Elastic.Documentation.Assembler/Indexing/AssemblerIndexService.cs @@ -34,6 +34,7 @@ ICoreService githubActionsService /// Elasticsearch username (basic auth), alternatively set env DOCUMENTATION_ELASTIC_USERNAME /// Elasticsearch password (basic auth), alternatively set env DOCUMENTATION_ELASTIC_PASSWORD /// Index without semantic fields + /// Enable AI enrichment of documents using LLM-generated metadata /// The number of search threads the inference endpoint should use. Defaults: 8 /// The number of index threads the inference endpoint should use. Defaults: 8 /// Do not use the Elastic Inference Service, bootstrap inference endpoint @@ -61,6 +62,7 @@ public async Task Index(IDiagnosticsCollector collector, string? password = null, // inference options bool? noSemantic = null, + bool? enableAiEnrichment = null, int? searchNumThreads = null, int? indexNumThreads = null, bool? noEis = null, @@ -139,6 +141,8 @@ public async Task Index(IDiagnosticsCollector collector, if (noSemantic.HasValue) cfg.NoSemantic = noSemantic.Value; + if (enableAiEnrichment.HasValue) + cfg.EnableAiEnrichment = enableAiEnrichment.Value; if (forceReindex.HasValue) cfg.ForceReindex = forceReindex.Value; diff --git a/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs b/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs index 3a60df236..b524513f8 100644 --- a/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs +++ b/src/services/Elastic.Documentation.Isolated/IsolatedIndexService.cs @@ -31,6 +31,7 @@ ICoreService githubActionsService /// Elasticsearch username (basic auth), alternatively set env DOCUMENTATION_ELASTIC_USERNAME /// Elasticsearch password (basic auth), alternatively set env DOCUMENTATION_ELASTIC_PASSWORD /// Index without semantic fields + /// Enable AI enrichment of documents using LLM-generated metadata /// The number of search threads the inference endpoint should use. Defaults: 8 /// The number of index threads the inference endpoint should use. Defaults: 8 /// Do not use the Elastic Inference Service, bootstrap inference endpoint @@ -58,6 +59,7 @@ public async Task Index(IDiagnosticsCollector collector, string? password = null, // inference options bool? noSemantic = null, + bool? enableAiEnrichment = null, int? searchNumThreads = null, int? indexNumThreads = null, bool? noEis = null, @@ -136,6 +138,8 @@ public async Task Index(IDiagnosticsCollector collector, if (noSemantic.HasValue) cfg.NoSemantic = noSemantic.Value; + if (enableAiEnrichment.HasValue) + cfg.EnableAiEnrichment = enableAiEnrichment.Value; if (forceReindex.HasValue) cfg.ForceReindex = forceReindex.Value; diff --git a/src/tooling/docs-builder/Commands/Assembler/AssemblerIndexCommand.cs b/src/tooling/docs-builder/Commands/Assembler/AssemblerIndexCommand.cs index 22bd5fb4b..115dda5b9 100644 --- a/src/tooling/docs-builder/Commands/Assembler/AssemblerIndexCommand.cs +++ b/src/tooling/docs-builder/Commands/Assembler/AssemblerIndexCommand.cs @@ -31,6 +31,7 @@ ICoreService githubActionsService /// Elasticsearch username (basic auth), alternatively set env DOCUMENTATION_ELASTIC_USERNAME /// Elasticsearch password (basic auth), alternatively set env DOCUMENTATION_ELASTIC_PASSWORD /// Index without semantic fields + /// Enable AI enrichment of documents using LLM-generated metadata /// The number of search threads the inference endpoint should use. Defaults: 8 /// The number of index threads the inference endpoint should use. Defaults: 8 /// Do not use the Elastic Inference Service, bootstrap inference endpoint @@ -59,6 +60,7 @@ public async Task Index( // inference options bool? noSemantic = null, + bool? enableAiEnrichment = null, int? searchNumThreads = null, int? indexNumThreads = null, bool? noEis = null, @@ -95,7 +97,7 @@ public async Task Index( // endpoint options endpoint, environment, apiKey, username, password, // inference options - noSemantic, indexNumThreads, searchNumThreads, noEis, bootstrapTimeout, + noSemantic, enableAiEnrichment, indexNumThreads, searchNumThreads, noEis, bootstrapTimeout, // channel and connection options indexNamePrefix, forceReindex, bufferSize, maxRetries, debugMode, // proxy options @@ -108,7 +110,7 @@ static async (s, collector, state, ctx) => await s.Index(collector, state.fs, // endpoint options state.endpoint, state.environment, state.apiKey, state.username, state.password, // inference options - state.noSemantic, state.searchNumThreads, state.indexNumThreads, state.noEis, state.bootstrapTimeout, + state.noSemantic, state.enableAiEnrichment, state.searchNumThreads, state.indexNumThreads, state.noEis, state.bootstrapTimeout, // channel and connection options state.indexNamePrefix, state.forceReindex, state.bufferSize, state.maxRetries, state.debugMode, // proxy options diff --git a/src/tooling/docs-builder/Commands/IndexCommand.cs b/src/tooling/docs-builder/Commands/IndexCommand.cs index c6a38dcee..efc1af596 100644 --- a/src/tooling/docs-builder/Commands/IndexCommand.cs +++ b/src/tooling/docs-builder/Commands/IndexCommand.cs @@ -29,6 +29,7 @@ ICoreService githubActionsService /// Elasticsearch username (basic auth), alternatively set env DOCUMENTATION_ELASTIC_USERNAME /// Elasticsearch password (basic auth), alternatively set env DOCUMENTATION_ELASTIC_PASSWORD /// Index without semantic fields + /// Enable AI enrichment of documents using LLM-generated metadata /// The number of search threads the inference endpoint should use. Defaults: 8 /// The number of index threads the inference endpoint should use. Defaults: 8 /// The prefix for the computed index/alias names. Defaults: semantic-docs @@ -57,6 +58,7 @@ public async Task Index( // inference options bool? noSemantic = null, + bool? enableAiEnrichment = null, int? searchNumThreads = null, int? indexNumThreads = null, bool? noEis = null, @@ -93,7 +95,7 @@ public async Task Index( // endpoint options endpoint, apiKey, username, password, // inference options - noSemantic, indexNumThreads, noEis, searchNumThreads, bootstrapTimeout, + noSemantic, enableAiEnrichment, indexNumThreads, noEis, searchNumThreads, bootstrapTimeout, // channel and connection options indexNamePrefix, forceReindex, bufferSize, maxRetries, debugMode, // proxy options @@ -106,7 +108,7 @@ static async (s, collector, state, ctx) => await s.Index(collector, state.fs, st // endpoint options state.endpoint, state.apiKey, state.username, state.password, // inference options - state.noSemantic, state.searchNumThreads, state.indexNumThreads, state.noEis, state.bootstrapTimeout, + state.noSemantic, state.enableAiEnrichment, state.searchNumThreads, state.indexNumThreads, state.noEis, state.bootstrapTimeout, // channel and connection options state.indexNamePrefix, state.forceReindex, state.bufferSize, state.maxRetries, state.debugMode, // proxy options