diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java index 5d6c604533be..a4c30f1c4561 100644 --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@ -160,6 +160,21 @@ public ConsistencyLevel consistency() * @return the applied resulting Mutation */ public Mutation applyCounterMutation() throws WriteTimeoutException + { + return applyCounterMutation(null); + } + + /** + * Applies the counter mutation with an optional mutation ID for tracked keyspaces. + * + * For tracked keyspaces, the mutation ID is assigned to the concrete result BEFORE + * it is applied, ensuring the concrete counter values (not the operation) are + * journaled and tracked with the ID. + * + * @param mutationId the mutation ID to assign to the concrete result, or null for non-tracked + * @return the applied resulting Mutation (with ID if provided) + */ + public Mutation applyCounterMutation(MutationId mutationId) throws WriteTimeoutException { Mutation.PartitionUpdateCollector resultBuilder = new Mutation.PartitionUpdateCollector(id(), getKeyspaceName(), key()); Keyspace keyspace = Keyspace.open(getKeyspaceName()); @@ -173,6 +188,12 @@ public Mutation applyCounterMutation() throws WriteTimeoutException resultBuilder.add(processModifications(upd)); Mutation result = resultBuilder.build(); + + // For tracked keyspaces, assign the mutation ID to the result before + // calling result.apply() since applyInternalTracked() requires an ID + if (mutationId != null && !mutationId.isNone()) + result = result.withMutationId(mutationId); + result.apply(); return result; } diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java index d0897afdd5af..d0a67e4163e3 100644 --- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java @@ -24,6 +24,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.replication.ForwardedWrite; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.transport.Dispatcher; @@ -38,6 +39,15 @@ protected void applyMutation(final Message message, InetAddress final CounterMutation cm = message.payload; logger.trace("Applying forwarded {}", cm); + Keyspace keyspace = Keyspace.open(cm.getKeyspaceName()); + + if (keyspace.getMetadata().useMutationTracking()) + { + logger.trace("Applying tracked forwarded counter mutation {}", cm); + ForwardedWrite.applyForwardedCounterMutation(cm, message, respondToAddress); + return; + } + String localDataCenter = DatabaseDescriptor.getLocator().local().datacenter; // We should not wait for the result of the write in this thread, // otherwise we could have a distributed deadlock between replicas diff --git a/src/java/org/apache/cassandra/replication/ForwardedWrite.java b/src/java/org/apache/cassandra/replication/ForwardedWrite.java index e5e2bf8d8b02..dec9412ca0e8 100644 --- a/src/java/org/apache/cassandra/replication/ForwardedWrite.java +++ b/src/java/org/apache/cassandra/replication/ForwardedWrite.java @@ -49,6 +49,7 @@ import org.apache.cassandra.net.RequestCallback; import org.apache.cassandra.net.Verb; import org.apache.cassandra.service.AbstractWriteResponseHandler; +import org.apache.cassandra.service.TrackedWriteResponseHandler; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.transport.Dispatcher; @@ -241,6 +242,119 @@ public static AbstractWriteResponseHandler forwardMutation(Mutation muta return handler; } + /** + * Forward a tracked counter mutation to a replica leader for processing. + * The leader will apply the counter mutation, assign a mutation ID, and replicate to other replicas. + */ + public static AbstractWriteResponseHandler forwardCounterMutation(CounterMutation counterMutation, + ReplicaPlan.ForWrite plan, + AbstractReplicationStrategy strategy, + Dispatcher.RequestTime requestTime) + { + Preconditions.checkArgument(counterMutation.id().isNone(), "CounterMutation should not have an ID when forwarding"); + + ClusterMetadata cm = ClusterMetadata.current(); + String localDataCenter = DatabaseDescriptor.getLocator().local().datacenter; + + // Find the leader replica - prefer local DC replicas for counters + Replica leader; + try + { + leader = ReplicaPlans.findCounterLeaderReplica(cm, counterMutation.getKeyspaceName(), + counterMutation.key(), localDataCenter, + counterMutation.consistency()); + } + catch (Exception e) + { + logger.error("Failed to find counter leader replica for tracked write", e); + throw e; + } + + Preconditions.checkState(!leader.isSelf(), "Leader should not be self when forwarding counter mutation"); + logger.trace("Forwarding tracked counter mutation to leader replica {}", leader); + + // Create response handler for all replicas + AbstractWriteResponseHandler handler = strategy.getWriteResponseHandler(plan, null, WriteType.COUNTER, null, requestTime); + + // Add callbacks for all live replicas to respond directly to coordinator + Message forwardMessage = Message.outWithRequestTime(Verb.COUNTER_MUTATION_REQ, counterMutation, requestTime); + + for (Replica replica : plan.contacts()) + { + if (plan.isAlive(replica)) + { + logger.trace("Adding forwarding callback for tracked counter response from {} id {}", replica, forwardMessage.id()); + MessagingService.instance().callbacks.addWithExpiration(handler, forwardMessage, replica); + } + else + { + handler.expired(); + } + } + + // Send the counter mutation to the leader + MessagingService.instance().send(forwardMessage, leader.endpoint()); + + return handler; + } + + /** + * Apply a forwarded tracked counter mutation on the leader replica. + * Called by CounterMutationVerbHandler when receiving a forwarded counter write. + * + * This method: + * 1. Creates CoordinatorAckInfo from the incoming message + * 2. Applies counter mutation locally with generated mutation ID + * 3. Forwards result (Mutation not CounterMutation) to other replicas with CoordinatorAckInfo + * 4. Sends leader's response back to coordinator + * + * @param counterMutation the counter mutation to apply + * @param message the original message (contains coordinator address and message ID) + * @param respondToAddress the address to send the response to (coordinator) + */ + public static void applyForwardedCounterMutation(CounterMutation counterMutation, + Message message, + InetAddressAndPort respondToAddress) + { + try + { + CoordinatorAckInfo coordinatorAckInfo = CoordinatorAckInfo.toCoordinator(message.from(), message.id()); + + String keyspaceName = counterMutation.getKeyspaceName(); + Token token = counterMutation.key().getToken(); + Keyspace ks = Keyspace.open(keyspaceName); + ReplicaPlan.ForWrite plan = ReplicaPlans.forWrite(ks, counterMutation.consistency(), token, ReplicaPlans.writeAll); + AbstractReplicationStrategy rs = plan.replicationStrategy(); + + MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token); + + if (logger.isTraceEnabled()) + logger.trace("Forwarded counter mutation {}: applying locally with ID and forwarding to other replicas", id); + + TrackedWriteResponseHandler handler = + TrackedWriteResponseHandler.wrap( + rs.getWriteResponseHandler(plan, null, WriteType.COUNTER, null, Dispatcher.RequestTime.forImmediateExecution()), + id + ); + + // Apply counter mutation with ID to get result + Mutation result = counterMutation.applyCounterMutation(id); + + // Send result to other replicas with CoordinatorAckInfo + // They will respond to the coordinator, not to this leader + TrackedWriteRequest.sendToReplicasOnly(result, plan, handler, coordinatorAckInfo); + + // Send this leader's response back to coordinator + MessagingService.instance().send(message.emptyResponse(), respondToAddress); + + logger.trace("Tracked counter mutation {} processed, response sent to {}", id, respondToAddress); + } + catch (Exception e) + { + logger.error("Error applying forwarded tracked counter mutation {}", counterMutation, e); + } + } + public static final IVersionedSerializer serializer = new IVersionedSerializer<>() { @Override diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java index b45eae9f74dc..f51bad2cd429 100644 --- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java +++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java @@ -33,6 +33,7 @@ import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.CounterMutation; import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; @@ -78,7 +79,7 @@ public class TrackedWriteRequest * @param requestTime object holding times when request got enqueued and started execution */ public static AbstractWriteResponseHandler perform( - Mutation mutation, ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime) + IMutation mutation, ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime) { Tracing.trace("Determining replicas for mutation"); @@ -95,23 +96,45 @@ public static AbstractWriteResponseHandler perform( if (logger.isTraceEnabled()) logger.trace("Remote tracked request {} {}", mutation, plan); writeMetrics.remoteRequests.mark(); - return ForwardedWrite.forwardMutation(mutation, plan, rs, requestTime); + + if (mutation instanceof CounterMutation) + return ForwardedWrite.forwardCounterMutation((CounterMutation) mutation, plan, rs, requestTime); + else + return ForwardedWrite.forwardMutation((Mutation) mutation, plan, rs, requestTime); } if (logger.isTraceEnabled()) logger.trace("Local tracked request {} {}", mutation, plan); writeMetrics.localRequests.mark(); + MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token); - mutation = mutation.withMutationId(id); - if (logger.isTraceEnabled()) - logger.trace("Write replication plan for mutation {}: live={}, pending={}, all={}", - id, plan.live(), plan.pending(), plan.contacts()); + if (mutation instanceof CounterMutation) + { + if (logger.isTraceEnabled()) + logger.trace("Write replication plan for counter mutation {}: live={}, pending={}, all={}", + id, plan.live(), plan.pending(), plan.contacts()); + + TrackedWriteResponseHandler handler = + TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null, WriteType.COUNTER, null, requestTime), id); + + Mutation result = ((CounterMutation) mutation).applyCounterMutation(id); + sendToReplicasOnly(result, plan, handler, null); + return handler; + } + else + { + mutation = mutation.withMutationId(id); + + if (logger.isTraceEnabled()) + logger.trace("Write replication plan for mutation {}: live={}, pending={}, all={}", + id, plan.live(), plan.pending(), plan.contacts()); - TrackedWriteResponseHandler handler = + TrackedWriteResponseHandler handler = TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null, WriteType.SIMPLE, null, requestTime), id); - applyLocallyAndSendToReplicas(mutation, plan, handler); - return handler; + applyLocallyAndSendToReplicas((Mutation) mutation, plan, handler); + return handler; + } } public static void applyLocallyAndSendToReplicas(Mutation mutation, ReplicaPlan.ForWrite plan, TrackedWriteResponseHandler handler) @@ -221,6 +244,121 @@ public static void applyLocallyAndSendToReplicas(Mutation mutation, ReplicaPlan. } } + /** + * Send a mutation to remote replicas only, without applying it locally. + * This is used for counter mutations where the mutation has already been applied locally + * by applyCounterMutation() before assigning the mutation ID. + * + * @param mutation the mutation with assigned ID to send to replicas + * @param plan the replica plan + * @param handler the response handler + * @param coordinatorAckInfo optional coordinator info for forwarded writes (null for local coordinator) + */ + public static void sendToReplicasOnly(Mutation mutation, ReplicaPlan.ForWrite plan, TrackedWriteResponseHandler handler, ForwardedWrite.CoordinatorAckInfo coordinatorAckInfo) + { + String localDataCenter = DatabaseDescriptor.getLocator().local().datacenter; + List localDCReplicas = null; + Map> remoteDCReplicas = null; + + // create a Message for non-local writes + Message message = null; + + // Serialize this mutation now so when we send it to multiple replicas concurrently, + // they all use the cached serialized bytes instead of re-serializing it multiple times. + Mutation.serializer.prepareSerializedBuffer(mutation, MessagingService.current_version); + + boolean foundSelf = false; + for (Replica destination : plan.contacts()) + { + if (!plan.isAlive(destination)) + { + if (logger.isTraceEnabled()) + logger.trace("Skipping dead replica {} for mutation {}", destination, mutation.id()); + handler.expired(); // immediately mark the response as expired since the request will not be sent + continue; + } + + if (destination.isSelf()) + { + foundSelf = true; // Mutation was already applied locally + continue; + } + + if (message == null) + { + Message.Builder builder = Message.builder(MUTATION_REQ, mutation) + .withRequestTime(handler.getRequestTime()) + .withFlag(MessageFlag.CALL_BACK_ON_FAILURE); + + // If this is a forwarded write, include coordinator ack info so replicas + // know to respond to the original coordinator, not this leader + if (coordinatorAckInfo != null) + builder.withParam(ParamType.COORDINATOR_ACK_INFO, coordinatorAckInfo); + + message = builder.build(); + } + + String dc = DatabaseDescriptor.getLocator().location(destination.endpoint()).datacenter; + + if (localDataCenter.equals(dc)) + { + if (localDCReplicas == null) + localDCReplicas = new ArrayList<>(plan.contacts().size()); + localDCReplicas.add(destination); + } + else + { + if (remoteDCReplicas == null) + remoteDCReplicas = new HashMap<>(); + + List replicas = remoteDCReplicas.get(dc); + if (replicas == null) + replicas = remoteDCReplicas.computeIfAbsent(dc, ignore -> new ArrayList<>(3)); // most DCs will have <= 3 replicas + replicas.add(destination); + } + } + + Preconditions.checkState(foundSelf, "Coordinator must be a replica for tracked counter mutations"); + + // Notify handler that local write succeeded (mutation was already applied before calling this method) + handler.onResponse(null); + + IntHashSet remoteReplicas = null; + if (localDCReplicas != null || remoteDCReplicas != null) + remoteReplicas = new IntHashSet(); + + if (localDCReplicas != null) + { + for (Replica replica : localDCReplicas) + { + if (logger.isTraceEnabled()) + logger.trace("Sending mutation {} to local replica {}", mutation.id(), replica); + MessagingService.instance().sendWriteWithCallback(message, replica, handler); + remoteReplicas.add(ClusterMetadata.current().directory.peerId(replica.endpoint()).id()); + } + } + + if (remoteDCReplicas != null) + { + // for each datacenter, send the message to one node to relay the write to other replicas + for (List dcReplicas : remoteDCReplicas.values()) + { + if (logger.isTraceEnabled()) + logger.trace("Sending mutation {} to remote dc replicas {}", mutation.id(), dcReplicas); + sendMessagesToRemoteDC(message, EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler, coordinatorAckInfo); + for (Replica replica : dcReplicas) + remoteReplicas.add(ClusterMetadata.current().directory.peerId(replica.endpoint()).id()); + } + } + + if (remoteReplicas != null) + { + if (logger.isTraceEnabled()) + logger.trace("Sending mutation {} to remote replicas {}", mutation.id(), remoteReplicas); + MutationTrackingService.instance.sentWriteRequest(mutation, remoteReplicas); + } + } + static void applyMutationLocally(Mutation mutation, RequestCallback handler) { Preconditions.checkArgument(handler instanceof TrackedWriteResponseHandler || handler instanceof ForwardedWrite.LeaderCallback); diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 355d20d19abb..875013d06b28 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1323,8 +1323,6 @@ public static void mutateWithoutConditions(List mutations, boolean isTracked = Schema.instance.getKeyspaceMetadata(mutations.get(0).getKeyspaceName()).params.replicationType.isTracked(); if (isTracked) { - if (mutations.stream().anyMatch(m -> m instanceof CounterMutation)) - throw new InvalidRequestException("Mutation tracking is currently unsupported with counters"); if (augmented != null) throw new InvalidRequestException("Mutation tracking is currently unsupported with triggers"); if (mutateAtomically) @@ -1367,7 +1365,10 @@ public static void dispatchMutationsWithRetryOnDifferentSystem(List c.with(GOSSIP, NATIVE_PROTOCOL)) + .start()) + { + cluster.schemaChange("CREATE KEYSPACE k WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} AND replication_type='tracked'"); + + cluster.schemaChange("CREATE TABLE k.counters (pk int PRIMARY KEY, count counter)"); + + ConsistencyLevel cl = ConsistencyLevel.QUORUM; + + // Test counter writes from all nodes + // Some will be local (coordinator is replica), some will be forwarded (coordinator not replica) + for (int coordinatorNode = 1; coordinatorNode <= 6; coordinatorNode++) + { + ICoordinator coordinator = cluster.coordinator(coordinatorNode); + int pk = coordinatorNode * 100; + + // Increment + coordinator.execute("UPDATE k.counters SET count = count + 5 WHERE pk = ?", cl, pk); + assertRows(coordinator.execute("SELECT count FROM k.counters WHERE pk = ?", cl, pk), row(5L)); + + // Increment again + coordinator.execute("UPDATE k.counters SET count = count + 3 WHERE pk = ?", cl, pk); + assertRows(coordinator.execute("SELECT count FROM k.counters WHERE pk = ?", cl, pk), row(8L)); + } + + // Verify all nodes can read all counters + for (int node = 1; node <= 6; node++) + { + ICoordinator coordinator = cluster.coordinator(node); + for (int pk = 100; pk <= 600; pk += 100) + { + assertRows(coordinator.execute("SELECT count FROM k.counters WHERE pk = ?", ConsistencyLevel.ONE, pk), + row(8L)); + } + } + } + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/replication/TrackedCounterWriteTest.java b/test/unit/org/apache/cassandra/replication/TrackedCounterWriteTest.java new file mode 100644 index 000000000000..4b49d52fc14f --- /dev/null +++ b/test/unit/org/apache/cassandra/replication/TrackedCounterWriteTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.replication; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.CounterMutation; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.ReplicationType; +import org.apache.cassandra.service.AbstractWriteResponseHandler; +import org.apache.cassandra.transport.Dispatcher; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; + +/** + * Unit tests for Token-Aware (Normal Path) Counter Mutations for Mutation Tracking + * In these tests, the coordinator IS a replica. + */ +public class TrackedCounterWriteTest +{ + private static final Logger logger = LoggerFactory.getLogger(TrackedCounterWriteTest.class); + + private static final String KEYSPACE_TRACKED = "TrackedCounterTest"; + private static final String CF_COUNTER = "Counter1"; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + + SchemaLoader.createKeyspace(KEYSPACE_TRACKED, + KeyspaceParams.simple(3, ReplicationType.tracked), + SchemaLoader.counterCFMD(KEYSPACE_TRACKED, CF_COUNTER)); + } + + /** + * Tests that counter writes are properly tracked when the coordinator is a replica. + * Verifies that: + * 1. CounterMutation initially has no ID before perform() + * 2. TrackedWriteRequest.perform() successfully processes the mutation + * 3. The counter value is correctly incremented + */ + @Test + public void testTrackedCounterWrite_CoordinatorIsReplica() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE_TRACKED).getColumnFamilyStore(CF_COUNTER); + cfs.truncateBlocking(); + + ColumnMetadata cDef = cfs.metadata().getColumn(ByteBufferUtil.bytes("val")); + + Mutation m = new RowUpdateBuilder(cfs.metadata(), System.currentTimeMillis(), "testkey1") + .clustering("cc") + .add("val", 5L) + .build(); + + CounterMutation counterMutation = new CounterMutation(m, ConsistencyLevel.ONE); + + assertTrue("CounterMutation should not have an ID before perform()", + counterMutation.id().isNone()); + + AbstractWriteResponseHandler handler = TrackedWriteRequest.perform( + counterMutation, + ConsistencyLevel.ONE, + Dispatcher.RequestTime.forImmediateExecution() + ); + assertNotNull("Handler should not be null", handler); + + Thread.sleep(100); // Waiting for async operations + + Row row = Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build()); + long counterValue = CounterContext.instance().total(row.getCell(cDef)); + assertEquals("Counter should be incremented to 5", 5L, counterValue); + } + + /** + * Tests that multiple counter updates on the same key accumulate correctly with tracking. + * Verifies that: + * 1. Multiple sequential increments/decrements are properly tracked + * 2. Counter values accumulate correctly across updates + */ + @Test + public void testTrackedCounterWrite_MultipleIncrements() throws Exception + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE_TRACKED).getColumnFamilyStore(CF_COUNTER); + cfs.truncateBlocking(); + + ColumnMetadata cDef = cfs.metadata().getColumn(ByteBufferUtil.bytes("val")); + + performCounterUpdate(cfs, "testkey2", 3L); // First increment (+3) + + Row row = Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build()); + long value = CounterContext.instance().total(row.getCell(cDef)); + assertEquals("Counter should be 3", 3L, value); + + performCounterUpdate(cfs, "testkey2", 7L); // Second increment (+7) + + row = Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build()); + value = CounterContext.instance().total(row.getCell(cDef)); + assertEquals("Counter should be 10", 10L, value); + + performCounterUpdate(cfs, "testkey2", -4L); // Decrement (-4) + + row = Util.getOnlyRow(Util.cmd(cfs).includeRow("cc").columns("val").build()); + value = CounterContext.instance().total(row.getCell(cDef)); + assertEquals("Counter should be 6", 6L, value); + } + + /** + * Performs a counter update and waits for async replication to complete + */ + private void performCounterUpdate(ColumnFamilyStore cfs, String key, long delta) throws Exception + { + Mutation m = new RowUpdateBuilder(cfs.metadata(), System.currentTimeMillis(), key) + .clustering("cc") + .add("val", delta) + .build(); + + CounterMutation counterMutation = new CounterMutation(m, ConsistencyLevel.ONE); + + TrackedWriteRequest.perform( + counterMutation, + ConsistencyLevel.ONE, + Dispatcher.RequestTime.forImmediateExecution() + ); + + Thread.sleep(50); + } +} \ No newline at end of file