diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java index a9bc7c893..178f611e2 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java @@ -50,6 +50,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -117,6 +118,7 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction sourceContext) throws Exception { + metricGroup = getRuntimeContext().getMetricGroup(); + final Map flinkCounters = new HashMap<>(); + nativeMetric = new MetricNode(new ArrayList<>()) { @Override public void add(String name, long value) { - // TODO Integration with Flink metrics - LOG.info("Metric Auron Source: {} = {}", name, value); + // Integration with Flink metrics + Counter counter = flinkCounters.get(name); + if (counter == null) { + counter = metricGroup.counter(name); + flinkCounters.put(name, counter); + } + counter.inc(value); + LOG.debug("Metric Auron Source: {} = {}", name, value); } }; List fieldList = new LinkedList<>();