Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions src/Formats/KafkaSchemaRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,81 @@ String KafkaSchemaRegistry::fetchSchema(UInt32 id) const
}
}

KafkaSchemaRegistry::SchemaWithReferences KafkaSchemaRegistry::fetchSchemaWithReferences(UInt32 id) const
{
try
{
try
{
Poco::URI url(base_url, std::format("schemas/ids/{}", id));
LOG_TRACE(logger, "Fetching schema with references id = {}", id);

auto timeouts = ConnectionTimeouts()
.withConnectionTimeout(5)
.withSendTimeout(5)
.withReceiveTimeout(5);

Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(url.getHost());

if (!credentials.empty())
credentials.authenticate(request);

auto session = makePooledHTTPSession(url, private_key_file, certificate_file, ca_location, Verification_mode, timeouts, 1);
std::istream * response_body{};
try
{
session->sendRequest(request);

Poco::Net::HTTPResponse response;
response_body = receiveResponse(*session, request, response, false);
}
catch (const Poco::Exception & e)
{
session->attachSessionData(e.message());
throw;
}
Poco::JSON::Parser parser;
auto json_body = parser.parse(*response_body).extract<Poco::JSON::Object::Ptr>();
auto schema = json_body->getValue<std::string>("schema");

std::vector<SchemaReference> references;
if (json_body->has("references"))
{
auto refs_array = json_body->getArray("references");
if (refs_array)
{
for (UInt32 i = 0; i < refs_array->size(); ++i)
{
auto ref_obj = refs_array->getObject(i);
SchemaReference ref;
ref.name = ref_obj->getValue<std::string>("name");
ref.subject = ref_obj->getValue<std::string>("subject");
ref.version = ref_obj->getValue<int32_t>("version");
references.push_back(std::move(ref));
}
}
}

LOG_TRACE(logger, "Successfully fetched schema id = {} with {} references\n{}", id, references.size(), schema);
return {std::move(schema), std::move(references)};
}
catch (const Exception &)
{
throw;
}
catch (const Poco::Exception & e)
{
throw Exception(Exception::CreateFromPocoTag{}, e);
}
}
catch (Exception & e)
{
e.addMessage(std::format("while fetching schema with references for id {}", id));
throw;
}
}

std::pair<UInt32, String> KafkaSchemaRegistry::fetchLatestSchemaForSubject(const String & subject) const
{
auto subject_name = subject + "-value";
Expand Down Expand Up @@ -161,6 +236,83 @@ std::pair<UInt32, String> KafkaSchemaRegistry::fetchLatestSchemaForSubject(const
}
}

std::pair<UInt32, KafkaSchemaRegistry::SchemaWithReferences>
KafkaSchemaRegistry::fetchSchemaBySubjectVersion(const String & subject, Int32 version) const
{
try
{
try
{
Poco::URI url(base_url, std::format("subjects/{}/versions/{}", subject, version));
LOG_TRACE(logger, "Fetching subject = {} version = {}", subject, version);

auto timeouts = ConnectionTimeouts()
.withConnectionTimeout(5)
.withSendTimeout(5)
.withReceiveTimeout(5);

Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(url.getHost());

if (!credentials.empty())
credentials.authenticate(request);

auto session = makePooledHTTPSession(url, private_key_file, certificate_file, ca_location, Verification_mode, timeouts, 1);
std::istream * response_body{};
try
{
session->sendRequest(request);

Poco::Net::HTTPResponse response;
response_body = receiveResponse(*session, request, response, false);
}
catch (const Poco::Exception & e)
{
session->attachSessionData(e.message());
throw;
}
Poco::JSON::Parser parser;
auto json_body = parser.parse(*response_body).extract<Poco::JSON::Object::Ptr>();
auto schema_id = json_body->getValue<uint32_t>("id");
auto schema = json_body->getValue<std::string>("schema");

std::vector<SchemaReference> references;
if (json_body->has("references"))
{
auto refs_array = json_body->getArray("references");
if (refs_array)
{
for (UInt32 i = 0; i < refs_array->size(); ++i)
{
auto ref_obj = refs_array->getObject(i);
SchemaReference ref;
ref.name = ref_obj->getValue<std::string>("name");
ref.subject = ref_obj->getValue<std::string>("subject");
ref.version = ref_obj->getValue<int32_t>("version");
references.push_back(std::move(ref));
}
}
}

LOG_TRACE(logger, "Successfully fetched schema from subject = {} version = {} id = {}\n{}", subject, version, schema_id, schema);
return {schema_id, SchemaWithReferences{std::move(schema), std::move(references)}};
}
catch (const Exception &)
{
throw;
}
catch (const Poco::Exception & e)
{
throw Exception(Exception::CreateFromPocoTag{}, e);
}
}
catch (Exception & e)
{
e.addMessage(std::format("while fetching schema for subject {} version {}", subject, version));
throw;
}
}

