Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import java.util.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
Expand Down Expand Up @@ -73,6 +75,7 @@ public class CassandraRing implements Serializable
private String keyspace;
private ReplicationFactor replicationFactor;
private List<CassandraInstance> instances;
private Map<Range<BigInteger>, List<String>> rangeToReplicas;

private transient RangeMap<BigInteger, List<CassandraInstance>> replicas;
private transient Multimap<CassandraInstance, Range<BigInteger>> tokenRangeMap;
Expand Down Expand Up @@ -106,6 +109,49 @@ private static void addReplica(CassandraInstance replica,
replicaMap.putAll(mappingsToAdd);
}

/**
* Find the CassandraInstance which owns a given token from a list of
* __sorted__ CassandraInstances. If the token is greater than or equal to
* the partitioner max token then the CassandraInstance with the smallest
* token value is returned, assuming token ring wraparound.
*/
private static int getTokenOwnerIdx(List<CassandraInstance> instances, Partitioner partitioner, BigInteger token)
{
OptionalInt maybeTokenOwnerIdx = IntStream.range(0, instances.size())
.filter(idx -> new BigInteger(instances.get(idx).token()).equals(token))
.findFirst();

if (maybeTokenOwnerIdx.isPresent())
{
return maybeTokenOwnerIdx.getAsInt();
}
else if (token.compareTo(partitioner.maxToken()) >= 0)
{
return 0;
}
else
{
throw new RuntimeException(String.format("Could not find instance for token: %s", token));
}
}

public CassandraRing(Partitioner partitioner,
String keyspace,
ReplicationFactor replicationFactor,
Collection<CassandraInstance> instances,
Map<Range<BigInteger>, List<String>> rangeToReplicas)
{
this.partitioner = partitioner;
this.keyspace = keyspace;
this.replicationFactor = replicationFactor;
this.instances = instances.stream()
.sorted(Comparator.comparing(instance -> new BigInteger(instance.token())))
.collect(Collectors.toCollection(ArrayList::new));
this.rangeToReplicas = rangeToReplicas;

this.initWithRangeToReplicas();
}

public CassandraRing(Partitioner partitioner,
String keyspace,
ReplicationFactor replicationFactor,
Expand All @@ -120,6 +166,32 @@ public CassandraRing(Partitioner partitioner,
this.init();
}

private void initWithRangeToReplicas()
{
// replicas is a mapping of token ranges to CassandraInstances
// which are replicas for that range
replicas = TreeRangeMap.create();

// tokenRangeMap is mapping of CassandraInstances to token ranges
// that they own or are a replica for
tokenRangeMap = ArrayListMultimap.create();

rangeToReplicas.forEach((range, rangeReplicas) -> {
int tokenOwnerIdx = getTokenOwnerIdx(this.instances, partitioner, range.upperEndpoint());

// Find following closest CassandraInstance for each replica and update tokenRangeMap
rangeReplicas.forEach(replica ->
IntStream.range(0, this.instances.size())
.map(idx -> (idx + tokenOwnerIdx) % this.instances.size())
.filter(idx -> this.instances.get(idx).nodeName().equals(replica))
.findFirst()
.ifPresent(idx -> tokenRangeMap.get(this.instances.get(idx)).add(range)));
});

replicas.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), Collections.emptyList());
tokenRangeMap.asMap().forEach((instance, ranges) -> ranges.forEach(range -> addReplica(instance, range, replicas)));
}

private void init()
{
// Setup token range map
Expand Down Expand Up @@ -278,7 +350,25 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
{
this.instances.add(new CassandraInstance(in.readUTF(), in.readUTF(), in.readUTF()));
}
this.init();

this.rangeToReplicas = new HashMap<>();
int numKeys = in.readShort();
for (int key = 0; key < numKeys; key++)
{
BigInteger rangeStart = new BigInteger(in.readUTF());
BigInteger rangeEnd = new BigInteger(in.readUTF());
List<String> replicas = new ArrayList<>();

int numReplicas = in.readShort();
for (int replica = 0; replica < numReplicas; replica++)
{
replicas.add(in.readUTF());
}

this.rangeToReplicas.put(Range.openClosed(rangeStart, rangeEnd), replicas);
}

this.initWithRangeToReplicas();
}

private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException
Expand All @@ -303,6 +393,19 @@ private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFou
out.writeUTF(instance.nodeName());
out.writeUTF(instance.dataCenter());
}

out.writeShort(this.rangeToReplicas.size());
for (Map.Entry<Range<BigInteger>, List<String>> entry : rangeToReplicas.entrySet())
{
out.writeUTF(entry.getKey().lowerEndpoint().toString());
out.writeUTF(entry.getKey().upperEndpoint().toString());

out.writeShort(entry.getValue().size());
for (String replica : entry.getValue())
{
out.writeUTF(replica);
}
}
}

public static class Serializer extends com.esotericsoftware.kryo.Serializer<CassandraRing>
Expand All @@ -314,6 +417,7 @@ public void write(Kryo kryo, Output out, CassandraRing ring)
out.writeString(ring.keyspace);
kryo.writeObject(out, ring.replicationFactor);
kryo.writeObject(out, ring.instances);
kryo.writeObject(out, ring.rangeToReplicas);
}

