Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") Strin
@GET
@Path("/{connector}/topics")
@Operation(summary = "Get the list of topics actively used by the specified connector")
public Response getConnectorActiveTopics(final @PathParam("connector") String connector) {
public Map<String, ActiveTopicsInfo> getConnectorActiveTopics(final @PathParam("connector") String connector) {
if (isTopicTrackingDisabled) {
throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(),
"Topic tracking is disabled.");
}
ActiveTopicsInfo info = herder.connectorActiveTopics(connector);
return Response.ok(Map.of(info.connector(), info)).build();
return Map.of(info.connector(), info);
}

@PUT
Expand Down Expand Up @@ -234,15 +234,15 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto

@PATCH
@Path("/{connector}/config")
public Response patchConnectorConfig(final @PathParam("connector") String connector,
public ConnectorInfo patchConnectorConfig(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
final Map<String, String> connectorConfigPatch) throws Throwable {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
herder.patchConnectorConfig(connector, connectorConfigPatch, cb);
Herder.Created<ConnectorInfo> createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
"PATCH", headers, connectorConfigPatch, new TypeReference<>() { }, new CreatedConnectorInfoTranslator(), forward);
return Response.ok().entity(createdInfo.result()).build();
return createdInfo.result();
}

@POST
Expand Down Expand Up @@ -359,7 +359,7 @@ public ConnectorOffsets getOffsets(final @PathParam("connector") String connecto
@PATCH
@Path("/{connector}/offsets")
@Operation(summary = "Alter the offsets for the specified connector")
public Response alterConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
public Message alterConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
final @Context HttpHeaders headers, final @PathParam("connector") String connector,
final ConnectorOffsets offsets) throws Throwable {
if (offsets.offsets() == null || offsets.offsets().isEmpty()) {
Expand All @@ -368,21 +368,19 @@ public Response alterConnectorOffsets(final @Parameter(hidden = true) @QueryPara

FutureCallback<Message> cb = new FutureCallback<>();
herder.alterConnectorOffsets(connector, offsets.toMap(), cb);
Message msg = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "PATCH", headers, offsets,
return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "PATCH", headers, offsets,
new TypeReference<>() { }, new IdentityTranslator<>(), forward);
return Response.ok().entity(msg).build();
}

@DELETE
@Path("/{connector}/offsets")
@Operation(summary = "Reset the offsets for the specified connector")
public Response resetConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
public Message resetConnectorOffsets(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
final @Context HttpHeaders headers, final @PathParam("connector") String connector) throws Throwable {
FutureCallback<Message> cb = new FutureCallback<>();
herder.resetConnectorOffsets(connector, cb);
Message msg = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "DELETE", headers, null,
return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/offsets", "DELETE", headers, null,
new TypeReference<>() { }, new IdentityTranslator<>(), forward);
return Response.ok().entity(msg).build();
}

// Check whether the connector name from the url matches the one (if there is one) provided in the connectorConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