UInt32 KafkaSchemaRegistry::fetchLatestSubjectVersion(const String & subject_name) const
{
try
Expand Down
19 changes: 19 additions & 0 deletions src/Formats/KafkaSchemaRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,30 @@
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/URI.h>

#include <vector>

namespace DB
{

/// A helper class helps working with Kafka schema registry.
class KafkaSchemaRegistry final
{
public:
/// A single reference entry from the schema registry response.
struct SchemaReference
{
String name; /// The import path, e.g. "google/protobuf/timestamp.proto"
String subject; /// The subject name in the registry
Int32 version; /// The version of the referenced schema
};

/// Schema text together with its references.
struct SchemaWithReferences
{
String schema;
std::vector<SchemaReference> references;
};

static UInt32 readSchemaId(ReadBuffer & in);
static void writeSchemaId(WriteBuffer & out, UInt32 schema_id);

Expand Down Expand Up @@ -62,7 +79,9 @@ class KafkaSchemaRegistry final
bool skip_cert_check);

String fetchSchema(UInt32 id) const;
SchemaWithReferences fetchSchemaWithReferences(UInt32 id) const;
std::pair<UInt32, String> fetchLatestSchemaForSubject(const String & subject) const;
std::pair<UInt32, SchemaWithReferences> fetchSchemaBySubjectVersion(const String & subject, Int32 version) const;

private:
UInt32 fetchLatestSubjectVersion(const String & subject_name) const;
Expand Down
122 changes: 103 additions & 19 deletions src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# include <Formats/KafkaSchemaRegistry.h>
# include <IO/VarInt.h>
# include <span>
# include <unordered_set>

# include <google/protobuf/compiler/parser.h>
# include <google/protobuf/descriptor.pb.h>
Expand Down Expand Up @@ -251,35 +252,118 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache : public google::
private:
const google::protobuf::FileDescriptor * getSchema(uint32_t id)
{
const auto * loaded_descriptor = descriptor_pool.FindFileByName(std::to_string(id));
if (loaded_descriptor != nullptr)
return loaded_descriptor;

{
std::lock_guard lock(mutex);
const auto * loaded_descriptor = descriptor_pool.FindFileByName(std::to_string(id));
if (loaded_descriptor != nullptr)
return loaded_descriptor;
}
return fetchSchema(id);
}

const google::protobuf::FileDescriptor * fetchSchema(uint32_t id)
{
std::lock_guard lock(mutex);
/// Just in case we got beaten
const auto * loaded_descriptor = descriptor_pool.FindFileByName(std::to_string(id));
if (loaded_descriptor != nullptr)
return loaded_descriptor;

auto schema = registry.fetchSchema(id);
google::protobuf::io::ArrayInputStream input{schema.data(), static_cast<int>(schema.size())};
google::protobuf::io::Tokenizer tokenizer(&input, this);
return fetchSchemaById(id, std::to_string(id));
}

/// Fetch schema by id and register it in the pool under `file_name`.
/// `file_name` must match the import path so that the pool can resolve import statements.
const google::protobuf::FileDescriptor * fetchSchemaById(uint32_t id, const String & file_name)
{
/// Fast path: check cache without network call
{
std::lock_guard lock(mutex);
const auto * loaded = descriptor_pool.FindFileByName(file_name);
if (loaded != nullptr)
return loaded;
loaded = descriptor_pool.FindFileByName(std::to_string(id));
if (loaded != nullptr)
return loaded;
}

auto schema_with_refs = registry.fetchSchemaWithReferences(id);

/// Resolve all referenced schemas into the pool before building this one
std::unordered_set<uint32_t> resolved_ids;
resolveReferences(schema_with_refs.references, resolved_ids);

google::protobuf::FileDescriptorProto file_descriptor;
file_descriptor.set_name(std::to_string(id));
google::protobuf::compiler::Parser parser;
parser.RecordErrorsTo(this);
parser.Parse(&tokenizer, &file_descriptor);
file_descriptor.set_name(file_name);
{
google::protobuf::io::ArrayInputStream input{
schema_with_refs.schema.data(), static_cast<int>(schema_with_refs.schema.size())};
google::protobuf::io::Tokenizer tokenizer(&input, this);
google::protobuf::compiler::Parser parser;
parser.RecordErrorsTo(this);
parser.Parse(&tokenizer, &file_descriptor);
}

auto const * descriptor = descriptor_pool.BuildFile(file_descriptor);
std::lock_guard lock(mutex);
/// Re-check under lock: another thread may have built this schema while we were parsing.
/// DescriptorPool::BuildFile returns nullptr for duplicate file names.
if (const auto * existing = descriptor_pool.FindFileByName(file_name))
return existing;
if (const auto * existing = descriptor_pool.FindFileByName(std::to_string(id)))
return existing;
const auto * descriptor = descriptor_pool.BuildFile(file_descriptor);
if ((descriptor != nullptr) && descriptor->message_type_count() > 0)
return descriptor;

throw Exception(ErrorCodes::INVALID_DATA, "No message type in schema");
throw Exception(ErrorCodes::INVALID_DATA, "No message type in schema id={}", id);
}

/// Recursively fetch and build all referenced schemas into the pool.
/// `ref.name` is used as the file name so that import statements resolve correctly.
void resolveReferences(
const std::vector<KafkaSchemaRegistry::SchemaReference> & references,
std::unordered_set<uint32_t> & resolved_ids)
{
for (const auto & ref : references)
{
/// Skip if already loaded by this name
{
std::lock_guard lock(mutex);
if (descriptor_pool.FindFileByName(ref.name) != nullptr)
continue;
}

/// Try Google's well-known types from the generated pool first
const auto * generated_file
= google::protobuf::DescriptorPool::generated_pool()->FindFileByName(ref.name);
if (generated_file != nullptr)
{
google::protobuf::FileDescriptorProto generated_proto;
generated_file->CopyTo(&generated_proto);
std::lock_guard lock(mutex);
descriptor_pool.BuildFile(generated_proto);
continue;
}

/// Fetch the referenced schema at its pinned version
auto [ref_schema_id, ref_schema_with_refs] = registry.fetchSchemaBySubjectVersion(ref.subject, ref.version);

/// Guard against circular references
if (resolved_ids.contains(ref_schema_id))
continue;
resolved_ids.insert(ref_schema_id);

/// Recursively resolve nested references before building this one
resolveReferences(ref_schema_with_refs.references, resolved_ids);

google::protobuf::FileDescriptorProto ref_file_descriptor;
ref_file_descriptor.set_name(ref.name);
{
google::protobuf::io::ArrayInputStream ref_input{
ref_schema_with_refs.schema.data(), static_cast<int>(ref_schema_with_refs.schema.size())};
google::protobuf::io::Tokenizer ref_tokenizer(&ref_input, this);
google::protobuf::compiler::Parser ref_parser;
ref_parser.RecordErrorsTo(this);
ref_parser.Parse(&ref_tokenizer, &ref_file_descriptor);
}

std::lock_guard lock(mutex);
descriptor_pool.BuildFile(ref_file_descriptor);
}
}

std::mutex mutex;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1 event-a (1735100000,123456789)
2 event-b (1735100001,0)
3 event-c (1735100002,999999999)
Loading