Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ibm-mq-metrics/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ val ibmClientJar: Configuration by configurations.creating {
}

dependencies {
api("com.google.auto.value:auto-value-annotations:1.11.1")
api("com.google.code.findbugs:jsr305:3.0.2")
api("io.swagger:swagger-annotations:1.6.16")
api("org.jetbrains:annotations:26.1.0")
Expand All @@ -33,6 +34,7 @@ dependencies {
implementation("org.slf4j:slf4j-simple:2.0.17")
testImplementation("com.google.guava:guava")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
annotationProcessor("com.google.auto.value:auto-value:1.11.1")
ibmClientJar("com.ibm.mq:com.ibm.mq.allclient:9.4.5.1") {
artifact {
name = "com.ibm.mq.allclient"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.ibm.mq.WmqMonitor;
import io.opentelemetry.ibm.mq.config.QueueManager;
import io.opentelemetry.ibm.mq.metrics.MetricProducer;
import io.opentelemetry.ibm.mq.opentelemetry.ConfigWrapper;
import java.util.List;
import java.util.Map;
Expand All @@ -26,12 +26,12 @@ class TestWMQMonitor {

private final ConfigWrapper config;
private final ExecutorService threadPool;
private final Meter meter;
private final MetricProducer producer;

TestWMQMonitor(ConfigWrapper config, Meter meter, ExecutorService service) {
TestWMQMonitor(ConfigWrapper config, MetricProducer producer, ExecutorService service) {
this.config = config;
this.threadPool = service;
this.meter = meter;
this.producer = producer;
}

/**
Expand All @@ -47,7 +47,7 @@ void runTest() {
assertThat(queueManagers).isNotNull();
ObjectMapper mapper = new ObjectMapper();

WmqMonitor wmqTask = new WmqMonitor(config, threadPool, meter);
WmqMonitor wmqTask = new WmqMonitor(config, threadPool, producer);

// we override this helper to pass in our opentelemetry helper instead.
for (Map<String, ?> queueManager : queueManagers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import com.ibm.mq.headers.pcf.PCFMessageAgent;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.ibm.mq.config.QueueManager;
import io.opentelemetry.ibm.mq.metrics.MetricProducer;
import io.opentelemetry.ibm.mq.opentelemetry.ConfigWrapper;
import io.opentelemetry.ibm.mq.opentelemetry.Main;
import io.opentelemetry.ibm.mq.util.WmqUtil;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
import io.opentelemetry.sdk.resources.Resource;
import java.io.File;
import java.net.URISyntaxException;
import java.net.URL;
Expand All @@ -50,7 +51,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,9 +60,6 @@ class WMQMonitorIntegrationTest {

private static final Logger logger = LoggerFactory.getLogger(WMQMonitorIntegrationTest.class);

@RegisterExtension
static final OpenTelemetryExtension otelTesting = OpenTelemetryExtension.create();

private static final ExecutorService service =
Executors.newFixedThreadPool(
4, /* one gets burned with our @BeforeAll message uzi, 4 is faster than 2 */
Expand Down Expand Up @@ -183,11 +180,13 @@ void test_monitor_with_full_config() throws Exception {
String configFile = getConfigFile("conf/test-config.yml");

ConfigWrapper config = ConfigWrapper.parse(configFile);
Meter meter = otelTesting.getOpenTelemetry().getMeter("opentelemetry.io/mq");
TestWMQMonitor monitor = new TestWMQMonitor(config, meter, service);
MetricProducer producer =
new MetricProducer(Resource.empty(), InstrumentationScopeInfo.empty());

TestWMQMonitor monitor = new TestWMQMonitor(config, producer, service);
monitor.runTest();

List<MetricData> data = otelTesting.getMetrics();
List<MetricData> data = producer.produce(Resource.empty());
Map<String, MetricData> metrics = new HashMap<>();
for (MetricData metricData : data) {
metrics.put(metricData.getName(), metricData);
Expand Down Expand Up @@ -244,9 +243,9 @@ void test_monitor_with_full_config() throws Exception {
void test_wmqmonitor() throws Exception {
String configFile = getConfigFile("conf/test-queuemgr-config.yml");
ConfigWrapper config = ConfigWrapper.parse(configFile);
Meter meter = otelTesting.getOpenTelemetry().getMeter("opentelemetry.io/mq");

TestWMQMonitor monitor = new TestWMQMonitor(config, meter, service);
MetricProducer producer =
new MetricProducer(Resource.empty(), InstrumentationScopeInfo.empty());
TestWMQMonitor monitor = new TestWMQMonitor(config, producer, service);
monitor.runTest();
// TODO: Wait why are there no asserts here?
}
Expand All @@ -257,14 +256,17 @@ void test_otlphttp() throws Exception {
ConfigWrapper.parse(WMQMonitorIntegrationTest.getConfigFile("conf/test-config.yml"));
ScheduledExecutorService service =
Executors.newScheduledThreadPool(config.getNumberOfThreads());
Main.run(config, service, otelTesting.getOpenTelemetry());
MetricProducer producer =
new MetricProducer(Resource.empty(), InstrumentationScopeInfo.empty());

Main.run(config, service, producer);
CountDownLatch latch = new CountDownLatch(1);
Future<?> ignored = service.submit(latch::countDown);
Thread.sleep(5000); // TODO: This is fragile and time consuming and should be made better
service.shutdown();
assertTrue(service.awaitTermination(30, TimeUnit.SECONDS));

List<MetricData> data = otelTesting.getMetrics();
List<MetricData> data = producer.produce(Resource.empty());
Set<String> metricNames = new HashSet<>();
for (MetricData metricData : data) {
metricNames.add(metricData.getName());
Expand All @@ -290,11 +292,12 @@ void test_bad_connection() throws Exception {
String configFile = getConfigFile("conf/test-bad-config.yml");

ConfigWrapper config = ConfigWrapper.parse(configFile);
Meter meter = otelTesting.getOpenTelemetry().getMeter("opentelemetry.io/mq");
TestWMQMonitor monitor = new TestWMQMonitor(config, meter, service);
MetricProducer producer =
new MetricProducer(Resource.empty(), InstrumentationScopeInfo.empty());
TestWMQMonitor monitor = new TestWMQMonitor(config, producer, service);
monitor.runTest();

List<MetricData> data = otelTesting.getMetrics();
List<MetricData> data = producer.produce(Resource.empty());

assertThat(data).isNotEmpty();
assertThat(data).hasSize(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.headers.pcf.PCFMessageAgent;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongGauge;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.ibm.mq.config.QueueManager;
import io.opentelemetry.ibm.mq.metrics.Metrics;
import io.opentelemetry.ibm.mq.metrics.MetricProducer;
import io.opentelemetry.ibm.mq.metrics.MetricsConfig;
import io.opentelemetry.ibm.mq.metricscollector.ChannelMetricsCollector;
import io.opentelemetry.ibm.mq.metricscollector.InquireChannelCmdCollector;
Expand Down Expand Up @@ -49,15 +46,15 @@ public final class WmqMonitor {

private final List<QueueManager> queueManagers;
private final List<Consumer<MetricsCollectorContext>> jobs = new ArrayList<>();
private final LongCounter errorCodesCounter;
private final LongGauge heartbeatGauge;
private final ExecutorService threadPool;
private final MetricsConfig metricsConfig;
private final MetricProducer producer;

public WmqMonitor(ConfigWrapper config, ExecutorService threadPool, Meter meter) {
public WmqMonitor(ConfigWrapper config, ExecutorService threadPool, MetricProducer producer) {
List<Map<String, ?>> queueManagers = getQueueManagers(config);
ObjectMapper mapper = new ObjectMapper();

this.producer = producer;
this.queueManagers = new ArrayList<>();

for (Map<String, ?> queueManager : queueManagers) {
Expand All @@ -70,21 +67,18 @@ public WmqMonitor(ConfigWrapper config, ExecutorService threadPool, Meter meter)
}

this.metricsConfig = new MetricsConfig(config);

this.heartbeatGauge = Metrics.createIbmMqHeartbeat(meter);
this.errorCodesCounter = Metrics.createIbmMqConnectionErrors(meter);
this.threadPool = threadPool;

jobs.add(new QueueManagerMetricsCollector(meter));
jobs.add(new InquireQueueManagerCmdCollector(meter));
jobs.add(new ChannelMetricsCollector(meter));
jobs.add(new InquireChannelCmdCollector(meter));
jobs.add(new QueueMetricsCollector(meter, threadPool, config));
jobs.add(new ListenerMetricsCollector(meter));
jobs.add(new TopicMetricsCollector(meter));
jobs.add(new ReadConfigurationEventQueueCollector(meter));
jobs.add(new PerformanceEventQueueCollector(meter));
jobs.add(new QueueManagerEventCollector(meter));
jobs.add(new QueueManagerMetricsCollector(producer));
jobs.add(new InquireQueueManagerCmdCollector(producer));
jobs.add(new ChannelMetricsCollector(producer));
jobs.add(new InquireChannelCmdCollector(producer));
jobs.add(new QueueMetricsCollector(producer, threadPool, config));
jobs.add(new ListenerMetricsCollector(producer));
jobs.add(new TopicMetricsCollector(producer));
jobs.add(new ReadConfigurationEventQueueCollector(producer));
jobs.add(new PerformanceEventQueueCollector(producer));
jobs.add(new QueueManagerEventCollector(producer));
}

public void run() {
Expand Down Expand Up @@ -115,12 +109,12 @@ public void run(QueueManager queueManager) {
if (e.getCause() instanceof MQException) {
MQException mqe = (MQException) e.getCause();
String errorCode = String.valueOf(mqe.getReason());
errorCodesCounter.add(
producer.recordIbmMqConnectionErrors(
1, Attributes.of(IBM_MQ_QUEUE_MANAGER, queueManagerName, ERROR_CODE, errorCode));
}
} finally {
if (this.metricsConfig.isIbmMqHeartbeatEnabled()) {
heartbeatGauge.set(
producer.recordIbmMqHeartbeat(
heartBeatMetricValue, Attributes.of(IBM_MQ_QUEUE_MANAGER, queueManagerName));
}
cleanUp(ibmQueueManager, agent);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.ibm.mq.metrics;

import com.google.auto.value.AutoValue;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.Data;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.resources.Resource;
import javax.annotation.concurrent.Immutable;

@Immutable
@AutoValue
abstract class MetricData implements io.opentelemetry.sdk.metrics.data.MetricData {

static MetricData createMetricData(
Resource resource,
InstrumentationScopeInfo instrumentationScopeInfo,
String name,
String description,
String unit,
MetricDataType type,
Data<?> data) {
return new AutoValue_MetricData(
resource, instrumentationScopeInfo, name, description, unit, type, data);
}
}
Loading
Loading