/**
* A set of endpoints to adjust the log levels of runtime loggers.
Expand Down Expand Up @@ -69,8 +68,8 @@ public LoggingResource(Herder herder) {
*/
@GET
@Operation(summary = "List the current loggers that have their levels explicitly set and their log levels")
public Response listLoggers() {
return Response.ok(herder.allLoggerLevels()).build();
public Map<String, LoggerLevel> listLoggers() {
return herder.allLoggerLevels();
}

/**
Expand All @@ -82,14 +81,14 @@ public Response listLoggers() {
@GET
@Path("/{logger}")
@Operation(summary = "Get the log level for the specified logger")
public Response getLogger(final @PathParam("logger") String namedLogger) {
public LoggerLevel getLogger(final @PathParam("logger") String namedLogger) {
Objects.requireNonNull(namedLogger, "require non-null name");

LoggerLevel loggerLevel = herder.loggerLevel(namedLogger);
if (loggerLevel == null)
throw new NotFoundException("Logger " + namedLogger + " not found.");

return Response.ok(loggerLevel).build();
return loggerLevel;
}

/**
Expand All @@ -104,7 +103,7 @@ public Response getLogger(final @PathParam("logger") String namedLogger) {
@Path("/{logger}")
@Operation(summary = "Set the log level for the specified logger")
@SuppressWarnings("fallthrough")
public Response setLevel(final @PathParam("logger") String namespace,
public List<String> setLevel(final @PathParam("logger") String namespace,
final Map<String, String> levelMap,
@DefaultValue("worker") @QueryParam("scope") @Parameter(description = "The scope for the logging modification (single-worker, cluster-wide, etc.)") String scope) {
if (scope == null) {
Expand All @@ -126,11 +125,10 @@ public Response setLevel(final @PathParam("logger") String namespace,
default:
log.warn("Received invalid scope '{}' in request to adjust logging level; will default to {}", scope, WORKER_SCOPE);
case WORKER_SCOPE:
List<String> affectedLoggers = herder.setWorkerLoggerLevel(namespace, levelString);
return Response.ok(affectedLoggers).build();
return herder.setWorkerLoggerLevel(namespace, levelString);
case CLUSTER_SCOPE:
herder.setClusterLoggerLevel(namespace, levelString);
return Response.noContent().build();
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,12 +816,10 @@ public void testConnectorActiveTopics() {
.thenReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS));
connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT);

Response response = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME);
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
Map<String, Map<String, Object>> body = (Map<String, Map<String, Object>>) response.getEntity();
assertEquals(CONNECTOR_NAME, ((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).connector());
Map<String, ActiveTopicsInfo> body = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME);
assertEquals(CONNECTOR_NAME, body.get(CONNECTOR_NAME).connector());
assertEquals(new HashSet<>(CONNECTOR_ACTIVE_TOPICS),
((ActiveTopicsInfo) body.get(CONNECTOR_NAME)).topics());
body.get(CONNECTOR_NAME).topics());
}

@Test
Expand Down Expand Up @@ -913,9 +911,8 @@ public void testAlterOffsets() throws Throwable {
cb.getValue().onCompletion(null, msg);
return null;
}).when(herder).alterConnectorOffsets(eq(CONNECTOR_NAME), eq(body.toMap()), cb.capture());
Response response = connectorsResource.alterConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME, body);
assertEquals(200, response.getStatus());
assertEquals(msg, response.getEntity());
Message result = connectorsResource.alterConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME, body);
assertEquals(msg, result);
}

@Test
Expand Down Expand Up @@ -945,9 +942,8 @@ public void testResetOffsets() throws Throwable {
cb.getValue().onCompletion(null, msg);
return null;
}).when(herder).resetConnectorOffsets(eq(CONNECTOR_NAME), cb.capture());
Response response = connectorsResource.resetConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME);
assertEquals(200, response.getStatus());
assertEquals(msg, response.getEntity());
Message result = connectorsResource.resetConnectorOffsets(null, NULL_HEADERS, CONNECTOR_NAME);
assertEquals(msg, result);
}

private <T> Stubber expectAndCallbackResult(final ArgumentCaptor<Callback<T>> cb, final T value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import java.util.List;
import java.util.Map;

import jakarta.ws.rs.core.Response;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -73,9 +71,7 @@ public void testGetLevel() {
final LoggerLevel expectedLevel = new LoggerLevel(Level.WARN.toString(), 976L);
when(herder.loggerLevel(logger)).thenReturn(expectedLevel);

Response response = loggingResource.getLogger(logger);
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
LoggerLevel actualLevel = (LoggerLevel) response.getEntity();
LoggerLevel actualLevel = loggingResource.getLogger(logger);

assertEquals(
expectedLevel,
Expand Down Expand Up @@ -126,7 +122,6 @@ public void testSetLevelWorkerScope() {
testSetLevelWorkerScope("worker", false);
}

@SuppressWarnings("unchecked")
private void testSetLevelWorkerScope(String scope, boolean expectWarning) {
final String logger = "org.apache.kafka.connect";
final String level = "TRACE";
Expand All @@ -138,9 +133,7 @@ private void testSetLevelWorkerScope(String scope, boolean expectWarning) {

List<String> actualLoggers;
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(LoggingResource.class)) {
Response response = loggingResource.setLevel(logger, Map.of("level", level), scope);
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
actualLoggers = (List<String>) response.getEntity();
actualLoggers = loggingResource.setLevel(logger, Map.of("level", level), scope);
long warningMessages = logCaptureAppender.getEvents().stream()
.filter(e -> "WARN".equals(e.getLevel()))
.count();
Expand All @@ -159,10 +152,9 @@ public void testSetLevelClusterScope() {
final String logger = "org.apache.kafka.connect";
final String level = "TRACE";

Response response = loggingResource.setLevel(logger, Map.of("level", level), "cluster");
List<String> result = loggingResource.setLevel(logger, Map.of("level", level), "cluster");

assertEquals(Response.Status.NO_CONTENT.getStatusCode(), response.getStatus());
assertNull(response.getEntity());
assertNull(result);

verify(herder).setClusterLoggerLevel(logger, level);
}
Expand Down
Loading