Skip to content

Conversation

@arjunashok
Copy link
Contributor

@arjunashok arjunashok commented Jun 26, 2025

…and options

Update 08/28:

  • Updated repair API to return synchronously without relying on background polling threads until repair completion or max-timeout. The repair invocation's commandId is stored in the job state, which is leveraged by subsequent polling API invocation to determine the job's status.
  • There exists configuration for retry attempts and interval for the API to retrieve the parent status before responding, to ensure that a repair process has been triggered and to return an accurate status of the process.
  • Addresses PR comments

Changes

  • Adds a Sidecar endpoint to support per-keyspace repair operation. API Spec: https://issues.apache.org/jira/browse/CASSSIDECAR-268
  • Triggers repair asynchronously as a OperationalJob similar to node decommission, leveraging the job-management framework.
  • Since repairAsync JMX operation is async, the repair job polls for status of the repair operation, to detect completion/failure statuses reported by job status APIs.
  • Adds related configs repairPollIntervalMillis and maxRepairRuntimeMillis. Latter is used to release sidecar resources after the configured time to recover from unusually long running or wedged repairs.

Open Items (out of scope for this PR. Can be tracked separately)

  • Publish metrics. eg. repairRunTime, succeeded/failed repairs.

@arjunashok arjunashok marked this pull request as ready for review July 1, 2025 16:30
Copy link
Contributor

@yifan-c yifan-c left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only scanned through the PR.

