Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ private SystemKeyspace()
+ "keyspace_name text,"
+ "rows_merged map<int, bigint>,"
+ "compaction_properties frozen<map<text, text>>,"
+ "compaction_type text,"
+ "PRIMARY KEY ((id)))")
.defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7))
.build();
Expand Down Expand Up @@ -718,12 +719,13 @@ public static void updateCompactionHistory(TimeUUID taskId,
long bytesIn,
long bytesOut,
Map<Integer, Long> rowsMerged,
Map<String, String> compactionProperties)
Map<String, String> compactionProperties,
String compactionType)
{
// don't write anything when the history table itself is compacted, since that would in turn cause new compactions
if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY))
return;
String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged, compaction_properties) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged, compaction_properties, compaction_type) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
executeInternal(format(req, COMPACTION_HISTORY),
taskId,
ksname,
Expand All @@ -732,7 +734,8 @@ public static void updateCompactionHistory(TimeUUID taskId,
bytesIn,
bytesOut,
rowsMerged,
compactionProperties);
compactionProperties,
compactionType);
}

public static TabularData getCompactionHistory() throws OpenDataException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
public class CompactionHistoryTabularData
{
private static final String[] ITEM_NAMES = new String[]{ "id", "keyspace_name", "columnfamily_name", "compacted_at",
"bytes_in", "bytes_out", "rows_merged", "compaction_properties" };
"bytes_in", "bytes_out", "rows_merged", "compaction_properties",
"compaction_type" }; // Added

private static final String[] ITEM_DESCS = new String[]{ "time uuid", "keyspace name",
"column family name", "compaction finished at",
"total bytes in", "total bytes out", "total rows merged", "compaction properties" };
"total bytes in", "total bytes out", "total rows merged",
"compaction properties", "compaction type" }; // Added

private static final String TYPE_NAME = "CompactionHistory";

Expand All @@ -45,15 +47,16 @@ public class CompactionHistoryTabularData
private static final CompositeType COMPOSITE_TYPE;

private static final TabularType TABULAR_TYPE;

public static final String COMPACTION_TYPE_PROPERTY = "compaction_type";
static

static
{
try
{
ITEM_TYPES = new OpenType[]{ SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG,
SimpleType.LONG, SimpleType.LONG, SimpleType.STRING, SimpleType.STRING };
SimpleType.LONG, SimpleType.LONG, SimpleType.STRING, SimpleType.STRING,
SimpleType.STRING };

COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES);

Expand All @@ -78,10 +81,14 @@ public static TabularData from(UntypedResultSet resultSet) throws OpenDataExcept
long bytesOut = row.getLong(ITEM_NAMES[5]);
Map<Integer, Long> rowMerged = row.getMap(ITEM_NAMES[6], Int32Type.instance, LongType.instance);
Map<String, String> compactionProperties = row.getMap(ITEM_NAMES[7], UTF8Type.instance, UTF8Type.instance);

String compactionType = row.has(ITEM_NAMES[8]) ? row.getString(ITEM_NAMES[8]) : "UNKNOWN";

result.put(new CompositeDataSupport(COMPOSITE_TYPE, ITEM_NAMES,
new Object[]{ id.toString(), ksName, cfName, compactedAt, bytesIn, bytesOut,
'{' + FBUtilities.toString(rowMerged) + '}',
'{' + FBUtilities.toString(compactionProperties) + '}' }));
'{' + FBUtilities.toString(compactionProperties) + '}',
compactionType }));
}
return result;
}
Expand Down
13 changes: 10 additions & 3 deletions src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,15 @@ public boolean apply(SSTableReader sstable)
for (int i = 0; i < mergedRowCounts.length; i++)
totalSourceRows += mergedRowCounts[i] * (i + 1);

Map<String, String> props = new HashMap<>();
props.put("strategy", cfs.getCompactionStrategyManager().getCompactionParams().klass().getSimpleName());

if (getLevel() > 0)
props.put("level", Integer.toString(getLevel()));

String mergeSummary = updateCompactionHistory(taskId, cfs.getKeyspaceName(), cfs.getTableName(), mergedRowCounts, startsize, endsize,
ImmutableMap.of(COMPACTION_TYPE_PROPERTY, compactionType.type));
props,
compactionType.type);

logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %s to %s (~%d%% of original) in %,dms. Read Throughput = %s, Write Throughput = %s, Row Throughput = ~%,d/s. %,d total partitions merged to %,d. Partition merge counts were {%s}. Time spent writing keys = %,dms",
transaction.opIdString(),
Expand Down Expand Up @@ -353,7 +360,7 @@ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, keepOriginals, getLevel());
}

