Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/tools/BulkLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public static void load(LoaderOptions options) throws BulkLoadException
DatabaseDescriptor.setInterDCStreamThroughputOutboundBytesPerSec(options.interDcThrottleBytes);
DatabaseDescriptor.setEntireSSTableStreamThroughputOutboundMebibytesPerSec(options.entireSSTableThrottleMebibytes);
DatabaseDescriptor.setEntireSSTableInterDCStreamThroughputOutboundMebibytesPerSec(options.entireSSTableInterDcThrottleMebibytes);
if (options.disableZeroCopyStreaming)
{
DatabaseDescriptor.setStreamEntireSSTables(false);
}
StreamResultFuture future;

ProgressIndicator indicator = new ProgressIndicator();
Expand Down
20 changes: 19 additions & 1 deletion src/java/org/apache/cassandra/tools/LoaderOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class LoaderOptions
public static final String TOOL_NAME = "sstableloader";
public static final String TARGET_KEYSPACE = "target-keyspace";
public static final String TARGET_TABLE = "target-table";
public static final String DISABLE_ZERO_COPY_STREAMING = "disable-zero-copy-streaming";

/* client encryption options */
public static final String SSL_TRUSTSTORE = "truststore";
Expand Down Expand Up @@ -127,6 +128,7 @@ public class LoaderOptions
public final Set<InetAddressAndPort> ignores;
public final String targetKeyspace;
public final String targetTable;
public final boolean disableZeroCopyStreaming;

LoaderOptions(Builder builder)
{
Expand All @@ -151,6 +153,7 @@ public class LoaderOptions
ignores = builder.ignores;
targetKeyspace = builder.targetKeyspace;
targetTable = builder.targetTable;
disableZeroCopyStreaming = builder.disableZeroCopyStreaming;
}

static class Builder
Expand Down Expand Up @@ -180,6 +183,7 @@ static class Builder
Set<InetAddressAndPort> ignores = new HashSet<>();
String targetKeyspace;
String targetTable;
boolean disableZeroCopyStreaming = false;

Builder()
{
Expand Down Expand Up @@ -412,6 +416,12 @@ public Builder targetTable(String table)
return this;
}

public Builder disableZeroCopyStreaming(boolean disableZeroCopyStreaming)
{
this.disableZeroCopyStreaming = disableZeroCopyStreaming;
return this;
}

public Builder parseArgs(String cmdArgs[])
{
CommandLineParser parser = new GnuParser();
Expand Down Expand Up @@ -690,6 +700,11 @@ public Builder parseArgs(String cmdArgs[])
errorMsg("Empty table is not supported.", options);
}

if (cmd.hasOption(DISABLE_ZERO_COPY_STREAMING))
{
disableZeroCopyStreaming = true;
}

return this;
}
catch (ParseException | ConfigurationException | MalformedURLException e)
Expand Down Expand Up @@ -801,6 +816,7 @@ private static CmdLineOptions getCmdLineOptions()
options.addOption("f", CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL.");
options.addOption("k", TARGET_KEYSPACE, "target keyspace name", "target keyspace name");
options.addOption("tb", TARGET_TABLE, "target table name", "target table name");
options.addOption(null, DISABLE_ZERO_COPY_STREAMING, "disable zero-copy streaming (required for loading legacy 3.x sstables)");
return options;
}

Expand All @@ -815,7 +831,9 @@ public static void printUsage(Options options)
String footer = System.lineSeparator() +
"You can provide cassandra.yaml file with -f command line option to set up streaming throughput, client and server encryption options. " +
"Only stream_throughput_outbound, server_encryption_options and client_encryption_options are read from yaml. " +
"You can override options read from cassandra.yaml with corresponding command line options.";
"You can override options read from cassandra.yaml with corresponding command line options." +
System.lineSeparator() +
"Note: Use --disable-zero-copy-streaming when loading SSTables from Cassandra 3.x.";
new HelpFormatter().printHelp(usage, header, options, footer);
}
}
237 changes: 237 additions & 0 deletions test/unit/org/apache/cassandra/io/sstable/SSTableLoaderLegacyTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.io.sstable;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import com.google.common.io.Files;

import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.OutputHandler;

import static org.junit.Assert.assertTrue;

/**
* Tests SSTableLoader with legacy sstables from Cassandra 3.x
*/
public class SSTableLoaderLegacyTest
{
public static final String KEYSPACE1 = "sstableloaderlegacytest";
public static final String LEGACY_VERSION = "me"; // Cassandra 3.11
public static final String LEGACY_TABLE = "legacy_me_simple";

private static java.io.File LEGACY_SSTABLE_ROOT;
private java.io.File tmpdir;

@BeforeClass
public static void defineSchema()
{
String scp = CassandraRelevantProperties.TEST_LEGACY_SSTABLE_ROOT.getString();
if (scp == null || scp.isEmpty())
{
throw new RuntimeException("System property for legacy sstable root is not set.");
}
LEGACY_SSTABLE_ROOT = new java.io.File(scp).getAbsoluteFile();

SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1));

