From a64d0aa22dcac9904d3f09414385f29aa440b5fc Mon Sep 17 00:00:00 2001 From: anderoo Date: Fri, 26 Sep 2025 14:10:03 +0200 Subject: [PATCH 01/11] CASSANALYTICS-79: Use tokenRangeReplicas endpoint for determining read replicas Initial implementation for using tokenRangeReplicas endpoint rather than calculating token ranges and replicas within the library. There's a fair amount of code which can be removed afterwards, but wanted to keep the PR small. --- .../spark/data/partitioner/CassandraRing.java | 15 ++++ .../spark/data/CassandraDataLayer.java | 77 ++++++++++++++++++- 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java index 3d142f8df..2b504a0e4 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java @@ -106,6 +106,21 @@ private static void addReplica(CassandraInstance replica, replicaMap.putAll(mappingsToAdd); } + public CassandraRing(Partitioner partitioner, + String keyspace, + ReplicationFactor replicationFactor, + List instances, + RangeMap> replicas, + Multimap> tokenRangeMap) + { + this.partitioner = partitioner; + this.keyspace = keyspace; + this.replicationFactor = replicationFactor; + this.instances = instances; + this.replicas = replicas; + this.tokenRangeMap = tokenRangeMap; + } + public CassandraRing(Partitioner partitioner, String keyspace, ReplicationFactor replicationFactor, diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 557aaf2cb..f689af1e9 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -50,7 +51,11 @@ import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +67,7 @@ import o.a.c.sidecar.client.shaded.common.response.NodeSettings; import o.a.c.sidecar.client.shaded.common.response.RingResponse; import o.a.c.sidecar.client.shaded.common.response.SchemaResponse; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; import org.apache.cassandra.analytics.stats.Stats; import org.apache.cassandra.bridge.BigNumberConfig; import org.apache.cassandra.bridge.BigNumberConfigImpl; @@ -300,7 +306,10 @@ private int initBulkReader(@NotNull ClientConfig options) throws ExecutionExcept udts.forEach(udt -> LOGGER.info("Adding schema UDT: '{}'", udt)); cqlTable = bridge().buildSchema(createStmt, keyspace, replicationFactor, partitioner, udts, null, indexCount, false); - CassandraRing ring = createCassandraRingFromRing(partitioner, replicationFactor, ringFuture.get()); + + CompletableFuture tokenRangeReplicasFuture = sidecar.tokenRangeReplicas( + new ArrayList<>(clusterConfig), maybeQuotedKeyspace); + CassandraRing ring = createCassandraRingFromTokenRangeReplicas(partitioner, replicationFactor, ringFuture.get(), tokenRangeReplicasFuture.get()); int effectiveNumberOfCores = sizingFuture.get(); tokenPartitioner = new TokenPartitioner(ring, options.defaultParallelism(), effectiveNumberOfCores); @@ -654,6 +663,7 @@ public BigNumberConfig bigNumberConfig(CqlField field) /* Internal Cassandra SSTable */ + // TODO(andrew.johnson): Remove @VisibleForTesting public CassandraRing createCassandraRingFromRing(Partitioner partitioner, ReplicationFactor replicationFactor, @@ -664,9 +674,74 @@ public CassandraRing createCassandraRingFromRing(Partitioner partitioner, .filter(status -> datacenter == null || datacenter.equalsIgnoreCase(status.datacenter())) .map(status -> new CassandraInstance(status.token(), status.fqdn(), status.datacenter())) .collect(Collectors.toList()); + return new CassandraRing(partitioner, keyspace, replicationFactor, instances); } + @VisibleForTesting + public CassandraRing createCassandraRingFromTokenRangeReplicas(Partitioner partitioner, + ReplicationFactor replicationFactor, + RingResponse ring, + TokenRangeReplicasResponse tokenRangeReplicas) + { + List instances = new ArrayList<>(); + Map addressAndPortToInstance = new HashMap<>(); + + // Map of Token Ranges to Read Replicas + RangeMap> replicas = TreeRangeMap.create(); + + // Map of Instances -> Owned Token Ranges + Multimap> tokenRangeMap = ArrayListMultimap.create(); + + ring.stream() + .filter(status -> datacenter == null || datacenter.equalsIgnoreCase(status.datacenter())) + .sorted(Comparator.comparing(instance -> new BigInteger(instance.token()))) + .forEach(ringEntry -> + { + CassandraInstance instance = new CassandraInstance(ringEntry.token(), ringEntry.fqdn(), ringEntry.datacenter()); + instances.add(instance); + addressAndPortToInstance.put(String.format("%s:%d", ringEntry.address(), ringEntry.port()), instance); + }); + + tokenRangeReplicas.readReplicas() + .forEach(replicaInfo -> + { + BigInteger rangeStart = new BigInteger(replicaInfo.start()); + BigInteger rangeEnd = new BigInteger(replicaInfo.end()); + Range range = Range.openClosed(rangeStart, rangeEnd); + + List instancesForRange = replicaInfo.replicasByDatacenter() + .get(datacenter) + .stream() + .map(addressAndPortToInstance::get) + .collect(Collectors.toList()); + + replicas.put(range, instancesForRange); + + // If the range is up-to MAX_TOKEN and there's no instance which owns MAX_TOKEN + // in the ring, then assign the owner of this range to the instance with the smallest + // MAX_TOKEN to account for wraparound. + CassandraInstance tokenOwner; + if (rangeEnd.equals(partitioner.maxToken()) && instances.stream().noneMatch(x -> new BigInteger(x.token()).equals(partitioner.maxToken()))) + { + tokenOwner = instances.stream() + .min(Comparator.comparing(instance -> new BigInteger(instance.token()))) + .get(); + } + else + { + tokenOwner = instancesForRange.stream() + .filter(instance -> new BigInteger(instance.token()).equals(rangeEnd)) + .findFirst() + .get(); + } + + tokenRangeMap.put(tokenOwner, range); + }); + + return new CassandraRing(partitioner, keyspace, replicationFactor, instances, replicas, tokenRangeMap); + } + // Startup Validation @Override From 504325bae6545e509b9d0ad44385c2e38f081468 Mon Sep 17 00:00:00 2001 From: anderoo Date: Fri, 26 Sep 2025 14:27:44 +0200 Subject: [PATCH 02/11] Fix checkstyle --- .../org/apache/cassandra/spark/data/CassandraDataLayer.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index f689af1e9..e0de84c39 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -696,16 +696,14 @@ public CassandraRing createCassandraRingFromTokenRangeReplicas(Partitioner parti ring.stream() .filter(status -> datacenter == null || datacenter.equalsIgnoreCase(status.datacenter())) .sorted(Comparator.comparing(instance -> new BigInteger(instance.token()))) - .forEach(ringEntry -> - { + .forEach(ringEntry -> { CassandraInstance instance = new CassandraInstance(ringEntry.token(), ringEntry.fqdn(), ringEntry.datacenter()); instances.add(instance); addressAndPortToInstance.put(String.format("%s:%d", ringEntry.address(), ringEntry.port()), instance); }); tokenRangeReplicas.readReplicas() - .forEach(replicaInfo -> - { + .forEach(replicaInfo -> { BigInteger rangeStart = new BigInteger(replicaInfo.start()); BigInteger rangeEnd = new BigInteger(replicaInfo.end()); Range range = Range.openClosed(rangeStart, rangeEnd); From 24fe08414aadde7b53031869aa7049ea9d37198e Mon Sep 17 00:00:00 2001 From: anderoo Date: Fri, 26 Sep 2025 14:50:45 +0200 Subject: [PATCH 03/11] Tidy up --- .../cassandra/spark/data/CassandraDataLayer.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index e0de84c39..b2942039e 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -687,10 +687,10 @@ public CassandraRing createCassandraRingFromTokenRangeReplicas(Partitioner parti List instances = new ArrayList<>(); Map addressAndPortToInstance = new HashMap<>(); - // Map of Token Ranges to Read Replicas + // Token Range -> Read replicas for range RangeMap> replicas = TreeRangeMap.create(); - // Map of Instances -> Owned Token Ranges + // Instance -> Ranges owned by instance Multimap> tokenRangeMap = ArrayListMultimap.create(); ring.stream() @@ -716,15 +716,13 @@ public CassandraRing createCassandraRingFromTokenRangeReplicas(Partitioner parti replicas.put(range, instancesForRange); - // If the range is up-to MAX_TOKEN and there's no instance which owns MAX_TOKEN - // in the ring, then assign the owner of this range to the instance with the smallest - // MAX_TOKEN to account for wraparound. + // If the range end is equal to MAX_TOKEN and the token of the last instance is + // not equal MAX_TOKEN, then the owner of this range should be the instance with + // the smallest token. Otherwise, find the owner based on token equality. CassandraInstance tokenOwner; - if (rangeEnd.equals(partitioner.maxToken()) && instances.stream().noneMatch(x -> new BigInteger(x.token()).equals(partitioner.maxToken()))) + if (rangeEnd.equals(partitioner.maxToken()) && !new BigInteger(instances.get(instances.size() - 1).token()).equals(partitioner.maxToken())) { - tokenOwner = instances.stream() - .min(Comparator.comparing(instance -> new BigInteger(instance.token()))) - .get(); + tokenOwner = instances.get(0); } else { From dfcbaffcffb749253a6d6021d97602486c91cb18 Mon Sep 17 00:00:00 2001 From: anderoo Date: Fri, 26 Sep 2025 16:53:37 +0200 Subject: [PATCH 04/11] Change replica logic --- .../spark/data/CassandraDataLayer.java | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index b2942039e..85b0ef81f 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -24,18 +24,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; @@ -685,7 +674,7 @@ public CassandraRing createCassandraRingFromTokenRangeReplicas(Partitioner parti TokenRangeReplicasResponse tokenRangeReplicas) { List instances = new ArrayList<>(); - Map addressAndPortToInstance = new HashMap<>(); + Map> addressAndPortToInstances = new HashMap<>(); // Token Range -> Read replicas for range RangeMap> replicas = TreeRangeMap.create(); @@ -698,8 +687,11 @@ public CassandraRing createCassandraRingFromTokenRangeReplicas(Partitioner parti .sorted(Comparator.comparing(instance -> new BigInteger(instance.token()))) .forEach(ringEntry -> { CassandraInstance instance = new CassandraInstance(ringEntry.token(), ringEntry.fqdn(), ringEntry.datacenter()); + String addressAndPort = String.format("%s:%d", ringEntry.address(), ringEntry.port()); + instances.add(instance); - addressAndPortToInstance.put(String.format("%s:%d", ringEntry.address(), ringEntry.port()), instance); + addressAndPortToInstances.putIfAbsent(addressAndPort, new ArrayList<>()); + addressAndPortToInstances.get(addressAndPort).add(instance); }); tokenRangeReplicas.readReplicas() @@ -707,14 +699,14 @@ public CassandraRing createCassandraRingFromTokenRangeReplicas(Partitioner parti BigInteger rangeStart = new BigInteger(replicaInfo.start()); BigInteger rangeEnd = new BigInteger(replicaInfo.end()); Range range = Range.openClosed(rangeStart, rangeEnd); + List replicasForRange = replicaInfo.replicasByDatacenter().get(datacenter); - List instancesForRange = replicaInfo.replicasByDatacenter() - .get(datacenter) - .stream() - .map(addressAndPortToInstance::get) - .collect(Collectors.toList()); - - replicas.put(range, instancesForRange); + // This isn't correct. Because we have multiple instances per Cassandra node when using + // num_tokens > 1, it means that replicas either maps to all instances or a single one. + // For now taking the first, but this needs to be changed. + replicas.put(range, replicasForRange.stream() + .map(replica -> addressAndPortToInstances.get(replica).get(0)) + .collect(Collectors.toList())); // If the range end is equal to MAX_TOKEN and the token of the last instance is // not equal MAX_TOKEN, then the owner of this range should be the instance with @@ -726,7 +718,9 @@ public CassandraRing createCassandraRingFromTokenRangeReplicas(Partitioner parti } else { - tokenOwner = instancesForRange.stream() + tokenOwner = replicasForRange.stream() + .map(addressAndPortToInstances::get) + .flatMap(Collection::stream) .filter(instance -> new BigInteger(instance.token()).equals(rangeEnd)) .findFirst() .get(); From ff81528dccd8d7e335c4d06971764c48be5f68e4 Mon Sep 17 00:00:00 2001 From: anderoo Date: Fri, 26 Sep 2025 17:17:45 +0200 Subject: [PATCH 05/11] Copy logic from CassandraRing --- .../spark/data/CassandraDataLayer.java | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 85b0ef81f..b609f9456 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -701,13 +701,6 @@ public CassandraRing createCassandraRingFromTokenRangeReplicas(Partitioner parti Range range = Range.openClosed(rangeStart, rangeEnd); List replicasForRange = replicaInfo.replicasByDatacenter().get(datacenter); - // This isn't correct. Because we have multiple instances per Cassandra node when using - // num_tokens > 1, it means that replicas either maps to all instances or a single one. - // For now taking the first, but this needs to be changed. - replicas.put(range, replicasForRange.stream() - .map(replica -> addressAndPortToInstances.get(replica).get(0)) - .collect(Collectors.toList())); - // If the range end is equal to MAX_TOKEN and the token of the last instance is // not equal MAX_TOKEN, then the owner of this range should be the instance with // the smallest token. Otherwise, find the owner based on token equality. @@ -729,6 +722,27 @@ public CassandraRing createCassandraRingFromTokenRangeReplicas(Partitioner parti tokenRangeMap.put(tokenOwner, range); }); + // This has been extracted from CassandraRing.addReplica, however I'm not + // sure if it's applicable or correct when using vnodes. Rather than mapping + // Ranges to CassandraInstance which includes token details, the mapping should + // be Ranges to CassandraNode with either all tokens or none. + replicas.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), Collections.emptyList()); + tokenRangeMap.asMap().forEach((instance, ranges) -> { + ranges.forEach(range -> { + // addReplica(instance, range, replicas) + RangeMap> replicaRanges = replicas.subRangeMap(range); + RangeMap> mappingsToAdd = TreeRangeMap.create(); + + replicaRanges.asMapOfRanges().forEach((key, value) -> { + List replicaInstances = new ArrayList<>(value); + replicaInstances.add(instance); + mappingsToAdd.put(key, replicaInstances); + }); + + replicas.putAll(mappingsToAdd); + }); + }); + return new CassandraRing(partitioner, keyspace, replicationFactor, instances, replicas, tokenRangeMap); } From c53e29492995a2aa3d4db691099f38a324cf9884 Mon Sep 17 00:00:00 2001 From: anderoo Date: Fri, 26 Sep 2025 18:04:40 +0200 Subject: [PATCH 06/11] Move into CassandraRing --- .../spark/data/partitioner/CassandraRing.java | 36 +++++-- .../spark/data/CassandraDataLayer.java | 95 +++++-------------- 2 files changed, 52 insertions(+), 79 deletions(-) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java index 2b504a0e4..2a0721143 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java @@ -109,16 +109,40 @@ private static void addReplica(CassandraInstance replica, public CassandraRing(Partitioner partitioner, String keyspace, ReplicationFactor replicationFactor, - List instances, - RangeMap> replicas, - Multimap> tokenRangeMap) + Collection instances, + Map, List> rangeToReplicas) { this.partitioner = partitioner; this.keyspace = keyspace; this.replicationFactor = replicationFactor; - this.instances = instances; - this.replicas = replicas; - this.tokenRangeMap = tokenRangeMap; + this.instances = instances.stream() + .sorted(Comparator.comparing(instance -> new BigInteger(instance.token()))) + .collect(Collectors.toCollection(ArrayList::new)); + + replicas = TreeRangeMap.create(); + tokenRangeMap = ArrayListMultimap.create(); + + rangeToReplicas.forEach((range, rangeReplicas) -> { + // Find the owner of this range + CassandraInstance tokenOwner; + + if (range.upperEndpoint().equals(partitioner.maxToken()) + && !new BigInteger(this.instances.get(this.instances.size() - 1).token()).equals(partitioner.maxToken())) + { + tokenOwner = this.instances.get(0); + } + else + { + tokenOwner = this.instances.stream() + .filter(instance -> new BigInteger(instance.token()).equals(range.upperEndpoint())) + .findFirst() + .get(); + } + tokenRangeMap.put(tokenOwner, range); + }); + + replicas.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), Collections.emptyList()); + tokenRangeMap.asMap().forEach((instance, ranges) -> ranges.forEach(range -> addReplica(instance, range, replicas))); } public CassandraRing(Partitioner partitioner, diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index b609f9456..eb8fcebe8 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -24,7 +24,17 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.math.BigInteger; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; @@ -40,11 +50,7 @@ import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; import com.google.common.collect.Range; -import com.google.common.collect.RangeMap; -import com.google.common.collect.TreeRangeMap; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -673,77 +679,20 @@ public CassandraRing createCassandraRingFromTokenRangeReplicas(Partitioner parti RingResponse ring, TokenRangeReplicasResponse tokenRangeReplicas) { - List instances = new ArrayList<>(); - Map> addressAndPortToInstances = new HashMap<>(); + Map, List> replicas = tokenRangeReplicas.readReplicas() + .stream() + .map(replicaInfo -> Map.entry( + Range.openClosed(new BigInteger(replicaInfo.start()), new BigInteger(replicaInfo.end())), + replicaInfo.replicasByDatacenter().get(datacenter))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - // Token Range -> Read replicas for range - RangeMap> replicas = TreeRangeMap.create(); - - // Instance -> Ranges owned by instance - Multimap> tokenRangeMap = ArrayListMultimap.create(); - - ring.stream() + Collection instances = ring + .stream() .filter(status -> datacenter == null || datacenter.equalsIgnoreCase(status.datacenter())) - .sorted(Comparator.comparing(instance -> new BigInteger(instance.token()))) - .forEach(ringEntry -> { - CassandraInstance instance = new CassandraInstance(ringEntry.token(), ringEntry.fqdn(), ringEntry.datacenter()); - String addressAndPort = String.format("%s:%d", ringEntry.address(), ringEntry.port()); - - instances.add(instance); - addressAndPortToInstances.putIfAbsent(addressAndPort, new ArrayList<>()); - addressAndPortToInstances.get(addressAndPort).add(instance); - }); - - tokenRangeReplicas.readReplicas() - .forEach(replicaInfo -> { - BigInteger rangeStart = new BigInteger(replicaInfo.start()); - BigInteger rangeEnd = new BigInteger(replicaInfo.end()); - Range range = Range.openClosed(rangeStart, rangeEnd); - List replicasForRange = replicaInfo.replicasByDatacenter().get(datacenter); - - // If the range end is equal to MAX_TOKEN and the token of the last instance is - // not equal MAX_TOKEN, then the owner of this range should be the instance with - // the smallest token. Otherwise, find the owner based on token equality. - CassandraInstance tokenOwner; - if (rangeEnd.equals(partitioner.maxToken()) && !new BigInteger(instances.get(instances.size() - 1).token()).equals(partitioner.maxToken())) - { - tokenOwner = instances.get(0); - } - else - { - tokenOwner = replicasForRange.stream() - .map(addressAndPortToInstances::get) - .flatMap(Collection::stream) - .filter(instance -> new BigInteger(instance.token()).equals(rangeEnd)) - .findFirst() - .get(); - } - - tokenRangeMap.put(tokenOwner, range); - }); - - // This has been extracted from CassandraRing.addReplica, however I'm not - // sure if it's applicable or correct when using vnodes. Rather than mapping - // Ranges to CassandraInstance which includes token details, the mapping should - // be Ranges to CassandraNode with either all tokens or none. - replicas.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), Collections.emptyList()); - tokenRangeMap.asMap().forEach((instance, ranges) -> { - ranges.forEach(range -> { - // addReplica(instance, range, replicas) - RangeMap> replicaRanges = replicas.subRangeMap(range); - RangeMap> mappingsToAdd = TreeRangeMap.create(); - - replicaRanges.asMapOfRanges().forEach((key, value) -> { - List replicaInstances = new ArrayList<>(value); - replicaInstances.add(instance); - mappingsToAdd.put(key, replicaInstances); - }); - - replicas.putAll(mappingsToAdd); - }); - }); + .map(status -> new CassandraInstance(status.token(), status.fqdn(), status.datacenter())) + .collect(Collectors.toList()); - return new CassandraRing(partitioner, keyspace, replicationFactor, instances, replicas, tokenRangeMap); + return new CassandraRing(partitioner, keyspace, replicationFactor, instances, replicas); } // Startup Validation From c9f68f686d9fdfd68d5ae7ec8e0f4c9209838b6d Mon Sep 17 00:00:00 2001 From: anderoo Date: Fri, 26 Sep 2025 19:18:29 +0200 Subject: [PATCH 07/11] Account for RandomPartitioner --- .../cassandra/spark/data/partitioner/CassandraRing.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java index 2a0721143..c7654200e 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java @@ -31,9 +31,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; -import java.util.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; @@ -126,7 +126,7 @@ public CassandraRing(Partitioner partitioner, // Find the owner of this range CassandraInstance tokenOwner; - if (range.upperEndpoint().equals(partitioner.maxToken()) + if ((range.upperEndpoint().compareTo(partitioner.maxToken()) > 0) && !new BigInteger(this.instances.get(this.instances.size() - 1).token()).equals(partitioner.maxToken())) { tokenOwner = this.instances.get(0); From 33642143c6c6b0365c0a86d09423f600964e9d8f Mon Sep 17 00:00:00 2001 From: anderoo Date: Fri, 26 Sep 2025 19:46:19 +0200 Subject: [PATCH 08/11] Fix comparison --- .../apache/cassandra/spark/data/partitioner/CassandraRing.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java index c7654200e..dee828894 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java @@ -126,7 +126,7 @@ public CassandraRing(Partitioner partitioner, // Find the owner of this range CassandraInstance tokenOwner; - if ((range.upperEndpoint().compareTo(partitioner.maxToken()) > 0) + if ((range.upperEndpoint().compareTo(partitioner.maxToken()) >= 0) && !new BigInteger(this.instances.get(this.instances.size() - 1).token()).equals(partitioner.maxToken())) { tokenOwner = this.instances.get(0); From 693ea49d047e3009d88e4e95364a89cf42998eaa Mon Sep 17 00:00:00 2001 From: anderoo Date: Mon, 29 Sep 2025 11:13:46 +0200 Subject: [PATCH 09/11] Rework + tests + style --- .../spark/data/partitioner/CassandraRing.java | 56 ++++-- .../data/partitioner/CassandraRingTests.java | 167 ++++++++++++++++++ .../spark/data/CassandraDataLayer.java | 35 +++- .../spark/data/CassandraDataLayerTests.java | 38 ++++ .../CassandraAnalyticsSimpleTest.java | 2 +- 5 files changed, 276 insertions(+), 22 deletions(-) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java index dee828894..63d53b715 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java @@ -32,7 +32,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.OptionalInt; import java.util.stream.Collectors; +import java.util.stream.IntStream; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -106,6 +108,32 @@ private static void addReplica(CassandraInstance replica, replicaMap.putAll(mappingsToAdd); } + /** + * Find the CassandraInstance which owns a given token from a list of + * __sorted__ CassandraInstances. If the token is greater than or equal to + * the partitioner max token then the CassandraInstance with the smallest + * token value is returned, assuming token ring wraparound. + */ + private static int getTokenOwnerIdx(List instances, Partitioner partitioner, BigInteger token) + { + OptionalInt maybeTokenOwnerIdx = IntStream.range(0, instances.size()) + .filter(idx -> new BigInteger(instances.get(idx).token()).equals(token)) + .findFirst(); + + if (maybeTokenOwnerIdx.isPresent()) + { + return maybeTokenOwnerIdx.getAsInt(); + } + else if (token.compareTo(partitioner.maxToken()) >= 0) + { + return 0; + } + else + { + throw new RuntimeException(String.format("Could not find instance for token: %s", token)); + } + } + public CassandraRing(Partitioner partitioner, String keyspace, ReplicationFactor replicationFactor, @@ -119,26 +147,24 @@ public CassandraRing(Partitioner partitioner, .sorted(Comparator.comparing(instance -> new BigInteger(instance.token()))) .collect(Collectors.toCollection(ArrayList::new)); + // replicas is a mapping of token ranges to CassandraInstances + // which are replicas for that range replicas = TreeRangeMap.create(); + + // tokenRangeMap is mapping of CassandraInstances to token ranges + // that they own or are a replica for tokenRangeMap = ArrayListMultimap.create(); rangeToReplicas.forEach((range, rangeReplicas) -> { - // Find the owner of this range - CassandraInstance tokenOwner; - - if ((range.upperEndpoint().compareTo(partitioner.maxToken()) >= 0) - && !new BigInteger(this.instances.get(this.instances.size() - 1).token()).equals(partitioner.maxToken())) - { - tokenOwner = this.instances.get(0); - } - else - { - tokenOwner = this.instances.stream() - .filter(instance -> new BigInteger(instance.token()).equals(range.upperEndpoint())) + int tokenOwnerIdx = getTokenOwnerIdx(this.instances, partitioner, range.upperEndpoint()); + + // Find following closest CassandraInstance for each replica and update tokenRangeMap + rangeReplicas.forEach(replica -> + IntStream.range(0, this.instances.size()) + .map(idx -> (idx + tokenOwnerIdx) % this.instances.size()) + .filter(idx -> this.instances.get(idx).nodeName().equals(replica)) .findFirst() - .get(); - } - tokenRangeMap.put(tokenOwner, range); + .ifPresent(idx -> tokenRangeMap.get(this.instances.get(idx)).add(range))); }); replicas.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), Collections.emptyList()); diff --git a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/CassandraRingTests.java b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/CassandraRingTests.java index c66290f0f..10da8a5d5 100644 --- a/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/CassandraRingTests.java +++ b/cassandra-analytics-common/src/test/java/org/apache/cassandra/spark/data/partitioner/CassandraRingTests.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; @@ -67,6 +68,12 @@ private void validateRanges(Collection> ranges, excludedTokens.forEach(token -> assertThat(rangeSet.contains(token)).isFalse()); } + private Map.Entry, List> replicaEntry(String startToken, String endToken, List replicas) + { + Range range = Range.openClosed(new BigInteger(startToken), new BigInteger(endToken)); + return Map.entry(range, replicas); + } + @Test public void testSimpleStrategyRF3() { @@ -447,4 +454,164 @@ public void testNetworkStrategyRF22() Partitioner.Murmur3Partitioner.minToken(), Partitioner.Murmur3Partitioner.maxToken())); } + + @Test + public void testNetworkStrategyRF3Tokens1WithRangeToReplicas() + { + List instances = Arrays.asList( + new CassandraInstance("0", "local0-i1", "DC1"), + new CassandraInstance("100", "local0-i2", "DC1"), + new CassandraInstance(Partitioner.Murmur3Partitioner.maxToken().toString(), "local0-i3", "DC1")); + + List replicas = List.of("local0-i1", "local0-i2", "local0-i3"); + Map, List> rangeToReplicas = Map.ofEntries( + replicaEntry(Partitioner.Murmur3Partitioner.minToken().toString(), "0", replicas), + replicaEntry("0", "100", replicas), + replicaEntry("100", Partitioner.Murmur3Partitioner.maxToken().toString(), replicas) + ); + + CassandraRing ring = new CassandraRing( + Partitioner.Murmur3Partitioner, + "test", + new ReplicationFactor( + ImmutableMap.of("class", "org.apache.cassandra.locator.NetworkTopologyStrategy", "DC1", "3")), + instances, + rangeToReplicas); + + assertThat(ring.tokens().toArray()).isEqualTo(Arrays.asList( + BigInteger.valueOf(0L), + BigInteger.valueOf(100L), + Partitioner.Murmur3Partitioner.maxToken()).toArray()); + + Multimap> tokenRanges = ring.tokenRanges(); + for (CassandraInstance instance : instances) + { + assertThat(mergeRanges(tokenRanges.get(instance))) + .isEqualTo(Range.openClosed(Partitioner.Murmur3Partitioner.minToken(), + Partitioner.Murmur3Partitioner.maxToken())); + } + } + + @Test + public void testNetworkStrategyRF3Tokens4WithRangeToReplicas() + { + List instances = Arrays.asList( + new CassandraInstance("-8000", "local0-i1", "DC1"), + new CassandraInstance("-2000", "local0-i1", "DC1"), + new CassandraInstance("2000", "local0-i1", "DC1"), + new CassandraInstance("8000", "local0-i1", "DC1"), + new CassandraInstance("-6000", "local0-i2", "DC1"), + new CassandraInstance("-1000", "local0-i2", "DC1"), + new CassandraInstance("4000", "local0-i2", "DC1"), + new CassandraInstance("9000", "local0-i2", "DC1"), + new CassandraInstance("-4000", "local0-i3", "DC1"), + new CassandraInstance("-5", "local0-i3", "DC1"), + new CassandraInstance("3050", "local0-i3", "DC1"), + new CassandraInstance("10000", "local0-i3", "DC1")); + + List replicas = List.of("local0-i1", "local0-i2", "local0-i3"); + Map, List> rangeToReplicas = Map.ofEntries( + replicaEntry(Partitioner.Murmur3Partitioner.minToken().toString(), "-8000", replicas), + replicaEntry("-8000", "-6000", replicas), + replicaEntry("-6000", "-4000", replicas), + replicaEntry("-4000", "-2000", replicas), + replicaEntry("-2000", "-1000", replicas), + replicaEntry("-1000", "-5", replicas), + replicaEntry("-5", "2000", replicas), + replicaEntry("2000", "3050", replicas), + replicaEntry("3050", "4000", replicas), + replicaEntry("4000", "8000", replicas), + replicaEntry("8000", "9000", replicas), + replicaEntry("9000", "10000", replicas), + replicaEntry("10000", Partitioner.Murmur3Partitioner.maxToken().toString(), replicas) + ); + + CassandraRing ring = new CassandraRing( + Partitioner.Murmur3Partitioner, + "test", + new ReplicationFactor( + ImmutableMap.of("class", "org.apache.cassandra.locator.NetworkTopologyStrategy", "DC1", "3")), + instances, + rangeToReplicas); + + assertThat(ring.tokens().toArray()).isEqualTo(Arrays.asList( + BigInteger.valueOf(-8000L), BigInteger.valueOf(-6000L), BigInteger.valueOf(-4000L), + BigInteger.valueOf(-2000L), BigInteger.valueOf(-1000L), BigInteger.valueOf(-5L), + BigInteger.valueOf(2000L), BigInteger.valueOf(3050L), BigInteger.valueOf(4000L), + BigInteger.valueOf(8000L), BigInteger.valueOf(9000L), BigInteger.valueOf(10000L)).toArray()); + + Multimap> tokenRanges = ring.tokenRanges(); + + // token(0) (-8000) => (MIN -> -8000], (8000 -> 9000], (9000 -> 10000], (10000 -> MAX] + // => (MIN -> -8000], (8000 -> MAX] + validateRanges(tokenRanges.get(instances.get(0)), + Arrays.asList(BigInteger.valueOf(-8000), Partitioner.Murmur3Partitioner.maxToken()), + Arrays.asList(Partitioner.Murmur3Partitioner.minToken(), BigInteger.valueOf(8000))); + + // token(2) (-2000) => (-8000 -> -6000], (-6000 -> -4000], (-4000 -> -2000] + // => (-8000 -> -2000] + validateRanges(tokenRanges.get(instances.get(1)), + Arrays.asList(BigInteger.valueOf(-2000)), + Arrays.asList(BigInteger.valueOf(-8000))); + + // token(3) (2000) => (-2000 -> -1000], (-1000 -> -5], (-5 -> 2000] + // => (-2000 -> 2000] + validateRanges(tokenRanges.get(instances.get(2)), + Arrays.asList(BigInteger.valueOf(2000)), + Arrays.asList(BigInteger.valueOf(-2000))); + + // token(4) (8000) => (2000 -> 3050], (3050 -> 4000], (4000 -> 8000] + // => (2000 -> 8000] + validateRanges(tokenRanges.get(instances.get(3)), + Arrays.asList(BigInteger.valueOf(8000)), + Arrays.asList(BigInteger.valueOf(2000))); + + // token(5) (-6000) => (MIN -> -8000], (-8000 -> -6000], (9000 -> 10000], (10000 -> MAX] + // => (MIN -> -6000], (9000 -> MAX) + validateRanges(tokenRanges.get(instances.get(4)), + Arrays.asList(BigInteger.valueOf(-6000), Partitioner.Murmur3Partitioner.maxToken()), + Arrays.asList(Partitioner.Murmur3Partitioner.minToken(), BigInteger.valueOf(9000))); + + // token(6) (-1000) => (-6000 -> -4000], (-4000 -> -2000], (-2000 -> -1000] + // => (-6000 -> -1000] + validateRanges(tokenRanges.get(instances.get(5)), + Arrays.asList(BigInteger.valueOf(-1000)), + Arrays.asList(BigInteger.valueOf(-6000))); + + // token(7) (4000) => (-1000 -> -5], (-5 -> 2000], (2000 -> 3050], (3050 -> 4000] + // => (-1000 -> 4000] + validateRanges(tokenRanges.get(instances.get(6)), + Arrays.asList(BigInteger.valueOf(4000)), + Arrays.asList(BigInteger.valueOf(-1000))); + + // token(8) (9000) => (4000 -> 8000], (8000 -> 9000] + // => (4000 -> 9000] + validateRanges(tokenRanges.get(instances.get(7)), + Arrays.asList(BigInteger.valueOf(9000)), + Arrays.asList(BigInteger.valueOf(4000))); + + // token(9) (-4000) => (MIN -> -8000], (-8000 -> -6000], (-6000 -> -4000], (10000 -> MAX] + // => (MIN -> -4000], (10000 -> -MAX] + validateRanges(tokenRanges.get(instances.get(8)), + Arrays.asList(BigInteger.valueOf(-4000), Partitioner.Murmur3Partitioner.maxToken()), + Arrays.asList(Partitioner.Murmur3Partitioner.minToken(), BigInteger.valueOf(10000))); + + // token(10) (-5) => (-4000 -> -2000], (-2000 -> -1000], (-1000 -> -5] + // => (-4000 -> -5] + validateRanges(tokenRanges.get(instances.get(9)), + Arrays.asList(BigInteger.valueOf(-5)), + Arrays.asList(BigInteger.valueOf(-4000))); + + // token(11) (3050) => (-5 -> 2000], (2000 -> 3050] + // => (-5 -> 3050] + validateRanges(tokenRanges.get(instances.get(10)), + Arrays.asList(BigInteger.valueOf(3050)), + Arrays.asList(BigInteger.valueOf(-5))); + + // token(12) (10000) => (3050 -> 4000], (4000 -> 8000], (8000 -> 9000], (9000 -> 10000] + // => (3050 -> 10000] + validateRanges(tokenRanges.get(instances.get(11)), + Arrays.asList(BigInteger.valueOf(10000)), + Arrays.asList(BigInteger.valueOf(3050))); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index eb8fcebe8..b20537be9 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -679,12 +679,7 @@ public CassandraRing createCassandraRingFromTokenRangeReplicas(Partitioner parti RingResponse ring, TokenRangeReplicasResponse tokenRangeReplicas) { - Map, List> replicas = tokenRangeReplicas.readReplicas() - .stream() - .map(replicaInfo -> Map.entry( - Range.openClosed(new BigInteger(replicaInfo.start()), new BigInteger(replicaInfo.end())), - replicaInfo.replicasByDatacenter().get(datacenter))) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map, List> replicas = dcReplicasByRange(tokenRangeReplicas, datacenter); Collection instances = ring .stream() @@ -1089,4 +1084,32 @@ public boolean fieldNullable() requestedFeatures.set(index, featureAlias); } } + + @VisibleForTesting + static Map, List> dcReplicasByRange(TokenRangeReplicasResponse tokenRangeReplicas, String datacenter) + { + Map fqdnByAddressAndPort = new HashMap<>(); + Map, List> replicas = new HashMap<>(); + + tokenRangeReplicas.replicaMetadata().forEach((addressAndPort, metadata) -> { + fqdnByAddressAndPort.putIfAbsent(addressAndPort, metadata.fqdn()); + }); + + tokenRangeReplicas.readReplicas().forEach(replicaInfo -> { + Range range = Range.openClosed(new BigInteger(replicaInfo.start()), new BigInteger(replicaInfo.end())); + List dcReplicas = replicaInfo.replicasByDatacenter() + .entrySet() + .stream() + .filter(entry -> datacenter.equalsIgnoreCase(entry.getKey())) + .findFirst() + .get() + .getValue() + .stream().map(fqdnByAddressAndPort::get) + .collect(Collectors.toList()); + + replicas.put(range, dcReplicas); + }); + + return replicas; + } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java index 088de9070..b5bc014f5 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/CassandraDataLayerTests.java @@ -19,14 +19,20 @@ package org.apache.cassandra.spark.data; +import java.math.BigInteger; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import static org.apache.cassandra.spark.data.CassandraDataLayer.dcReplicasByRange; import static org.assertj.core.api.Assertions.assertThat; class CassandraDataLayerTests @@ -64,4 +70,36 @@ void testClearSnapshotOptionSupport(Boolean clearSnapshot, String expectedClearS assertThat(clearSnapshotStrategy.hasTTL()).isEqualTo(expectedClearSnapshotStrategy.hasTTL()); assertThat(clearSnapshotStrategy.ttl()).isEqualTo(expectedClearSnapshotStrategy.ttl()); } + + @Test + void testDcReplicasByRangeMultiDC() + { + List readReplicas = List.of( + new TokenRangeReplicasResponse.ReplicaInfo("-5000", "5000", + Map.of( + "dc1", List.of("localhost1:9000", "localhost2:9001", "localhost3:9002"), + "dc2", List.of("localhost4:9003")))); + + Map replicaMetadata = Map.of( + "localhost1:9000", new TokenRangeReplicasResponse.ReplicaMetadata("Normal", "Up", "replica1-1", "localhost1", 9000, "dc1"), + "localhost2:9001", new TokenRangeReplicasResponse.ReplicaMetadata("Normal", "Up", "replica1-2", "localhost2", 9001, "dc1"), + "localhost3:9002", new TokenRangeReplicasResponse.ReplicaMetadata("Normal", "Up", "replica1-3", "localhost3", 9002, "dc1"), + "localhost4:9003", new TokenRangeReplicasResponse.ReplicaMetadata("Normal", "Up", "replica2-1", "localhost4", 9003, "dc2") + ); + + TokenRangeReplicasResponse response = + new TokenRangeReplicasResponse(Collections.EMPTY_LIST, readReplicas, replicaMetadata); + + Map, List> expectedDc1 = Map.of( + Range.openClosed(new BigInteger("-5000"), new BigInteger("5000")), List.of("replica1-1", "replica1-2", "replica1-3")); + Map, List> actualDc1 = dcReplicasByRange(response, "dc1"); + + assertThat(actualDc1).isEqualTo(expectedDc1); + + Map, List> expectedDc2 = Map.of( + Range.openClosed(new BigInteger("-5000"), new BigInteger("5000")), List.of("replica2-1")); + Map, List> actualDc2 = dcReplicasByRange(response, "dc2"); + + assertThat(actualDc2).isEqualTo(expectedDc2); + } } diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleTest.java index 264d8375c..46a82de03 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleTest.java @@ -56,7 +56,7 @@ class CassandraAnalyticsSimpleTest extends SharedClusterSparkIntegrationTestBase @ParameterizedTest @MethodSource("options") - @Timeout(value = 30) // 30 seconds + @Timeout(value = 90) // 30 seconds void runSampleJob(Integer ttl, Long timestamp, QualifiedName tableName) { Map writerOptions = new HashMap<>(); From 4e652a1fb2d211cd40ccc2df6ae36e344b59689a Mon Sep 17 00:00:00 2001 From: anderoo Date: Mon, 29 Sep 2025 11:15:49 +0200 Subject: [PATCH 10/11] Set back to 30s --- .../cassandra/analytics/CassandraAnalyticsSimpleTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleTest.java index 46a82de03..264d8375c 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleTest.java @@ -56,7 +56,7 @@ class CassandraAnalyticsSimpleTest extends SharedClusterSparkIntegrationTestBase @ParameterizedTest @MethodSource("options") - @Timeout(value = 90) // 30 seconds + @Timeout(value = 30) // 30 seconds void runSampleJob(Integer ttl, Long timestamp, QualifiedName tableName) { Map writerOptions = new HashMap<>(); From 13a3a97f0c41fedc3401150537df4eda1e2e0d0a Mon Sep 17 00:00:00 2001 From: anderoo Date: Mon, 29 Sep 2025 18:34:59 +0200 Subject: [PATCH 11/11] Add serialisation --- .../spark/data/partitioner/CassandraRing.java | 80 ++++++++++++++----- 1 file changed, 60 insertions(+), 20 deletions(-) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java index 63d53b715..f1b1e0d64 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/partitioner/CassandraRing.java @@ -75,6 +75,7 @@ public class CassandraRing implements Serializable private String keyspace; private ReplicationFactor replicationFactor; private List instances; + private Map, List> rangeToReplicas; private transient RangeMap> replicas; private transient Multimap> tokenRangeMap; @@ -146,7 +147,27 @@ public CassandraRing(Partitioner partitioner, this.instances = instances.stream() .sorted(Comparator.comparing(instance -> new BigInteger(instance.token()))) .collect(Collectors.toCollection(ArrayList::new)); + this.rangeToReplicas = rangeToReplicas; + this.initWithRangeToReplicas(); + } + + public CassandraRing(Partitioner partitioner, + String keyspace, + ReplicationFactor replicationFactor, + Collection instances) + { + this.partitioner = partitioner; + this.keyspace = keyspace; + this.replicationFactor = replicationFactor; + this.instances = instances.stream() + .sorted(Comparator.comparing(instance -> new BigInteger(instance.token()))) + .collect(Collectors.toCollection(ArrayList::new)); + this.init(); + } + + private void initWithRangeToReplicas() + { // replicas is a mapping of token ranges to CassandraInstances // which are replicas for that range replicas = TreeRangeMap.create(); @@ -161,30 +182,16 @@ public CassandraRing(Partitioner partitioner, // Find following closest CassandraInstance for each replica and update tokenRangeMap rangeReplicas.forEach(replica -> IntStream.range(0, this.instances.size()) - .map(idx -> (idx + tokenOwnerIdx) % this.instances.size()) - .filter(idx -> this.instances.get(idx).nodeName().equals(replica)) - .findFirst() - .ifPresent(idx -> tokenRangeMap.get(this.instances.get(idx)).add(range))); + .map(idx -> (idx + tokenOwnerIdx) % this.instances.size()) + .filter(idx -> this.instances.get(idx).nodeName().equals(replica)) + .findFirst() + .ifPresent(idx -> tokenRangeMap.get(this.instances.get(idx)).add(range))); }); replicas.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), Collections.emptyList()); tokenRangeMap.asMap().forEach((instance, ranges) -> ranges.forEach(range -> addReplica(instance, range, replicas))); } - public CassandraRing(Partitioner partitioner, - String keyspace, - ReplicationFactor replicationFactor, - Collection instances) - { - this.partitioner = partitioner; - this.keyspace = keyspace; - this.replicationFactor = replicationFactor; - this.instances = instances.stream() - .sorted(Comparator.comparing(instance -> new BigInteger(instance.token()))) - .collect(Collectors.toCollection(ArrayList::new)); - this.init(); - } - private void init() { // Setup token range map @@ -343,7 +350,25 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE { this.instances.add(new CassandraInstance(in.readUTF(), in.readUTF(), in.readUTF())); } - this.init(); + + this.rangeToReplicas = new HashMap<>(); + int numKeys = in.readShort(); + for (int key = 0; key < numKeys; key++) + { + BigInteger rangeStart = new BigInteger(in.readUTF()); + BigInteger rangeEnd = new BigInteger(in.readUTF()); + List replicas = new ArrayList<>(); + + int numReplicas = in.readShort(); + for (int replica = 0; replica < numReplicas; replica++) + { + replicas.add(in.readUTF()); + } + + this.rangeToReplicas.put(Range.openClosed(rangeStart, rangeEnd), replicas); + } + + this.initWithRangeToReplicas(); } private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException @@ -368,6 +393,19 @@ private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFou out.writeUTF(instance.nodeName()); out.writeUTF(instance.dataCenter()); } + + out.writeShort(this.rangeToReplicas.size()); + for (Map.Entry, List> entry : rangeToReplicas.entrySet()) + { + out.writeUTF(entry.getKey().lowerEndpoint().toString()); + out.writeUTF(entry.getKey().upperEndpoint().toString()); + + out.writeShort(entry.getValue().size()); + for (String replica : entry.getValue()) + { + out.writeUTF(replica); + } + } } public static class Serializer extends com.esotericsoftware.kryo.Serializer @@ -379,6 +417,7 @@ public void write(Kryo kryo, Output out, CassandraRing ring) out.writeString(ring.keyspace); kryo.writeObject(out, ring.replicationFactor); kryo.writeObject(out, ring.instances); + kryo.writeObject(out, ring.rangeToReplicas); } @Override @@ -389,7 +428,8 @@ public CassandraRing read(Kryo kryo, Input in, Class type) : Partitioner.Murmur3Partitioner, in.readString(), kryo.readObject(in, ReplicationFactor.class), - kryo.readObject(in, ArrayList.class)); + kryo.readObject(in, ArrayList.class), + kryo.readObject(in, Map.class)); } } }