Skip to content
This repository was archived by the owner on Aug 3, 2020. It is now read-only.

Commit d0fb040

Browse files
Roman Khachatryanrkhachatryan
authored andcommitted
[FLINK-16057] Add ContinuousFileReaderOperator IO Benchmark
1 parent cc4ae2f commit d0fb040

1,101 files changed

Lines changed: 200073 additions & 0 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.benchmark;
19+
20+
import org.apache.flink.api.common.io.FileInputFormat;
21+
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
22+
import org.apache.flink.api.java.io.TextInputFormat;
23+
import org.apache.flink.core.fs.Path;
24+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
25+
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
26+
import org.apache.flink.util.Preconditions;
27+
import org.openjdk.jmh.annotations.Benchmark;
28+
import org.openjdk.jmh.annotations.OperationsPerInvocation;
29+
import org.openjdk.jmh.annotations.Param;
30+
import org.openjdk.jmh.annotations.Setup;
31+
32+
import java.io.File;
33+
import java.nio.file.Paths;
34+
35+
@OperationsPerInvocation(value = ContinuousFileReaderOperatorIoBenchmark.RECORDS_PER_INVOCATION)
36+
public class ContinuousFileReaderOperatorIoBenchmark extends BenchmarkBase {
37+
38+
public static final int RECORDS_PER_INVOCATION = 1000_000;
39+
private static final long CHECKPOINT_INTERVAL_MS = 100;
40+
41+
private FileInputFormat<?> fileReader;
42+
private File path;
43+
44+
/**
45+
* Name of the folder where the text files are located. Can be an arbitrary string but the total number of characters
46+
* should match {@link #RECORDS_PER_INVOCATION}. Here, the following pattern is used: txt-nr_files-nr_lines-nr_symbols.
47+
*/
48+
@Param({"txt-100-1000-10", "txt-1000-100-10"})
49+
public String folder;
50+
51+
@Setup
52+
public void setUp() {
53+
java.nio.file.Path p = folder.startsWith("/") ? Paths.get(folder) : Paths.get("src/main/resources/", folder);
54+
path = p.toAbsolutePath().toFile();
55+
Preconditions.checkArgument(path.exists() && path.isDirectory() && path.canExecute(), "%s doesn't exist, is not a directory, or isn't readable", path);
56+
fileReader = new TextInputFormat(new Path(path.toString()));
57+
}
58+
59+
@Benchmark
60+
public void readFiles(FlinkEnvironmentContext context) throws Exception {
61+
StreamExecutionEnvironment env = context.env;
62+
63+
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS)
64+
.setParallelism(1)
65+
.readFile(fileReader, path.toString())
66+
.addSink(new DiscardingSink<>());
67+
68+
env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
69+
70+
env.execute();
71+
}
72+
73+
}

0 commit comments

Comments
 (0)