catch (OperationalJobConflictException oje)
{
String reason = oje.getMessage();
logger.error("Conflicting job encountered. reason={}", reason);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the log level be warning or even info? It is not a server error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason this is error is to highlight the cause for a job to be returned as FAILED, which is the outcome of a conflict (consistent with what we're doing with decommission).

I don't have a strong opinion on this and can switch to warn but not info as we would want to highlight this as an unexpected condition if it persists.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you log bit more details like keyspace name, table names, tokens , DC etc...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some more details. I'd prefer to add these when we define a conflict policy for repair jobs, which is not currently defined for v1 (See RepairJob.hasConflict())

Comment on lines 107 to 134
Promise<Boolean> maxWaitTimePromise = Promise.promise();
vertx.setTimer(repairConfiguration.maxRepairJobRuntimeMillis(), d -> {
LOGGER.info("Timer Poll");
maxWaitTimePromise.tryComplete(true);
});

// Promise for completion of repair operation
Promise<Void> promise = Promise.promise();
Future<Void> resultFut = promise.future();

// main event loop checks periodically (10s) for completion
vertx.setPeriodic(repairConfiguration.repairPollIntervalMillis(), id -> queryForCompletedRepair(promise, cmd));
resultFut.onComplete(res -> maxWaitTimePromise.tryComplete(false));
Future<Boolean> maxWaitTimeFut = maxWaitTimePromise.future();

Future<Void> compositeFut = Future.any(maxWaitTimeFut, resultFut)
// If this lambda below is evaluated, either one of the futures have completed;
// In either case, the future corresponding to the job execution is returned
.compose(f -> {
LOGGER.info("One of the futures ended waitStatus={} resultStatus={}",
maxWaitTimeFut.isComplete(), resultFut.isComplete());
boolean isTimeout = (maxWaitTimeFut.succeeded()) ? maxWaitTimeFut.result() : false;
if (isTimeout)
{
LOGGER.error("Timer ran out before the repair job completed. Repair took too long");
// TODO: Cancel repair? (Nice to have)
// We free up the thread (job fails) and stop polling for completion
return Future.failedFuture("Repair job taking too long");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you implement this in the asyncResult override?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the asyncResult implementation is meant for tracking the execution of a OperationalJob and it's timeout from the handler's standpoint, while executeInternal is specific to the actual job's implementation, where we want to wait based on the job-specific configuration for the asynchronous command.

Also, the timeouts are handled differently - the main job handler succeeds on a job timeout, but the repair job's async repair command fails on the repair timeout.

There are similarities in the implementation, but I figured it is simpler to allow jobs to manage their own timeouts independently.

}

@Override
protected void executeInternal()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since repair is async, you might want to override execute(Promise<Void> promise) instead, completing the parameter promise when repair is complete.
executeInternal() is to run blocking operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The base execute method was meant to abstract some of the common job lifecycle management, logging, error handling from job-specific implementations. Separating these also has the advantage of being able to test only the job-specific logic.

I have updated executeInternal's contract to return a Future addressing a prior comment. Lmk if that makes more sense.

Copy link
Contributor

@nvharikrishna nvharikrishna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed it partially, will continue to review it.

catch (OperationalJobConflictException oje)
{
String reason = oje.getMessage();
logger.error("Conflicting job encountered. reason={}", reason);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you log bit more details like keyspace name, table names, tokens , DC etc...?

});
try
{
compositeFut.toCompletionStage().toCompletableFuture().get(); // wait
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

periodic task will keep on running if the job has timedout. Need to be cancelled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the changes made addressing Yifan's comment that links the PeriodicTask's promise to be handled when the internal future is complete (returned from executeInternal), it should address this concern. Let me know if that is what you are referring to.

Comment on lines 236 to 240
default:
String message = String.format("Encountered unexpected repair status: %s Messages: %s", parentRepairStatus, String.join("\n", messages));
LOGGER.error(message);
promise.fail(message);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think you can ever hit this case for switch on enum. Any unrecognized status would fail the statement ParentRepairStatus.valueOf first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more of a catch-all behavior if/when a new enum type is introduced, so it returns a meaningful response.

Comment on lines 101 to 105
Map<String, String> options = generateRepairOptions(repairParams.requestpayload());
String keyspace = repairParams.keyspace().name();

LOGGER.info("Executing repair operation for keyspace {} jobId={} maxRuntime={}",
keyspace, this.jobId(), config.maxRepairJobRuntime());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not find the code that validate the existence of keyspace. It could create unnecessary load on Cassandra process.

Please take a look at org.apache.cassandra.sidecar.handlers.validations.ValidateTableExistenceHandler. You can enhance it to only check for keyspace or add a new validator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not believe keyspace-only existence validation exists. Makes sense to have it's own validator with some abstraction between the two.

Comment on lines 137 to 138
LOGGER.error("Timer ran out before the repair job completed. Repair took too long");
return Future.failedFuture("Repair job taking too long");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the behavior wanted here? If you want to let the repair on Cassandra to continue and ask client to query for the progress later, I think it is wrong to return a completed (failure) future here. It will complete the executionPromise in OperationalJob, which completes the job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, valid concern. The current implementation uses a timeout to prevent the sidecar from holding thread resources indefinitely but simply failing the future doesn't provide a good way for clients to continue tracking the repair operation that's still running in Cassandra.

How about a hybrid approach -
Keep the timeout mechanism to release sidecar resources after maxRepairJobRuntime, but change how we handle the timeout. Instead of marking the job as failed on timeout, transition it to a 'DETACHED' state that indicates 'no longer actively monitored by sidecar, but still running in Cassandra'.

Additionally, if we have access to Cassandra's repair session/command ID, it could added to the job response, allowing clients to query/troubleshoot with Cassandra directly for status if needed.

Copy link
Contributor Author

@arjunashok arjunashok Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this offline with @yifan-c . We could potentially leverage the repair vtable to track repair progress instead of polling the async repair operation. Looking into this in more detail as to how it will interoperate with the job management/tracking flow.

Unfortunately, the repair vtable does not exist in 4.0

@yifan-c
Copy link
Contributor

yifan-c commented Aug 6, 2025

I have yet to take another look at the implementation of the async timeout. Just noticed that CHANGES.txt needs to be updated.

@JsonProperty(value = "max_repair_runtime", defaultValue = DEFAULT_MAX_REPAIR_RUNTIME_MILLIS + "") long maxRepairRuntimeMillis,
@JsonProperty(value = "repair_polling_interval", defaultValue = DEFAULT_REPAIR_POLLING_INTERVAL_MILLIS + "") long repairPollIntervalMillis)
{
this.maxRepairRuntimeMillis = maxRepairRuntimeMillis;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the valid values/range for these configs? Shall we add some validation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated these configs in the latest revision. The new configs will have retry attempts which is a int value, and polling interval is a long value in milliseconds. As for validation, what is the concern you have in mind?

It is currently a balance of how defensive we'd like to be, and consistent with other areas where we read configs - i.e. we do not explicitly validate ranges.

return Future.succeededFuture(keyspaceMetadata);
}
}, throwable -> {
context.fail(throwable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is in utility class, I think it is better to leave it to the caller how to handle the failure. Caller can do additional things if needed (like logging or wrapping it up with HttpException etc...)

@JeetKunDoug
Copy link
Contributor

@arjunashok can you please rebase this onto the current trunk branch?
Others who have reviewed previously - if you have time, please take a look and make sure everything is in order now? Would love to get this PR merged soon if possible!

Copy link
Contributor

@nvharikrishna nvharikrishna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 22/36 files. Few nits, otherwise looks good so far. Thanks for the patience. Will continue the review.

private final List<String> hosts;
private final String startToken;
private final String endToken;
private RepairType repairType;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be final?

/**
* Constructs a new {@link RepairPayload}.
*/
public RepairPayload()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it needed? Okay to avoid this constructor so that only builder can build this object?

public interface RepairJobsConfiguration
{
/**
* @return the max retry attempts for the repair job status to be valid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repair job status to be valid

valid here means?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to repairStatusMaxAttempts. This holds the no. retries when we fail to get or receive a null status (while initializing) of the repair job.

*/
public class RepairJobsConfigurationImpl implements RepairJobsConfiguration
{
// 1 day in milliseconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this comment is irrelevant now, isn't it?

{
String reason = oje.getMessage();
logger.warn("Conflicting job encountered for keyspace {}. reason={}", repairRequestParam.keyspace(), reason);
context.response().setStatusCode(HttpResponseStatus.CONFLICT.code());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can’t the fluent API be used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this as-is for consistency with other implementations/handlers

return;
}

job.asyncResult(executorPools.service(), config.operationalJobExecutionMaxWaitTime())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config.operationalJobExecutionMaxWaitTime() sounded like total time to wait for the job, but it looks the propertry name is operations_job_sync_response_timeout which is more clear. Okay to rename config.operationalJobExecutionMaxWaitTime to reflect sync response timeout ? And can we have upper limit for this value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, this is the " total time to wait for the job". I can see why operations_job_sync_response_timeout can be confusing since it refers to a response. Will look into making it more consistent.

Copy link
Contributor

@nvharikrishna nvharikrishna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few minor comments

schema = @Schema(implementation = StreamStatsResponse.class)))
@ProvidesIntoMap
@KeyClassMapKey(VertxRouteMapKeys.CassandraRepairRouteKey.class)
VertxRoute cassandraRepairRoute(RouteBuilder.Factory factory,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is misplaced (between @Path(ApiEndpointsV1.STREAM_STATS_ROUTE) and cassandraStreamStatsRoute method).


try
{
commandId = storageOperations.repair(keyspace, options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to run it using executorPools as it makes a JMX call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being executed on a separate internal thread. The job-management framework does this when any job is being executed.

.compose(keyspaceMetadata -> {
if (keyspaceMetadata == null)
{
return Future.failedFuture("Keyspace " + keyspace + " was not found");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be more direct if the promised is failed with an exception (something that sounds like: KeyspaceNotFoundException) instead of a string message checked against "not found". Not intended to insist on it though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. your suggestion is cleaner and confirms to what we're doing with SchemaUnavailableException. Will update.

* @return a Future that completes with the KeyspaceMetadata if the keyspace exists,
* or fails with an error if the keyspace doesn't exist or an error occurs
*/
public static Future<KeyspaceMetadata> validateKeyspaceExists(InstanceMetadataFetcher metadataFetcher,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate prefix gives impression that it returns success/failure. requireKeyspaceExists - does it sound better?

public boolean hasConflict(@NotNull List<OperationalJob> sameOperationJobs)
{
String operationMode = storageOperations.operationMode();
return "LEAVING".equals(operationMode) || "DECOMMISSIONED".equals(operationMode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should check if any of the current node's replicas are getting decommissioned or not (probably for some other time)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. Will track this optimization in a separate ticket.

promise.tryComplete();
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Complete job execution. jobId={} status={}", jobId, status());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be info message?

*/
public enum ParentRepairStatus
{
IN_PROGRESS, COMPLETED, FAILED, NEW_STATUS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NEW_STATUS - when it is used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused. removed.

@Override
protected Future<Void> executeInternal()
{
final Promise<Void> repairJobPromise = Promise.promise();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this promise can be created before starting the periodic task (just before it is needed)

options.put(RepairOptions.HOSTS.optionName(), String.join(",", hosts));
}

if (repairPayload.startToken() != null && repairPayload.endToken() != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if only one of them is specified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curently, it expects both tokens, with the intention to not have silent defaults, since repair is an expensive operation.

I see nodetool command in Cassandra supports specifying one or the other, but it ends up eventually failing for Murmur3 and Random partitioners. For ByteOrdered and OrderPreserving, this should map to the min-token. I'll validate and create a separate Cassandra ticket to investigate this.

{
try
{
long startToken = Long.parseLong(repairPayload.startToken());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#231 (comment) this comment is applicable here as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see there is a benefit to parsing it here for token range validation to fail-fast. Will switch it to BigInteger just for the validation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants