diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java index 9f87486a..94b38200 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java @@ -66,7 +66,7 @@ public class RocketMQSink extends RichSinkFunction implements Checkpoin private List batchList; private Meter sinkInTps; - private Meter outTps; + private Meter numRecordsOutPerSecond; private Meter outBps; private MetricUtils.LatencyGauge latencyGauge; @@ -101,7 +101,7 @@ public void open(Configuration parameters) throws Exception { throw new RuntimeException(e); } sinkInTps = MetricUtils.registerSinkInTps(getRuntimeContext()); - outTps = MetricUtils.registerOutTps(getRuntimeContext()); + numRecordsOutPerSecond = MetricUtils.registerNumRecordsOutPerSecond(getRuntimeContext()); outBps = MetricUtils.registerOutBps(getRuntimeContext()); latencyGauge = MetricUtils.registerOutLatency(getRuntimeContext()); } @@ -128,7 +128,7 @@ public void onSuccess(SendResult sendResult) { LOG.debug("Async send message success! result: {}", sendResult); long end = System.currentTimeMillis(); latencyGauge.report(end - timeStartWriting, 1); - outTps.markEvent(); + numRecordsOutPerSecond.markEvent(); outBps.markEvent(input.getBody().length); } @@ -169,7 +169,7 @@ public void onException(Throwable throwable) { } long end = System.currentTimeMillis(); latencyGauge.report(end - timeStartWriting, 1); - outTps.markEvent(); + numRecordsOutPerSecond.markEvent(); outBps.markEvent(input.getBody().length); } catch (Exception e) { LOG.error("Sync send message exception: ", e); diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java index bedf97f8..31aee752 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java @@ -33,10 +33,7 @@ import org.apache.flink.connector.rocketmq.legacy.common.util.RocketMQUtils; import org.apache.flink.connector.rocketmq.legacy.common.watermark.WaterMarkForAll; import org.apache.flink.connector.rocketmq.legacy.common.watermark.WaterMarkPerQueue; -import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Meter; -import org.apache.flink.metrics.MeterView; -import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -115,7 +112,7 @@ public class RocketMQSourceFunction extends RichParallelSourceFunction private transient boolean enableCheckpoint; private volatile Object checkPointLock; - private Meter tpsMetric; + private Meter numRecordsInPerSecond; private MetricUtils.TimestampGauge fetchDelay = new MetricUtils.TimestampGauge(); private MetricUtils.TimestampGauge emitDelay = new MetricUtils.TimestampGauge(); @@ -213,14 +210,7 @@ public void open(Configuration parameters) throws Exception { consumer.setInstanceName(instanceName); consumer.start(); - Counter outputCounter = - getRuntimeContext() - .getMetricGroup() - .counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter()); - tpsMetric = - getRuntimeContext() - .getMetricGroup() - .meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60)); + numRecordsInPerSecond = MetricUtils.registerNumRecordsInPerSecond(getRuntimeContext()); getRuntimeContext() .getMetricGroup() @@ -323,7 +313,7 @@ public void run(SourceContext context) throws Exception { // waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp()); waterMarkForAll.extractTimestamp( msg.getBornTimestamp()); - tpsMetric.markEvent(); + numRecordsInPerSecond.markEvent(); long eventTime = msg.getStoreTimestamp(); fetchDelay.report( @@ -537,7 +527,8 @@ public void close() throws Exception { } } - public void initOffsetTableFromRestoredOffsets(List messageQueues) throws MQClientException { + public void initOffsetTableFromRestoredOffsets(List messageQueues) + throws MQClientException { Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null"); restoredOffsets.forEach( (mq, offset) -> { diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/MetricUtils.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/MetricUtils.java index d98e3c34..4b4b3d95 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/MetricUtils.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/MetricUtils.java @@ -32,12 +32,16 @@ public class MetricUtils { // https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics public static final String CURRENT_FETCH_EVENT_TIME_LAG = "currentFetchEventTimeLag"; public static final String CURRENT_EMIT_EVENT_TIME_LAG = "currentEmitEventTimeLag"; - + private static final String METRIC_GROUP_SOURCE = "source"; private static final String METRIC_GROUP_SINK = "sink"; private static final String METRICS_SINK_IN_TPS = "inTps"; - private static final String METRICS_SINK_OUT_TPS = "outTps"; private static final String METRICS_SINK_OUT_BPS = "outBps"; private static final String METRICS_SINK_OUT_LATENCY = "outLatency"; + public static final String SUFFIX_RATE = "PerSecond"; + public static final String IO_NUM_RECORDS_IN = "numRecordsIn"; + public static final String IO_NUM_RECORDS_OUT = "numRecordsOut"; + public static final String IO_NUM_RECORDS_IN_RATE = IO_NUM_RECORDS_IN + SUFFIX_RATE; + public static final String IO_NUM_RECORDS_OUT_RATE = IO_NUM_RECORDS_OUT + SUFFIX_RATE; public static Meter registerSinkInTps(RuntimeContext context) { Counter parserCounter = @@ -49,16 +53,6 @@ public static Meter registerSinkInTps(RuntimeContext context) { .meter(METRICS_SINK_IN_TPS, new MeterView(parserCounter, 60)); } - public static Meter registerOutTps(RuntimeContext context) { - Counter parserCounter = - context.getMetricGroup() - .addGroup(METRIC_GROUP_SINK) - .counter(METRICS_SINK_OUT_TPS + "_counter", new SimpleCounter()); - return context.getMetricGroup() - .addGroup(METRIC_GROUP_SINK) - .meter(METRICS_SINK_OUT_TPS, new MeterView(parserCounter, 60)); - } - public static Meter registerOutBps(RuntimeContext context) { Counter bpsCounter = context.getMetricGroup() @@ -75,6 +69,26 @@ public static LatencyGauge registerOutLatency(RuntimeContext context) { .gauge(METRICS_SINK_OUT_LATENCY, new LatencyGauge()); } + public static Meter registerNumRecordsInPerSecond(RuntimeContext context) { + Counter numRecordsIn = + context.getMetricGroup() + .addGroup(METRIC_GROUP_SOURCE) + .counter(IO_NUM_RECORDS_IN, new SimpleCounter()); + return context.getMetricGroup() + .addGroup(METRIC_GROUP_SOURCE) + .meter(IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60)); + } + + public static Meter registerNumRecordsOutPerSecond(RuntimeContext context) { + Counter numRecordsOut = + context.getMetricGroup() + .addGroup(METRIC_GROUP_SINK) + .counter(IO_NUM_RECORDS_OUT, new SimpleCounter()); + return context.getMetricGroup() + .addGroup(METRIC_GROUP_SINK) + .meter(IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60)); + } + public static class LatencyGauge implements Gauge { private double value;