Skip to content

Commit 32079d6

Browse files
authored
Merge branch 'main' into spring_rabbit
2 parents be5d5a3 + eb4eb48 commit 32079d6

File tree

4 files changed

+353
-2
lines changed

4 files changed

+353
-2
lines changed

CHANGES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ Release Notes.
77

88
* Added support for Lettuce reactive Redis commands.
99
* Add Spring AI 1.x plugin and GenAI layer.
10-
* Add Spring RabbitMQ plugin.
10+
* Fix httpclient-5.x plugin injecting sw8 propagation headers into ClickHouse HTTP requests (port 8123), causing HTTP 400. Add `PROPAGATION_EXCLUDE_PORTS` config to skip tracing (including header injection) for specified ports in the classic client interceptor.
11+
* Add Spring RabbitMQ 2.x - 4.x plugin.
1112

1213
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/249?closed=1)
1314

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.httpclient.v5;
20+
21+
import org.apache.skywalking.apm.agent.core.boot.PluginConfig;
22+
23+
public class HttpClient5PluginConfig {
24+
public static class Plugin {
25+
@PluginConfig(root = HttpClient5PluginConfig.class)
26+
public static class HttpClient5 {
27+
/**
28+
* Comma-separated list of destination ports whose outbound HTTP requests
29+
* will be completely skipped by the classic client interceptor: no exit
30+
* span is created and no SkyWalking propagation headers are injected.
31+
*
32+
* <p>Some HTTP-based database protocols (e.g. ClickHouse on port 8123)
33+
* reject requests that contain unknown HTTP headers, returning HTTP 400.
34+
* Adding such ports here prevents the agent from creating exit spans
35+
* and from injecting the {@code sw8} tracing headers into those outbound
36+
* requests, meaning these requests are completely untraced by SkyWalking,
37+
* while leaving all other HTTP calls fully traced.
38+
*
39+
* <p>Default: {@code "8123"} (ClickHouse HTTP interface).
40+
*
41+
* <p>Example – also exclude port 9200 (Elasticsearch):
42+
* {@code plugin.httpclient5.PROPAGATION_EXCLUDE_PORTS=8123,9200}
43+
*/
44+
public static String PROPAGATION_EXCLUDE_PORTS = "8123";
45+
}
46+
}
47+
}

apm-sniffer/apm-sdk-plugin/httpclient-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/httpclient/v5/HttpClientDoExecuteInterceptor.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,23 @@
3737
import java.lang.reflect.Method;
3838
import java.net.MalformedURLException;
3939
import java.net.URL;
40+
import java.util.Arrays;
41+
import java.util.Collections;
42+
import java.util.Set;
43+
import java.util.stream.Collectors;
4044

