Skip to content

Cannot Publish Kafka Message on Compacted Kafka Topic due to no Key #116

@vlad-ge

Description

@vlad-ge

Describe the bug
I tried to publish a kafka message through springwolf-ui on a "compacted" kafka topic. The publish is rejected due to the fact that the producer publishes messages with key value "null".

Dependencies and versions used
springwolf-ui: 0.6.0
springwolf-kafka: 0.9.0

Code example
The problem lies in the producer since the message gets published without a key:
`
package io.github.stavshamir.springwolf.producer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static java.nio.charset.StandardCharsets.UTF_8;

@slf4j
@requiredargsconstructor
public class SpringwolfKafkaProducer {

private final Optional<KafkaTemplate<Object, Map<String, ?>>> kafkaTemplate;

public boolean isEnabled() {
    return kafkaTemplate.isPresent();
}

public void send(String topic, Map<String, String> headers, Map<String, ?> payload) {
    if (kafkaTemplate.isPresent()) {
        kafkaTemplate.get().send(buildProducerRecord(topic, headers, payload));
    } else {
        log.warn("Kafka producer is not configured");
    }
}

private ProducerRecord<Object, Map<String, ?>> buildProducerRecord(String topic, Map<String, String> headers, Map<String, ?> payload) {
    List<Header> recordHeaders = headers != null ? buildHeaders(headers) : Collections.emptyList();

    return new ProducerRecord<>(topic, null, null, null, payload, recordHeaders);
}

private List<Header> buildHeaders(Map<String, String> headers) {
    return headers.entrySet().stream()
            .map(header -> new RecordHeader(header.getKey(), header.getValue().getBytes(UTF_8)))
            .collect(Collectors.toList());
}

}

`

Stack trace and error logs
org.apache.kafka.common.InvalidRecordException: Compacted topic cannot accept message without key in topic partition xyz.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions