From 62dfc03af0e05b39351886393967f96e4b5c5906 Mon Sep 17 00:00:00 2001 From: SBALAVIGNESH123 Date: Sat, 14 Mar 2026 18:21:17 +0530 Subject: [PATCH 1/2] fix: resolve protobuf import statements via kafka_schema_registry_url (#1076) --- src/Formats/KafkaSchemaRegistry.cpp | 152 ++++++++++++++++++ src/Formats/KafkaSchemaRegistry.h | 19 +++ .../Formats/Impl/ProtobufRowInputFormat.cpp | 125 +++++++++++--- 3 files changed, 277 insertions(+), 19 deletions(-) diff --git a/src/Formats/KafkaSchemaRegistry.cpp b/src/Formats/KafkaSchemaRegistry.cpp index cc32fb37029..af227a552f3 100644 --- a/src/Formats/KafkaSchemaRegistry.cpp +++ b/src/Formats/KafkaSchemaRegistry.cpp @@ -100,6 +100,81 @@ String KafkaSchemaRegistry::fetchSchema(UInt32 id) const } } +KafkaSchemaRegistry::SchemaWithReferences KafkaSchemaRegistry::fetchSchemaWithReferences(UInt32 id) const +{ + try + { + try + { + Poco::URI url(base_url, std::format("schemas/ids/{}", id)); + LOG_TRACE(logger, "Fetching schema with references id = {}", id); + + auto timeouts = ConnectionTimeouts() + .withConnectionTimeout(5) + .withSendTimeout(5) + .withReceiveTimeout(5); + + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); + request.setHost(url.getHost()); + + if (!credentials.empty()) + credentials.authenticate(request); + + auto session = makePooledHTTPSession(url, private_key_file, certificate_file, ca_location, Verification_mode, timeouts, 1); + std::istream * response_body{}; + try + { + session->sendRequest(request); + + Poco::Net::HTTPResponse response; + response_body = receiveResponse(*session, request, response, false); + } + catch (const Poco::Exception & e) + { + session->attachSessionData(e.message()); + throw; + } + Poco::JSON::Parser parser; + auto json_body = parser.parse(*response_body).extract(); + auto schema = json_body->getValue("schema"); + + std::vector references; + if (json_body->has("references")) + { + auto refs_array = json_body->getArray("references"); + if (refs_array) + { + for (UInt32 i = 0; i < refs_array->size(); ++i) + { + auto ref_obj = refs_array->getObject(i); + SchemaReference ref; + ref.name = ref_obj->getValue("name"); + ref.subject = ref_obj->getValue("subject"); + ref.version = ref_obj->getValue("version"); + references.push_back(std::move(ref)); + } + } + } + + LOG_TRACE(logger, "Successfully fetched schema id = {} with {} references\n{}", id, references.size(), schema); + return {std::move(schema), std::move(references)}; + } + catch (const Exception &) + { + throw; + } + catch (const Poco::Exception & e) + { + throw Exception(Exception::CreateFromPocoTag{}, e); + } + } + catch (Exception & e) + { + e.addMessage(std::format("while fetching schema with references for id {}", id)); + throw; + } +} + std::pair KafkaSchemaRegistry::fetchLatestSchemaForSubject(const String & subject) const { auto subject_name = subject + "-value"; @@ -161,6 +236,83 @@ std::pair KafkaSchemaRegistry::fetchLatestSchemaForSubject(const } } +std::pair +KafkaSchemaRegistry::fetchSchemaBySubjectVersion(const String & subject, Int32 version) const +{ + try + { + try + { + Poco::URI url(base_url, std::format("subjects/{}/versions/{}", subject, version)); + LOG_TRACE(logger, "Fetching subject = {} version = {}", subject, version); + + auto timeouts = ConnectionTimeouts() + .withConnectionTimeout(5) + .withSendTimeout(5) + .withReceiveTimeout(5); + + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); + request.setHost(url.getHost()); + + if (!credentials.empty()) + credentials.authenticate(request); + + auto session = makePooledHTTPSession(url, private_key_file, certificate_file, ca_location, Verification_mode, timeouts, 1); + std::istream * response_body{}; + try + { + session->sendRequest(request); + + Poco::Net::HTTPResponse response; + response_body = receiveResponse(*session, request, response, false); + } + catch (const Poco::Exception & e) + { + session->attachSessionData(e.message()); + throw; + } + Poco::JSON::Parser parser; + auto json_body = parser.parse(*response_body).extract(); + auto schema_id = json_body->getValue("id"); + auto schema = json_body->getValue("schema"); + + std::vector references; + if (json_body->has("references")) + { + auto refs_array = json_body->getArray("references"); + if (refs_array) + { + for (UInt32 i = 0; i < refs_array->size(); ++i) + { + auto ref_obj = refs_array->getObject(i); + SchemaReference ref; + ref.name = ref_obj->getValue("name"); + ref.subject = ref_obj->getValue("subject"); + ref.version = ref_obj->getValue("version"); + references.push_back(std::move(ref)); + } + } + } + + LOG_TRACE(logger, "Successfully fetched schema from subject = {} version = {} id = {}\n{}", subject, version, schema_id, schema); + return {schema_id, SchemaWithReferences{std::move(schema), std::move(references)}}; + } + catch (const Exception &) + { + throw; + } + catch (const Poco::Exception & e) + { + throw Exception(Exception::CreateFromPocoTag{}, e); + } + } + catch (Exception & e) + { + e.addMessage(std::format("while fetching schema for subject {} version {}", subject, version)); + throw; + } +} + UInt32 KafkaSchemaRegistry::fetchLatestSubjectVersion(const String & subject_name) const { try diff --git a/src/Formats/KafkaSchemaRegistry.h b/src/Formats/KafkaSchemaRegistry.h index 1275c09a015..5404336e0f1 100644 --- a/src/Formats/KafkaSchemaRegistry.h +++ b/src/Formats/KafkaSchemaRegistry.h @@ -9,6 +9,8 @@ #include #include +#include + namespace DB { @@ -16,6 +18,21 @@ namespace DB class KafkaSchemaRegistry final { public: + /// A single reference entry from the schema registry response. + struct SchemaReference + { + String name; /// The import path, e.g. "google/protobuf/timestamp.proto" + String subject; /// The subject name in the registry + Int32 version; /// The version of the referenced schema + }; + + /// Schema text together with its references. + struct SchemaWithReferences + { + String schema; + std::vector references; + }; + static UInt32 readSchemaId(ReadBuffer & in); static void writeSchemaId(WriteBuffer & out, UInt32 schema_id); @@ -62,7 +79,9 @@ class KafkaSchemaRegistry final bool skip_cert_check); String fetchSchema(UInt32 id) const; + SchemaWithReferences fetchSchemaWithReferences(UInt32 id) const; std::pair fetchLatestSchemaForSubject(const String & subject) const; + std::pair fetchSchemaBySubjectVersion(const String & subject, Int32 version) const; private: UInt32 fetchLatestSubjectVersion(const String & subject_name) const; diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index dfc1472fea0..38bcc6466c6 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -13,6 +13,7 @@ # include # include # include +# include # include # include @@ -251,35 +252,121 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache : public google:: private: const google::protobuf::FileDescriptor * getSchema(uint32_t id) { - const auto * loaded_descriptor = descriptor_pool.FindFileByName(std::to_string(id)); - if (loaded_descriptor != nullptr) - return loaded_descriptor; - + { + std::lock_guard lock(mutex); + const auto * loaded_descriptor = descriptor_pool.FindFileByName(std::to_string(id)); + if (loaded_descriptor != nullptr) + return loaded_descriptor; + } return fetchSchema(id); } const google::protobuf::FileDescriptor * fetchSchema(uint32_t id) { - std::lock_guard lock(mutex); - /// Just in case we got beaten - const auto * loaded_descriptor = descriptor_pool.FindFileByName(std::to_string(id)); - if (loaded_descriptor != nullptr) - return loaded_descriptor; - - auto schema = registry.fetchSchema(id); - google::protobuf::io::ArrayInputStream input{schema.data(), static_cast(schema.size())}; - google::protobuf::io::Tokenizer tokenizer(&input, this); + return fetchSchemaById(id, std::to_string(id)); + } + + /// Fetch schema by id and register it in the pool under `file_name`. + /// `file_name` must match the import path so that the pool can resolve import statements. + const google::protobuf::FileDescriptor * fetchSchemaById(uint32_t id, const String & file_name) + { + /// Fast path: check cache without network call + { + std::lock_guard lock(mutex); + const auto * loaded = descriptor_pool.FindFileByName(file_name); + if (loaded != nullptr) + return loaded; + loaded = descriptor_pool.FindFileByName(std::to_string(id)); + if (loaded != nullptr) + return loaded; + } + + /// Fetch schema and its references from registry + auto schema_with_refs = registry.fetchSchemaWithReferences(id); + + /// Resolve all referenced schemas into the pool before building this one + std::unordered_set resolved_ids; + resolveReferences(schema_with_refs.references, resolved_ids); + + /// Parse schema text google::protobuf::FileDescriptorProto file_descriptor; - file_descriptor.set_name(std::to_string(id)); - google::protobuf::compiler::Parser parser; - parser.RecordErrorsTo(this); - parser.Parse(&tokenizer, &file_descriptor); + file_descriptor.set_name(file_name); + { + google::protobuf::io::ArrayInputStream input{ + schema_with_refs.schema.data(), static_cast(schema_with_refs.schema.size())}; + google::protobuf::io::Tokenizer tokenizer(&input, this); + google::protobuf::compiler::Parser parser; + parser.RecordErrorsTo(this); + parser.Parse(&tokenizer, &file_descriptor); + } - auto const * descriptor = descriptor_pool.BuildFile(file_descriptor); + std::lock_guard lock(mutex); + /// Re-check under lock: another thread may have built this schema while we were parsing. + /// DescriptorPool::BuildFile returns nullptr for duplicate file names. + if (const auto * existing = descriptor_pool.FindFileByName(file_name)) + return existing; + if (const auto * existing = descriptor_pool.FindFileByName(std::to_string(id))) + return existing; + const auto * descriptor = descriptor_pool.BuildFile(file_descriptor); if ((descriptor != nullptr) && descriptor->message_type_count() > 0) return descriptor; - throw Exception(ErrorCodes::INVALID_DATA, "No message type in schema"); + throw Exception(ErrorCodes::INVALID_DATA, "No message type in schema id={}", id); + } + + /// Recursively fetch and build all referenced schemas into the pool. + /// `ref.name` is used as the file name so that import statements resolve correctly. + void resolveReferences( + const std::vector & references, + std::unordered_set & resolved_ids) + { + for (const auto & ref : references) + { + /// Skip if already loaded by this name + { + std::lock_guard lock(mutex); + if (descriptor_pool.FindFileByName(ref.name) != nullptr) + continue; + } + + /// Try Google's well-known types from the generated pool first + const auto * generated_file + = google::protobuf::DescriptorPool::generated_pool()->FindFileByName(ref.name); + if (generated_file != nullptr) + { + google::protobuf::FileDescriptorProto generated_proto; + generated_file->CopyTo(&generated_proto); + std::lock_guard lock(mutex); + descriptor_pool.BuildFile(generated_proto); + continue; + } + + /// Fetch the referenced schema at its pinned version + auto [ref_schema_id, ref_schema_with_refs] = registry.fetchSchemaBySubjectVersion(ref.subject, ref.version); + + /// Guard against circular references + if (resolved_ids.contains(ref_schema_id)) + continue; + resolved_ids.insert(ref_schema_id); + + /// Recursively resolve nested references before building this one + resolveReferences(ref_schema_with_refs.references, resolved_ids); + + /// Parse the referenced schema text + google::protobuf::FileDescriptorProto ref_file_descriptor; + ref_file_descriptor.set_name(ref.name); + { + google::protobuf::io::ArrayInputStream ref_input{ + ref_schema_with_refs.schema.data(), static_cast(ref_schema_with_refs.schema.size())}; + google::protobuf::io::Tokenizer ref_tokenizer(&ref_input, this); + google::protobuf::compiler::Parser ref_parser; + ref_parser.RecordErrorsTo(this); + ref_parser.Parse(&ref_tokenizer, &ref_file_descriptor); + } + + std::lock_guard lock(mutex); + descriptor_pool.BuildFile(ref_file_descriptor); + } } std::mutex mutex; From 3737c207c4ceb88ef2c4f137015936a2a6cc9cc2 Mon Sep 17 00:00:00 2001 From: SBALAVIGNESH123 Date: Sat, 14 Mar 2026 18:24:11 +0530 Subject: [PATCH 2/2] test: add stateless test for protobuf schema registry import resolution --- .../Formats/Impl/ProtobufRowInputFormat.cpp | 3 -- ...protobuf_schema_registry_imports.reference | 3 ++ .../99177_protobuf_schema_registry_imports.sh | 54 +++++++++++++++++++ 3 files changed, 57 insertions(+), 3 deletions(-) create mode 100644 tests/queries_ported/0_stateless/99177_protobuf_schema_registry_imports.reference create mode 100644 tests/queries_ported/0_stateless/99177_protobuf_schema_registry_imports.sh diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp index 38bcc6466c6..f7a65df5efe 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp @@ -281,14 +281,12 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache : public google:: return loaded; } - /// Fetch schema and its references from registry auto schema_with_refs = registry.fetchSchemaWithReferences(id); /// Resolve all referenced schemas into the pool before building this one std::unordered_set resolved_ids; resolveReferences(schema_with_refs.references, resolved_ids); - /// Parse schema text google::protobuf::FileDescriptorProto file_descriptor; file_descriptor.set_name(file_name); { @@ -352,7 +350,6 @@ class ProtobufConfluentRowInputFormat::SchemaRegistryWithCache : public google:: /// Recursively resolve nested references before building this one resolveReferences(ref_schema_with_refs.references, resolved_ids); - /// Parse the referenced schema text google::protobuf::FileDescriptorProto ref_file_descriptor; ref_file_descriptor.set_name(ref.name); { diff --git a/tests/queries_ported/0_stateless/99177_protobuf_schema_registry_imports.reference b/tests/queries_ported/0_stateless/99177_protobuf_schema_registry_imports.reference new file mode 100644 index 00000000000..074415802b0 --- /dev/null +++ b/tests/queries_ported/0_stateless/99177_protobuf_schema_registry_imports.reference @@ -0,0 +1,3 @@ +1 event-a (1735100000,123456789) +2 event-b (1735100001,0) +3 event-c (1735100002,999999999) diff --git a/tests/queries_ported/0_stateless/99177_protobuf_schema_registry_imports.sh b/tests/queries_ported/0_stateless/99177_protobuf_schema_registry_imports.sh new file mode 100644 index 00000000000..9dd053e7d02 --- /dev/null +++ b/tests/queries_ported/0_stateless/99177_protobuf_schema_registry_imports.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash +# Tags: no-parallel +# Test: protobuf schema registry import resolution (issue #1076) +# +# Verifies that protobuf schemas with import statements +# (e.g. import "google/protobuf/timestamp.proto") are correctly +# resolved when using format_schema with CREATE FORMAT SCHEMA. +# +# This exercises the same DescriptorPool + resolveReferences() +# code path used by kafka_schema_registry_url. + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +SCHEMADIR=$CURDIR/format_schemas + +# Cleanup +$CLICKHOUSE_CLIENT -q "DROP STREAM IF EXISTS test_proto_import_99177;" +$CLICKHOUSE_CLIENT -q "DROP FORMAT SCHEMA IF EXISTS test_import_schema_99177 TYPE Protobuf;" + +# Step 1: Create a format schema with an import statement +$CLICKHOUSE_CLIENT -q " +CREATE FORMAT SCHEMA test_import_schema_99177 AS \$\$ +syntax = \"proto3\"; +import \"google/protobuf/timestamp.proto\"; + +message TestEvent { + int64 id = 1; + string name = 2; + google.protobuf.Timestamp created_at = 3; +} +\$\$ TYPE Protobuf +" + +# Step 2: Create a stream using the schema +$CLICKHOUSE_CLIENT -q " +CREATE STREAM test_proto_import_99177 ( + id int64, + name string, + created_at tuple(seconds int64, nanos int32) +) ENGINE = Memory; +" + +# Step 3: Insert data using the protobuf schema +# The import must resolve for the INSERT to succeed +$CLICKHOUSE_CLIENT -q "INSERT INTO test_proto_import_99177 (id, name, created_at) VALUES (1, 'event-a', (1735100000, 123456789)), (2, 'event-b', (1735100001, 0)), (3, 'event-c', (1735100002, 999999999));" + +# Step 4: Read back and verify +$CLICKHOUSE_CLIENT -q "SELECT id, name, created_at FROM test_proto_import_99177 ORDER BY id;" + +# Cleanup +$CLICKHOUSE_CLIENT -q "DROP STREAM test_proto_import_99177;" +$CLICKHOUSE_CLIENT -q "DROP FORMAT SCHEMA IF EXISTS test_import_schema_99177 TYPE Protobuf;"