Skip to content

Commit 884526b

Browse files
committed
Add custom vector data converters for high performance ingest scenarios
1 parent 1e8ef0b commit 884526b

File tree

4 files changed

+356
-4
lines changed

4 files changed

+356
-4
lines changed

src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/ElasticsearchClientSettings.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ public abstract class ElasticsearchClientSettingsBase<TConnectionSettings> :
115115
private readonly Serializer _sourceSerializer;
116116
private BeforeRequestEvent? _onBeforeRequest;
117117
private bool _experimentalEnableSerializeNullInferredValues;
118+
private FloatVectorDataEncoding _floatVectorDataEncoding = Elasticsearch.FloatVectorDataEncoding.Base64;
119+
private ByteVectorDataEncoding _byteVectorDataEncoding = Elasticsearch.ByteVectorDataEncoding.Base64;
118120
private ExperimentalSettings _experimentalSettings = new();
119121

120122
private bool _defaultDisableAllInference;
@@ -165,6 +167,8 @@ protected ElasticsearchClientSettingsBase(
165167
FluentDictionary<Type, string> IElasticsearchClientSettings.RouteProperties => _routeProperties;
166168
Serializer IElasticsearchClientSettings.SourceSerializer => _sourceSerializer;
167169
BeforeRequestEvent? IElasticsearchClientSettings.OnBeforeRequest => _onBeforeRequest;
170+
FloatVectorDataEncoding IElasticsearchClientSettings.FloatVectorDataEncoding => _floatVectorDataEncoding;
171+
ByteVectorDataEncoding IElasticsearchClientSettings.ByteVectorDataEncoding => _byteVectorDataEncoding;
168172
ExperimentalSettings IElasticsearchClientSettings.Experimental => _experimentalSettings;
169173

170174
bool IElasticsearchClientSettings.ExperimentalEnableSerializeNullInferredValues => _experimentalEnableSerializeNullInferredValues;
@@ -198,6 +202,18 @@ public TConnectionSettings DefaultFieldNameInferrer(Func<string, string> fieldNa
198202
public TConnectionSettings ExperimentalEnableSerializeNullInferredValues(bool enabled = true) =>
199203
Assign(enabled, (a, v) => a._experimentalEnableSerializeNullInferredValues = v);
200204

205+
/// <inheritdoc cref="IElasticsearchClientSettings.FloatVectorDataEncoding"/>
206+
/// <param name="encoding">The default vector data encoding to use.</param>
207+
/// <returns>This settings instance for chaining.</returns>
208+
public TConnectionSettings FloatVectorDataEncoding(FloatVectorDataEncoding encoding) =>
209+
Assign(encoding, (a, v) => a._floatVectorDataEncoding = v);
210+
211+
/// <inheritdoc cref="IElasticsearchClientSettings.ByteVectorDataEncoding"/>
212+
/// <param name="encoding">The default vector data encoding to use.</param>
213+
/// <returns>This settings instance for chaining.</returns>
214+
public TConnectionSettings ByteVectorDataEncoding(ByteVectorDataEncoding encoding) =>
215+
Assign(encoding, (a, v) => a._byteVectorDataEncoding = v);
216+
201217
public TConnectionSettings Experimental(ExperimentalSettings settings) =>
202218
Assign(settings, (a, v) => a._experimentalSettings = v);
203219

src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/IElasticsearchClientSettings.cs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,37 @@ public interface IElasticsearchClientSettings : ITransportConfiguration
116116
BeforeRequestEvent? OnBeforeRequest { get; }
117117

118118
/// <summary>
119-
/// This is an advanced setting which controls serialization behaviour for inferred properies such as ID, routing and index name.
120-
/// <para>When enabled, it may reduce allocations on serialisation paths where the cost can be more significant, such as in bulk operations.</para>
119+
/// This is an advanced setting which controls serialization behaviour for inferred properties such as ID, routing and index name.
120+
/// <para>When enabled, it may reduce allocations on serialization paths where the cost can be more significant, such as in bulk operations.</para>
121121
/// <para>As a by-product it may cause null values to be included in the serialized data and impact payload size. This will only be a concern should some
122-
/// typed not have inferrence mappings defined for the required properties.</para>
122+
/// typed not have inference mappings defined for the required properties.</para>
123123
/// </summary>
124-
/// <remarks>This is marked as experiemental and may be removed or renamed in the future once its impact is evaluated.</remarks>
124+
/// <remarks>This is marked as experimental and may be removed or renamed in the future once its impact is evaluated.</remarks>
125125
bool ExperimentalEnableSerializeNullInferredValues { get; }
126126

127+
/// <summary>
128+
/// Controls the vector data encoding to use for <see cref="ReadOnlyMemory{T}"/> properties
129+
/// in documents during ingestion when the <see cref="FloatVectorDataConverter"/> is used.
130+
/// </summary>
131+
/// <remarks>
132+
/// Setting this value to <see cref="FloatVectorDataEncoding.Legacy"/> provides backwards
133+
/// compatibility when talking to Elasticsearch servers with a version older than 9.3.0
134+
/// (required for <see cref="ByteVectorDataEncoding.Base64"/>).
135+
/// </remarks>
136+
FloatVectorDataEncoding FloatVectorDataEncoding { get; }
137+
138+
/// <summary>
139+
/// Controls the vector data encoding to use for <see cref="ReadOnlyMemory{T}"/> properties
140+
/// in documents during ingestion when the <see cref="ByteVectorDataConverter"/> is used.
141+
/// </summary>
142+
/// <remarks>
143+
/// Setting this value to <see cref="ByteVectorDataEncoding.Legacy"/> provides backwards
144+
/// compatibility when talking to Elasticsearch servers with a version older than 8.14.0
145+
/// (required for <see cref="ByteVectorDataEncoding.Hex"/>) or older than 9.3.0 (required
146+
/// for <see cref="ByteVectorDataEncoding.Base64"/>).
147+
/// </remarks>
148+
ByteVectorDataEncoding ByteVectorDataEncoding { get; }
149+
127150
/// <summary>
128151
/// Experimental settings.
129152
/// </summary>

src/Elastic.Clients.Elasticsearch/_Shared/Next/JsonWriterExtensions.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,21 @@ public static void WriteUnionValue<T1, T2>(this Utf8JsonWriter writer, JsonSeria
250250
);
251251
}
252252

253+
public static void WriteSpanValue<T>(this Utf8JsonWriter writer, JsonSerializerOptions options, ReadOnlySpan<T> span,
254+
JsonWriteFunc<T>? writeElement)
255+
{
256+
writeElement ??= static (w, o, v) => WriteValue(w, o, v);
257+
258+
writer.WriteStartArray();
259+
260+
foreach (var element in span)
261+
{
262+
writeElement(writer, options, element);
263+
}
264+
265+
writer.WriteEndArray();
266+
}
267+
253268
#endregion Delegate Based Write Methods
254269

255270
#region Specialized Write Methods
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.Buffers;
7+
using System.Buffers.Binary;
8+
using System.Runtime.InteropServices;
9+
using System.Text.Json;
10+
using System.Text.Json.Serialization;
11+
12+
using Elastic.Clients.Elasticsearch.Serialization;
13+
14+
namespace Elastic.Clients.Elasticsearch;
15+
16+
/// <summary>
17+
/// The encoding to use when serializing vector data using the <see cref="FloatVectorDataConverter"/> converter.
18+
/// </summary>
19+
public enum FloatVectorDataEncoding
20+
{
21+
/// <summary>
22+
/// Legacy (JSON array) vector encoding for backwards compatibility.
23+
/// </summary>
24+
Legacy,
25+
26+
/// <summary>
27+
/// <c>Base64</c> vector encoding.
28+
/// </summary>
29+
/// <remarks>
30+
/// <c>Base64</c> encoding is available starting from Elasticsearch 9.3.0.
31+
/// </remarks>
32+
Base64
33+
}
34+
35+
public sealed class FloatVectorDataConverter :
36+
JsonConverter<ReadOnlyMemory<float>>
37+
{
38+
private FloatVectorDataEncoding? _encoding;
39+
40+
public override ReadOnlyMemory<float> Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
41+
{
42+
return reader.TokenType switch
43+
{
44+
JsonTokenType.StartArray => new(reader.ReadCollectionValue<float>(options, null)!.ToArray()),
45+
JsonTokenType.String => ReadBase64VectorData(ref reader),
46+
_ => throw reader.UnexpectedTokenException(JsonTokenType.StartArray, JsonTokenType.String)
47+
};
48+
}
49+
50+
public override void Write(Utf8JsonWriter writer, ReadOnlyMemory<float> value, JsonSerializerOptions options)
51+
{
52+
var encoding = _encoding;
53+
if (encoding is null)
54+
{
55+
var settings = ContextProvider<IElasticsearchClientSettings>.GetContext(options);
56+
_encoding = settings.FloatVectorDataEncoding;
57+
}
58+
59+
switch (_encoding)
60+
{
61+
case FloatVectorDataEncoding.Legacy:
62+
writer.WriteSpanValue(options, value.Span, null);
63+
break;
64+
65+
case FloatVectorDataEncoding.Base64:
66+
WriteBase64VectorData(writer, value);
67+
break;
68+
69+
default:
70+
throw new NotSupportedException();
71+
}
72+
}
73+
74+
private static ReadOnlyMemory<float> ReadBase64VectorData(ref Utf8JsonReader reader)
75+
{
76+
var bytes = reader.GetBytesFromBase64();
77+
78+
if ((bytes.Length & 3) != 0)
79+
{
80+
throw new ArgumentException("Decoded vector data length is not a multiple of 4 (not valid 32-bit floats).");
81+
}
82+
83+
var span = bytes.AsSpan();
84+
85+
if (BitConverter.IsLittleEndian)
86+
{
87+
// Host is little-endian. We must swap the byte order.
88+
89+
var intSourceDest = MemoryMarshal.Cast<byte, int>(span);
90+
91+
for (var i = 0; i < intSourceDest.Length; i++)
92+
{
93+
intSourceDest[i] = BinaryPrimitives.ReverseEndianness(intSourceDest[i]);
94+
}
95+
}
96+
97+
var result = new float[bytes.Length / 4];
98+
Buffer.BlockCopy(bytes, 0, result, 0, bytes.Length);
99+
100+
return new(result);
101+
}
102+
103+
private static void WriteBase64VectorData(Utf8JsonWriter writer, ReadOnlyMemory<float> value)
104+
{
105+
if (value.IsEmpty)
106+
{
107+
writer.WriteStringValue(string.Empty);
108+
return;
109+
}
110+
111+
// If the host is big-endian we can reinterpret the memory as bytes without copying.
112+
if (!BitConverter.IsLittleEndian)
113+
{
114+
writer.WriteBase64StringValue(MemoryMarshal.AsBytes(value.Span));
115+
}
116+
117+
// Host is little-endian. We must swap the byte order.
118+
119+
var pool = MemoryPool<byte>.Shared;
120+
var required = checked(value.Length * sizeof(float));
121+
var owner = pool.Rent(required);
122+
123+
try
124+
{
125+
var dest = owner.Memory.Span[..required];
126+
127+
var intSource = MemoryMarshal.Cast<float, int>(value.Span);
128+
var intDest = MemoryMarshal.Cast<byte, int>(dest);
129+
130+
for (var i = 0; i < intSource.Length; i++)
131+
{
132+
intDest[i] = BinaryPrimitives.ReverseEndianness(intSource[i]);
133+
}
134+
135+
writer.WriteBase64StringValue(dest);
136+
}
137+
finally
138+
{
139+
owner.Dispose();
140+
}
141+
}
142+
}
143+
144+
/// <summary>
145+
/// The encoding to use when serializing vector data using the <see cref="ByteVectorDataConverter"/> converter.
146+
/// </summary>
147+
public enum ByteVectorDataEncoding
148+
{
149+
/// <summary>
150+
/// Legacy (JSON array) vector encoding for backwards compatibility.
151+
/// </summary>
152+
Legacy,
153+
154+
/// <summary>
155+
/// Hexadecimal string vector encoding.
156+
/// </summary>
157+
/// <remarks>
158+
/// Hexadecimal encoding is available starting from Elasticsearch 8.14.0.
159+
/// </remarks>
160+
Hex,
161+
162+
/// <summary>
163+
/// <c>Base64</c> vector encoding.
164+
/// </summary>
165+
/// <remarks>
166+
/// <c>Base64</c> encoding is available starting from Elasticsearch 9.3.0.
167+
/// </remarks>
168+
Base64
169+
}
170+
171+
public sealed class ByteVectorDataConverter :
172+
JsonConverter<ReadOnlyMemory<byte>>
173+
{
174+
private ByteVectorDataEncoding? _encoding;
175+
176+
public override ReadOnlyMemory<byte> Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
177+
{
178+
return reader.TokenType switch
179+
{
180+
JsonTokenType.StartArray => new(reader.ReadCollectionValue(options, (ref r, _) => unchecked((byte)r.GetSByte()))!.ToArray()),
181+
JsonTokenType.String => ReadStringVectorData(ref reader),
182+
_ => throw reader.UnexpectedTokenException(JsonTokenType.StartArray, JsonTokenType.String)
183+
};
184+
}
185+
186+
public override void Write(Utf8JsonWriter writer, ReadOnlyMemory<byte> value, JsonSerializerOptions options)
187+
{
188+
if (_encoding is null)
189+
{
190+
var settings = ContextProvider<IElasticsearchClientSettings>.GetContext(options);
191+
_encoding = settings.ByteVectorDataEncoding;
192+
}
193+
194+
switch (_encoding)
195+
{
196+
case ByteVectorDataEncoding.Legacy:
197+
writer.WriteSpanValue(options, value.Span, (w, _, b) => w.WriteNumberValue(unchecked((sbyte)b)));
198+
break;
199+
200+
case ByteVectorDataEncoding.Hex:
201+
WriteHexVectorData(writer, value);
202+
break;
203+
204+
case ByteVectorDataEncoding.Base64:
205+
writer.WriteBase64StringValue(value.Span);
206+
break;
207+
208+
default:
209+
throw new NotSupportedException();
210+
}
211+
}
212+
213+
private static ReadOnlyMemory<byte> ReadStringVectorData(ref Utf8JsonReader reader)
214+
{
215+
if (reader.TryGetBytesFromBase64(out var result))
216+
{
217+
return result;
218+
}
219+
220+
return ReadHexVectorData(ref reader);
221+
}
222+
223+
private static ReadOnlyMemory<byte> ReadHexVectorData(ref Utf8JsonReader reader)
224+
{
225+
#if NET5_0_OR_GREATER
226+
var data = Convert.FromHexString(reader.GetString()!);
227+
#else
228+
var data = FromHex(reader.GetString()!);
229+
#endif
230+
231+
return new(data);
232+
}
233+
234+
private static void WriteHexVectorData(Utf8JsonWriter writer, ReadOnlyMemory<byte> value)
235+
{
236+
if (value.IsEmpty)
237+
{
238+
writer.WriteStringValue(string.Empty);
239+
return;
240+
}
241+
242+
// We don't use Convert.ToHexString even for .NET 5.0+ to be able to use pooled memory.
243+
244+
var pool = MemoryPool<char>.Shared;
245+
var required = checked(value.Length * 2);
246+
var owner = pool.Rent(required);
247+
248+
try
249+
{
250+
var source = value.Span;
251+
var dest = owner.Memory.Span[..required];
252+
253+
byte b;
254+
255+
for(int bx = 0, cx = 0; bx < source.Length; ++bx, ++cx)
256+
{
257+
b = ((byte)(source[bx] >> 4));
258+
dest[cx] = (char)(b > 9 ? b + 0x37 : b + 0x30);
259+
b = ((byte)(source[bx] & 0x0F));
260+
dest[++cx]=(char)(b > 9 ? b + 0x37 : b + 0x30);
261+
}
262+
263+
writer.WriteStringValue(dest);
264+
}
265+
finally
266+
{
267+
owner.Dispose();
268+
}
269+
}
270+
271+
#if !NET5_0_OR_GREATER
272+
public static byte[] FromHex(string data)
273+
{
274+
if (data.Length is 0)
275+
{
276+
return [];
277+
}
278+
279+
if (data.Length % 2 != 0)
280+
{
281+
throw new ArgumentException("Decoded vector data length is not a multiple of 2 (not valid 8-bit hex niblets).");
282+
}
283+
284+
var buffer = new byte[data.Length / 2];
285+
char c;
286+
287+
for (int bx = 0, sx = 0; bx < buffer.Length; ++bx, ++sx)
288+
{
289+
c = data[sx];
290+
buffer[bx] = (byte)((c > '9' ? (c > 'Z' ? (c - 'a' + 10) : (c - 'A' + 10)) : (c - '0')) << 4);
291+
c = data[++sx];
292+
buffer[bx] |= (byte)(c > '9' ? (c > 'Z' ? (c - 'a' + 10) : (c - 'A' + 10)) : (c - '0'));
293+
}
294+
295+
return buffer;
296+
}
297+
#endif
298+
}

0 commit comments

Comments
 (0)