Skip to content

Commit 0d80967

Browse files
committed
Add custom vector data converters for high performance ingest scenarios
1 parent 1e8ef0b commit 0d80967

File tree

4 files changed

+352
-4
lines changed

4 files changed

+352
-4
lines changed

src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/ElasticsearchClientSettings.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ public abstract class ElasticsearchClientSettingsBase<TConnectionSettings> :
115115
private readonly Serializer _sourceSerializer;
116116
private BeforeRequestEvent? _onBeforeRequest;
117117
private bool _experimentalEnableSerializeNullInferredValues;
118+
private FloatVectorDataEncoding _floatVectorDataEncoding = Elasticsearch.FloatVectorDataEncoding.Base64;
119+
private ByteVectorDataEncoding _byteVectorDataEncoding = Elasticsearch.ByteVectorDataEncoding.Base64;
118120
private ExperimentalSettings _experimentalSettings = new();
119121

120122
private bool _defaultDisableAllInference;
@@ -165,6 +167,8 @@ protected ElasticsearchClientSettingsBase(
165167
FluentDictionary<Type, string> IElasticsearchClientSettings.RouteProperties => _routeProperties;
166168
Serializer IElasticsearchClientSettings.SourceSerializer => _sourceSerializer;
167169
BeforeRequestEvent? IElasticsearchClientSettings.OnBeforeRequest => _onBeforeRequest;
170+
FloatVectorDataEncoding IElasticsearchClientSettings.FloatVectorDataEncoding => _floatVectorDataEncoding;
171+
ByteVectorDataEncoding IElasticsearchClientSettings.ByteVectorDataEncoding => _byteVectorDataEncoding;
168172
ExperimentalSettings IElasticsearchClientSettings.Experimental => _experimentalSettings;
169173

170174
bool IElasticsearchClientSettings.ExperimentalEnableSerializeNullInferredValues => _experimentalEnableSerializeNullInferredValues;
@@ -198,6 +202,18 @@ public TConnectionSettings DefaultFieldNameInferrer(Func<string, string> fieldNa
198202
public TConnectionSettings ExperimentalEnableSerializeNullInferredValues(bool enabled = true) =>
199203
Assign(enabled, (a, v) => a._experimentalEnableSerializeNullInferredValues = v);
200204

205+
/// <inheritdoc cref="IElasticsearchClientSettings.FloatVectorDataEncoding"/>
206+
/// <param name="encoding">The default vector data encoding to use.</param>
207+
/// <returns>This settings instance for chaining.</returns>
208+
public TConnectionSettings FloatVectorDataEncoding(FloatVectorDataEncoding encoding) =>
209+
Assign(encoding, (a, v) => a._floatVectorDataEncoding = v);
210+
211+
/// <inheritdoc cref="IElasticsearchClientSettings.ByteVectorDataEncoding"/>
212+
/// <param name="encoding">The default vector data encoding to use.</param>
213+
/// <returns>This settings instance for chaining.</returns>
214+
public TConnectionSettings ByteVectorDataEncoding(ByteVectorDataEncoding encoding) =>
215+
Assign(encoding, (a, v) => a._byteVectorDataEncoding = v);
216+
201217
public TConnectionSettings Experimental(ExperimentalSettings settings) =>
202218
Assign(settings, (a, v) => a._experimentalSettings = v);
203219

src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/IElasticsearchClientSettings.cs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,37 @@ public interface IElasticsearchClientSettings : ITransportConfiguration
116116
BeforeRequestEvent? OnBeforeRequest { get; }
117117

118118
/// <summary>
119-
/// This is an advanced setting which controls serialization behaviour for inferred properies such as ID, routing and index name.
120-
/// <para>When enabled, it may reduce allocations on serialisation paths where the cost can be more significant, such as in bulk operations.</para>
119+
/// This is an advanced setting which controls serialization behaviour for inferred properties such as ID, routing and index name.
120+
/// <para>When enabled, it may reduce allocations on serialization paths where the cost can be more significant, such as in bulk operations.</para>
121121
/// <para>As a by-product it may cause null values to be included in the serialized data and impact payload size. This will only be a concern should some
122-
/// typed not have inferrence mappings defined for the required properties.</para>
122+
/// typed not have inference mappings defined for the required properties.</para>
123123
/// </summary>
124-
/// <remarks>This is marked as experiemental and may be removed or renamed in the future once its impact is evaluated.</remarks>
124+
/// <remarks>This is marked as experimental and may be removed or renamed in the future once its impact is evaluated.</remarks>
125125
bool ExperimentalEnableSerializeNullInferredValues { get; }
126126

127+
/// <summary>
128+
/// Controls the vector data encoding to use for <see cref="ReadOnlyMemory{T}"/> properties
129+
/// in documents during ingestion when the <see cref="FloatVectorDataConverter"/> is used.
130+
/// </summary>
131+
/// <remarks>
132+
/// Setting this value to <see cref="FloatVectorDataEncoding.Legacy"/> provides backwards
133+
/// compatibility when talking to Elasticsearch servers with a version older than 9.3.0
134+
/// (required for <see cref="ByteVectorDataEncoding.Base64"/>).
135+
/// </remarks>
136+
FloatVectorDataEncoding FloatVectorDataEncoding { get; }
137+
138+
/// <summary>
139+
/// Controls the vector data encoding to use for <see cref="ReadOnlyMemory{T}"/> properties
140+
/// in documents during ingestion when the <see cref="ByteVectorDataConverter"/> is used.
141+
/// </summary>
142+
/// <remarks>
143+
/// Setting this value to <see cref="ByteVectorDataEncoding.Legacy"/> provides backwards
144+
/// compatibility when talking to Elasticsearch servers with a version older than 8.14.0
145+
/// (required for <see cref="ByteVectorDataEncoding.Hex"/>) or older than 9.3.0 (required
146+
/// for <see cref="ByteVectorDataEncoding.Base64"/>).
147+
/// </remarks>
148+
ByteVectorDataEncoding ByteVectorDataEncoding { get; }
149+
127150
/// <summary>
128151
/// Experimental settings.
129152
/// </summary>

src/Elastic.Clients.Elasticsearch/_Shared/Next/JsonWriterExtensions.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,21 @@ public static void WriteUnionValue<T1, T2>(this Utf8JsonWriter writer, JsonSeria
250250
);
251251
}
252252

253+
public static void WriteSpanValue<T>(this Utf8JsonWriter writer, JsonSerializerOptions options, ReadOnlySpan<T> span,
254+
JsonWriteFunc<T>? writeElement)
255+
{
256+
writeElement ??= static (w, o, v) => WriteValue(w, o, v);
257+
258+
writer.WriteStartArray();
259+
260+
foreach (var element in span)
261+
{
262+
writeElement(writer, options, element);
263+
}
264+
265+
writer.WriteEndArray();
266+
}
267+
253268
#endregion Delegate Based Write Methods
254269

255270
#region Specialized Write Methods
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Buffers.Binary;
4+
using System.Runtime.InteropServices;
5+
using System.Text.Json;
6+
using System.Text.Json.Serialization;
7+
8+
using Elastic.Clients.Elasticsearch.Serialization;
9+
10+
namespace Elastic.Clients.Elasticsearch;
11+
12+
/// <summary>
13+
/// The encoding to use when serializing vector data using the <see cref="FloatVectorDataConverter"/> converter.
14+
/// </summary>
15+
public enum FloatVectorDataEncoding
16+
{
17+
/// <summary>
18+
/// Legacy (JSON array) vector encoding for backwards compatibility.
19+
/// </summary>
20+
Legacy,
21+
22+
/// <summary>
23+
/// <c>Base64</c> vector encoding.
24+
/// </summary>
25+
/// <remarks>
26+
/// <c>Base64</c> encoding is available starting from Elasticsearch 9.3.0.
27+
/// </remarks>
28+
Base64
29+
}
30+
31+
public sealed class FloatVectorDataConverter :
32+
JsonConverter<ReadOnlyMemory<float>>
33+
{
34+
private FloatVectorDataEncoding? _encoding;
35+
36+
public override ReadOnlyMemory<float> Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
37+
{
38+
return reader.TokenType switch
39+
{
40+
JsonTokenType.StartArray => new(reader.ReadCollectionValue<float>(options, null)!.ToArray()),
41+
JsonTokenType.String => ReadBase64VectorData(ref reader),
42+
_ => throw reader.UnexpectedTokenException(JsonTokenType.StartArray, JsonTokenType.String)
43+
};
44+
}
45+
46+
public override void Write(Utf8JsonWriter writer, ReadOnlyMemory<float> value, JsonSerializerOptions options)
47+
{
48+
var encoding = _encoding;
49+
if (encoding is null)
50+
{
51+
var settings = ContextProvider<IElasticsearchClientSettings>.GetContext(options);
52+
_encoding = settings.FloatVectorDataEncoding;
53+
}
54+
55+
switch (_encoding)
56+
{
57+
case FloatVectorDataEncoding.Legacy:
58+
writer.WriteSpanValue(options, value.Span, null);
59+
break;
60+
61+
case FloatVectorDataEncoding.Base64:
62+
WriteBase64VectorData(writer, value);
63+
break;
64+
65+
default:
66+
throw new NotSupportedException();
67+
}
68+
}
69+
70+
private static ReadOnlyMemory<float> ReadBase64VectorData(ref Utf8JsonReader reader)
71+
{
72+
var bytes = reader.GetBytesFromBase64();
73+
74+
if ((bytes.Length & 3) != 0)
75+
{
76+
throw new ArgumentException("Decoded vector data length is not a multiple of 4 (not valid 32-bit floats).");
77+
}
78+
79+
var span = bytes.AsSpan();
80+
81+
if (BitConverter.IsLittleEndian)
82+
{
83+
// Host is little-endian. We must swap the byte order.
84+
85+
var intSourceDest = MemoryMarshal.Cast<byte, int>(span);
86+
87+
for (var i = 0; i < intSourceDest.Length; i++)
88+
{
89+
intSourceDest[i] = BinaryPrimitives.ReverseEndianness(intSourceDest[i]);
90+
}
91+
}
92+
93+
var result = new float[bytes.Length / 4];
94+
Buffer.BlockCopy(bytes, 0, result, 0, bytes.Length);
95+
96+
return new(result);
97+
}
98+
99+
private static void WriteBase64VectorData(Utf8JsonWriter writer, ReadOnlyMemory<float> value)
100+
{
101+
if (value.IsEmpty)
102+
{
103+
writer.WriteStringValue(string.Empty);
104+
return;
105+
}
106+
107+
// If the host is big-endian we can reinterpret the memory as bytes without copying.
108+
if (!BitConverter.IsLittleEndian)
109+
{
110+
writer.WriteBase64StringValue(MemoryMarshal.AsBytes(value.Span));
111+
}
112+
113+
// Host is little-endian. We must swap the byte order.
114+
115+
var pool = MemoryPool<byte>.Shared;
116+
var required = checked(value.Length * sizeof(float));
117+
var owner = pool.Rent(required);
118+
119+
try
120+
{
121+
var dest = owner.Memory.Span[..required];
122+
123+
var intSource = MemoryMarshal.Cast<float, int>(value.Span);
124+
var intDest = MemoryMarshal.Cast<byte, int>(dest);
125+
126+
for (var i = 0; i < intSource.Length; i++)
127+
{
128+
intDest[i] = BinaryPrimitives.ReverseEndianness(intSource[i]);
129+
}
130+
131+
writer.WriteBase64StringValue(dest);
132+
}
133+
finally
134+
{
135+
owner.Dispose();
136+
}
137+
}
138+
}
139+
140+
/// <summary>
141+
/// The encoding to use when serializing vector data using the <see cref="ByteVectorDataConverter"/> converter.
142+
/// </summary>
143+
public enum ByteVectorDataEncoding
144+
{
145+
/// <summary>
146+
/// Legacy (JSON array) vector encoding for backwards compatibility.
147+
/// </summary>
148+
Legacy,
149+
150+
/// <summary>
151+
/// Hexadecimal string vector encoding.
152+
/// </summary>
153+
/// <remarks>
154+
/// Hexadecimal encoding is available starting from Elasticsearch 8.14.0.
155+
/// </remarks>
156+
Hex,
157+
158+
/// <summary>
159+
/// <c>Base64</c> vector encoding.
160+
/// </summary>
161+
/// <remarks>
162+
/// <c>Base64</c> encoding is available starting from Elasticsearch 9.3.0.
163+
/// </remarks>
164+
Base64
165+
}
166+
167+
public sealed class ByteVectorDataConverter :
168+
JsonConverter<ReadOnlyMemory<byte>>
169+
{
170+
private ByteVectorDataEncoding? _encoding;
171+
172+
public override ReadOnlyMemory<byte> Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
173+
{
174+
return reader.TokenType switch
175+
{
176+
JsonTokenType.StartArray => new(reader.ReadCollectionValue(options, (ref r, _) => unchecked((byte)r.GetSByte()))!.ToArray()),
177+
JsonTokenType.String => ReadStringVectorData(ref reader),
178+
_ => throw reader.UnexpectedTokenException(JsonTokenType.StartArray, JsonTokenType.String)
179+
};
180+
}
181+
182+
public override void Write(Utf8JsonWriter writer, ReadOnlyMemory<byte> value, JsonSerializerOptions options)
183+
{
184+
if (_encoding is null)
185+
{
186+
var settings = ContextProvider<IElasticsearchClientSettings>.GetContext(options);
187+
_encoding = settings.ByteVectorDataEncoding;
188+
}
189+
190+
switch (_encoding)
191+
{
192+
case ByteVectorDataEncoding.Legacy:
193+
writer.WriteSpanValue(options, value.Span, (w, _, b) => w.WriteNumberValue(unchecked((sbyte)b)));
194+
break;
195+
196+
case ByteVectorDataEncoding.Hex:
197+
WriteHexVectorData(writer, value);
198+
break;
199+
200+
case ByteVectorDataEncoding.Base64:
201+
writer.WriteBase64StringValue(value.Span);
202+
break;
203+
204+
default:
205+
throw new NotSupportedException();
206+
}
207+
}
208+
209+
private static ReadOnlyMemory<byte> ReadStringVectorData(ref Utf8JsonReader reader)
210+
{
211+
if (reader.TryGetBytesFromBase64(out var result))
212+
{
213+
return result;
214+
}
215+
216+
return ReadHexVectorData(ref reader);
217+
}
218+
219+
private static ReadOnlyMemory<byte> ReadHexVectorData(ref Utf8JsonReader reader)
220+
{
221+
#if NET5_0_OR_GREATER
222+
var data = Convert.FromHexString(reader.GetString()!);
223+
#else
224+
var data = FromHex(reader.GetString()!);
225+
#endif
226+
227+
return new(data);
228+
}
229+
230+
private static void WriteHexVectorData(Utf8JsonWriter writer, ReadOnlyMemory<byte> value)
231+
{
232+
if (value.IsEmpty)
233+
{
234+
writer.WriteStringValue(string.Empty);
235+
return;
236+
}
237+
238+
// We don't use Convert.ToHexString even for .NET 5.0+ to be able to use pooled memory.
239+
240+
var pool = MemoryPool<char>.Shared;
241+
var required = checked(value.Length * 2);
242+
var owner = pool.Rent(required);
243+
244+
try
245+
{
246+
var source = value.Span;
247+
var dest = owner.Memory.Span[..required];
248+
249+
byte b;
250+
251+
for(int bx = 0, cx = 0; bx < source.Length; ++bx, ++cx)
252+
{
253+
b = ((byte)(source[bx] >> 4));
254+
dest[cx] = (char)(b > 9 ? b + 0x37 : b + 0x30);
255+
b = ((byte)(source[bx] & 0x0F));
256+
dest[++cx]=(char)(b > 9 ? b + 0x37 : b + 0x30);
257+
}
258+
259+
writer.WriteStringValue(dest);
260+
}
261+
finally
262+
{
263+
owner.Dispose();
264+
}
265+
}
266+
267+
#if !NET5_0_OR_GREATER
268+
public static byte[] FromHex(string data)
269+
{
270+
if (data.Length is 0)
271+
{
272+
return [];
273+
}
274+
275+
if (data.Length % 2 != 0)
276+
{
277+
throw new ArgumentException("Decoded vector data length is not a multiple of 2 (not valid 8-bit hex niblets).");
278+
}
279+
280+
var buffer = new byte[data.Length / 2];
281+
char c;
282+
283+
for (int bx = 0, sx = 0; bx < buffer.Length; ++bx, ++sx)
284+
{
285+
c = data[sx];
286+
buffer[bx] = (byte)((c > '9' ? (c > 'Z' ? (c - 'a' + 10) : (c - 'A' + 10)) : (c - '0')) << 4);
287+
c = data[++sx];
288+
buffer[bx] |= (byte)(c > '9' ? (c > 'Z' ? (c - 'a' + 10) : (c - 'A' + 10)) : (c - '0'));
289+
}
290+
291+
return buffer;
292+
}
293+
#endif
294+
}

0 commit comments

Comments
 (0)