-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathWordCountExample.java
More file actions
159 lines (132 loc) · 7.04 KB
/
WordCountExample.java
File metadata and controls
159 lines (132 loc) · 7.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package batching;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
import org.apache.beam.sdk.io.snowflake.data.SnowflakeColumn;
import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema;
import org.apache.beam.sdk.io.snowflake.data.text.SnowflakeString;
import org.apache.beam.sdk.io.snowflake.enums.CreateDisposition;
import org.apache.beam.sdk.io.snowflake.enums.WriteDisposition;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import util.AwsOptionsParser;
/**
* An example that contains batch writing and reading from Snowflake. Inspired by Apache Beam/WordCount-example(https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java)
* <p>
* Check main README for more information.
*/
public class WordCountExample {
public static void main(String[] args) {
SnowflakeWordCountOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(SnowflakeWordCountOptions.class);
AwsOptionsParser.format(options);
runWritingToSnowflake(options);
runReadingFromSnowflake(options);
}
private static void runWritingToSnowflake(SnowflakeWordCountOptions options) {
Pipeline p = Pipeline.create(options);
p.apply("Reading files", TextIO.read().from(options.getInputFile()))
.apply("Counting words", new CountWords())
.apply("Writing counts to Snowflake", createSnowflakeWriteTransform(options));
p.run().waitUntilFinish();
}
private static void runReadingFromSnowflake(SnowflakeWordCountOptions options) {
Pipeline p = Pipeline.create(options);
p.apply("Reading from Snowflake", createSnowflakeReadTransform(options))
.apply(MapElements.via(new FormatAsTextFn()))
.apply("Writing counts to GCP", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
private static PTransform<PCollection<WordCountRow>, PDone> createSnowflakeWriteTransform(SnowflakeWordCountOptions options) {
SnowflakeIO.DataSourceConfiguration dataSourceConfiguration = createSnowflakeConfiguration(options);
SnowflakeIO.UserDataMapper<WordCountRow> userDataMapper = (SnowflakeIO.UserDataMapper<WordCountRow>)
(WordCountRow row) -> new Object[]{row.getWord(), row.getCount()};
return SnowflakeIO.<WordCountRow>write()
.withDataSourceConfiguration(dataSourceConfiguration)
.withWriteDisposition(WriteDisposition.TRUNCATE)
.withUserDataMapper(userDataMapper)
.toTable(options.getTable())
.withStorageIntegrationName(options.getStorageIntegrationName())
.withStagingBucketName(options.getStagingBucketName())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withTableSchema(
SnowflakeTableSchema.of(
SnowflakeColumn.of("word", SnowflakeString.of()),
SnowflakeColumn.of("count", SnowflakeString.of())));
}
private static PTransform<PBegin, PCollection<WordCountRow>> createSnowflakeReadTransform(SnowflakeWordCountOptions options) {
SnowflakeIO.DataSourceConfiguration dataSourceConfiguration = createSnowflakeConfiguration(options);
SnowflakeIO.CsvMapper<WordCountRow> csvMapper = (SnowflakeIO.CsvMapper<WordCountRow>)
parts -> new WordCountRow(parts[0], Long.valueOf(parts[1]));
return SnowflakeIO.<WordCountRow>read()
.withDataSourceConfiguration(dataSourceConfiguration)
.fromTable(options.getTable())
.withStagingBucketName(options.getStagingBucketName())
.withStorageIntegrationName(options.getStorageIntegrationName())
.withCsvMapper(csvMapper)
.withCoder(SerializableCoder.of(WordCountRow.class));
}
public static SnowflakeIO.DataSourceConfiguration createSnowflakeConfiguration(SnowflakeWordCountOptions options) {
return SnowflakeIO.DataSourceConfiguration.create()
.withUsernamePasswordAuth(options.getUsername(), options.getPassword())
.withOAuth(options.getOauthToken())
.withKeyPairRawAuth(options.getUsername(), options.getRawPrivateKey(), options.getPrivateKeyPassphrase())
.withDatabase(options.getDatabase())
.withServerName(options.getServerName())
.withSchema(options.getSchema());
}
public static class FormatAsTextFn extends SimpleFunction<WordCountRow, String> {
@Override
public String apply(WordCountRow wordCountRow) {
return wordCountRow.getWord() + ": " + wordCountRow.getCount();
}
}
private static class CountWords
extends PTransform<PCollection<String>, PCollection<WordCountRow>> {
@Override
public PCollection<WordCountRow> expand(PCollection<String> lines) {
return lines
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Count.perElement())
.apply(MapElements.via(
new SimpleFunction<KV<String, Long>, WordCountRow>() {
@Override
public WordCountRow apply(KV<String, Long> line) {
return new WordCountRow(line.getKey(), line.getValue());
}
}));
}
}
private static class ExtractWordsFn extends DoFn<String, String> {
private final String TOKENIZER_PATTERN = "[^\\p{L}]+";
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
private final Distribution lineLenDist =
Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> receiver) {
lineLenDist.update(element.length());
if (element.trim().isEmpty()) {
emptyLines.inc();
}
String[] words = element.split(TOKENIZER_PATTERN, -1);
for (String word : words) {
if (!word.isEmpty()) {
receiver.output(word);
}
}
}
}
}