@Override
Expand All @@ -324,7 +428,8 @@ public CassandraRing read(Kryo kryo, Input in, Class<CassandraRing> type)
: Partitioner.Murmur3Partitioner,
in.readString(),
kryo.readObject(in, ReplicationFactor.class),
kryo.readObject(in, ArrayList.class));
kryo.readObject(in, ArrayList.class),
kryo.readObject(in, Map.class));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
Expand Down Expand Up @@ -67,6 +68,12 @@ private void validateRanges(Collection<Range<BigInteger>> ranges,
excludedTokens.forEach(token -> assertThat(rangeSet.contains(token)).isFalse());
}

private Map.Entry<Range<BigInteger>, List<String>> replicaEntry(String startToken, String endToken, List<String> replicas)
{
Range<BigInteger> range = Range.openClosed(new BigInteger(startToken), new BigInteger(endToken));
return Map.entry(range, replicas);
}

@Test
public void testSimpleStrategyRF3()
{
Expand Down Expand Up @@ -447,4 +454,164 @@ public void testNetworkStrategyRF22()
Partitioner.Murmur3Partitioner.minToken(),
Partitioner.Murmur3Partitioner.maxToken()));
}

@Test
public void testNetworkStrategyRF3Tokens1WithRangeToReplicas()
{
List<CassandraInstance> instances = Arrays.asList(
new CassandraInstance("0", "local0-i1", "DC1"),
new CassandraInstance("100", "local0-i2", "DC1"),
new CassandraInstance(Partitioner.Murmur3Partitioner.maxToken().toString(), "local0-i3", "DC1"));

List<String> replicas = List.of("local0-i1", "local0-i2", "local0-i3");
Map<Range<BigInteger>, List<String>> rangeToReplicas = Map.ofEntries(
replicaEntry(Partitioner.Murmur3Partitioner.minToken().toString(), "0", replicas),
replicaEntry("0", "100", replicas),
replicaEntry("100", Partitioner.Murmur3Partitioner.maxToken().toString(), replicas)
);

CassandraRing ring = new CassandraRing(
Partitioner.Murmur3Partitioner,
"test",
new ReplicationFactor(
ImmutableMap.of("class", "org.apache.cassandra.locator.NetworkTopologyStrategy", "DC1", "3")),
instances,
rangeToReplicas);

assertThat(ring.tokens().toArray()).isEqualTo(Arrays.asList(
BigInteger.valueOf(0L),
BigInteger.valueOf(100L),
Partitioner.Murmur3Partitioner.maxToken()).toArray());

Multimap<CassandraInstance, Range<BigInteger>> tokenRanges = ring.tokenRanges();
for (CassandraInstance instance : instances)
{
assertThat(mergeRanges(tokenRanges.get(instance)))
.isEqualTo(Range.openClosed(Partitioner.Murmur3Partitioner.minToken(),
Partitioner.Murmur3Partitioner.maxToken()));
}
}

