Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
0.2.0
0.3.0
-----
* File descriptor leak after file streamed in Sidecar Client (CASSANALYTICS-103)
* Add TimeRangeFilter to filter out SSTables outside given time window (CASSANALYTICS-102)

0.2.0
-----
* Generated distribution artifacts fix (CASSANALYTICS-105)
* Fix SSTable descriptor mismatch preventing newly produced SSTables from being uploaded (CASSANALYTICS-98)
* Expose SidecarCdc builders and interfaces (CASSANALYTICS-94)
Expand Down
2 changes: 1 addition & 1 deletion analytics-sidecar-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ sourceCompatibility = 1.8
test {
useJUnitPlatform()
testLogging {
events "passed", "skipped", "failed"
events "started", "passed", "skipped", "failed"
}
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
reports {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.cassandra.sidecar.common.request.Request;
import org.apache.cassandra.sidecar.common.request.UploadableRequest;

import static java.lang.String.valueOf;
import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.AUTH_ROLE;
import static org.apache.cassandra.sidecar.common.utils.StringUtils.isNullOrEmpty;

Expand Down Expand Up @@ -165,11 +166,7 @@ protected CompletableFuture<HttpResponse> executeUploadFileInternal(SidecarInsta
Promise<HttpResponse> promise = Promise.promise();
// open the local file
openFileForRead(vertx.fileSystem(), filename)
.compose(pair -> vertxRequest.ssl(config.ssl())
.putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(),
String.valueOf(pair.getKey()))
.sendStream(pair.getValue()
.setReadBufferSize(config.sendReadBufferSize())))
.compose(pair -> sendFileStream(vertxRequest, pair, filename))
.onFailure(promise::fail)
.onSuccess(response -> {
byte[] raw = response.body() != null ? response.body().getBytes() : null;
Expand All @@ -184,6 +181,43 @@ protected CompletableFuture<HttpResponse> executeUploadFileInternal(SidecarInsta
return promise.future().toCompletionStage().toCompletableFuture();
}

/**
* Sends the file stream via HTTP request.
*
* @param vertxRequest the HTTP request to send the file stream with
* @param pair a pair containing file size and the AsyncFile handle
* @param filename the name of the file being uploaded (for logging purposes)
* @return a Future that completes when the file has been sent
*/
protected Future<io.vertx.ext.web.client.HttpResponse<Buffer>> sendFileStream(
HttpRequest<Buffer> vertxRequest,
AbstractMap.SimpleEntry<Long, AsyncFile> pair,
String filename)
{
AsyncFile asyncFile = pair.getValue();
return vertxRequest.ssl(config.ssl())
.putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(),
valueOf(pair.getKey()))
.sendStream(pair.getValue()
.setReadBufferSize(config.sendReadBufferSize()))
.eventually(() -> {
// vertx.setTimer(1000, timerId -> {
// Defer file closing for 1 second
try
{
return asyncFile.close().onFailure(err ->
LOGGER.warn("Failed to close file after upload: filename='{}'", filename, err));
}
catch (Exception ex)
{
LOGGER.warn("Failed due to exception for filename='{}'", filename, ex);
return Future.failedFuture(ex);
}

//});
});
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,36 @@

package org.apache.cassandra.sidecar.client;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.apache.cassandra.sidecar.client.request.RequestExecutorTest;

import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static java.nio.file.Files.copy;
import static org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.AUTH_ROLE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -37,17 +57,35 @@
public class VertxHttpClientTest
{
private static Vertx vertx;
private MockWebServer mockServer;
private HttpClientConfig config;
private SidecarInstanceImpl sidecarInstance;

@BeforeAll
public static void setUp()
@BeforeEach
public void setUp() throws IOException
{
vertx = Vertx.vertx();
mockServer = new MockWebServer();
mockServer.start();

config = new HttpClientConfig.Builder<>()
.ssl(false)
.timeoutMillis(30000)
.build();
sidecarInstance = RequestExecutorTest.newSidecarInstance(mockServer);
}

@AfterAll
public static void tearDown()
@AfterEach
public void tearDown() throws IOException
{
vertx.close();
if (mockServer != null)
{
mockServer.shutdown();
}
if (vertx != null)
{
vertx.close();
}
}

@Test
Expand All @@ -74,4 +112,116 @@ private HttpClientConfig.Builder<?> httpClientConfigBuilder()
.timeoutMillis(100)
.idleTimeoutMillis(100);
}

@Test
void testUploadSSTableClosesFile(@TempDir Path tempDirectory) throws Exception
{
runTestScenario(tempDirectory,
new MockResponse().setResponseCode(OK.code()),
new ExposeAsyncFileVertxHttpClient(vertx, config));
}

@Test
void testUploadClosesFileOnUploadFailure(@TempDir Path tempDirectory) throws Exception
{
runTestScenario(tempDirectory,
new MockResponse().setResponseCode(INTERNAL_SERVER_ERROR.code()),
new ExposeAsyncFileVertxHttpClient(vertx, config));
}

@Test
void testMultipleUploadClosesAllFiles(@TempDir Path tempDirectory) throws Exception
{
mockServer.enqueue(new MockResponse().setResponseCode(OK.code()));
mockServer.enqueue(new MockResponse().setResponseCode(OK.code()));
mockServer.enqueue(new MockResponse().setResponseCode(OK.code()));

Path fileToUpload = prepareFile(tempDirectory);

ExposeAsyncFileVertxHttpClient httpClient = new ExposeAsyncFileVertxHttpClient(vertx, config);

// Upload the same file 3 times (simulating multiple file uploads)
for (int i = 0; i < 3; i++)
{
HttpRequest<Buffer> vertxRequest = httpClient.webClient.put(mockServer.getPort(),
mockServer.getHostName(),
"/upload/test" + i);
httpClient.executeUploadFileInternal(sidecarInstance, vertxRequest, fileToUpload.toString())
.get(30, TimeUnit.SECONDS);
}

assertThat(mockServer.getRequestCount()).isEqualTo(3);
assertThat(httpClient.capturedFiles).hasSize(3);

// Give async file close operations time to complete
Thread.sleep(100);

// Verify all the files are closed by attempting to call .end() which should throw IllegalStateException
for (AsyncFile file : httpClient.capturedFiles)
{
assertThatThrownBy(file::end)
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("File handle is closed");
}
}

private void runTestScenario(Path tempDirectory,
MockResponse mockResponse,
ExposeAsyncFileVertxHttpClient httpClient) throws Exception
{
mockServer.enqueue(mockResponse);

Path fileToUpload = prepareFile(tempDirectory);
HttpRequest<Buffer> vertxRequest = httpClient.webClient.put(mockServer.getPort(),
mockServer.getHostName(),
"/upload/test");

httpClient.executeUploadFileInternal(sidecarInstance, vertxRequest, fileToUpload.toString())
.get(30, TimeUnit.SECONDS);

assertThat(mockServer.getRequestCount()).isEqualTo(1);

// Give async file close operation time to complete
Thread.sleep(100);

// Verify file is closed by attempting to call .end() which should throw IllegalStateException
assertThat(httpClient.capturedFiles).hasSize(1);
assertThatThrownBy(() -> httpClient.capturedFiles.get(0).end())
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("File handle is closed");
}

/**
* Class that extends from {@link VertxHttpClient} for testing purposes and holds a reference to the
* {@link AsyncFile} to ensure that the file has been closed.
*/
static class ExposeAsyncFileVertxHttpClient extends VertxHttpClient
{
List<AsyncFile> capturedFiles = new ArrayList<>();

ExposeAsyncFileVertxHttpClient(Vertx vertx, HttpClientConfig config)
{
super(vertx, config);
}

@Override
protected Future<HttpResponse<Buffer>> sendFileStream(HttpRequest<Buffer> vertxRequest,
SimpleEntry<Long, AsyncFile> pair,
String filename)
{
capturedFiles.add(pair.getValue());
return super.sendFileStream(vertxRequest, pair, filename);
}
}

private Path prepareFile(Path tempDirectory) throws IOException
{
Path fileToUpload = tempDirectory.resolve("nb-1-big-TOC.txt");
try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream("sstables/nb-1-big-TOC.txt"))
{
assertThat(inputStream).isNotNull();
copy(inputStream, fileToUpload, StandardCopyOption.REPLACE_EXISTING);
}
return fileToUpload;
}
}
3 changes: 2 additions & 1 deletion cassandra-analytics-integration-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ test {
systemProperty "SKIP_STARTUP_VALIDATIONS", "true"
systemProperty "logback.configurationFile", "src/test/resources/logback-test.xml"
systemProperty "cassandra.integration.sidecar.test.enable_mtls", integrationEnableMtls
systemProperty "vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory"
minHeapSize = '1g'
maxHeapSize = integrationMaxHeapSize
maxParallelForks = integrationMaxParallelForks
Expand All @@ -102,7 +103,7 @@ test {
}

testLogging {
events "passed", "skipped", "failed"
events "started", "passed", "skipped", "failed"

showExceptions true
exceptionFormat "full"
Expand Down