Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,21 @@
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.description.Description;

import java.util.List;

import static org.apache.flink.table.factories.FactoryUtil.FORMAT_SUFFIX;
import static org.apache.rocketmq.flink.legacy.RocketMQConfig.DEFAULT_START_MESSAGE_OFFSET;

/** Includes config options of RocketMQ connector type. */
Expand Down Expand Up @@ -117,4 +121,83 @@ public class RocketMQOptions {

public static final ConfigOption<Long> OPTIONAL_OFFSET_FROM_TIMESTAMP =
ConfigOptions.key("offsetFromTimestamp").longType().noDefaultValue();

// --------------------------------------------------------------------------------------------
// Format options
// --------------------------------------------------------------------------------------------

public static final ConfigOption<String> VALUE_FORMAT =
ConfigOptions.key("value" + FORMAT_SUFFIX)
.stringType()
.noDefaultValue()
.withDescription(
"Defines the format identifier for encoding value data. "
+ "The identifier is used to discover a suitable format factory.");

public static final ConfigOption<String> KEY_FORMAT =
ConfigOptions.key("key" + FORMAT_SUFFIX)
.stringType()
// .defaultValue("rocketmq-default")
.noDefaultValue()
.withDescription(
"Defines the format identifier for encoding key data. "
+ "The identifier is used to discover a suitable format factory.");

public static final ConfigOption<List<String>> KEY_FIELDS =
ConfigOptions.key("key.fields")
.stringType()
.asList()
.defaultValues()
.withDescription(
"Defines an explicit list of physical columns from the table schema "
+ "that configure the data type for the key format. By default, this list is "
+ "empty and thus a key is undefined.");

public static final ConfigOption<ValueFieldsStrategy> VALUE_FIELDS_INCLUDE =
ConfigOptions.key("value.fields-include")
.enumType(ValueFieldsStrategy.class)
.defaultValue(ValueFieldsStrategy.ALL)
.withDescription(
String.format(
"Defines a strategy how to deal with key columns in the data type "
+ "of the value format. By default, '%s' physical columns of the table schema "
+ "will be included in the value format which means that the key columns "
+ "appear in the data type for both the key and value format.",
ValueFieldsStrategy.ALL));

public static final ConfigOption<String> KEY_FIELDS_PREFIX =
ConfigOptions.key("key.fields-prefix")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Defines a custom prefix for all fields of the key format to avoid "
+ "name clashes with fields of the value format. "
+ "By default, the prefix is empty.")
.linebreak()
.text(
String.format(
"If a custom prefix is defined, both the table schema and '%s' will work with prefixed names.",
KEY_FIELDS.key()))
.linebreak()
.text(
"When constructing the data type of the key format, the prefix "
+ "will be removed and the non-prefixed names will be used within the key format.")
.linebreak()
.text(
String.format(
"Please note that this option requires that '%s' must be '%s'.",
VALUE_FIELDS_INCLUDE.key(),
ValueFieldsStrategy.EXCEPT_KEY))
.build());

// --------------------------------------------------------------------------------------------
// Enums
// --------------------------------------------------------------------------------------------

public enum ValueFieldsStrategy {
ALL,
EXCEPT_KEY
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package org.apache.rocketmq.flink.legacy.common.serialization;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.java.typeutils.MapTypeInfo;

public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializationSchema<Map<String, String>> {
public class SimpleKeyValueDeserializationSchema
implements KeyValueDeserializationSchema<Map<String, String>> {
public static final String DEFAULT_KEY_FIELD = "key";
public static final String DEFAULT_VALUE_FIELD = "value";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplitSerializer;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
Expand Down Expand Up @@ -123,12 +122,13 @@ public Boundedness getBoundedness() {
}

@Override
public SourceReader<OUT, RocketMQPartitionSplit> createReader(
SourceReaderContext readerContext) {
public SourceReader<OUT, RocketMQPartitionSplit> createReader(SourceReaderContext readerContext)
throws Exception {
FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<OUT, Long, Long>>> elementsQueue =
new FutureCompletingBlockingQueue<>();
deserializationSchema.open(
new DeserializationSchema.InitializationContext() {
new org.apache.flink.api.common.serialization.DeserializationSchema
.InitializationContext() {
@Override
public MetricGroup getMetricGroup() {
return readerContext.metricGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public interface RocketMQDeserializationSchema<T>
*/
@Override
@PublicEvolving
default void open(InitializationContext context) {}
default void open(InitializationContext context) throws Exception {}

/**
* Deserializes the byte message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* A row data wrapper class that wraps a {@link RocketMQDeserializationSchema} to deserialize {@link
* A row data wrapper class that wraps a {@link DeserializationSchema} to deserialize {@link
* MessageExt}.
*/
public class RocketMQRowDeserializationSchema implements RocketMQDeserializationSchema<RowData> {
Expand All @@ -46,6 +47,10 @@ public class RocketMQRowDeserializationSchema implements RocketMQDeserialization

public RocketMQRowDeserializationSchema(
TableSchema tableSchema,
org.apache.flink.api.common.serialization.DeserializationSchema<RowData>
keyDeserialization,
org.apache.flink.api.common.serialization.DeserializationSchema<RowData>
valueDeserialization,
Map<String, String> properties,
boolean hasMetadata,
MetadataConverter[] metadataConverters) {
Expand All @@ -55,17 +60,20 @@ public RocketMQRowDeserializationSchema(
.setTableSchema(tableSchema)
.setHasMetadata(hasMetadata)
.setMetadataConverters(metadataConverters)
.setKeyDeserialization(keyDeserialization)
.setValueDeserialization(valueDeserialization)
.build();
}

@Override
public void open(InitializationContext context) {
public void open(InitializationContext context) throws Exception {
deserializationSchema.open(context);
bytesMessages = new ArrayList<>();
}

@Override
public void deserialize(List<MessageExt> input, Collector<RowData> collector) {
public void deserialize(List<MessageExt> input, Collector<RowData> collector)
throws IOException {
extractMessages(input);
deserializationSchema.deserialize(bytesMessages, collector);
}
Expand Down
Loading