Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flink-formats/flink-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ under the License.
<configuration>
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
<testOutputDirectory>${project.basedir}/target/generated-test-sources/</testOutputDirectory>
<enableDecimalLogicalType>true</enableDecimalLogicalType>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat.Codec;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -186,11 +185,9 @@ public User map(Tuple3<String, Integer, String> value) {
user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
// 20.00
user.setTypeDecimalBytes(
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2));
// 20.00
user.setTypeDecimalFixed(
new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2));
return user;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.mock.Whitebox;

Expand Down Expand Up @@ -200,11 +199,9 @@ public int getAttemptNumber() {
user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));

// 20.00
user.setTypeDecimalBytes(
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2));
// 20.00
user.setTypeDecimalFixed(
new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2));

outputFormat.writeRecord(user);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
Expand Down Expand Up @@ -140,11 +139,9 @@ public static void writeTestFile(File testFile) throws IOException {
user1.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
user1.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
// 20.00
user1.setTypeDecimalBytes(
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user1.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2));
// 20.00
user1.setTypeDecimalFixed(
new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user1.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2));

// Construct via builder
User user2 =
Expand Down Expand Up @@ -180,13 +177,9 @@ public static void writeTestFile(File testFile) throws IOException {
.setTypeTimestampMicros(
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
// 20.00
.setTypeDecimalBytes(
ByteBuffer.wrap(
BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2))
// 20.00
.setTypeDecimalFixed(
new Fixed2(
BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2))
.build();
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed16;
import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;

import org.apache.avro.file.DataFileWriter;
Expand Down Expand Up @@ -120,11 +119,9 @@ void createFiles(@TempDir java.nio.file.Path tempDir) throws IOException {
user1.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
user1.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
// 20.00
user1.setTypeDecimalBytes(
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user1.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2));
// 20.00
user1.setTypeDecimalFixed(
new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user1.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2));

// Construct via builder
User user2 =
Expand Down Expand Up @@ -160,13 +157,9 @@ void createFiles(@TempDir java.nio.file.Path tempDir) throws IOException {
.setTypeTimestampMicros(
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
// 20.00
.setTypeDecimalBytes(
ByteBuffer.wrap(
BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2))
// 20.00
.setTypeDecimalFixed(
new Fixed2(
BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2))
.build();
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<>(userDatumWriter);
Expand Down Expand Up @@ -199,11 +192,9 @@ void createFiles(@TempDir java.nio.file.Path tempDir) throws IOException {
user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
// 20.00
user.setTypeDecimalBytes(
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2));
// 20.00
user.setTypeDecimalFixed(
new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
user.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2));

dataFileWriter.append(user);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,7 @@ void testGeneratedObjectWithNullableFields() {
LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS),
Instant.parse("2014-03-01T12:12:12.321Z"),
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS),
ByteBuffer.wrap(
BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()), // 20.00
BigDecimal.valueOf(2000, 2), // 20.00
new Fixed2(
BigDecimal.valueOf(2000, 2)
.unscaledValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed16;
import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.Timestamps;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.typeutils.AvroSerializerLargeGenericRecordTest;
Expand Down Expand Up @@ -109,15 +108,8 @@ public final class AvroTestUtils {
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
// byte array must contain the two's-complement representation of the
// unscaled integer value in big-endian byte order
.setTypeDecimalBytes(
ByteBuffer.wrap(
BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
// array of length n can store at most
// Math.floor(Math.log10(Math.pow(2, 8 * n - 1) - 1))
// base-10 digits of precision
.setTypeDecimalFixed(
new Fixed2(
BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2))
.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2))
.build();

final Row rowUser = new Row(23);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static User generateRandomUser(Random rnd) {
LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS),
Instant.parse("2014-03-01T12:12:12.321Z"),
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS),
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()),
BigDecimal.valueOf(2000, 2),
new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,8 @@ class AvroTypesITCase extends AbstractTestBase {
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
.setTypeTimestampMicros(
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
.setTypeDecimalBytes(
ByteBuffer.wrap(
BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalFixed(
new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2))
.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2))
.build();

private static final User USER_2 =
Expand All @@ -114,11 +111,8 @@ class AvroTypesITCase extends AbstractTestBase {
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
.setTypeTimestampMicros(
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
.setTypeDecimalBytes(
ByteBuffer.wrap(
BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalFixed(
new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2))
.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2))
.build();

private static final User USER_3 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ This project bundles the following dependencies under the Apache Software Licens
- commons-io:commons-io:2.15.1
- io.confluent:common-utils:7.5.3
- io.confluent:kafka-schema-registry-client:7.5.3
- org.apache.avro:avro:1.11.4
- org.apache.avro:avro:1.12.1
- org.apache.commons:commons-compress:1.26.0
- org.apache.commons:commons-lang3:3.18.0
- org.apache.kafka:kafka-clients:7.5.3-ccs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The Apache Software Foundation (http://www.apache.org/).

This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)

- org.apache.avro:avro:1.11.4
- org.apache.avro:avro:1.12.1
- com.fasterxml.jackson.core:jackson-core:2.20.1
- com.fasterxml.jackson.core:jackson-databind:2.20.1
- com.fasterxml.jackson.core:jackson-annotations:2.20
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ under the License.
<!-- Project `flink-benchmarks` uses zk testing server in `curator-test` for performance
benchmark, please confirm it will not affect the benchmarks when the version is bumped. -->
<curator.version>5.4.0</curator.version>
<avro.version>1.11.4</avro.version>
<avro.version>1.12.1</avro.version>
<!-- Version for transitive Jackson dependencies that are not used within Flink itself.-->
<jackson.mapper.asl.version>1.9.14.jdk17-redhat-00001</jackson.mapper.asl.version>
<jackson-bom.version>2.20.1</jackson-bom.version>
Expand Down