public static String updateCompactionHistory(TimeUUID taskId, String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize, Map<String, String> compactionProperties)
public static String updateCompactionHistory(TimeUUID taskId, String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize, Map<String, String> compactionProperties, String compactionType)
{
StringBuilder mergeSummary = new StringBuilder(mergedRowCounts.length * 10);
Map<Integer, Long> mergedRows = new HashMap<>();
Expand All @@ -367,7 +374,7 @@ public static String updateCompactionHistory(TimeUUID taskId, String keyspaceNam
mergeSummary.append(String.format("%d:%d, ", rows, count));
mergedRows.put(rows, count);
}
SystemKeyspace.updateCompactionHistory(taskId, keyspaceName, columnFamilyName, currentTimeMillis(), startSize, endSize, mergedRows, compactionProperties);
SystemKeyspace.updateCompactionHistory(taskId, keyspaceName, columnFamilyName, currentTimeMillis(), startSize, endSize, mergedRows, compactionProperties, compactionType);
return mergeSummary.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ public class CompactionHistory extends AbstractCommand
description = "Output format (json, yaml)")
private String outputFormat = "";

@Option(paramLabel = "human_readable",
names = { "-H", "--human-readable" },
description = "Display bytes in human readable form, i.e. KiB, MiB, GiB, TiB")
private boolean humanReadable = false;
@Option(paramLabel = "no_human_readable",
names = { "-n", "--no-human-readable" },
description = "Display raw bytes (disable default human readable format)")
private boolean noHumanReadable = false;

@Override
public void execute(NodeProbe probe)
Expand All @@ -45,7 +45,7 @@ public void execute(NodeProbe probe)
{
throw new IllegalArgumentException("arguments for -F are json,yaml only.");
}
StatsHolder data = new CompactionHistoryHolder(probe, humanReadable);
StatsHolder data = new CompactionHistoryHolder(probe, !noHumanReadable);
StatsPrinter printer = CompactionHistoryPrinter.from(outputFormat);
printer.print(data, probe.output().out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ private static class CompactionHistoryRow implements Comparable<CompactionHistor
private final long bytesOut;
private final String rowMerged;
private final String compactionProperties;
private final String compactionType;

CompactionHistoryRow(String id, String ksName, String cfName, long compactedAt, long bytesIn, long bytesOut, String rowMerged, String compactionProperties)
CompactionHistoryRow(String id, String ksName, String cfName, long compactedAt, long bytesIn, long bytesOut, String rowMerged, String compactionProperties, String compactionType)
{
this.id = id;
this.ksName = ksName;
Expand All @@ -69,6 +70,7 @@ private static class CompactionHistoryRow implements Comparable<CompactionHistor
this.bytesOut = bytesOut;
this.rowMerged = rowMerged;
this.compactionProperties = compactionProperties;
this.compactionType = compactionType;
}

public int compareTo(CompactionHistoryHolder.CompactionHistoryRow chr)
Expand All @@ -89,6 +91,7 @@ private HashMap<String, Object> getAllAsMap(boolean humanReadable)
compaction.put("bytes_out", FileUtils.stringifyFileSize(this.bytesOut, humanReadable));
compaction.put("rows_merged", this.rowMerged);
compaction.put("compaction_properties", this.compactionProperties);
compaction.put("compaction_type", this.compactionType);
return compaction;
}
}
Expand Down Expand Up @@ -117,7 +120,8 @@ public Map<String, Object> convert2Map()
(Long)value.get(4),
(Long)value.get(5),
(String)value.get(6),
(String)value.get(7)
(String)value.get(7),
value.size() > 8 ? (String)value.get(8) : "UNKNOWN"
);
chrList.add(chr);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void print(CompactionHistoryHolder data, PrintStream out)
for (Object chr : compactionHistories)
{
Map value = chr instanceof Map<?, ?> ? (Map)chr : Collections.emptyMap();
String[] obj = new String[8];
String[] obj = new String[9];
obj[0] = (String)value.get("id");
obj[1] = (String)value.get("keyspace_name");
obj[2] = (String)value.get("columnfamily_name");
Expand All @@ -74,6 +74,7 @@ public void print(CompactionHistoryHolder data, PrintStream out)
obj[5] = value.get("bytes_out").toString();
obj[6] = (String)value.get("rows_merged");
obj[7] = (String)value.get("compaction_properties");
obj[8] = (String)value.get("compaction_type");
table.add(obj);
}
table.printTo(out);
Expand Down
28 changes: 28 additions & 0 deletions test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,34 @@ public void testPersistLocalMetadata()
assertEquals(DatabaseDescriptor.getStoragePort(), row.getInt("listen_port"));
}