// Create table matching the legacy sstable schema
// legacy_me_simple has schema: pk text PRIMARY KEY, val text
QueryProcessor.executeInternal(String.format(
"CREATE TABLE %s.%s (pk text PRIMARY KEY, val text)",
KEYSPACE1, LEGACY_TABLE));

StorageService.instance.initServer();
}

@Before
public void setup()
{
tmpdir = Files.createTempDir();
}

@After
public void cleanup()
{
FileUtils.deleteRecursive(new File(tmpdir));
}

private static final class TestClient extends SSTableLoader.Client
{
private String keyspace;

public void init(String keyspace)
{
this.keyspace = keyspace;
for (Replica replica : StorageService.instance.getLocalReplicas(KEYSPACE1))
addRangeForEndpoint(replica.range(), FBUtilities.getBroadcastAddressAndPort());
}

public TableMetadataRef getTableMetadata(String tableName)
{
return Schema.instance.getTableMetadataRef(keyspace, tableName);
}
}

@Test(expected = ExecutionException.class)
public void testLoadLegacy311SSTableWithZeroCopyEnabled() throws Exception
{
assertTrue("Zero-copy streaming should be enabled by default",
DatabaseDescriptor.streamEntireSSTables());

java.io.File dataDir = setupLegacySSTableDirectory();
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(LEGACY_TABLE);

final CountDownLatch latch = new CountDownLatch(1);
SSTableLoader loader = new SSTableLoader(new File(dataDir), new TestClient(),
new OutputHandler.SystemOutput(false, false));

loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
latch.await();
}

@Test
public void testLoadLegacy311SSTableWithZeroCopyDisabled() throws Exception
{
DatabaseDescriptor.setStreamEntireSSTables(false);

java.io.File dataDir = setupLegacySSTableDirectory();
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(LEGACY_TABLE);

final CountDownLatch latch = new CountDownLatch(1);
SSTableLoader loader = new SSTableLoader(new File(dataDir), new TestClient(),
new OutputHandler.SystemOutput(false, false));

try
{
loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
latch.await();

assertTrue("Data should be loaded from legacy sstable",
!Util.getAll(Util.cmd(cfs).build()).isEmpty());
}
finally
{
DatabaseDescriptor.setStreamEntireSSTables(true);
}
}

/**
* Sets up a directory with legacy 3.11 sstables copied from test data.
*/
private java.io.File setupLegacySSTableDirectory() throws IOException
{
java.io.File dataDir = new java.io.File(tmpdir, KEYSPACE1 + java.io.File.separator + LEGACY_TABLE);
if (!dataDir.exists())
dataDir.mkdirs();

java.io.File legacyTableDir = new java.io.File(LEGACY_SSTABLE_ROOT,
String.format("%s/legacy_tables/%s", LEGACY_VERSION, LEGACY_TABLE));

if (!legacyTableDir.isDirectory())
{
throw new RuntimeException("Legacy sstable directory not found: " + legacyTableDir);
}

// Copy all sstable components to the test directory
java.io.File[] sourceFiles = legacyTableDir.listFiles();
if (sourceFiles != null)
{
for (java.io.File sourceFile : sourceFiles)
{
copyFile(sourceFile, new java.io.File(dataDir, sourceFile.getName()));
}
}

System.out.println("Copied legacy sstables from: " + legacyTableDir);
System.out.println("To: " + dataDir);
java.io.File[] copiedFiles = dataDir.listFiles();
System.out.println("File count: " + (copiedFiles != null ? copiedFiles.length : 0));

return dataDir;
}

/**
* Copies a file from source to target.
*/
private static void copyFile(java.io.File sourceFile, java.io.File targetFile) throws IOException
{
byte[] buf = new byte[65536];
if (sourceFile.isFile())
{
int rd;
try (FileInputStream is = new FileInputStream(sourceFile);
FileOutputStream os = new FileOutputStream(targetFile))
{
while ((rd = is.read(buf)) >= 0)
os.write(buf, 0, rd);
}
}
}

/**
* Creates a stream completion listener.
*/
private StreamEventHandler completionStreamListener(final CountDownLatch latch)
{
return new StreamEventHandler()
{
public void onFailure(Throwable arg0)
{
latch.countDown();
}

public void onSuccess(StreamState arg0)
{
latch.countDown();
}

public void handleStreamEvent(StreamEvent event)
{
}
};
}
}