From 81097261aefe3f60e34fde5bbc503c3178581440 Mon Sep 17 00:00:00 2001 From: gomudayya Date: Thu, 9 Apr 2026 00:09:40 +0900 Subject: [PATCH] Improve types returned by the Connect REST API --- .../rest/resources/ConnectorsResource.java | 18 ++++++++---------- .../rest/resources/LoggingResource.java | 16 +++++++--------- .../rest/resources/ConnectorsResourceTest.java | 18 +++++++----------- .../rest/resources/LoggingResourceTest.java | 16 ++++------------ 4 files changed, 26 insertions(+), 42 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index 148e96a4cee13..b9d01b0557d26 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -184,13 +184,13 @@ public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") Strin @GET @Path("/{connector}/topics") @Operation(summary = "Get the list of topics actively used by the specified connector") - public Response getConnectorActiveTopics(final @PathParam("connector") String connector) { + public Map getConnectorActiveTopics(final @PathParam("connector") String connector) { if (isTopicTrackingDisabled) { throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "Topic tracking is disabled."); } ActiveTopicsInfo info = herder.connectorActiveTopics(connector); - return Response.ok(Map.of(info.connector(), info)).build(); + return Map.of(info.connector(), info); } @PUT @@ -234,7 +234,7 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto @PATCH @Path("/{connector}/config") - public Response patchConnectorConfig(final @PathParam("connector") String connector, + public ConnectorInfo patchConnectorConfig(final @PathParam("connector") String connector, final @Context HttpHeaders headers, final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, final Map connectorConfigPatch) throws Throwable { @@ -242,7 +242,7 @@ public Response patchConnectorConfig(final @PathParam("connector") String connec herder.patchConnectorConfig(connector, connectorConfigPatch, cb); Herder.Created createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "PATCH", headers, connectorConfigPatch, new TypeReference<>() { }, new CreatedConnectorInfoTranslator(), forward); - return Response.ok().entity(createdInfo.result()).build(); + return createdInfo.result(); } @POST @@ -359,7 +359,7 @@ public ConnectorOffsets getOffsets(final @PathParam("connector") String connecto @PATCH @Path("/{connector}/offsets") @Operation(summary = "Alter the offsets for the specified connector") - public Response alterConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, + public Message alterConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, final @Context HttpHeaders headers, final @PathParam("connector") String connector, final ConnectorOffsets offsets) throws Throwable { if (offsets.offsets() == null || offsets.offsets().isEmpty()) { @@ -368,21 +368,19 @@ public Response alterConnectorOffsets(final @Parameter(hidden = true) @QueryPara FutureCallback cb = new FutureCallback<>(); herder.alterConnectorOffsets(connector, offsets.toMap(), cb); - Message msg = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "PATCH", headers, offsets, + return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "PATCH", headers, offsets, new TypeReference<>() { }, new IdentityTranslator<>(), forward); - return Response.ok().entity(msg).build(); } @DELETE @Path("/{connector}/offsets") @Operation(summary = "Reset the offsets for the specified connector") - public Response resetConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, + public Message resetConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, final @Context HttpHeaders headers, final @PathParam("connector") String connector) throws Throwable { FutureCallback cb = new FutureCallback<>(); herder.resetConnectorOffsets(connector, cb); - Message msg = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "DELETE", headers, null, + return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "DELETE", headers, null, new TypeReference<>() { }, new IdentityTranslator<>(), forward); - return Response.ok().entity(msg).build(); } // Check whether the connector name from the url matches the one (if there is one) provided in the connectorConfig diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java index c5bad9fd78a57..499916d63f0ac 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java @@ -40,7 +40,6 @@ import jakarta.ws.rs.Produces; import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.Response; /** * A set of endpoints to adjust the log levels of runtime loggers. @@ -69,8 +68,8 @@ public LoggingResource(Herder herder) { */ @GET @Operation(summary = "List the current loggers that have their levels explicitly set and their log levels") - public Response listLoggers() { - return Response.ok(herder.allLoggerLevels()).build(); + public Map listLoggers() { + return herder.allLoggerLevels(); } /** @@ -82,14 +81,14 @@ public Response listLoggers() { @GET @Path("/{logger}") @Operation(summary = "Get the log level for the specified logger") - public Response getLogger(final @PathParam("logger") String namedLogger) { + public LoggerLevel getLogger(final @PathParam("logger") String namedLogger) { Objects.requireNonNull(namedLogger, "require non-null name"); LoggerLevel loggerLevel = herder.loggerLevel(namedLogger); if (loggerLevel == null) throw new NotFoundException("Logger " + namedLogger + " not found."); - return Response.ok(loggerLevel).build(); + return loggerLevel; } /** @@ -104,7 +103,7 @@ public Response getLogger(final @PathParam("logger") String namedLogger) { @Path("/{logger}") @Operation(summary = "Set the log level for the specified logger") @SuppressWarnings("fallthrough") - public Response setLevel(final @PathParam("logger") String namespace, + public List setLevel(final @PathParam("logger") String namespace, final Map levelMap, @DefaultValue("worker") @QueryParam("scope") @Parameter(description = "The scope for the logging modification (single-worker, cluster-wide, etc.)") String scope) { if (scope == null) { @@ -126,11 +125,10 @@ public Response setLevel(final @PathParam("logger") String namespace, default: log.warn("Received invalid scope '{}' in request to adjust logging level; will default to {}", scope, WORKER_SCOPE); case WORKER_SCOPE: - List affectedLoggers = herder.setWorkerLoggerLevel(namespace, levelString); - return Response.ok(affectedLoggers).build(); + return herder.setWorkerLoggerLevel(namespace, levelString); case CLUSTER_SCOPE: herder.setClusterLoggerLevel(namespace, levelString); - return Response.noContent().build(); + return null; } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index 26a4665824802..eed8530aa8ef0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -816,12 +816,10 @@ public void testConnectorActiveTopics() { .thenReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS)); connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT); - Response response = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME); - assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - Map> body = (Map>) response.getEntity(); - assertEquals(CONNECTOR_NAME, ((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).connector()); + Map body = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME); + assertEquals(CONNECTOR_NAME, body.get(CONNECTOR_NAME).connector()); assertEquals(new HashSet<>(CONNECTOR_ACTIVE_TOPICS), - ((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).topics()); + body.get(CONNECTOR_NAME).topics()); } @Test @@ -913,9 +911,8 @@ public void testAlterOffsets() throws Throwable { cb.getValue().onCompletion(null, msg); return null; }).when(herder).alterConnectorOffsets(eq(CONNECTOR_NAME), eq(body.toMap()), cb.capture()); - Response response = connectorsResource.alterConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME, body); - assertEquals(200, response.getStatus()); - assertEquals(msg, response.getEntity()); + Message result = connectorsResource.alterConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME, body); + assertEquals(msg, result); } @Test @@ -945,9 +942,8 @@ public void testResetOffsets() throws Throwable { cb.getValue().onCompletion(null, msg); return null; }).when(herder).resetConnectorOffsets(eq(CONNECTOR_NAME), cb.capture()); - Response response = connectorsResource.resetConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME); - assertEquals(200, response.getStatus()); - assertEquals(msg, response.getEntity()); + Message result = connectorsResource.resetConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME); + assertEquals(msg, result); } private Stubber expectAndCallbackResult(final ArgumentCaptor> cb, final T value) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResourceTest.java index 67ccb519d05ef..8b3300ff987c0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResourceTest.java @@ -35,8 +35,6 @@ import java.util.List; import java.util.Map; -import jakarta.ws.rs.core.Response; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -73,9 +71,7 @@ public void testGetLevel() { final LoggerLevel expectedLevel = new LoggerLevel(Level.WARN.toString(), 976L); when(herder.loggerLevel(logger)).thenReturn(expectedLevel); - Response response = loggingResource.getLogger(logger); - assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - LoggerLevel actualLevel = (LoggerLevel) response.getEntity(); + LoggerLevel actualLevel = loggingResource.getLogger(logger); assertEquals( expectedLevel, @@ -126,7 +122,6 @@ public void testSetLevelWorkerScope() { testSetLevelWorkerScope("worker", false); } - @SuppressWarnings("unchecked") private void testSetLevelWorkerScope(String scope, boolean expectWarning) { final String logger = "org.apache.kafka.connect"; final String level = "TRACE"; @@ -138,9 +133,7 @@ private void testSetLevelWorkerScope(String scope, boolean expectWarning) { List actualLoggers; try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(LoggingResource.class)) { - Response response = loggingResource.setLevel(logger, Map.of("level", level), scope); - assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - actualLoggers = (List) response.getEntity(); + actualLoggers = loggingResource.setLevel(logger, Map.of("level", level), scope); long warningMessages = logCaptureAppender.getEvents().stream() .filter(e -> "WARN".equals(e.getLevel())) .count(); @@ -159,10 +152,9 @@ public void testSetLevelClusterScope() { final String logger = "org.apache.kafka.connect"; final String level = "TRACE"; - Response response = loggingResource.setLevel(logger, Map.of("level", level), "cluster"); + List result = loggingResource.setLevel(logger, Map.of("level", level), "cluster"); - assertEquals(Response.Status.NO_CONTENT.getStatusCode(), response.getStatus()); - assertNull(response.getEntity()); + assertNull(result); verify(herder).setClusterLoggerLevel(logger, level); }