@Test
public void testCompactionHistory()
{
String ks = "test_ks";
String cf = "test_cf";
long now = System.currentTimeMillis();
Map<Integer, Long> rowsMerged = Collections.singletonMap(1, 100L);
Map<String, String> props = Collections.singletonMap("strategy", "STCS");
String compactionType = "TestMajor";

SystemKeyspace.updateCompactionHistory(
org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID(),
ks,
cf,
now,
1000,
500,
rowsMerged,
props,
compactionType
);

UntypedResultSet result = executeInternal("SELECT compaction_type FROM system.compaction_history WHERE keyspace_name=? AND columnfamily_name=? ALLOW FILTERING", ks, cf);

assertNotNull(result);
assertEquals(compactionType, result.one().getString("compaction_type"));
}

private String getOlderVersionString()
{
String version = FBUtilities.getReleaseVersionString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void setUp() throws Exception
cfs.truncateBlocking();
}

@Test
@Test
public void testTaskIdIsPersistedInCompactionHistory()
{
QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, v) VALUES (1, 1);");
Expand All @@ -92,7 +92,7 @@ public void testTaskIdIsPersistedInCompactionHistory()
task.execute(CompactionManager.instance.active);
}

UntypedResultSet rows = QueryProcessor.executeInternal(format("SELECT id FROM system.%s where id = %s",
UntypedResultSet rows = QueryProcessor.executeInternal(format("SELECT id, compaction_type, compaction_properties FROM system.%s where id = %s",
SystemKeyspace.COMPACTION_HISTORY,
id.toString()));

Expand All @@ -103,6 +103,12 @@ public void testTaskIdIsPersistedInCompactionHistory()
TimeUUID persistedId = one.getTimeUUID("id");

Assert.assertEquals(id, persistedId);

String type = one.getString("compaction_type");
Assert.assertEquals("Compaction", type);

java.util.Map<String, String> properties = one.getMap("compaction_properties", org.apache.cassandra.db.marshal.UTF8Type.instance, org.apache.cassandra.db.marshal.UTF8Type.instance);
Assert.assertTrue("Strategy missing in properties", properties.containsKey("strategy"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,35 +101,40 @@ public void testCompactionProperties() throws Throwable

ImmutableList.Builder<String> builder = ImmutableList.builder();
List<String> cmds = builder.addAll(cmd).add(keyspace()).add(currentTable()).build();
compactionHistoryResultVerify(keyspace(), currentTable(), ImmutableMap.of(COMPACTION_TYPE_PROPERTY, compactionType), cmds);

String cql = "select keyspace_name,columnfamily_name,compaction_properties from system." + SystemKeyspace.COMPACTION_HISTORY +
Map<String, String> expectedProperties = ImmutableMap.of("strategy", "SizeTieredCompactionStrategy");

compactionHistoryResultVerify(keyspace(), currentTable(), expectedProperties, compactionType, cmds);

String cql = "select keyspace_name,columnfamily_name,compaction_properties,compaction_type from system." + SystemKeyspace.COMPACTION_HISTORY +
" where keyspace_name = '" + keyspace() + "' AND columnfamily_name = '" + currentTable() + "' ALLOW FILTERING";

Object[][] objects = new Object[systemTableRecord][];
for (int i = 0; i != systemTableRecord; ++i)
{
objects[i] = row(keyspace(), currentTable(), ImmutableMap.of(COMPACTION_TYPE_PROPERTY, compactionType));
objects[i] = row(keyspace(), currentTable(), expectedProperties, compactionType);
}
assertRows(execute(cql), objects);
}

private void compactionHistoryResultVerify(String keyspace, String table, Map<String, String> properties, List<String> cmds)
private void compactionHistoryResultVerify(String keyspace, String table, Map<String, String> properties, String compType, List<String> cmds)
{
ToolResult toolCompact = invokeNodetool(cmds);
toolCompact.assertOnCleanExit();

ToolResult toolHistory = invokeNodetool("compactionhistory");
toolHistory.assertOnCleanExit();
assertCompactionHistoryOutPut(toolHistory, keyspace, table, properties);
assertCompactionHistoryOutPut(toolHistory, keyspace, table, properties, compType);
}

public static void assertCompactionHistoryOutPut(ToolResult toolHistory, String keyspace, String table, Map<String, String> properties)
public static void assertCompactionHistoryOutPut(ToolResult toolHistory, String keyspace, String table, Map<String, String> properties, String compType)
{
String stdout = toolHistory.getStdout();
String[] resultArray = stdout.split(System.lineSeparator());
assertTrue(Arrays.stream(resultArray)
.anyMatch(result -> result.contains('{' + FBUtilities.toString(properties) + '}')
&& result.contains(keyspace)
&& result.contains(table)));
&& result.contains(table)
&& result.contains(compType)));
}
}