Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class RocketMQSink extends RichSinkFunction<Message> implements Checkpoin
private List<Message> batchList;

private Meter sinkInTps;
private Meter outTps;
private Meter numRecordsOutPerSecond;
private Meter outBps;
private MetricUtils.LatencyGauge latencyGauge;

Expand Down Expand Up @@ -101,7 +101,7 @@ public void open(Configuration parameters) throws Exception {
throw new RuntimeException(e);
}
sinkInTps = MetricUtils.registerSinkInTps(getRuntimeContext());
outTps = MetricUtils.registerOutTps(getRuntimeContext());
numRecordsOutPerSecond = MetricUtils.registerNumRecordsOutPerSecond(getRuntimeContext());
outBps = MetricUtils.registerOutBps(getRuntimeContext());
latencyGauge = MetricUtils.registerOutLatency(getRuntimeContext());
}
Expand All @@ -128,7 +128,7 @@ public void onSuccess(SendResult sendResult) {
LOG.debug("Async send message success! result: {}", sendResult);
long end = System.currentTimeMillis();
latencyGauge.report(end - timeStartWriting, 1);
outTps.markEvent();
numRecordsOutPerSecond.markEvent();
outBps.markEvent(input.getBody().length);
}

Expand Down Expand Up @@ -169,7 +169,7 @@ public void onException(Throwable throwable) {
}
long end = System.currentTimeMillis();
latencyGauge.report(end - timeStartWriting, 1);
outTps.markEvent();
numRecordsOutPerSecond.markEvent();
outBps.markEvent(input.getBody().length);
} catch (Exception e) {
LOG.error("Sync send message exception: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@
import org.apache.flink.connector.rocketmq.legacy.common.util.RocketMQUtils;
import org.apache.flink.connector.rocketmq.legacy.common.watermark.WaterMarkForAll;
import org.apache.flink.connector.rocketmq.legacy.common.watermark.WaterMarkPerQueue;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
Expand Down Expand Up @@ -115,7 +112,7 @@ public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT>
private transient boolean enableCheckpoint;
private volatile Object checkPointLock;

private Meter tpsMetric;
private Meter numRecordsInPerSecond;
private MetricUtils.TimestampGauge fetchDelay = new MetricUtils.TimestampGauge();
private MetricUtils.TimestampGauge emitDelay = new MetricUtils.TimestampGauge();

Expand Down Expand Up @@ -213,14 +210,7 @@ public void open(Configuration parameters) throws Exception {
consumer.setInstanceName(instanceName);
consumer.start();

Counter outputCounter =
getRuntimeContext()
.getMetricGroup()
.counter(MetricUtils.METRICS_TPS + "_counter", new SimpleCounter());
tpsMetric =
getRuntimeContext()
.getMetricGroup()
.meter(MetricUtils.METRICS_TPS, new MeterView(outputCounter, 60));
numRecordsInPerSecond = MetricUtils.registerNumRecordsInPerSecond(getRuntimeContext());

getRuntimeContext()
.getMetricGroup()
Expand Down Expand Up @@ -323,7 +313,7 @@ public void run(SourceContext context) throws Exception {
// waterMarkPerQueue.extractTimestamp(mq, msg.getBornTimestamp());
waterMarkForAll.extractTimestamp(
msg.getBornTimestamp());
tpsMetric.markEvent();
numRecordsInPerSecond.markEvent();
long eventTime =
msg.getStoreTimestamp();
fetchDelay.report(
Expand Down Expand Up @@ -537,7 +527,8 @@ public void close() throws Exception {
}
}

public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues) throws MQClientException {
public void initOffsetTableFromRestoredOffsets(List<MessageQueue> messageQueues)
throws MQClientException {
Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be null");
restoredOffsets.forEach(
(mq, offset) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ public class MetricUtils {
// https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
public static final String CURRENT_FETCH_EVENT_TIME_LAG = "currentFetchEventTimeLag";
public static final String CURRENT_EMIT_EVENT_TIME_LAG = "currentEmitEventTimeLag";

private static final String METRIC_GROUP_SOURCE = "source";
private static final String METRIC_GROUP_SINK = "sink";
private static final String METRICS_SINK_IN_TPS = "inTps";
private static final String METRICS_SINK_OUT_TPS = "outTps";
private static final String METRICS_SINK_OUT_BPS = "outBps";
private static final String METRICS_SINK_OUT_LATENCY = "outLatency";
public static final String SUFFIX_RATE = "PerSecond";
public static final String IO_NUM_RECORDS_IN = "numRecordsIn";
public static final String IO_NUM_RECORDS_OUT = "numRecordsOut";
public static final String IO_NUM_RECORDS_IN_RATE = IO_NUM_RECORDS_IN + SUFFIX_RATE;
public static final String IO_NUM_RECORDS_OUT_RATE = IO_NUM_RECORDS_OUT + SUFFIX_RATE;

public static Meter registerSinkInTps(RuntimeContext context) {
Counter parserCounter =
Expand All @@ -49,16 +53,6 @@ public static Meter registerSinkInTps(RuntimeContext context) {
.meter(METRICS_SINK_IN_TPS, new MeterView(parserCounter, 60));
}

public static Meter registerOutTps(RuntimeContext context) {
Counter parserCounter =
context.getMetricGroup()
.addGroup(METRIC_GROUP_SINK)
.counter(METRICS_SINK_OUT_TPS + "_counter", new SimpleCounter());
return context.getMetricGroup()
.addGroup(METRIC_GROUP_SINK)
.meter(METRICS_SINK_OUT_TPS, new MeterView(parserCounter, 60));
}

public static Meter registerOutBps(RuntimeContext context) {
Counter bpsCounter =
context.getMetricGroup()
Expand All @@ -75,6 +69,26 @@ public static LatencyGauge registerOutLatency(RuntimeContext context) {
.gauge(METRICS_SINK_OUT_LATENCY, new LatencyGauge());
}

public static Meter registerNumRecordsInPerSecond(RuntimeContext context) {
Counter numRecordsIn =
context.getMetricGroup()
.addGroup(METRIC_GROUP_SOURCE)
.counter(IO_NUM_RECORDS_IN, new SimpleCounter());
return context.getMetricGroup()
.addGroup(METRIC_GROUP_SOURCE)
.meter(IO_NUM_RECORDS_IN_RATE, new MeterView(numRecordsIn, 60));
}

public static Meter registerNumRecordsOutPerSecond(RuntimeContext context) {
Counter numRecordsOut =
context.getMetricGroup()
.addGroup(METRIC_GROUP_SINK)
.counter(IO_NUM_RECORDS_OUT, new SimpleCounter());
return context.getMetricGroup()
.addGroup(METRIC_GROUP_SINK)
.meter(IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut, 60));
}

public static class LatencyGauge implements Gauge<Double> {
private double value;

Expand Down