4145
public abstract class HttpClientDoExecuteInterceptor implements InstanceMethodsAroundInterceptor {
4246
private static final String ERROR_URI = "/_blank";
4347

4448
private static final ILog LOGGER = LogManager.getLogger(HttpClientDoExecuteInterceptor.class);
4549

50+
/**
51+
* Lazily-resolved set of ports that must not receive SkyWalking
52+
* propagation headers. Built once from
53+
* {@link HttpClient5PluginConfig.Plugin.HttpClient5#PROPAGATION_EXCLUDE_PORTS}.
54+
*/
55+
private volatile Set<Integer> excludePortsCache;
56+
4657
@Override
4758
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
4859
MethodInterceptResult result) throws Throwable {
@@ -77,7 +88,55 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
7788

7889
protected boolean skipIntercept(EnhancedInstance objInst, Method method, Object[] allArguments,
7990
Class<?>[] argumentsTypes) {
80-
return allArguments[1] == null || getHttpHost(objInst, method, allArguments, argumentsTypes) == null;
91+
if (allArguments[1] == null) {
92+
return true;
93+
}
94+
HttpHost host = getHttpHost(objInst, method, allArguments, argumentsTypes);
95+
if (host == null) {
96+
return true;
97+
}
98+
return isExcludedPort(port(host));
99+
}
100+
101+
/**
102+
* Returns {@code true} when {@code port} is listed in
103+
* {@link HttpClient5PluginConfig.Plugin.HttpClient5#PROPAGATION_EXCLUDE_PORTS}.
104+
*
105+
* <p>The config value is parsed lazily and cached so that it is read after
106+
* the agent has fully initialised its configuration subsystem.
107+
*/
108+
private boolean isExcludedPort(int port) {
109+
if (port <= 0) {
110+
return false;
111+
}
112+
if (excludePortsCache == null) {
113+
synchronized (this) {
114+
if (excludePortsCache == null) {
115+
excludePortsCache = parseExcludePorts(
116+
HttpClient5PluginConfig.Plugin.HttpClient5.PROPAGATION_EXCLUDE_PORTS);
117+
}
118+
}
119+
}
120+
return excludePortsCache.contains(port);
121+
}
122+
123+
private static Set<Integer> parseExcludePorts(String raw) {
124+
if (raw == null || raw.trim().isEmpty()) {
125+
return Collections.emptySet();
126+
}
127+
return Arrays.stream(raw.split(","))
128+
.map(String::trim)
129+
.filter(s -> !s.isEmpty())
130+
.map(s -> {
131+
try {
132+
return Integer.parseInt(s);
133+
} catch (NumberFormatException e) {
134+
LOGGER.warn("Ignoring invalid port in PROPAGATION_EXCLUDE_PORTS: {}", s);
135+
return -1;
136+
}
137+
})
138+
.filter(p -> p > 0)
139+
.collect(Collectors.toSet());
81140
}
82141

83142
protected abstract HttpHost getHttpHost(EnhancedInstance objInst, Method method, Object[] allArguments,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.httpclient.v5;
20+
21+
import org.apache.hc.core5.http.ClassicHttpRequest;
22+
import org.apache.hc.core5.http.ClassicHttpResponse;
23+
import org.apache.hc.core5.http.HttpHost;
24+
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
25+
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
26+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
27+
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
28+
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
29+
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
30+
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
31+
import org.junit.Before;
32+
import org.junit.Rule;
33+
import org.junit.Test;
34+
import org.junit.runner.RunWith;
35+
import org.mockito.Mock;
36+
import org.mockito.junit.MockitoJUnit;
37+
import org.mockito.junit.MockitoRule;
38+
39+
import java.net.URI;
40+
import java.util.List;
41+
42+
import static org.hamcrest.CoreMatchers.is;
43+
import static org.hamcrest.MatcherAssert.assertThat;
44+
import static org.mockito.ArgumentMatchers.anyString;
45+
import static org.mockito.Mockito.never;
46+
import static org.mockito.Mockito.verify;
47+
import static org.mockito.Mockito.when;
48+
49+
/**
50+
* Verifies that requests to ports listed in
51+
* {@link HttpClient5PluginConfig.Plugin.HttpClient5#PROPAGATION_EXCLUDE_PORTS}
52+
* are silently skipped (no span created, no {@code sw8} header injected).
53+
*
54+
* <p>This regression-covers the ClickHouse HTTP-interface issue: ClickHouse
55+
* listens on port 8123 and rejects HTTP requests that carry unknown headers
56+
* (such as the SkyWalking {@code sw8} propagation header), responding with
57+
* HTTP 400 Bad Request. By excluding port 8123 the agent leaves those
58+
* requests untouched while continuing to trace all other HTTP calls.
59+
*/
60+
@RunWith(TracingSegmentRunner.class)
61+
public class HttpClientPropagationExcludePortTest {
62+
63+
@SegmentStoragePoint
64+
private SegmentStorage segmentStorage;
65+
66+
@Rule
67+
public AgentServiceRule agentServiceRule = new AgentServiceRule();
68+
@Rule
69+
public MockitoRule rule = MockitoJUnit.rule();
70+
71+
@Mock
72+
private HttpHost clickHouseHost; // port 8123 – should be excluded
73+
@Mock
74+
private HttpHost regularHost; // port 8080 – should be traced
75+
@Mock
76+
private ClassicHttpRequest request;
77+
@Mock
78+
private ClassicHttpResponse httpResponse;
79+
@Mock
80+
private EnhancedInstance enhancedInstance;
81+
82+
private HttpClientDoExecuteInterceptor internalInterceptor;
83+
private HttpClientDoExecuteInterceptor minimalInterceptor;
84+
85+
private Object[] clickHouseArgs;
86+
private Object[] regularArgs;
87+
private Class<?>[] argumentsType;
88+
89+
@Before
90+
public void setUp() throws Exception {
91+
ServiceManager.INSTANCE.boot();
92+
93+
// Set the exclusion list to the default (includes 8123)
94+
HttpClient5PluginConfig.Plugin.HttpClient5.PROPAGATION_EXCLUDE_PORTS = "8123";
95+
96+
internalInterceptor = new InternalClientDoExecuteInterceptor();
97+
minimalInterceptor = new MinimalClientDoExecuteInterceptor();
98+
99+
when(httpResponse.getCode()).thenReturn(200);
100+
101+
// ClickHouse-like host on port 8123
102+
when(clickHouseHost.getHostName()).thenReturn("clickhouse-server");
103+
when(clickHouseHost.getSchemeName()).thenReturn("http");
104+
when(clickHouseHost.getPort()).thenReturn(8123);
105+
106+
// Regular application host on port 8080
107+
when(regularHost.getHostName()).thenReturn("my-service");
108+
when(regularHost.getSchemeName()).thenReturn("http");
109+
when(regularHost.getPort()).thenReturn(8080);
110+
111+
when(request.getUri()).thenReturn(new URI("http://my-service:8080/api/ping"));
112+
when(request.getMethod()).thenReturn("GET");
113+
114+
clickHouseArgs = new Object[]{clickHouseHost, request};
115+
regularArgs = new Object[]{regularHost, request};
116+
argumentsType = new Class[]{HttpHost.class, ClassicHttpRequest.class};
117+
}
118+
119+
// -----------------------------------------------------------------------
120+
// InternalHttpClient path
121+
// -----------------------------------------------------------------------
122+
123+
/**
124+
* Requests to port 8123 via {@code InternalHttpClient} must not produce a
125+
* trace segment and must NOT set any propagation header on the request.
126+
*
127+
* <p>Before this fix the agent injected {@code sw8} (and two companion
128+
* headers) into every outbound request regardless of the destination port.
129+
* ClickHouse interprets unknown headers as malformed requests and returns
130+
* HTTP 400, making all JDBC queries fail while the SkyWalking agent is
131+
* attached.
132+
*/
133+
@Test
134+
public void internalClient_requestToExcludedPort_noSpanAndNoHeaderInjected() throws Throwable {
135+
internalInterceptor.beforeMethod(enhancedInstance, null, clickHouseArgs, argumentsType, null);
136+
internalInterceptor.afterMethod(enhancedInstance, null, clickHouseArgs, argumentsType, httpResponse);
137+
138+
List<TraceSegment> segments = segmentStorage.getTraceSegments();
139+
assertThat("No trace segment should be created for excluded port", segments.size(), is(0));
140+
verify(request, never()).setHeader(anyString(), anyString());
141+
}
142+
143+
/**
144+
* Requests to a non-excluded port via {@code InternalHttpClient} must still
145+
* be traced and have propagation headers injected.
146+
*/
147+
@Test
148+
public void internalClient_requestToRegularPort_spanCreatedAndHeadersInjected() throws Throwable {
149+
internalInterceptor.beforeMethod(enhancedInstance, null, regularArgs, argumentsType, null);
150+
internalInterceptor.afterMethod(enhancedInstance, null, regularArgs, argumentsType, httpResponse);
151+
152+
List<TraceSegment> segments = segmentStorage.getTraceSegments();
153+
assertThat("A trace segment must be created for a non-excluded port", segments.size(), is(1));
154+
// sw8, sw8-correlation, sw8-x – exactly 3 propagation headers, consistent with existing tests
155+
verify(request, org.mockito.Mockito.times(3)).setHeader(anyString(), anyString());
156+
}
157+
158+
// -----------------------------------------------------------------------
159+
// MinimalHttpClient path
160+
// -----------------------------------------------------------------------
161+
162+
/**
163+
* Same assertion for the {@code MinimalHttpClient} code path.
164+
*/
165+
@Test
166+
public void minimalClient_requestToExcludedPort_noSpanAndNoHeaderInjected() throws Throwable {
167+
minimalInterceptor.beforeMethod(enhancedInstance, null, clickHouseArgs, argumentsType, null);
168+
minimalInterceptor.afterMethod(enhancedInstance, null, clickHouseArgs, argumentsType, httpResponse);
169+
170+
List<TraceSegment> segments = segmentStorage.getTraceSegments();
171+
assertThat("No trace segment should be created for excluded port", segments.size(), is(0));
172+
verify(request, never()).setHeader(anyString(), anyString());
173+
}
174+
175+
/**
176+
* Normal (non-excluded) port via {@code MinimalHttpClient} must still be
177+
* traced.
178+
*/
179+
@Test
180+
public void minimalClient_requestToRegularPort_spanCreatedAndHeadersInjected() throws Throwable {
181+
minimalInterceptor.beforeMethod(enhancedInstance, null, regularArgs, argumentsType, null);
182+
minimalInterceptor.afterMethod(enhancedInstance, null, regularArgs, argumentsType, httpResponse);
183+
184+
List<TraceSegment> segments = segmentStorage.getTraceSegments();
185+
assertThat("A trace segment must be created for a non-excluded port", segments.size(), is(1));
186+
verify(request, org.mockito.Mockito.times(3)).setHeader(anyString(), anyString());
187+
}
188+
189+
// -----------------------------------------------------------------------
190+
// Configuration edge cases
191+
// -----------------------------------------------------------------------
192+
193+
/**
194+
* When {@code PROPAGATION_EXCLUDE_PORTS} is cleared (empty string), every
195+
* port – including 8123 – must be traced normally.
196+
*/
197+
@Test
198+
public void whenExcludePortsEmpty_allPortsAreTraced() throws Throwable {
199+
HttpClient5PluginConfig.Plugin.HttpClient5.PROPAGATION_EXCLUDE_PORTS = "";
200+
201+
// Use a fresh interceptor so the cache is not pre-populated
202+
HttpClientDoExecuteInterceptor freshInterceptor = new MinimalClientDoExecuteInterceptor();
203+
freshInterceptor.beforeMethod(enhancedInstance, null, clickHouseArgs, argumentsType, null);
204+
freshInterceptor.afterMethod(enhancedInstance, null, clickHouseArgs, argumentsType, httpResponse);
205+
206+
List<TraceSegment> segments = segmentStorage.getTraceSegments();
207+
assertThat("Port 8123 should be traced when exclusion list is empty", segments.size(), is(1));
208+
}
209+
210+
/**
211+
* Multiple ports can be listed: verify that both excluded ports are silently
212+
* skipped while a non-excluded port is still traced under the same config.
213+
*/
214+
@Test
215+
public void multipleExcludedPorts_allSkippedAndNonExcludedStillTraced() throws Throwable {
216+
HttpClient5PluginConfig.Plugin.HttpClient5.PROPAGATION_EXCLUDE_PORTS = "8123,9200";
217+
218+
HttpClientDoExecuteInterceptor freshInterceptor = new MinimalClientDoExecuteInterceptor();
219+
220+
// 8123 – must be excluded
221+
freshInterceptor.beforeMethod(enhancedInstance, null, clickHouseArgs, argumentsType, null);
222+
freshInterceptor.afterMethod(enhancedInstance, null, clickHouseArgs, argumentsType, httpResponse);
223+
assertThat("Port 8123 should be excluded", segmentStorage.getTraceSegments().size(), is(0));
224+
225+
// 9200 (Elasticsearch) – must also be excluded
226+
HttpHost esHost = org.mockito.Mockito.mock(HttpHost.class);
227+
when(esHost.getHostName()).thenReturn("es-server");
228+
when(esHost.getSchemeName()).thenReturn("http");
229+
when(esHost.getPort()).thenReturn(9200);
230+
Object[] esArgs = new Object[]{esHost, request};
231+
232+
freshInterceptor = new MinimalClientDoExecuteInterceptor();
233+
freshInterceptor.beforeMethod(enhancedInstance, null, esArgs, argumentsType, null);
234+
freshInterceptor.afterMethod(enhancedInstance, null, esArgs, argumentsType, httpResponse);
235+
assertThat("Port 9200 should also be excluded", segmentStorage.getTraceSegments().size(), is(0));
236+
237+
// 8080 (regular service) – must still be traced under the same multi-port config
238+
freshInterceptor = new MinimalClientDoExecuteInterceptor();
239+
freshInterceptor.beforeMethod(enhancedInstance, null, regularArgs, argumentsType, null);
240+
freshInterceptor.afterMethod(enhancedInstance, null, regularArgs, argumentsType, httpResponse);
241+
assertThat("Non-excluded port 8080 must still produce a trace segment",
242+
segmentStorage.getTraceSegments().size(), is(1));
243+
}
244+
}

0 commit comments

Comments
 (0)