@Test
public void testNetworkStrategyRF3Tokens4WithRangeToReplicas()
{
List<CassandraInstance> instances = Arrays.asList(
new CassandraInstance("-8000", "local0-i1", "DC1"),
new CassandraInstance("-2000", "local0-i1", "DC1"),
new CassandraInstance("2000", "local0-i1", "DC1"),
new CassandraInstance("8000", "local0-i1", "DC1"),
new CassandraInstance("-6000", "local0-i2", "DC1"),
new CassandraInstance("-1000", "local0-i2", "DC1"),
new CassandraInstance("4000", "local0-i2", "DC1"),
new CassandraInstance("9000", "local0-i2", "DC1"),
new CassandraInstance("-4000", "local0-i3", "DC1"),
new CassandraInstance("-5", "local0-i3", "DC1"),
new CassandraInstance("3050", "local0-i3", "DC1"),
new CassandraInstance("10000", "local0-i3", "DC1"));

List<String> replicas = List.of("local0-i1", "local0-i2", "local0-i3");
Map<Range<BigInteger>, List<String>> rangeToReplicas = Map.ofEntries(
replicaEntry(Partitioner.Murmur3Partitioner.minToken().toString(), "-8000", replicas),
replicaEntry("-8000", "-6000", replicas),
replicaEntry("-6000", "-4000", replicas),
replicaEntry("-4000", "-2000", replicas),
replicaEntry("-2000", "-1000", replicas),
replicaEntry("-1000", "-5", replicas),
replicaEntry("-5", "2000", replicas),
replicaEntry("2000", "3050", replicas),
replicaEntry("3050", "4000", replicas),
replicaEntry("4000", "8000", replicas),
replicaEntry("8000", "9000", replicas),
replicaEntry("9000", "10000", replicas),
replicaEntry("10000", Partitioner.Murmur3Partitioner.maxToken().toString(), replicas)
);

CassandraRing ring = new CassandraRing(
Partitioner.Murmur3Partitioner,
"test",
new ReplicationFactor(
ImmutableMap.of("class", "org.apache.cassandra.locator.NetworkTopologyStrategy", "DC1", "3")),
instances,
rangeToReplicas);

assertThat(ring.tokens().toArray()).isEqualTo(Arrays.asList(
BigInteger.valueOf(-8000L), BigInteger.valueOf(-6000L), BigInteger.valueOf(-4000L),
BigInteger.valueOf(-2000L), BigInteger.valueOf(-1000L), BigInteger.valueOf(-5L),
BigInteger.valueOf(2000L), BigInteger.valueOf(3050L), BigInteger.valueOf(4000L),
BigInteger.valueOf(8000L), BigInteger.valueOf(9000L), BigInteger.valueOf(10000L)).toArray());

Multimap<CassandraInstance, Range<BigInteger>> tokenRanges = ring.tokenRanges();

// token(0) (-8000) => (MIN -> -8000], (8000 -> 9000], (9000 -> 10000], (10000 -> MAX]
// => (MIN -> -8000], (8000 -> MAX]
validateRanges(tokenRanges.get(instances.get(0)),
Arrays.asList(BigInteger.valueOf(-8000), Partitioner.Murmur3Partitioner.maxToken()),
Arrays.asList(Partitioner.Murmur3Partitioner.minToken(), BigInteger.valueOf(8000)));

// token(2) (-2000) => (-8000 -> -6000], (-6000 -> -4000], (-4000 -> -2000]
// => (-8000 -> -2000]
validateRanges(tokenRanges.get(instances.get(1)),
Arrays.asList(BigInteger.valueOf(-2000)),
Arrays.asList(BigInteger.valueOf(-8000)));

// token(3) (2000) => (-2000 -> -1000], (-1000 -> -5], (-5 -> 2000]
// => (-2000 -> 2000]
validateRanges(tokenRanges.get(instances.get(2)),
Arrays.asList(BigInteger.valueOf(2000)),
Arrays.asList(BigInteger.valueOf(-2000)));

// token(4) (8000) => (2000 -> 3050], (3050 -> 4000], (4000 -> 8000]
// => (2000 -> 8000]
validateRanges(tokenRanges.get(instances.get(3)),
Arrays.asList(BigInteger.valueOf(8000)),
Arrays.asList(BigInteger.valueOf(2000)));

// token(5) (-6000) => (MIN -> -8000], (-8000 -> -6000], (9000 -> 10000], (10000 -> MAX]
// => (MIN -> -6000], (9000 -> MAX)
validateRanges(tokenRanges.get(instances.get(4)),
Arrays.asList(BigInteger.valueOf(-6000), Partitioner.Murmur3Partitioner.maxToken()),
Arrays.asList(Partitioner.Murmur3Partitioner.minToken(), BigInteger.valueOf(9000)));

// token(6) (-1000) => (-6000 -> -4000], (-4000 -> -2000], (-2000 -> -1000]
// => (-6000 -> -1000]
validateRanges(tokenRanges.get(instances.get(5)),
Arrays.asList(BigInteger.valueOf(-1000)),
Arrays.asList(BigInteger.valueOf(-6000)));

// token(7) (4000) => (-1000 -> -5], (-5 -> 2000], (2000 -> 3050], (3050 -> 4000]
// => (-1000 -> 4000]
validateRanges(tokenRanges.get(instances.get(6)),
Arrays.asList(BigInteger.valueOf(4000)),
Arrays.asList(BigInteger.valueOf(-1000)));

// token(8) (9000) => (4000 -> 8000], (8000 -> 9000]
// => (4000 -> 9000]
validateRanges(tokenRanges.get(instances.get(7)),
Arrays.asList(BigInteger.valueOf(9000)),
Arrays.asList(BigInteger.valueOf(4000)));

// token(9) (-4000) => (MIN -> -8000], (-8000 -> -6000], (-6000 -> -4000], (10000 -> MAX]
// => (MIN -> -4000], (10000 -> -MAX]
validateRanges(tokenRanges.get(instances.get(8)),
Arrays.asList(BigInteger.valueOf(-4000), Partitioner.Murmur3Partitioner.maxToken()),
Arrays.asList(Partitioner.Murmur3Partitioner.minToken(), BigInteger.valueOf(10000)));

// token(10) (-5) => (-4000 -> -2000], (-2000 -> -1000], (-1000 -> -5]
// => (-4000 -> -5]
validateRanges(tokenRanges.get(instances.get(9)),
Arrays.asList(BigInteger.valueOf(-5)),
Arrays.asList(BigInteger.valueOf(-4000)));

// token(11) (3050) => (-5 -> 2000], (2000 -> 3050]
// => (-5 -> 3050]
validateRanges(tokenRanges.get(instances.get(10)),
Arrays.asList(BigInteger.valueOf(3050)),
Arrays.asList(BigInteger.valueOf(-5)));

// token(12) (10000) => (3050 -> 4000], (4000 -> 8000], (8000 -> 9000], (9000 -> 10000]
// => (3050 -> 10000]
validateRanges(tokenRanges.get(instances.get(11)),
Arrays.asList(BigInteger.valueOf(10000)),
Arrays.asList(BigInteger.valueOf(3050)));
}
}
Loading